rosa/inc/alsk/executor/utility/pool.h
2021-05-10 18:14:24 +02:00

155 lines
3.0 KiB
C++

#ifndef ALSK_ALSK_EXECUTOR_UTILITY_POOL_H
#define ALSK_ALSK_EXECUTOR_UTILITY_POOL_H
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <list>
#include <mutex>
#include <vector>
#include <tmp/traits.h>
namespace alsk {
namespace exec {
namespace util {
struct Pool {
using Task = std::function<void()>;
using TaskInfo = std::tuple<Task, std::promise<void>>;
private:
std::atomic_bool _running;
std::vector<std::thread> _threads;
std::list<TaskInfo> _tasks;
std::mutex _mutexTasks;
std::condition_variable _cvTasks;
std::mutex _mutexProcessed;
std::condition_variable _cvProcessed;
public:
Pool(): _running{true} {}
Pool(Pool const& o): _running{true} {
config(o._threads.size());
}
Pool(Pool&& o): _running{true} {
config(o._threads.size());
}
~Pool() {
terminate();
}
Pool const& operator=(Pool const& o) {
if(this == &o) return *this;
config(o._threads.size());
return *this;
}
Pool const& operator=(Pool&& o) {
if(this == &o) return *this;
config(o._threads.size());
return *this;
}
void config(unsigned int cores) {
terminate();
_running = true;
if(cores == 0) return;
--cores; // main thread will work too
_threads.reserve(cores);
while(cores--)
_threads.emplace_back([&]{ worker(); });
}
template<typename F, typename R = tmp::invoke_result_t<F>, std::enable_if_t<std::is_same<R, void>{}>* = nullptr>
std::future<void> run(F&& task) {
std::future<void> future;
{
std::lock_guard<std::mutex> lg{_mutexTasks};
_tasks.emplace_back(std::forward<F>(task), std::promise<void>{});
future = std::get<1>(_tasks.back()).get_future();
}
_cvTasks.notify_one();
return future;
}
template<typename F, typename R = tmp::invoke_result_t<F>, std::enable_if_t<not std::is_same<R, void>{}>* = nullptr>
std::future<R> run(F&& task, std::promise<R>& promise) {
std::future<R> future = promise.get_future();
run([task=std::forward<F>(task), &promise]{ promise.set_value(task()); });
return future;
}
template<typename Futures>
void wait(Futures& futures) {
while(tryProcessOne());
for(auto& future: futures) future.wait();
}
protected:
void terminate() {
{
std::lock_guard<std::mutex> lk{_mutexTasks};
_running = false;
}
_cvTasks.notify_all();
for(auto& thread: _threads) thread.join();
_threads.clear();
}
void worker() {
auto test = [&]{ return !_running || _tasks.size(); };
for(;;) {
TaskInfo taskInfo;
{
std::unique_lock<std::mutex> lk{_mutexTasks};
if(!test()) _cvTasks.wait(lk, test);
if(!_running) return;
taskInfo = std::move(_tasks.front());
_tasks.pop_front();
}
process(taskInfo);
}
}
bool tryProcessOne() {
TaskInfo taskInfo;
{
std::unique_lock<std::mutex> lk{_mutexTasks};
if(_tasks.empty()) return false;
taskInfo = std::move(_tasks.front());
_tasks.pop_front();
}
process(taskInfo);
return true;
}
void process(TaskInfo& taskInfo) {
std::get<0>(taskInfo)();
std::get<1>(taskInfo).set_value();
_cvProcessed.notify_all();
}
};
}
}
}
#endif