线程

创建线程

创建线程非常的简单的,下面就是一个使用了多线程的Hello World示例:

// 01_hello_thread.cpp

#include <iostream>
#include <thread> // ①

using namespace std; // ②

void hello() { // ③
  cout << "Hello World from new thread." << endl;
}

int main() {
  thread t(hello); // ④
  t.join(); // ⑤

  return 0;
}

对于这段代码说明如下:

  1. 为了使用多线程的接口,我们需要#include 头文件。
  2. 为了简化声明,本文中的代码都将using namespace std;。
  3. 新建线程的入口是一个普通的函数,它并没有什么特别的地方。
  4. 创建线程的方式就是构造一个thread对象,并指定入口函数。与普通对象不一样的是,此时编译器便会为我们创建一个新的操作系统线程,并在新的线程中执行我们的入口函数。
  5. 关于join函数在下文中讲解。
int main() {
  thread t([] {
    cout << "Hello World from lambda thread." << endl;
  });

  t.join();

  return 0;
}

当然,你可以传递参数给入口函数,像下面这样:

void hello(string name) {
  cout << "Welcome to " << name << endl;
}

int main() {
  thread t(hello, "https://paul.pub");
  t.join();

  return 0;
}
  • 不过需要注意的是,参数是以拷贝的形式进行传递的。
  • 因此对于拷贝耗时的对象你可能需要传递指针或者引用类型作为参数。
  • 但是,如果是传递指针或者引用,你还需要考虑参数对象的生命周期。因为线程的运行长度很可能会超过参数的生命周期(见下文detach),这个时候如果线程还在访问一个已经被销毁的对象就会出现问题。

join与detach

  • 一旦启动线程之后,我们必须决定是要等待直接它结束(通过join),还是让它独立运行(通过detach),我们必须二者选其一。

  • 如果在thread对象销毁的时候我们还没有做决定,则thread对象在析构函数出将调用std::terminate()从而导致我们的进程异常退出。

  • join:调用此接口时,当前线程会一直阻塞,直到目标线程执行完成(当然,很可能目标线程在此处调用之前就已经执行完成了,不过这不要紧)。因此,如果目标线程的任务非常耗时,你就要考虑好是否需要在主线程上等待它了,因此这很可能会导致主线程卡住。

  • detach:detach是让目标线程成为守护线程(daemon threads)。一旦detach之后,目标线程将独立执行,即便其对应的thread对象销毁也不影响线程的执行。并且,你无法再与之通信。

管理当前线程

  • yield 通常用在自己的主要任务已经完成的时候,此时希望让出处理器给其他任务使用。
  • get_id 返回当前线程的id,可以以此来标识不同的线程。
  • sleep_for 是让当前线程停止一段时间。
  • sleep_until 和sleep_for类似,但是是以具体的时间点为参数。这两个API都以chrono API(由于篇幅所限,这里不展开这方面内容)为基础。
void print_time() {
  auto now = chrono::system_clock::now();
  auto in_time_t = chrono::system_clock::to_time_t(now);

  std::stringstream ss;
  ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
  cout << "now is: " << ss.str() << endl;
}

void sleep_thread() {
  this_thread::sleep_for(chrono::seconds(3));
  cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;
}

void loop_thread() {
  for (int i = 0; i < 10; i++) {
    cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
  }
}

int main() {
  print_time();

  thread t1(sleep_thread);
  thread t2(loop_thread);

  t1.join();
  t2.detach();

  print_time();
  return 0;
}

一次调用

在一些情况下,我们有些任务需要执行一次,并且我们只希望它执行一次,例如资源的初始化任务。这个时候就可以用到上面的接口。这个接口会保证,即便在多线程的环境下,相应的函数也只会调用一次。

下面就是一个示例:有三个线程都会使用init函数,但是只会有一个线程真正执行它。

void init() {
  cout << "Initialing..." << endl;
  // Do something...
}

void worker(once_flag* flag) {
  call_once(*flag, init);
}

int main() {
  once_flag flag;

  thread t1(worker, &flag);
  thread t2(worker, &flag);
  thread t3(worker, &flag);

  t1.join();
  t2.join();
  t3.join();

  return 0;
}

并发任务

  1. thread::hardware_concurrency()可以获取到当前硬件支持多少个线程并行执行。
  2. 根据处理器的情况决定线程的数量。
  3. 对于每一个线程都通过worker函数来完成任务,并划分一部分数据给它处理。
  4. 等待每一个线程执行结束。

互斥体与锁

mutex

开发并发系统的目的主要是为了提升性能:将任务分散到多个线程,然后在不同的处理器上同时执行。这些分散开来的线程通常会包含两类任务:

  1. 独立的对于划分给自己的数据的处理
  2. 对于处理结果的汇总

