第30章 并发与异步编程

并发编程基础

并发编程的挑战

并发编程是现代C++开发中的重要组成部分,它可以充分利用多核处理器的性能,但同时也带来了一系列挑战:

  1. 竞态条件(Race Conditions):多个线程同时访问共享资源导致的不可预测行为
  2. 死锁(Deadlocks):线程相互等待对方释放资源导致的永久阻塞
  3. 活锁(Livelocks):线程不断改变状态但无法继续执行
  4. 饥饿(Starvation):线程长期无法获得所需资源
  5. 内存顺序问题:编译器和处理器重排序导致的可见性问题

C++内存模型

C++内存模型定义了多线程程序中内存操作的可见性和顺序,是理解并发编程的基础:

  1. 顺序一致性(Sequential Consistency):最强的内存序,所有操作按程序顺序执行,对所有线程可见
  2. 获取-释放(Acquire-Release):保证获取操作前的所有操作对释放操作后的代码可见
  3. 宽松(Relaxed):最弱的内存序,只保证原子性,不保证操作顺序
  4. 消费(Consume):针对依赖链的内存序,是获取语义的子集

内存序的应用场景

内存序适用场景性能安全性
顺序一致性简单场景,对性能要求不高
获取-释放大多数同步场景
宽松计数器、统计等无依赖场景
消费依赖链场景(较少使用)

线程管理

std::thread类

std::thread是C++11引入的线程类,用于创建和管理线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
#include <vector>

void thread_function(int id) {
std::cout << "Thread " << id << " executing" << std::endl;
}

int main() {
// 创建单个线程
std::thread t1(thread_function, 1);
t1.join(); // 等待线程结束

// 创建多个线程
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(thread_function, i);
}

// 等待所有线程结束
for (auto& t : threads) {
t.join();
}

return 0;
}

std::jthread类(C++20+)

std::jthread是C++20引入的std::thread扩展,提供了自动加入和停止令牌功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <thread>
#include <stop_token>
#include <syncstream>

void long_running_task(std::stop_token st, int id) {
int count = 0;
while (!st.stop_requested() && count < 100) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
count++;
}
std::osyncstream(std::cout) << "Task " << id << " completed after " << count << " iterations" << std::endl;
}

int main() {
// 创建jthread
std::jthread t1(long_running_task, 1);
std::jthread t2(long_running_task, 2);

// 主线程工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// jthread会自动join,无需手动调用
std::cout << "Main thread exiting" << std::endl;
return 0;
}

线程池实现

线程池是管理线程生命周期的重要工具,它可以减少线程创建和销毁的开销:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}

~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker: workers) {
worker.join();
}
}

private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

// 使用示例
int main() {
ThreadPool pool(4);

std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::cout << "Processing " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
})
);
}

for (auto &&result : results) {
std::cout << "Result: " << result.get() << std::endl;
}

return 0;
}

同步原语

互斥量(Mutex)

互斥量是最基本的同步原语,用于保护共享资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;
int shared_counter = 0;

void increment_counter(int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
std::lock_guard<std::mutex> lock(mtx);
shared_counter++;
// 作用域结束时自动释放锁
}
}

int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(increment_counter, i, 10000);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Final counter value: " << shared_counter << std::endl;
return 0;
}

读写锁(SharedMutex)

读写锁允许多个线程同时读取,但只允许一个线程写入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <vector>

std::shared_mutex rw_mutex;
int shared_data = 0;

void reader(int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
std::shared_lock<std::shared_mutex> lock(rw_mutex);
std::cout << "Reader " << id << " read: " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void writer(int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
std::unique_lock<std::shared_mutex> lock(rw_mutex);
shared_data++;
std::cout << "Writer " << id << " wrote: " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}

int main() {
std::vector<std::thread> threads;

// 创建3个读线程
for (int i = 0; i < 3; ++i) {
threads.emplace_back(reader, i, 10);
}

// 创建1个写线程
threads.emplace_back(writer, 0, 5);

for (auto& t : threads) {
t.join();
}

return 0;
}

条件变量(Condition Variable)

条件变量用于线程间的事件通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool done = false;

void producer(int id, int items) {
for (int i = 0; i < items; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Producer " << id << " produced: " << i << std::endl;
}
cv.notify_one(); // 通知一个消费者
}

// 标记生产完成
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
}
cv.notify_all(); // 通知所有消费者
}

