简介
线程池(thread pool)
:一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
实现
/*
* example:
* // create thread pool with 4 worker threads
* ThreadPool pool(4);
*
* // enqueue and store future
* auto result = pool.enqueue([](int answer) { return answer; }, 42);
*
* // get result from future
* std::cout << result.get() << std::endl;
*
*/
class thread_pool {
public:
explicit thread_pool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~thread_pool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline thread_pool::thread_pool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto thread_pool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped thread_pool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline thread_pool::~thread_pool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
详解
-
构造一个线程数量的线程池
-
使用互斥量和条件变量实现安全入队出队
-
入队
-
-> std::future<typename std::result_of<F(Args...)>::type> //尾置返回类型
- 返回std::future
- result<>
- 用于在编译的时候推导出一个可调用对象(函数,std::funciton或者重载了operator()操作的对象等)的返回值类型.主要用于模板编写中.
- 模板参数: Fn
- 可调用对象 ArgTypes…
- 参数列表. 注意是类型(int , string等)
- 成员: type
- 可调用对象的返回类型.(主要用该成员来获得结果)
-
auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...)
-
构造智能指针
-
std::packaged_task<return_type()>
- std::packaged_task<funcRetType(参数)>
-
-
std::future<return_type> res = task->get_future();
- 返回future 由外界持有,可调用future.get()得到 funcRetType(参数)的返回值
-