rosa/inc/alsk/executor/impl/staticpool.h

273 lines
8.0 KiB
C
Raw Permalink Normal View History

2021-05-10 16:14:24 +00:00
#ifndef ALSK_ALSK_EXECUTOR_IMPL_STATICPOOL_H
#define ALSK_ALSK_EXECUTOR_IMPL_STATICPOOL_H
#include <algorithm>
#include <cmath>
#include <future>
#include <map>
#include <set>
#include <vector>
#include "../executorbase.h"
#include "../executorstate.h"
#include "../../skeleton/traits.h"
#include "../utility/staticpool.h"
namespace alsk {
namespace exec {
template<typename S>
struct StaticPool: ExecutorBase {
using Tag = alsk::tag::Parallel;
public:
struct Info {
std::size_t cores;
std::size_t offset;
Info(std::size_t cores = 0, std::size_t offset = 0):
cores{cores}, offset{offset} {}
};
private:
auto buildSplitFor(S& s, std::size_t cores) {
std::map<std::size_t, std::size_t> ms;
auto populateSplitImpl = [&ms, totalCores=cores](
auto& buildSplitImpl, auto& s,
std::size_t maxThreads, std::size_t thOffset,
std::size_t n, std::size_t idStep, std::size_t id, bool isRemainder
) {
std::size_t const nThreads = std::min(n, maxThreads);
if(nThreads > 0) {
std::size_t const step = n/nThreads;
std::size_t const remainBase = n - step*nThreads;
std::size_t remain = remainBase;
std::size_t const coresA = maxThreads/nThreads;
std::size_t const coresB = remainBase? maxThreads/remainBase : 1;
std::size_t start = 0;
for(std::size_t i = 0; i < nThreads; ++i) {
std::size_t thNum = thOffset + i*coresA;
std::size_t offset = !!remain;
remain -= offset;
if(!ms.count(id+start*idStep))
ms[id+start*idStep] = thNum;
for(std::size_t j = 0; j < step; ++j)
buildSplitImpl(s, coresA, thNum, id+(start+j)*idStep, false);
if(offset)
buildSplitImpl(s, coresB, thNum, id+(start+step)*idStep, true);
start += step+offset;
}
if(isRemainder) ms[id+start*idStep] = totalCores;
} else {
for(std::size_t i = 0; i < n; ++i)
buildSplitImpl(s, maxThreads, thOffset, id+i*idStep, false);
}
};
auto buildSplitImpl = makeRecursiveLambda(
[&populateSplitImpl](
auto buildSplitImpl,
auto& s, auto maxThreads, auto thOffset,
auto id, bool isRemainder
) {
auto idStep = skeletonStep(s);
auto populateSplit = [&](auto& s, std::size_t n) {
if(!idStep) return;
populateSplitImpl(buildSplitImpl, s, maxThreads, thOffset, n, idStep, id, isRemainder);
};
skeletonTraversal(s, populateSplit);
}
);
buildSplitImpl(s, cores, 0ul, 0ul, false);
return ms;
}
template<typename Impl>
void buildSplit(Impl& impl) {
typename Impl::State& state = impl.state;
auto& split = state.executor.split;
split.clear();
split.insert(0);
for(auto cores: repeatability.coresList) {
std::size_t curThread = 0;
for(auto p: buildSplitFor(impl.skeleton, cores)) { // TODO: C++17
if(std::get<1>(p) != curThread) {
curThread = std::get<1>(p);
split.insert(std::get<0>(p));
}
}
}
}
std::size_t threadLimit(Info const& info) const {
auto const& lCores = info.cores;
return lCores? lCores : cores;
}
public:
template<typename Impl>
void config(Impl& impl) {
typename Impl::State& state = impl.state;
impl.executorInfo.cores = cores;
impl.executorInfo.offset = 0;
state.executor.config(cores);
state.executor.parTasksCount = impl.parallelTasksCount();;
buildSplit(impl);
}
template<typename Impl>
std::size_t contextIdCount(Impl& impl, std::size_t) {
typename Impl::State& state = impl.state;
return state.executor.split.size();
}
template<typename Impl>
std::size_t contextId(Impl& impl, std::size_t id) { // O(log(n))
typename Impl::State& state = impl.state;
auto& split = state.executor.split;
return std::distance(std::begin(split), split.upper_bound(id)) - 1;
}
template<typename Task, typename Impl, typename BTask, typename Parameters>
void executeParallel(Impl& impl, BTask& task, Parameters const& parameters, std::size_t n) {
std::size_t const maxThreads = threadLimit(impl.executorInfo);
std::size_t const nThreads = std::min(n, maxThreads);
if(nThreads > 0) {
std::vector<std::future<void>> futures(nThreads);
std::size_t const step = n/nThreads;
std::size_t const remainBase = n - step*nThreads;
std::size_t remain = remainBase;
std::size_t const coresA = maxThreads/nThreads; // cores for sub tasks in main cases
std::size_t const coresB = remainBase? maxThreads/remainBase : 1; // cores for remaining tasks
typename Impl::State& state = impl.state;
auto run = [&](std::size_t b, std::size_t k, bool offset, std::size_t thOffset) {
Info infoA{coresA, thOffset}, infoB{coresB, thOffset};
std::size_t i;
for(i = 0; i < k; ++i)
Task::execute(impl, task, b+i, infoA, parameters, std::tuple<>{});
if(offset)
Task::execute(impl, task, b+i, infoB, parameters, std::tuple<>{});
};
for(std::size_t i = 0, start = 0; i < nThreads; ++i) {
std::size_t thNum = impl.executorInfo.offset + i*coresA;
std::size_t offset = !!remain;
remain -= offset;
auto task = [&run, start, step, offset, thNum]{ run(start, step, offset, thNum); };
futures[i] = state.executor.run(thNum, std::move(task));
start += step+offset;
}
state.executor.wait(futures);
} else {
Info info{impl.executorInfo};
for(std::size_t i = 0; i < n; ++i)
Task::execute(impl, task, i, info, parameters, std::tuple<>{});
}
}
template<typename Value, typename Task, typename Select, typename Impl, typename BTask, typename BSelect, typename Parameters>
Value executeParallelAccumulate(Impl& impl, BTask& task, BSelect& select, Parameters const& parameters, std::size_t n) {
std::size_t const maxThreads = threadLimit(impl.executorInfo);
Value best{};
std::size_t const nThreads = std::min(n, maxThreads);
if(nThreads > 0) {
std::vector<std::future<void>> futures(nThreads);
std::size_t const step = n/nThreads;
std::size_t const remainBase = n - step*nThreads;
std::size_t remain = remainBase;
std::size_t const coresA = maxThreads/nThreads; // cores for sub tasks in main cases
std::size_t const coresB = remainBase? maxThreads/remainBase : 1; // cores for remaining tasks
typename Impl::State& state = impl.state;
auto run = [&](Value& out, std::size_t b, std::size_t k, bool offset, std::size_t thOffset) {
Value best{};
Info infoA{coresA, thOffset}, infoB{coresB, thOffset};
if(k) {
best = Task::execute(impl, task, b+0, infoA, parameters, std::tuple<>{});
std::size_t i;
for(i = 1; i < k; ++i) {
Value current = Task::execute(impl, task, b+i, infoA, parameters, std::tuple<>{});
best = Select::execute(impl, select, b+i, infoA, parameters, std::tuple<>{}, std::move(current), std::move(best));
}
if(offset) {
Value current = Task::execute(impl, task, b+i, infoB, parameters, std::tuple<>{});
best = Select::execute(impl, select, b+i, infoB, parameters, std::tuple<>{}, std::move(current), std::move(best));
}
}
out = std::move(best);
};
std::vector<Value> bests(nThreads);
for(std::size_t i = 0, start = 0; i < nThreads; ++i) {
std::size_t thNum = impl.executorInfo.offset + i*coresA;
std::size_t offset = !!remain;
remain -= offset;
auto task = [&, &best=bests[i], start, step, offset, thNum]{ run(best, start, step, offset, thNum); };
futures[i] = state.executor.run(thNum, std::move(task));
start += step+offset;
}
state.executor.wait(futures);
if(nThreads) best = std::move(bests[0]);
for(std::size_t i = 1; i < nThreads; ++i)
best = Select::execute(impl, select, i, impl.executorInfo, parameters, std::tuple<>{}, std::move(bests[i]), std::move(best));
} else {
Info info{impl.executorInfo};
if(n)
best = Task::execute(impl, task, 0, info, parameters, std::tuple<>{});
for(std::size_t i = 1; i < n; ++i) {
Value current = Task::execute(impl, task, i, info, parameters, std::tuple<>{});
best = Select::execute(impl, select, i, info, parameters, std::tuple<>{}, std::move(current), std::move(best));
}
}
return best;
}
};
template<typename S>
struct ExecutorState<StaticPool<S>>: util::StaticPool {
std::size_t parTasksCount;
std::set<std::size_t> split;
};
}
}
#endif