其中第1项任务因为每个线程是独立的,不存在竞争条件的问题。而第2项任务,由于所有线程都可能往总结果(例如上面的sum变量)汇总,这就需要做保护了。在某一个具体的时刻,只应当有一个线程更新总结果,即:保证每个线程对于共享数据的访问是“互斥”的。

mutex是最基础的API。其他类都是在它的基础上的改进。所以这些类都提供了下面三个方法,并且它们的功能是一样的:

| 方法| 说明 | | lock|锁定互斥体,如果不可用,则阻塞 | | try_lock |尝试锁定互斥体,如果不可用,直接返回 | |unlock | 解锁互斥体|

  • 超时:

    timed_mutex,

    recursive_timed_mutex,

    shared_timed_mutex

    名称都带有timed,这意味着它们都支持超时功能。它们都提供了try_lock_for和try_lock_until方法,这两个方法分别可以指定超时的时间长度和时间点。如果在超时的时间范围内没有能获取到锁,则直接返回,不再继续等待。

  • 可重入:

    recursive_mutex和recursive_timed_mutex的名称都带有recursive。可重入或者叫做可递归,是指在同一个线程中,同一把锁可以锁定多次。这就避免了一些不必要的死锁。

  • 共享:(其实就是读写锁)

    shared_timed_mutex和shared_mutex提供了共享功能。对于这类互斥体,实际上是提供了两把锁:一把是共享锁,一把是互斥锁。一旦某个线程获取了互斥锁,任何其他线程都无法再获取互斥锁和共享锁;但是如果有某个线程获取到了共享锁,其他线程无法再获取到互斥锁,但是还有获取到共享锁。这里互斥锁的使用和其他的互斥体接口和功能一样。而共享锁可以同时被多个线程同时获取到(使用共享锁的接口见下面的表格)。共享锁通常用在读者写者模型上。

使用共享锁的接口如下:

| 方法| 说明 | |lock_shared | 获取互斥体的共享锁,如果无法获取则阻塞 | | try_lock_shared| 尝试获取共享锁,如果不可用,直接返回 | | unlock_shared| 解锁共享锁 |

通用互斥管理

标准库就提供了上面的这些API。它们都使用了叫做RAII的编程技巧,来简化我们手动加锁和解锁的“体力活”。


#include <thread>
#include <mutex>
#include <iostream>
 
int g_i = 0;
std::mutex g_i_mutex;  // ①
 
void safe_increment()
{
  std::lock_guard<std::mutex> lock(g_i_mutex);  // ②
  ++g_i;

  std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
  // ③
}
 
int main()
{
  std::cout << "main: " << g_i << '\n';
 
  std::thread t1(safe_increment); // ④
  std::thread t2(safe_increment);
 
  t1.join();
  t2.join();
 
  std::cout << "main: " << g_i << '\n';
}

这段代码中:

  1. 全局的互斥体g_i_mutex用来保护全局变量g_i
  2. 这是一个设计为可以被多线程环境使用的方法。因此需要通过互斥体来进行保护。这里没有调用lock方法,而是直接使用lock_guard来锁定互斥体。
  3. 在方法结束的时候,局部变量std::lock_guard< std::mutex > lock会被销毁,它对互斥体的锁定也就解除了。
  4. 在多个线程中使用这个方法。

std::lock_guard

  • std::lock_guard 在构造函数中进行加锁,析构函数中进行解锁。
  • 锁在多线程编程中,使用较多,因此c++11提供了lock_guard模板类;在实际编程中,我们也可以根据自己的场景编写resource_guard RAII类,避免忘掉释放资源。

std::unique_lock

  • 类 unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用
  • unique_lock比lock_guard使用更加灵活,功能更加强大。
  • 使用unique_lock需要付出更多的时间、性能成本。

条件变量

| API | C++标准 | 说明 | | condition_variable | C++ 11 | 提供与 std::unique_lock 关联的条件变量 | | condition_variable_any | C++ 11 |提供与任何锁类型关联的条件变量 | | notify_all_at_thread_exit |C++ 11 | 安排到在此线程完全结束时对 notify_all 的调用 | | cv_status | C++ 11 |列出条件变量上定时等待的可能结果 |

条件变量提供了一个可以让多个线程间同步协作的功能。这对于生产者-消费者模型很有意义。在这个模型下:

  • 生产者和消费者共享一个工作区。这个区间的大小是有限的。
  • 生产者总是产生数据放入工作区中,当工作区满了。它就停下来等消费者消费一部分数据,然后继续工作。
  • 消费者总是从工作区中拿出数据使用。当工作区中的数据全部被消费空了之后,它也会停下来等待生产者往工作区中放入新的数据。

