简介

线程池(thread pool):一种线程的使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

实现

/*
 * example:
 * // create thread pool with 4 worker threads
 *  ThreadPool pool(4);
 *
 *  // enqueue and store future
 *  auto result = pool.enqueue([](int answer) { return answer; }, 42);
 *
 *  // get result from future
 *  std::cout << result.get() << std::endl;
 *
 */



class thread_pool {
public:
    explicit thread_pool(size_t);

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>;
    ~thread_pool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline thread_pool::thread_pool(size_t threads)
        :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
                [this]
                {
                    for(;;)
                    {
                        std::function<void()> task;

                        {
                            std::unique_lock<std::mutex> lock(this->queue_mutex);
                            this->condition.wait(lock,
                                                 [this]{ return this->stop || !this->tasks.empty(); });
                            if(this->stop && this->tasks.empty())
                                return;
                            task = std::move(this->tasks.front());
                            this->tasks.pop();
                        }

                        task();
                    }
                }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto thread_pool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped thread_pool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline thread_pool::~thread_pool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

详解

  • 构造一个线程数量的线程池

  • 使用互斥量和条件变量实现安全入队出队

  • 入队

    • -> std::future<typename std::result_of<F(Args...)>::type>
          //尾置返回类型
      
      • 返回std::future
      • result<>
      • 用于在编译的时候推导出一个可调用对象(函数,std::funciton或者重载了operator()操作的对象等)的返回值类型.主要用于模板编写中.
        • 模板参数: Fn
        • 可调用对象 ArgTypes…
        • 参数列表. 注意是类型(int , string等)
        • 成员: type
        • 可调用对象的返回类型.(主要用该成员来获得结果)
    • auto task = std::make_shared< std::packaged_task<return_type()> >(
                  std::bind(std::forward<F>(f), std::forward<Args>(args)...)
      
      • 构造智能指针

      • std::packaged_task<return_type()>
        
        • std::packaged_task<funcRetType(参数)>
    • std::future<return_type> res = task->get_future();
      
      • 返回future 由外界持有,可调用future.get()得到 funcRetType(参数)的返回值