第30章 并发与异步编程 并发编程基础 并发编程的挑战 并发编程是现代C++开发中的重要组成部分,它可以充分利用多核处理器的性能,但同时也带来了一系列挑战:
竞态条件(Race Conditions) :多个线程同时访问共享资源导致的不可预测行为死锁(Deadlocks) :线程相互等待对方释放资源导致的永久阻塞活锁(Livelocks) :线程不断改变状态但无法继续执行饥饿(Starvation) :线程长期无法获得所需资源内存顺序问题 :编译器和处理器重排序导致的可见性问题C++内存模型 C++内存模型定义了多线程程序中内存操作的可见性和顺序,是理解并发编程的基础:
顺序一致性(Sequential Consistency) :最强的内存序,所有操作按程序顺序执行,对所有线程可见获取-释放(Acquire-Release) :保证获取操作前的所有操作对释放操作后的代码可见宽松(Relaxed) :最弱的内存序,只保证原子性,不保证操作顺序消费(Consume) :针对依赖链的内存序,是获取语义的子集内存序的应用场景 内存序 适用场景 性能 安全性 顺序一致性 简单场景,对性能要求不高 低 高 获取-释放 大多数同步场景 中 高 宽松 计数器、统计等无依赖场景 高 中 消费 依赖链场景(较少使用) 高 中
线程管理 std::thread类 std::thread是C++11引入的线程类,用于创建和管理线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> #include <vector> void thread_function (int id) { std::cout << "Thread " << id << " executing" << std::endl; } int main () { std::thread t1 (thread_function, 1 ) ; t1. join (); std::vector<std::thread> threads; for (int i = 0 ; i < 5 ; ++i) { threads.emplace_back (thread_function, i); } for (auto & t : threads) { t.join (); } return 0 ; }
std::jthread类(C++20+) std::jthread是C++20引入的std::thread扩展,提供了自动加入和停止令牌功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <thread> #include <stop_token> #include <syncstream> void long_running_task (std::stop_token st, int id) { int count = 0 ; while (!st.stop_requested () && count < 100 ) { std::this_thread::sleep_for (std::chrono::milliseconds (10 )); count++; } std::osyncstream (std::cout) << "Task " << id << " completed after " << count << " iterations" << std::endl; } int main () { std::jthread t1 (long_running_task, 1 ) ; std::jthread t2 (long_running_task, 2 ) ; std::this_thread::sleep_for (std::chrono::milliseconds (100 )); std::cout << "Main thread exiting" << std::endl; return 0 ; }
线程池实现 线程池是管理线程生命周期的重要工具,它可以减少线程创建和销毁的开销:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 #include <iostream> #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <future> class ThreadPool {public : ThreadPool (size_t threads) : stop (false ) { for (size_t i = 0 ; i < threads; ++i) { workers.emplace_back ([this ] { while (true ) { std::function<void ()> task; { std::unique_lock<std::mutex> lock (this ->queue_mutex); this ->condition.wait (lock, [this ] { return this ->stop || !this ->tasks.empty (); }); if (this ->stop && this ->tasks.empty ()) return ; task = std::move (this ->tasks.front ()); this ->tasks.pop (); } task (); } }); } } template <class F, class ... Args> auto enqueue (F&& f, Args&&... args) -> std::future<typename std::result_of<F (Args...) >::type> { using return_type = typename std::result_of<F (Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type ()>>( std::bind (std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future (); { std::unique_lock<std::mutex> lock (queue_mutex) ; if (stop) throw std::runtime_error ("enqueue on stopped ThreadPool" ); tasks.emplace ([task]() { (*task)(); }); } condition.notify_one (); return res; } ~ThreadPool () { { std::unique_lock<std::mutex> lock (queue_mutex) ; stop = true ; } condition.notify_all (); for (std::thread &worker: workers) { worker.join (); } } private : std::vector<std::thread> workers; std::queue<std::function<void ()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; }; int main () { ThreadPool pool (4 ) ; std::vector<std::future<int >> results; for (int i = 0 ; i < 8 ; ++i) { results.emplace_back ( pool.enqueue ([i] { std::cout << "Processing " << i << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); return i * i; }) ); } for (auto &&result : results) { std::cout << "Result: " << result.get () << std::endl; } return 0 ; }
同步原语 互斥量(Mutex) 互斥量是最基本的同步原语,用于保护共享资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <thread> #include <mutex> #include <vector> std::mutex mtx; int shared_counter = 0 ;void increment_counter (int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { std::lock_guard<std::mutex> lock (mtx) ; shared_counter++; } } int main () { std::vector<std::thread> threads; for (int i = 0 ; i < 4 ; ++i) { threads.emplace_back (increment_counter, i, 10000 ); } for (auto & t : threads) { t.join (); } std::cout << "Final counter value: " << shared_counter << std::endl; return 0 ; }
读写锁(SharedMutex) 读写锁允许多个线程同时读取,但只允许一个线程写入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #include <iostream> #include <thread> #include <shared_mutex> #include <vector> std::shared_mutex rw_mutex; int shared_data = 0 ;void reader (int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { std::shared_lock<std::shared_mutex> lock (rw_mutex) ; std::cout << "Reader " << id << " read: " << shared_data << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (10 )); } } void writer (int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { std::unique_lock<std::shared_mutex> lock (rw_mutex) ; shared_data++; std::cout << "Writer " << id << " wrote: " << shared_data << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); } } int main () { std::vector<std::thread> threads; for (int i = 0 ; i < 3 ; ++i) { threads.emplace_back (reader, i, 10 ); } threads.emplace_back (writer, 0 , 5 ); for (auto & t : threads) { t.join (); } return 0 ; }
条件变量(Condition Variable) 条件变量用于线程间的事件通知:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> std::mutex mtx; std::condition_variable cv; std::queue<int > data_queue; bool done = false ;void producer (int id, int items) { for (int i = 0 ; i < items; ++i) { std::this_thread::sleep_for (std::chrono::milliseconds (100 )); { std::lock_guard<std::mutex> lock (mtx) ; data_queue.push (i); std::cout << "Producer " << id << " produced: " << i << std::endl; } cv.notify_one (); } { std::lock_guard<std::mutex> lock (mtx) ; done = true ; } cv.notify_all (); } void consumer (int id) { while (true ) { std::unique_lock<std::mutex> lock (mtx) ; cv.wait (lock, [] { return !data_queue.empty () || done; }); if (data_queue.empty () && done) { break ; } int value = data_queue.front (); data_queue.pop (); std::cout << "Consumer " << id << " consumed: " << value << std::endl; } std::cout << "Consumer " << id << " exiting" << std::endl; } int main () { std::thread producer_thread (producer, 0 , 10 ) ; std::thread consumer_thread1 (consumer, 1 ) ; std::thread consumer_thread2 (consumer, 2 ) ; producer_thread.join (); consumer_thread1. join (); consumer_thread2. join (); return 0 ; }
信号量(Semaphore) C++20引入了标准信号量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <iostream> #include <thread> #include <semaphore> #include <vector> #include <syncstream> std::counting_semaphore<3> sem (3 ) ;void worker (int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { sem.acquire (); std::osyncstream (std::cout) << "Worker " << id << " acquired semaphore, iteration " << i << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (100 )); sem.release (); std::osyncstream (std::cout) << "Worker " << id << " released semaphore, iteration " << i << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); } } int main () { std::vector<std::thread> workers; for (int i = 0 ; i < 5 ; ++i) { workers.emplace_back (worker, i, 3 ); } for (auto & w : workers) { w.join (); } return 0 ; }
闩和屏障(Latch and Barrier) C++20引入了闩和屏障同步原语:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <iostream> #include <thread> #include <latch> #include <barrier> #include <syncstream> void prepare_work (std::latch& ready_latch, int id) { std::osyncstream (std::cout) << "Worker " << id << " preparing" << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (100 * id)); std::osyncstream (std::cout) << "Worker " << id << " ready" << std::endl; ready_latch.count_down (); } void process_work (std::barrier<>& sync_barrier, int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { std::osyncstream (std::cout) << "Worker " << id << " processing iteration " << i << std::endl; std::this_thread::sleep_for (std::chrono::milliseconds (50 )); sync_barrier.arrive_and_wait (); } } int main () { std::cout << "=== Testing latch ===" << std::endl; std::latch ready_latch (3 ) ; std::vector<std::thread> prepare_threads; for (int i = 0 ; i < 3 ; ++i) { prepare_threads.emplace_back (prepare_work, std::ref (ready_latch), i); } std::cout << "Main thread waiting for all workers to be ready" << std::endl; ready_latch.wait (); std::cout << "All workers are ready, main thread proceeding" << std::endl; for (auto & t : prepare_threads) { t.join (); } std::cout << "\n=== Testing barrier ===" << std::endl; auto completion = []() { static int phase = 0 ; std::cout << "Barrier phase " << phase++ << " completed" << std::endl; }; std::barrier sync_barrier (3 , completion) ; std::vector<std::thread> process_threads; for (int i = 0 ; i < 3 ; ++i) { process_threads.emplace_back (process_work, std::ref (sync_barrier), i, 3 ); } for (auto & t : process_threads) { t.join (); } return 0 ; }
内存模型与原子操作 原子类型 C++11引入了std::atomic模板,用于创建原子类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #include <iostream> #include <thread> #include <atomic> #include <vector> std::atomic<int > counter (0 ) ;std::atomic<bool > ready (false ) ;void increment_counter (int id, int iterations) { while (!ready.load (std::memory_order_acquire)) { std::this_thread::yield (); } for (int i = 0 ; i < iterations; ++i) { counter.fetch_add (1 , std::memory_order_relaxed); } } int main () { std::vector<std::thread> threads; const int thread_count = 4 ; const int iterations_per_thread = 10000 ; for (int i = 0 ; i < thread_count; ++i) { threads.emplace_back (increment_counter, i, iterations_per_thread); } ready.store (true , std::memory_order_release); for (auto & t : threads) { t.join (); } std::cout << "Expected counter value: " << thread_count * iterations_per_thread << std::endl; std::cout << "Actual counter value: " << counter.load () << std::endl; return 0 ; }
原子操作的高级用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <iostream> #include <thread> #include <atomic> #include <vector> class AtomicCounter {private : std::atomic<int > value; public : AtomicCounter () : value (0 ) {} int increment () { return value.fetch_add (1 , std::memory_order_relaxed); } int decrement () { return value.fetch_sub (1 , std::memory_order_relaxed); } bool compare_and_swap (int expected, int desired) { return value.compare_exchange_strong (expected, desired, std::memory_order_acq_rel, std::memory_order_acquire); } int get () const { return value.load (std::memory_order_relaxed); } }; AtomicCounter counter; void worker (int id, int iterations) { for (int i = 0 ; i < iterations; ++i) { int old_value = counter.increment (); if (old_value % 10000 == 0 ) { std::cout << "Thread " << id << " incremented counter to " << old_value + 1 << std::endl; } } } int main () { std::vector<std::thread> threads; const int thread_count = 4 ; const int iterations_per_thread = 100000 ; for (int i = 0 ; i < thread_count; ++i) { threads.emplace_back (worker, i, iterations_per_thread); } for (auto & t : threads) { t.join (); } std::cout << "Final counter value: " << counter.get () << std::endl; std::cout << "Expected value: " << thread_count * iterations_per_thread << std::endl; return 0 ; }
无锁编程 无锁队列实现 无锁队列是并发编程中的重要数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <atomic> #include <memory> template <typename T> class LockFreeQueue { private : struct Node { T data; std::atomic<Node*> next; Node (const T& value) : data (value), next (nullptr ) {} Node () : next (nullptr ) {} }; std::atomic<Node*> head; std::atomic<Node*> tail; public : LockFreeQueue () : head (new Node ()), tail (head.load ()) {} ~LockFreeQueue () { while (Node* oldHead = head.load ()) { head.store (oldHead->next.load ()); delete oldHead; } } void enqueue (const T& value) { Node* newNode = new Node (value); Node* oldTail = tail.exchange (newNode, std::memory_order_acq_rel); oldTail->next.store (newNode, std::memory_order_release); } bool dequeue (T& value) { Node* oldHead = head.load (std::memory_order_acquire); Node* newHead = oldHead->next.load (std::memory_order_acquire); if (!newHead) { return false ; } if (!head.compare_exchange_strong (oldHead, newHead, std::memory_order_acq_rel, std::memory_order_acquire)) { return false ; } value = newHead->data; delete oldHead; return true ; } bool empty () const { return head.load (std::memory_order_acquire)->next.load (std::memory_order_acquire) == nullptr ; } }; #include <iostream> #include <thread> #include <vector> LockFreeQueue<int > queue; const int items_per_producer = 1000 ;const int producer_count = 4 ;const int consumer_count = 4 ;void producer (int id) { for (int i = 0 ; i < items_per_producer; ++i) { int value = id * items_per_producer + i; queue.enqueue (value); } } void consumer (int id) { int value; int count = 0 ; while (count < items_per_producer) { if (queue.dequeue (value)) { count++; } } std::cout << "Consumer " << id << " processed " << count << " items" << std::endl; } int main () { std::vector<std::thread> producers; std::vector<std::thread> consumers; for (int i = 0 ; i < producer_count; ++i) { producers.emplace_back (producer, i); } for (int i = 0 ; i < consumer_count; ++i) { consumers.emplace_back (consumer, i); } for (auto & p : producers) { p.join (); } for (auto & c : consumers) { c.join (); } std::cout << "Queue empty: " << queue.empty () << std::endl; return 0 ; }
无锁栈实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 #include <atomic> template <typename T> class LockFreeStack { private : struct Node { T data; Node* next; Node (const T& value) : data (value), next (nullptr ) {} }; std::atomic<Node*> top; public : LockFreeStack () : top (nullptr ) {} ~LockFreeStack () { while (Node* oldTop = top.load ()) { if (top.compare_exchange_strong (oldTop, oldTop->next)) { delete oldTop; } else { std::this_thread::yield (); } } } void push (const T& value) { Node* newNode = new Node (value); newNode->next = top.load (std::memory_order_relaxed); while (!top.compare_exchange_weak (newNode->next, newNode, std::memory_order_release, std::memory_order_relaxed)) { std::this_thread::yield (); } } bool pop (T& value) { Node* oldTop = top.load (std::memory_order_acquire); if (!oldTop) { return false ; } while (!top.compare_exchange_weak (oldTop, oldTop->next, std::memory_order_release, std::memory_order_acquire)) { if (!oldTop) { return false ; } std::this_thread::yield (); } value = oldTop->data; delete oldTop; return true ; } bool empty () const { return top.load (std::memory_order_acquire) == nullptr ; } }; #include <iostream> #include <thread> #include <vector> LockFreeStack<int > stack; const int items_per_thread = 1000 ;const int thread_count = 4 ;void worker (int id) { for (int i = 0 ; i < items_per_thread; ++i) { stack.push (id * items_per_thread + i); } int value; int count = 0 ; while (count < items_per_thread) { if (stack.pop (value)) { count++; } } std::cout << "Worker " << id << " processed " << count << " items" << std::endl; } int main () { std::vector<std::thread> threads; for (int i = 0 ; i < thread_count; ++i) { threads.emplace_back (worker, i); } for (auto & t : threads) { t.join (); } std::cout << "Stack empty: " << stack.empty () << std::endl; return 0 ; }
异步编程 std::future和std::promise std::future和std::promise用于线程间的结果传递:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <thread> #include <future> #include <chrono> void compute_square (std::promise<int > promise, int value) { std::this_thread::sleep_for (std::chrono::seconds (1 )); promise.set_value (value * value); } int main () { std::promise<int > promise; std::future<int > future = promise.get_future (); std::thread t (compute_square, std::move(promise), 42 ) ; std::cout << "Main thread waiting for result..." << std::endl; int result = future.get (); std::cout << "Result: " << result << std::endl; t.join (); return 0 ; }
std::async std::async提供了更简洁的异步任务创建方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <future> #include <chrono> int compute_factorial (int n) { std::this_thread::sleep_for (std::chrono::milliseconds (500 )); int result = 1 ; for (int i = 2 ; i <= n; ++i) { result *= i; } return result; } int main () { std::future<int > future1 = std::async (std::launch::async, compute_factorial, 10 ); std::future<int > future2 = std::async (std::launch::async, compute_factorial, 15 ); std::cout << "Main thread doing work..." << std::endl; std::this_thread::sleep_for (std::chrono::seconds (1 )); std::cout << "Factorial of 10: " << future1. get () << std::endl; std::cout << "Factorial of 15: " << future2. get () << std::endl; return 0 ; }
异步任务链 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <iostream> #include <future> #include <string> int compute (int value) { return value * 2 ; } std::string format_result (int value) { return "Result: " + std::to_string (value); } void print_result (const std::string& message) { std::cout << message << std::endl; } int main () { std::future<int > future1 = std::async (std::launch::async, compute, 42 ); std::future<std::string> future2 = std::async (std::launch::async, [&future1]() { return format_result(future1. get ()); }); std::future<void > future3 = std::async (std::launch::async, [&future2]() { print_result (future2. get ()); }); future3. wait (); return 0 ; }
并行算法 C++17并行算法 C++17引入了并行算法,支持在多个线程上执行标准算法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <iostream> #include <vector> #include <algorithm> #include <execution> #include <chrono> int main () { std::vector<int > v (10000000 ) ; for (size_t i = 0 ; i < v.size (); ++i) { v[i] = rand () % 10000 ; } auto start = std::chrono::high_resolution_clock::now (); std::sort (std::execution::seq, v.begin (), v.end ()); auto end = std::chrono::high_resolution_clock::now (); std::cout << "Serial sort time: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; std::shuffle (v.begin (), v.end (), std::mt19937{std::random_device{}()}); start = std::chrono::high_resolution_clock::now (); std::sort (std::execution::par, v.begin (), v.end ()); end = std::chrono::high_resolution_clock::now (); std::cout << "Parallel sort time: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; return 0 ; }
并行变换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <vector> #include <algorithm> #include <execution> #include <numeric> int main () { std::vector<int > input (1000000 ) ; std::iota (input.begin (), input.end (), 1 ); std::vector<int > output (input.size()) ; std::transform (std::execution::par, input.begin (), input.end (), output.begin (), [](int x) { return x * x; }); std::cout << "First 10 elements: " << std::endl; for (int i = 0 ; i < 10 ; ++i) { std::cout << input[i] << "^2 = " << output[i] << std::endl; } return 0 ; }
并行查找 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #include <iostream> #include <vector> #include <algorithm> #include <execution> int main () { std::vector<int > v (10000000 ) ; for (size_t i = 0 ; i < v.size (); ++i) { v[i] = i; } int target = 9999999 ; auto it = std::find (std::execution::par, v.begin (), v.end (), target); if (it != v.end ()) { std::cout << "Found " << target << " at position " << std::distance (v.begin (), it) << std::endl; } else { std::cout << "Value not found" << std::endl; } return 0 ; }
性能优化 并发编程性能优化策略 减少锁竞争
内存局部性
负载均衡
线程数量优化
避免伪共享 伪共享是并发编程中的性能杀手,发生在不同线程访问共享缓存行的不同变量时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <iostream> #include <thread> #include <vector> #include <atomic> #include <chrono> struct SharedData { std::atomic<int > counter1; std::atomic<int > counter2; }; struct AlignedData { std::atomic<int > counter1; char padding[64 - sizeof (std::atomic<int >)]; std::atomic<int > counter2; }; void increment_counter (std::atomic<int >& counter, int iterations) { for (int i = 0 ; i < iterations; ++i) { counter.fetch_add (1 , std::memory_order_relaxed); } } int main () { const int iterations = 10000000 ; SharedData shared_data; auto start = std::chrono::high_resolution_clock::now (); std::thread t1 (increment_counter, std::ref(shared_data.counter1), iterations) ; std::thread t2 (increment_counter, std::ref(shared_data.counter2), iterations) ; t1. join (); t2. join (); auto end = std::chrono::high_resolution_clock::now (); std::cout << "With false sharing: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; AlignedData aligned_data; start = std::chrono::high_resolution_clock::now (); std::thread t3 (increment_counter, std::ref(aligned_data.counter1), iterations) ; std::thread t4 (increment_counter, std::ref(aligned_data.counter2), iterations) ; t3. join (); t4. join (); end = std::chrono::high_resolution_clock::now (); std::cout << "Without false sharing: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; return 0 ; }
线程本地存储 线程本地存储可以减少线程间的竞争:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include <iostream> #include <thread> #include <vector> #include <atomic> thread_local int thread_local_counter = 0 ;std::atomic<int > global_counter = 0 ; void increment_counters (int iterations) { for (int i = 0 ; i < iterations; ++i) { thread_local_counter++; } global_counter.fetch_add (thread_local_counter, std::memory_order_relaxed); } int main () { const int thread_count = 4 ; const int iterations_per_thread = 10000000 ; std::vector<std::thread> threads; for (int i = 0 ; i < thread_count; ++i) { threads.emplace_back (increment_counters, iterations_per_thread); } for (auto & t : threads) { t.join (); } std::cout << "Global counter: " << global_counter << std::endl; std::cout << "Expected: " << thread_count * iterations_per_thread << std::endl; return 0 ; }
最佳实践 并发编程最佳实践 优先使用高级同步原语
优先使用std::async和std::future 使用std::jthread(C++20+)自动管理线程生命周期 考虑使用并行算法而不是手动线程管理 正确使用内存序
默认使用std::memory_order_seq_cst保证正确性 性能关键路径上使用更宽松的内存序 理解内存序的语义和影响 避免死锁
始终以相同的顺序获取多个锁 使用std::scoped_lock(C++17+)获取多个锁 设置锁超时(如果可能) 使用RAII管理同步原语
使用std::lock_guard和std::unique_lock自动释放锁 避免手动调用lock()和unlock() 性能监控和分析
使用性能分析工具识别瓶颈 监控线程创建和销毁的开销 测量锁竞争和等待时间 可扩展性考虑
设计可扩展的并发算法 考虑不同硬件配置的性能 避免序列化瓶颈 错误处理
正确处理异步任务中的异常 确保线程异常不会导致整个程序崩溃 使用std::future::get()获取和传播异常 异步编程最佳实践 使用std::async的正确启动策略
std::launch::async:立即启动新线程std::launch::deferred:延迟执行,直到调用get()默认策略:由实现决定 避免future.get()的阻塞
考虑使用future.wait_for()和future.wait_until()进行非阻塞等待 使用std::future_status检查结果状态 组合异步任务
使用std::when_all和std::when_any(C++23+)组合多个future 考虑使用协程(C++20+)进行更灵活的异步编程 资源管理
确保异步任务中使用的资源正确释放 避免悬空引用和指针 实际应用示例:并行图像处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 #include <iostream> #include <vector> #include <thread> #include <future> #include <execution> #include <algorithm> struct Image { int width, height; std::vector<int > pixels; Image (int w, int h) : width (w), height (h), pixels (w * h, 0 ) {} int & at (int x, int y) { return pixels[y * width + x]; } const int & at (int x, int y) const { return pixels[y * width + x]; } }; void apply_filter (const Image& input, Image& output, int start_row, int end_row) { for (int y = start_row; y < end_row; ++y) { for (int x = 0 ; x < input.width; ++x) { int sum = 0 ; int count = 0 ; for (int dy = -1 ; dy <= 1 ; ++dy) { for (int dx = -1 ; dx <= 1 ; ++dx) { int nx = x + dx; int ny = y + dy; if (nx >= 0 && nx < input.width && ny >= 0 && ny < input.height) { sum += input.at (nx, ny); count++; } } } output.at (x, y) = sum / count; } } } void process_image_parallel (const Image& input, Image& output) { const int thread_count = std::thread::hardware_concurrency (); const int rows_per_thread = input.height / thread_count; std::vector<std::future<void >> futures; for (int i = 0 ; i < thread_count; ++i) { int start_row = i * rows_per_thread; int end_row = (i == thread_count - 1 ) ? input.height : (i + 1 ) * rows_per_thread; futures.push_back (std::async (std::launch::async, apply_filter, std::cref (input), std::ref (output), start_row, end_row)); } for (auto & future : futures) { future.wait (); } } void process_image_serial (const Image& input, Image& output) { apply_filter (input, output, 0 , input.height); } int main () { const int width = 1000 ; const int height = 1000 ; Image input (width, height) ; std::generate (input.pixels.begin (), input.pixels.end (), []() { return rand () % 256 ; }); Image output_serial (width, height) ; auto start = std::chrono::high_resolution_clock::now (); process_image_serial (input, output_serial); auto end = std::chrono::high_resolution_clock::now (); std::cout << "Serial processing time: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; Image output_parallel (width, height) ; start = std::chrono::high_resolution_clock::now (); process_image_parallel (input, output_parallel); end = std::chrono::high_resolution_clock::now (); std::cout << "Parallel processing time: " << std::chrono::duration_cast <std::chrono::milliseconds>(end - start).count () << " ms" << std::endl; bool results_match = true ; for (int i = 0 ; i < width * height; ++i) { if (output_serial.pixels[i] != output_parallel.pixels[i]) { results_match = false ; break ; } } std::cout << "Results match: " << (results_match ? "Yes" : "No" ) << std::endl; return 0 ; }
总结 C++并发与异步编程是现代C++开发中的重要组成部分,它可以充分利用多核处理器的性能,提高应用程序的响应速度和吞吐量。本文介绍了C++并发与异步编程的核心概念和技术,包括:
并发编程基础 :内存模型、线程管理和同步原语高级同步技术 :无锁编程、原子操作和内存序异步编程 :std::future、std::promise和std::async并行算法 :C++17并行标准算法性能优化 :避免伪共享、线程本地存储和负载均衡最佳实践 :错误处理、资源管理和可扩展性通过掌握这些技术,开发者可以编写高效、可靠的并发和异步代码,充分发挥现代硬件的性能潜力。同时,随着C++标准的不断演进,并发和异步编程的工具和技术也在不断完善,例如C++20引入的std::jthread、std::latch、std::barrier和std::semaphore,以及C++23计划引入的更多并发特性。
在实际应用中,开发者应该根据具体场景选择合适的并发和异步编程技术,平衡性能和复杂度,确保代码的正确性和可维护性。