class Account {
public:
  Account(string name, double money): mName(name), mMoney(money) {};

public:
  void changeMoney(double amount) {
    unique_lock lock(mMoneyLock); // ②
    mConditionVar.wait(lock, [this, amount] { // ③
      return mMoney + amount > 0; // ④
    });
    mMoney += amount;
    mConditionVar.notify_all(); // ⑤
  }

  string getName() {
    return mName;
  }

  double getMoney() {
    return mMoney;
  }

private:
  string mName;
  double mMoney;
  mutex mMoneyLock;
  condition_variable mConditionVar; // ①
};

这几处改动说明如下:

  1. 这里声明了一个条件变量,用来在多个线程之间协作。
  2. 这里使用的是unique_lock,这是为了与条件变量相配合。因为条件变量会解锁和重新锁定互斥体。
  3. 这里是比较重要的一个地方:通过条件变量进行等待。此时:会通过后面的lambda表达式判断条件是否满足。如果满足则继续;如果不满足,则此处会解锁互斥体,并让当前线程等待。解锁这一点非常重要,因为只有这样,才能让其他线程获取互斥体。
  4. 这里是条件变量等待的条件。如果你不熟悉lambda表达式,请自行网上学习,或者阅读我之前写的文章。
  5. 此处也很重要。当金额发生变动之后,我们需要通知所有在条件变量上等待的其他线程。此时所有调用wait线程都会再次唤醒,然后尝试获取锁(当然,只有一个能获取到)并再次判断条件是否满足。除了notify_all还有notify_one,它只通知一个等待的线程。wait和notify就构成了线程间互相协作的工具。

future

api 标准 说明
async 11 异步运行一个函数,并返回保有结果的std::future
future 11 等待被异步设置的值
packaged_task 11 打包一个函数,存储其返回值进行异步获取
promise 11 存储一个值进行异步获取
shared_future 11 等待被异步设置的值

async

static const int MAX = 10e8;
static double sum = 0;

void worker(int min, int max) {
  for (int i = min; i <= max; i++) {
    sum += sqrt(i);
  }
}

int main() {
  sum = 0;
  auto f1 = async(worker, 0, MAX);
  cout << "Async task triggered" << endl;
  f1.wait();
  cout << "Async task finish, result: " << sum << endl << endl;
}

这仍然是我们之前熟悉的例子。这里有两个地方需要说明:

  1. 这里以异步的方式启动了任务。它会返回一个future对象。future用来存储异步任务的执行结果,关于future我们在后面packaged_task的例子中再详细说明。在这个例子中我们仅仅用它来等待任务执行完成。
  2. 此处是等待异步任务执行完成。

需要注意的是,默认情况下,async是启动一个新的线程,还是以同步的方式(不启动新的线程)运行任务,这一点标准是没有指定的,由具体的编译器决定。如果希望一定要以新的线程来异步执行任务,可以通过launch::async来明确说明。launch中有两个常量:

  • async:运行新线程,以异步执行任务。
  • deferred:调用方线程上第一次请求其结果时才执行任务,即惰性求值。

packaged_task

在一些业务中,我们可能会有很多的任务需要调度。这时我们常常会设计出任务队列和线程池的结构。此时,就可以使用packaged_task来包装任务。

packaged_task绑定到一个函数或者可调用对象上。当它被调用时,它就会调用其绑定的函数或者可调用对象。并且,可以通过与之相关联的future来获取任务的结果。调度程序只需要处理packaged_task,而非各个函数。

packaged_task对象是一个可调用对象,它可以被封装成一个std::fucntion,或者作为线程函数传递给std::thread,或者直接调用。


double concurrent_worker(int min, int max) {
  double sum = 0;
  for (int i = min; i <= max; i++) {
    sum += sqrt(i);
  }
  return sum;
}

double concurrent_task(int min, int max) {
  vector<future<double>> results; // ①

  unsigned concurrent_count = thread::hardware_concurrency();
  min = 0;
  for (int i = 0; i < concurrent_count; i++) { // ②
    packaged_task<double(int, int)> task(concurrent_worker); // ③
    results.push_back(task.get_future()); // ④

    int range = max / concurrent_count * (i + 1);
    thread t(std::move(task), min, range); // ⑤
    t.detach();

    min = range + 1;
  }

  cout << "threads create finish" << endl;
  double sum = 0;
  for (auto& r : results) {
    sum += r.get(); 
  }
  return sum;
}

int main() {
  auto start_time = chrono::steady_clock::now();

  double r = concurrent_task(0, MAX);

  auto end_time = chrono::steady_clock::now();
  auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
  cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << r << endl;
  return 0;
}

