196 lines
5.5 KiB
C++
196 lines
5.5 KiB
C++
#ifndef ALSK_ALSK_EXECUTOR_IMPL_FIRSTLEVEL_EQUI_H
|
|
#define ALSK_ALSK_EXECUTOR_IMPL_FIRSTLEVEL_EQUI_H
|
|
|
|
#include <cmath>
|
|
#include <thread>
|
|
#include <set>
|
|
#include <vector>
|
|
|
|
#include "../../executorbase.h"
|
|
#include "../../executorstate.h"
|
|
#include "../../../skeleton/traits.h"
|
|
|
|
namespace alsk {
|
|
namespace exec {
|
|
|
|
template<typename S>
|
|
struct FirstLevelEqui: ExecutorBase {
|
|
using Tag = alsk::tag::Parallel;
|
|
|
|
public:
|
|
struct Info {
|
|
unsigned int parDepth;
|
|
|
|
Info(unsigned int parDepth = 0) noexcept: parDepth{parDepth} {}
|
|
|
|
Info par() const noexcept { return {parDepth+1}; }
|
|
Info seq() const noexcept { return {parDepth}; }
|
|
};
|
|
|
|
private:
|
|
template<typename Impl>
|
|
void buildSplit(Impl& impl) {
|
|
typename Impl::State& state = impl.state;
|
|
auto& split = state.executor.split;
|
|
|
|
split.clear();
|
|
|
|
auto traverser = [](std::size_t, auto&& skl, auto&&... values) {
|
|
using Skl = decltype(skl);
|
|
using Traits = SkeletonTraitsT<std::decay_t<Skl>>;
|
|
if(Traits::serial) return max(decltype(values)(values)...);
|
|
return Traits::parallelizability(std::forward<Skl>(skl));
|
|
};
|
|
|
|
auto firstLevelPar = SkeletonTraversal<S>::execute(impl.skeleton, traverser, 1ul);
|
|
|
|
split.insert(0);
|
|
for(auto const& k: repeatability.coresList) {
|
|
std::size_t start{};
|
|
std::size_t const step = firstLevelPar/k;
|
|
std::size_t remain = firstLevelPar - step*k;
|
|
|
|
for(unsigned int i = 0; i < k-1; ++i) {
|
|
std::size_t offset = !!remain;
|
|
remain -= offset;
|
|
start += step+offset;
|
|
split.insert(start * (state.executor.parTasksCount/firstLevelPar));
|
|
}
|
|
}
|
|
}
|
|
|
|
unsigned int threadLimit(Info const& info) const noexcept {
|
|
return info.parDepth? 1 : cores;
|
|
}
|
|
|
|
public:
|
|
template<typename Impl>
|
|
void config(Impl& impl) {
|
|
typename Impl::State& state = impl.state;
|
|
|
|
state.executor.parTasksCount = impl.parallelTasksCount();;
|
|
buildSplit(impl);
|
|
}
|
|
|
|
template<typename Impl>
|
|
std::size_t contextIdCount(Impl& impl, std::size_t) {
|
|
typename Impl::State& state = impl.state;
|
|
return state.executor.split.size();
|
|
}
|
|
|
|
template<typename Impl>
|
|
std::size_t contextId(Impl& impl, std::size_t id) { // O(log(n))
|
|
typename Impl::State& state = impl.state;
|
|
auto& split = state.executor.split;
|
|
return std::distance(std::begin(split), split.upper_bound(id)) - 1;
|
|
}
|
|
|
|
template<typename Task, typename Impl, typename BTask, typename Parameters>
|
|
void executeParallel(Impl& impl, BTask& task, Parameters const& parameters, std::size_t n) {
|
|
std::size_t const maxThreads = threadLimit(impl.executorInfo);
|
|
std::size_t const nThreads = std::min(n, maxThreads);
|
|
|
|
if(nThreads > 1) {
|
|
Info info = impl.executorInfo.par();
|
|
std::vector<std::thread> threads(nThreads-1);
|
|
std::size_t const step = n/nThreads;
|
|
std::size_t const remainBase = n - step*nThreads;
|
|
std::size_t remain = remainBase;
|
|
|
|
auto run = [&](std::size_t b, std::size_t k) {
|
|
for(std::size_t i = 0; i < k; ++i)
|
|
Task::execute(impl, task, b+i, info, parameters, std::tuple<>{});
|
|
};
|
|
|
|
{
|
|
std::size_t start{};
|
|
for(std::size_t i = 0; i < nThreads-1; ++i) {
|
|
std::size_t offset = !!remain;
|
|
remain -= offset;
|
|
threads[i] = std::thread{run, start, step+offset};
|
|
start += step+offset;
|
|
}
|
|
|
|
run(start, step);
|
|
}
|
|
|
|
for(std::thread& thread: threads) thread.join();
|
|
} else {
|
|
Info info = impl.executorInfo.seq();
|
|
for(std::size_t i = 0; i < n; ++i)
|
|
Task::execute(impl, task, i, info, parameters, std::tuple<>{});
|
|
}
|
|
}
|
|
|
|
template<typename Value, typename Task, typename Select, typename Impl, typename BTask, typename BSelect, typename Parameters>
|
|
Value executeParallelAccumulate(Impl& impl, BTask& task, BSelect& select, Parameters const& parameters, std::size_t n) {
|
|
std::size_t const maxThreads = threadLimit(impl.executorInfo); // TODO? fix neighbours
|
|
|
|
Value best{};
|
|
|
|
std::size_t const nThreads = std::min(n, maxThreads);
|
|
if(nThreads > 1) {
|
|
Info info = impl.executorInfo.par();
|
|
std::vector<std::thread> threads(nThreads-1);
|
|
std::size_t const step = n/nThreads;
|
|
std::size_t const remainBase = n - step*nThreads;
|
|
std::size_t remain = remainBase;
|
|
|
|
auto run = [&](Value& out, std::size_t b, std::size_t k) {
|
|
Value best{};
|
|
|
|
if(k)
|
|
best = Task::execute(impl, task, b+0, info, parameters, std::tuple<>{});
|
|
for(std::size_t i = 1; i < k; ++i) {
|
|
Value current = Task::execute(impl, task, b+i, info, parameters, std::tuple<>{});
|
|
best = Select::execute(impl, select, b+i, info, parameters, std::tuple<>{}, std::move(current), std::move(best));
|
|
}
|
|
|
|
out = std::move(best);
|
|
};
|
|
|
|
std::size_t start{};
|
|
std::vector<Value> bests(nThreads);
|
|
|
|
{
|
|
std::size_t i;
|
|
for(i = 0; i < nThreads-1; ++i) {
|
|
std::size_t offset = !!remain;
|
|
remain -= offset;
|
|
threads[i] = std::thread{run, std::ref(bests[i]), start, step+offset};
|
|
start += step+offset;
|
|
}
|
|
|
|
run(bests[i], start, step);
|
|
}
|
|
|
|
for(std::thread& thread: threads) thread.join();
|
|
|
|
if(nThreads) best = std::move(bests[0]);
|
|
for(std::size_t i = 1; i < nThreads; ++i)
|
|
best = Select::execute(impl, select, i, info, parameters, std::tuple<>{}, std::move(bests[i]), std::move(best));
|
|
} else {
|
|
Info info = impl.executorInfo.seq();
|
|
if(n)
|
|
best = Task::execute(impl, task, 0, info, parameters, std::tuple<>{});
|
|
for(std::size_t i = 1; i < n; ++i) {
|
|
Value current = Task::execute(impl, task, i, info, parameters, std::tuple<>{});
|
|
best = Select::execute(impl, select, i, info, parameters, std::tuple<>{}, std::move(current), std::move(best));
|
|
}
|
|
}
|
|
|
|
return best;
|
|
}
|
|
};
|
|
|
|
template<typename S>
|
|
struct ExecutorState<FirstLevelEqui<S>> {
|
|
std::size_t parTasksCount;
|
|
std::set<std::size_t> split;
|
|
};
|
|
|
|
}
|
|
}
|
|
|
|
#endif
|