void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] {
return !data_queue.empty() || done;
});

// 检查队列是否为空且生产已完成
if (data_queue.empty() && done) {
break;
}

// 处理数据
int value = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << value << std::endl;
}

std::cout << "Consumer " << id << " exiting" << std::endl;
}

int main() {
std::thread producer_thread(producer, 0, 10);
std::thread consumer_thread1(consumer, 1);
std::thread consumer_thread2(consumer, 2);

producer_thread.join();
consumer_thread1.join();
consumer_thread2.join();

return 0;
}

信号量(Semaphore)

C++20引入了标准信号量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <semaphore>
#include <vector>
#include <syncstream>

// 计数信号量,最多允许3个线程同时访问
std::counting_semaphore<3> sem(3);

void worker(int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
// 获取信号量
sem.acquire();

// 临界区
std::osyncstream(std::cout) << "Worker " << id << " acquired semaphore, iteration " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// 释放信号量
sem.release();
std::osyncstream(std::cout) << "Worker " << id << " released semaphore, iteration " << i << std::endl;

std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}

int main() {
std::vector<std::thread> workers;
for (int i = 0; i < 5; ++i) {
workers.emplace_back(worker, i, 3);
}

for (auto& w : workers) {
w.join();
}

return 0;
}

闩和屏障(Latch and Barrier)

C++20引入了闩和屏障同步原语:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <iostream>
#include <thread>
#include <latch>
#include <barrier>
#include <syncstream>

// 使用latch
void prepare_work(std::latch& ready_latch, int id) {
std::osyncstream(std::cout) << "Worker " << id << " preparing" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100 * id));
std::osyncstream(std::cout) << "Worker " << id << " ready" << std::endl;
ready_latch.count_down();
}

// 使用barrier
void process_work(std::barrier<>& sync_barrier, int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
std::osyncstream(std::cout) << "Worker " << id << " processing iteration " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));

// 等待所有线程完成当前迭代
sync_barrier.arrive_and_wait();
}
}

int main() {
std::cout << "=== Testing latch ===" << std::endl;
std::latch ready_latch(3);
std::vector<std::thread> prepare_threads;
for (int i = 0; i < 3; ++i) {
prepare_threads.emplace_back(prepare_work, std::ref(ready_latch), i);
}

std::cout << "Main thread waiting for all workers to be ready" << std::endl;
ready_latch.wait();
std::cout << "All workers are ready, main thread proceeding" << std::endl;

for (auto& t : prepare_threads) {
t.join();
}

std::cout << "\n=== Testing barrier ===" << std::endl;
auto completion = []() {
static int phase = 0;
std::cout << "Barrier phase " << phase++ << " completed" << std::endl;
};

std::barrier sync_barrier(3, completion);
std::vector<std::thread> process_threads;
for (int i = 0; i < 3; ++i) {
process_threads.emplace_back(process_work, std::ref(sync_barrier), i, 3);
}

for (auto& t : process_threads) {
t.join();
}

return 0;
}

内存模型与原子操作

原子类型

C++11引入了std::atomic模板,用于创建原子类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> counter(0);
std::atomic<bool> ready(false);

void increment_counter(int id, int iterations) {
// 等待就绪信号
while (!ready.load(std::memory_order_acquire)) {
std::this_thread::yield();
}

for (int i = 0; i < iterations; ++i) {
// 宽松内存序,只需要原子性
counter.fetch_add(1, std::memory_order_relaxed);
}
}

int main() {
std::vector<std::thread> threads;
const int thread_count = 4;
const int iterations_per_thread = 10000;

for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(increment_counter, i, iterations_per_thread);
}

// 启动所有线程
ready.store(true, std::memory_order_release);

for (auto& t : threads) {
t.join();
}

std::cout << "Expected counter value: " << thread_count * iterations_per_thread << std::endl;
std::cout << "Actual counter value: " << counter.load() << std::endl;

return 0;
}

原子操作的高级用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

// 原子操作的高级用法示例
class AtomicCounter {
private:
std::atomic<int> value;
public:
AtomicCounter() : value(0) {}

// 递增并返回旧值
int increment() {
return value.fetch_add(1, std::memory_order_relaxed);
}

// 递减并返回旧值
int decrement() {
return value.fetch_sub(1, std::memory_order_relaxed);
}

// 比较并交换
bool compare_and_swap(int expected, int desired) {
return value.compare_exchange_strong(expected, desired,
std::memory_order_acq_rel,
std::memory_order_acquire);
}

// 获取当前值
int get() const {
return value.load(std::memory_order_relaxed);
}
};