在这段代码中:

  1. 首先创建一个集合来存储future对象。我们将用它来获取任务的结果。
  2. 同样的,根据CPU的情况来创建线程的数量。
  3. 将任务包装成packaged_task。请注意,由于concurrent_worker被包装成了任务,我们无法直接获取它的return值。而是要通过future对象来获取。
  4. 获取任务关联的future对象,并将其存入集合中。
  5. 通过一个新的线程来执行任务,并传入需要的参数。
  6. 通过future集合,逐个获取每个任务的计算结果,将其累加。这里r.get()获取到的就是每个任务中concurrent_worker的返回值。

简易package_task例子


int p(int pro)
{
    sleep(5);
    return pro + pro;
}

int main()
{
    packaged_task< int(int) > task(p);
    cout <<"hello"<<endl; //hello

    future<int> ret = task.get_future();
    cout <<"hello"<<endl;//hello

    thread th(move(task), 10);
    cout <<"hello"<<endl;//hello

    cout << ret.get() << endl;//sleep(5) 20

    th.join();
    return 0;
}

promise与future

在上面的例子中,concurrent_task的结果是通过return返回的。但在一些时候,我们可能不能这么做:在得到任务结果之后,可能还有一些事情需要继续处理,例如清理工作。

这个时候,就可以将promise与future配对使用。这样就可以将返回结果和任务结束两个事情分开。

下面是对上面代码示例的改写:

double concurrent_worker(int min, int max) {
  double sum = 0;
  for (int i = min; i <= max; i++) {
    sum += sqrt(i);
  }
  return sum;
}

void concurrent_task(int min, int max, promise<double>* result) { // ①
  vector<future<double>> results;

  unsigned concurrent_count = thread::hardware_concurrency();
  min = 0;
  for (int i = 0; i < concurrent_count; i++) {
    packaged_task<double(int, int)> task(concurrent_worker);
    results.push_back(task.get_future()); 

    int range = max / concurrent_count * (i + 1);
    thread t(std::move(task), min, range);
    t.detach();

    min = range + 1;
  }

  cout << "threads create finish" << endl;
  double sum = 0;
  for (auto& r : results) {
    sum += r.get();
  }
  result->set_value(sum); // ②
  cout << "concurrent_task finish" << endl;
}

int main() {
  auto start_time = chrono::steady_clock::now();

  promise<double> sum; // ③
  concurrent_task(0, MAX, &sum);

  auto end_time = chrono::steady_clock::now();
  auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
  cout << "Concurrent task finish, " << ms << " ms consumed." << endl;
  cout << "Result: " << sum.get_future().get() << endl; // ④
  return 0;
}

这段代码和上面的示例在很大程度上是一样的。只有小部分内容做了改动:

  1. concurrent_task不再直接返回计算结果,而是增加了一个promise对象来存放结果。
  2. 在任务计算完成之后,将总结过设置到promise对象上。一旦这里调用了set_value,其相关联的future对象就会就绪。
  3. 这里是在main中创建一个promoise来存放结果,并以指针的形式传递进concurrent_task中。
  4. 通过sum.get_future().get()来获取结果。第2点中已经说了:一旦调用了set_value,其相关联的future对象就会就绪。

需要注意的是,future对象只有被一个线程获取值。并且在调用get()之后,就没有可以获取的值了。如果从多个线程调用get()会出现数据竞争,其结果是未定义的。

如果真的需要在多个线程中获取future的结果,可以使用shared_future。

简易promise、future例子

#include <thread>  
#include <iostream>  
#include <future>  
#include <chrono>  
  
struct _data  
{  
    int32_t value;  
};  
  
_data data = { 0 };  
  
int main()  
{  
    std::promise<_data> data_promise;      //创建一个承诺  
    std::future<_data> data_future = data_promise.get_future();     //得到这个承诺封装好的期望  
  
    std::thread prepare_data_thread([](std::promise<_data> &data_promise){  
        std::this_thread::sleep_for(std::chrono::seconds(2));    //模拟生产过程  
  
        data_promise.set_value({ 1 });       //通过set_value()反馈结果  
    }, std::ref(data_promise));  
  
    std::thread process_data_thread([](std::future<_data> &data_future){  
        std::cout << data_future.get().value << std::endl;    //通过get()获取结果  
    }, std::ref(data_future));  
  
    prepare_data_thread.join();  
    process_data_thread.join();  
  
    system("pause");  
    return 0;  
}  
thread ref ( [promise] ) thread ref ( [future] )
| |
| |
set_value get

注意

  • 不管是packaged_task.get_future() 还是 promise.get_future()都是不会阻塞的
  • 阻塞的点在get