#ifndef ALSK_ALSK_EXECUTOR_UTILITY_STATICPOOL_H #define ALSK_ALSK_EXECUTOR_UTILITY_STATICPOOL_H #include #include #include #include #include #include #include #include #include #include namespace alsk { namespace exec { namespace util { struct StaticPool { using Task = std::function; using TaskInfo = std::tuple>; struct ThreadInfo { std::atomic_bool running; std::thread thread; std::list tasks; std::mutex mutex; std::condition_variable cv; ThreadInfo() {} ThreadInfo(ThreadInfo&&) {} }; private: std::vector _threads; std::unordered_map> _threadFromId; public: StaticPool() {} StaticPool(StaticPool const& o) { config(o._threads.size()); } StaticPool(StaticPool&& o) { config(o._threads.size()); } ~StaticPool() { terminate(); } StaticPool const& operator=(StaticPool const& o) { if(this == &o) return *this; config(o._threads.size()); return *this; } StaticPool const& operator=(StaticPool&& o) { if(this == &o) return *this; config(o._threads.size()); return *this; } void config(unsigned int cores) { terminate(); if(cores == 0) return; _threads.resize(cores); for(unsigned int i = 0; i < cores; ++i) { ThreadInfo& threadInfo = _threads[i]; threadInfo.running = true; threadInfo.thread = std::thread{[&,&threadInfo=threadInfo] { worker(threadInfo); }}; _threadFromId.emplace(threadInfo.thread.get_id(), threadInfo); } } template, std::enable_if_t{}>* = nullptr> std::future run(std::size_t i, F&& task) { ThreadInfo& threadInfo = _threads[i]; std::future future; { std::lock_guard lg{threadInfo.mutex}; threadInfo.tasks.emplace_back(std::forward(task), std::promise{}); future = std::get<1>(threadInfo.tasks.back()).get_future(); } threadInfo.cv.notify_one(); return future; } template void wait(Futures& futures) { auto const& id = std::this_thread::get_id(); if(_threadFromId.count(id)) { auto& threadInfo = _threadFromId.at(id); while(tryProcessOne(threadInfo)); } for(auto& future: futures) future.wait(); futures.clear(); } protected: void terminate() { for(auto& threadInfo: _threads) { { std::lock_guard lg{threadInfo.mutex}; threadInfo.running = false; } threadInfo.cv.notify_all(); threadInfo.thread.join(); } _threads.clear(); _threadFromId.clear(); } void worker(ThreadInfo& threadInfo) { auto test = [&]{ return !threadInfo.running || threadInfo.tasks.size(); }; for(;;) { TaskInfo taskInfo; { std::unique_lock lk{threadInfo.mutex}; if(!test()) threadInfo.cv.wait(lk, test); if(!threadInfo.running) return; taskInfo = std::move(threadInfo.tasks.front()); threadInfo.tasks.pop_front(); } process(taskInfo); } } bool tryProcessOne(ThreadInfo& threadInfo) { TaskInfo taskInfo; { std::unique_lock lk{threadInfo.mutex}; if(threadInfo.tasks.empty()) return false; taskInfo = std::move(threadInfo.tasks.front()); threadInfo.tasks.pop_front(); } process(taskInfo); return true; } void process(TaskInfo& taskInfo) { std::get<0>(taskInfo)(); std::get<1>(taskInfo).set_value(); } }; } } } #endif