AtomicCounter counter;

void worker(int id, int iterations) {
for (int i = 0; i < iterations; ++i) {
int old_value = counter.increment();
if (old_value % 10000 == 0) {
std::cout << "Thread " << id << " incremented counter to " << old_value + 1 << std::endl;
}
}
}

int main() {
std::vector<std::thread> threads;
const int thread_count = 4;
const int iterations_per_thread = 100000;

for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(worker, i, iterations_per_thread);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Final counter value: " << counter.get() << std::endl;
std::cout << "Expected value: " << thread_count * iterations_per_thread << std::endl;

return 0;
}

无锁编程

无锁队列实现

无锁队列是并发编程中的重要数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <atomic>
#include <memory>

template <typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;

Node(const T& value) : data(value), next(nullptr) {}
Node() : next(nullptr) {}
};

std::atomic<Node*> head;
std::atomic<Node*> tail;

public:
LockFreeQueue() : head(new Node()), tail(head.load()) {}

~LockFreeQueue() {
while (Node* oldHead = head.load()) {
head.store(oldHead->next.load());
delete oldHead;
}
}

void enqueue(const T& value) {
Node* newNode = new Node(value);
Node* oldTail = tail.exchange(newNode, std::memory_order_acq_rel);
oldTail->next.store(newNode, std::memory_order_release);
}

bool dequeue(T& value) {
Node* oldHead = head.load(std::memory_order_acquire);
Node* newHead = oldHead->next.load(std::memory_order_acquire);

if (!newHead) {
return false; // 队列为空
}

if (!head.compare_exchange_strong(oldHead, newHead,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return false; // 操作失败,重试
}

value = newHead->data;
delete oldHead;
return true;
}

bool empty() const {
return head.load(std::memory_order_acquire)->next.load(std::memory_order_acquire) == nullptr;
}
};

// 使用示例
#include <iostream>
#include <thread>
#include <vector>

LockFreeQueue<int> queue;
const int items_per_producer = 1000;
const int producer_count = 4;
const int consumer_count = 4;

void producer(int id) {
for (int i = 0; i < items_per_producer; ++i) {
int value = id * items_per_producer + i;
queue.enqueue(value);
}
}

void consumer(int id) {
int value;
int count = 0;
while (count < items_per_producer) {
if (queue.dequeue(value)) {
count++;
}
}
std::cout << "Consumer " << id << " processed " << count << " items" << std::endl;
}

int main() {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;

// 创建生产者线程
for (int i = 0; i < producer_count; ++i) {
producers.emplace_back(producer, i);
}

// 创建消费者线程
for (int i = 0; i < consumer_count; ++i) {
consumers.emplace_back(consumer, i);
}

// 等待所有线程完成
for (auto& p : producers) {
p.join();
}

for (auto& c : consumers) {
c.join();
}

std::cout << "Queue empty: " << queue.empty() << std::endl;
return 0;
}

无锁栈实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include <atomic>

template <typename T>
class LockFreeStack {
private:
struct Node {
T data;
Node* next;

Node(const T& value) : data(value), next(nullptr) {}
};

std::atomic<Node*> top;

public:
LockFreeStack() : top(nullptr) {}

~LockFreeStack() {
while (Node* oldTop = top.load()) {
if (top.compare_exchange_strong(oldTop, oldTop->next)) {
delete oldTop;
} else {
std::this_thread::yield();
}
}
}

void push(const T& value) {
Node* newNode = new Node(value);
newNode->next = top.load(std::memory_order_relaxed);

while (!top.compare_exchange_weak(newNode->next, newNode,
std::memory_order_release,
std::memory_order_relaxed)) {
std::this_thread::yield();
}
}

bool pop(T& value) {
Node* oldTop = top.load(std::memory_order_acquire);

if (!oldTop) {
return false; // 栈为空
}

while (!top.compare_exchange_weak(oldTop, oldTop->next,
std::memory_order_release,
std::memory_order_acquire)) {
if (!oldTop) {
return false; // 栈为空
}
std::this_thread::yield();
}

value = oldTop->data;
delete oldTop;
return true;
}

bool empty() const {
return top.load(std::memory_order_acquire) == nullptr;
}
};

// 使用示例
#include <iostream>
#include <thread>
#include <vector>

LockFreeStack<int> stack;
const int items_per_thread = 1000;
const int thread_count = 4;

void worker(int id) {
// 先压入数据
for (int i = 0; i < items_per_thread; ++i) {
stack.push(id * items_per_thread + i);
}

// 再弹出数据
int value;
int count = 0;
while (count < items_per_thread) {
if (stack.pop(value)) {
count++;
}
}

std::cout << "Worker " << id << " processed " << count << " items" << std::endl;
}

int main() {
std::vector<std::thread> threads;

for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(worker, i);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Stack empty: " << stack.empty() << std::endl;
return 0;
}

异步编程

std::future和std::promise

std::futurestd::promise用于线程间的结果传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

void compute_square(std::promise<int> promise, int value) {
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(value * value);
}

int main() {
// 创建promise和future
std::promise<int> promise;
std::future<int> future = promise.get_future();

// 启动线程进行计算
std::thread t(compute_square, std::move(promise), 42);

std::cout << "Main thread waiting for result..." << std::endl;

// 等待结果
int result = future.get(); // 阻塞直到结果可用

std::cout << "Result: " << result << std::endl;

t.join();
return 0;
}

std::async

std::async提供了更简洁的异步任务创建方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <future>
#include <chrono>

int compute_factorial(int n) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int result = 1;
for (int i = 2; i <= n; ++i) {
result *= i;
}
return result;
}

int main() {
// 启动异步任务
std::future<int> future1 = std::async(std::launch::async, compute_factorial, 10);
std::future<int> future2 = std::async(std::launch::async, compute_factorial, 15);

std::cout << "Main thread doing work..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));

// 获取结果
std::cout << "Factorial of 10: " << future1.get() << std::endl;
std::cout << "Factorial of 15: " << future2.get() << std::endl;

return 0;
}

异步任务链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <future>
#include <string>

int compute(int value) {
return value * 2;
}

std::string format_result(int value) {
return "Result: " + std::to_string(value);
}

void print_result(const std::string& message) {
std::cout << message << std::endl;
}

int main() {
// 创建异步任务链
std::future<int> future1 = std::async(std::launch::async, compute, 42);
std::future<std::string> future2 = std::async(std::launch::async,
[&future1]() {
return format_result(future1.get());
});
std::future<void> future3 = std::async(std::launch::async,
[&future2]() {
print_result(future2.get());
});

// 等待所有任务完成
future3.wait();

return 0;
}

并行算法

C++17并行算法

C++17引入了并行算法,支持在多个线程上执行标准算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <chrono>

int main() {
// 创建大向量
std::vector<int> v(10000000);
for (size_t i = 0; i < v.size(); ++i) {
v[i] = rand() % 10000;
}

// 测量串行排序时间
auto start = std::chrono::high_resolution_clock::now();
std::sort(std::execution::seq, v.begin(), v.end());
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Serial sort time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

// 重新打乱向量
std::shuffle(v.begin(), v.end(), std::mt19937{std::random_device{}()});

// 测量并行排序时间
start = std::chrono::high_resolution_clock::now();
std::sort(std::execution::par, v.begin(), v.end());
end = std::chrono::high_resolution_clock::now();
std::cout << "Parallel sort time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

return 0;
}

并行变换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <numeric>

int main() {
std::vector<int> input(1000000);
std::iota(input.begin(), input.end(), 1); // 填充1, 2, ..., 1000000

std::vector<int> output(input.size());

// 并行变换
std::transform(std::execution::par,
input.begin(), input.end(),
output.begin(),
[](int x) { return x * x; });

// 验证结果
std::cout << "First 10 elements: " << std::endl;
for (int i = 0; i < 10; ++i) {
std::cout << input[i] << "^2 = " << output[i] << std::endl;
}

return 0;
}

并行查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>

int main() {
std::vector<int> v(10000000);
for (size_t i = 0; i < v.size(); ++i) {
v[i] = i;
}

// 查找值
int target = 9999999;

// 并行查找
auto it = std::find(std::execution::par, v.begin(), v.end(), target);

if (it != v.end()) {
std::cout << "Found " << target << " at position "
<< std::distance(v.begin(), it) << std::endl;
} else {
std::cout << "Value not found" << std::endl;
}

return 0;
}

性能优化

并发编程性能优化策略

  1. 减少锁竞争

    • 使用细粒度锁
    • 减少临界区大小
    • 使用无锁数据结构
  2. 内存局部性

    • 线程本地存储
    • 避免伪共享
    • 数据对齐
  3. 负载均衡

    • 动态任务分配
    • 工作窃取
    • 批处理
  4. 线程数量优化

    • 根据硬件并发度调整
    • 避免过度线程化

避免伪共享

伪共享是并发编程中的性能杀手,发生在不同线程访问共享缓存行的不同变量时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
#include <chrono>

// 容易发生伪共享的结构
struct SharedData {
std::atomic<int> counter1; // 可能与counter2共享缓存行
std::atomic<int> counter2;
};

// 避免伪共享的结构
struct AlignedData {
std::atomic<int> counter1;
char padding[64 - sizeof(std::atomic<int>)]; // 填充到64字节(典型缓存行大小)
std::atomic<int> counter2;
};

void increment_counter(std::atomic<int>& counter, int iterations) {
for (int i = 0; i < iterations; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}

int main() {
const int iterations = 10000000;

// 测试伪共享
SharedData shared_data;
auto start = std::chrono::high_resolution_clock::now();

std::thread t1(increment_counter, std::ref(shared_data.counter1), iterations);
std::thread t2(increment_counter, std::ref(shared_data.counter2), iterations);

t1.join();
t2.join();

auto end = std::chrono::high_resolution_clock::now();
std::cout << "With false sharing: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

// 测试避免伪共享
AlignedData aligned_data;
start = std::chrono::high_resolution_clock::now();

std::thread t3(increment_counter, std::ref(aligned_data.counter1), iterations);
std::thread t4(increment_counter, std::ref(aligned_data.counter2), iterations);

t3.join();
t4.join();

end = std::chrono::high_resolution_clock::now();
std::cout << "Without false sharing: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

return 0;
}

线程本地存储

线程本地存储可以减少线程间的竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>

thread_local int thread_local_counter = 0;
std::atomic<int> global_counter = 0;

void increment_counters(int iterations) {
for (int i = 0; i < iterations; ++i) {
thread_local_counter++;
}
// 最后汇总到全局计数器
global_counter.fetch_add(thread_local_counter, std::memory_order_relaxed);
}

int main() {
const int thread_count = 4;
const int iterations_per_thread = 10000000;

std::vector<std::thread> threads;
for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(increment_counters, iterations_per_thread);
}

for (auto& t : threads) {
t.join();
}

std::cout << "Global counter: " << global_counter << std::endl;
std::cout << "Expected: " << thread_count * iterations_per_thread << std::endl;

return 0;
}

最佳实践

并发编程最佳实践

  1. 优先使用高级同步原语

    • 优先使用std::asyncstd::future
    • 使用std::jthread(C++20+)自动管理线程生命周期
    • 考虑使用并行算法而不是手动线程管理
  2. 正确使用内存序

    • 默认使用std::memory_order_seq_cst保证正确性
    • 性能关键路径上使用更宽松的内存序
    • 理解内存序的语义和影响
  3. 避免死锁

    • 始终以相同的顺序获取多个锁
    • 使用std::scoped_lock(C++17+)获取多个锁
    • 设置锁超时(如果可能)
  4. 使用RAII管理同步原语

    • 使用std::lock_guardstd::unique_lock自动释放锁
    • 避免手动调用lock()unlock()
  5. 性能监控和分析

    • 使用性能分析工具识别瓶颈
    • 监控线程创建和销毁的开销
    • 测量锁竞争和等待时间
  6. 可扩展性考虑

    • 设计可扩展的并发算法
    • 考虑不同硬件配置的性能
    • 避免序列化瓶颈
  7. 错误处理

    • 正确处理异步任务中的异常
    • 确保线程异常不会导致整个程序崩溃
    • 使用std::future::get()获取和传播异常

异步编程最佳实践

  1. 使用std::async的正确启动策略

    • std::launch::async:立即启动新线程
    • std::launch::deferred:延迟执行,直到调用get()
    • 默认策略:由实现决定
  2. 避免future.get()的阻塞

    • 考虑使用future.wait_for()future.wait_until()进行非阻塞等待
    • 使用std::future_status检查结果状态
  3. 组合异步任务

    • 使用std::when_allstd::when_any(C++23+)组合多个future
    • 考虑使用协程(C++20+)进行更灵活的异步编程
  4. 资源管理

    • 确保异步任务中使用的资源正确释放
    • 避免悬空引用和指针

实际应用示例:并行图像处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <execution>
#include <algorithm>

// 简单的图像表示
struct Image {
int width, height;
std::vector<int> pixels;

Image(int w, int h) : width(w), height(h), pixels(w * h, 0) {}

int& at(int x, int y) {
return pixels[y * width + x];
}

const int& at(int x, int y) const {
return pixels[y * width + x];
}
};

// 图像滤波函数
void apply_filter(const Image& input, Image& output, int start_row, int end_row) {
// 简单的3x3均值滤波器
for (int y = start_row; y < end_row; ++y) {
for (int x = 0; x < input.width; ++x) {
int sum = 0;
int count = 0;

// 遍历3x3邻域
for (int dy = -1; dy <= 1; ++dy) {
for (int dx = -1; dx <= 1; ++dx) {
int nx = x + dx;
int ny = y + dy;

// 边界检查
if (nx >= 0 && nx < input.width && ny >= 0 && ny < input.height) {
sum += input.at(nx, ny);
count++;
}
}
}

output.at(x, y) = sum / count;
}
}
}

// 并行图像处理
void process_image_parallel(const Image& input, Image& output) {
const int thread_count = std::thread::hardware_concurrency();
const int rows_per_thread = input.height / thread_count;

std::vector<std::future<void>> futures;

for (int i = 0; i < thread_count; ++i) {
int start_row = i * rows_per_thread;
int end_row = (i == thread_count - 1) ? input.height : (i + 1) * rows_per_thread;

futures.push_back(std::async(std::launch::async,
apply_filter,
std::cref(input),
std::ref(output),
start_row,
end_row));
}

// 等待所有线程完成
for (auto& future : futures) {
future.wait();
}
}

// 串行图像处理(用于比较)
void process_image_serial(const Image& input, Image& output) {
apply_filter(input, output, 0, input.height);
}

int main() {
// 创建测试图像
const int width = 1000;
const int height = 1000;
Image input(width, height);

// 填充随机数据
std::generate(input.pixels.begin(), input.pixels.end(), []() {
return rand() % 256;
});

// 测试串行处理
Image output_serial(width, height);
auto start = std::chrono::high_resolution_clock::now();
process_image_serial(input, output_serial);
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Serial processing time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

// 测试并行处理
Image output_parallel(width, height);
start = std::chrono::high_resolution_clock::now();
process_image_parallel(input, output_parallel);
end = std::chrono::high_resolution_clock::now();
std::cout << "Parallel processing time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< " ms" << std::endl;

// 验证结果
bool results_match = true;
for (int i = 0; i < width * height; ++i) {
if (output_serial.pixels[i] != output_parallel.pixels[i]) {
results_match = false;
break;
}
}

std::cout << "Results match: " << (results_match ? "Yes" : "No") << std::endl;

return 0;
}

总结

C++并发与异步编程是现代C++开发中的重要组成部分,它可以充分利用多核处理器的性能,提高应用程序的响应速度和吞吐量。本文介绍了C++并发与异步编程的核心概念和技术,包括:

  1. 并发编程基础:内存模型、线程管理和同步原语
  2. 高级同步技术:无锁编程、原子操作和内存序
  3. 异步编程std::futurestd::promisestd::async
  4. 并行算法:C++17并行标准算法
  5. 性能优化:避免伪共享、线程本地存储和负载均衡
  6. 最佳实践:错误处理、资源管理和可扩展性

通过掌握这些技术,开发者可以编写高效、可靠的并发和异步代码,充分发挥现代硬件的性能潜力。同时,随着C++标准的不断演进,并发和异步编程的工具和技术也在不断完善,例如C++20引入的std::jthreadstd::latchstd::barrierstd::semaphore,以及C++23计划引入的更多并发特性。

在实际应用中,开发者应该根据具体场景选择合适的并发和异步编程技术,平衡性能和复杂度,确保代码的正确性和可维护性。