第25章 并发与异步模式
并发编程概述
并发编程是指同时执行多个任务的编程方式,它可以提高程序的性能和响应速度。在现代计算机系统中,多核处理器已经非常普遍,并发编程可以充分利用多核资源,提高程序的执行效率。
并发的类型
- 多进程并发:使用多个进程来执行任务,进程之间通过进程间通信(IPC)机制进行通信
- 多线程并发:在同一个进程中使用多个线程来执行任务,线程之间共享进程的内存空间
- 异步编程:使用回调、Promise、Future等机制,在单线程中实现并发效果
并发的挑战
- 竞态条件:多个线程同时访问和修改共享数据,导致结果不确定
- 死锁:多个线程互相等待对方释放资源,导致所有线程都无法继续执行
- 活锁:多个线程不断改变自己的状态以响应其他线程的变化,但没有任何线程能够继续执行
- 饥饿:一个线程一直无法获得所需的资源,导致无法继续执行
- 线程安全:确保多个线程访问共享数据时不会导致数据不一致或其他问题
多线程编程
线程的创建和管理
C++11引入了标准线程库std::thread,使得创建和管理线程变得更加简单和便携。
创建线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| #include <iostream> #include <thread>
void function1() { std::cout << "Hello from thread!" << std::endl; }
int main() { std::thread t1(function1); t1.join(); std::thread t2([]() { std::cout << "Hello from lambda thread!" << std::endl; }); t2.join(); return 0; }
|
线程的属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include <iostream> #include <thread> #include <chrono>
void function(int id, int sleep_time) { std::cout << "Thread " << id << " started" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); std::cout << "Thread " << id << " finished" << std::endl; }
int main() { std::cout << "Hardware concurrency: " << std::thread::hardware_concurrency() << std::endl; std::thread t1(function, 1, 2); std::thread t2(function, 2, 1); std::cout << "Thread 1 ID: " << t1.get_id() << std::endl; std::cout << "Thread 2 ID: " << t2.get_id() << std::endl; std::cout << "Thread 1 joinable: " << t1.joinable() << std::endl; t1.join(); t2.join(); std::cout << "Thread 1 joinable after join: " << t1.joinable() << std::endl; return 0; }
|
互斥量
互斥量(Mutex)是一种同步原语,用于保护共享数据,确保同一时间只有一个线程可以访问共享数据。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #include <iostream> #include <thread> #include <mutex>
std::mutex mtx; int shared_data = 0;
void increment(int times) { for (int i = 0; i < times; ++i) { std::lock_guard<std::mutex> lock(mtx); ++shared_data; } }
int main() { std::thread t1(increment, 100000); std::thread t2(increment, 100000); t1.join(); t2.join(); std::cout << "Shared data: " << shared_data << std::endl; return 0; }
|
其他互斥量类型
std::recursive_mutex:允许同一线程多次锁定std::timed_mutex:支持超时锁定std::recursive_timed_mutex:结合了递归和超时特性
条件变量
条件变量是一种同步原语,用于线程之间的通信,允许一个线程等待某个条件成立。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #include <iostream> #include <thread> #include <mutex> #include <condition_variable>
std::mutex mtx; std::condition_variable cv; bool ready = false;
void worker() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return ready; }); std::cout << "Worker thread started working" << std::endl; }
int main() { std::thread t(worker); std::this_thread::sleep_for(std::chrono::seconds(2)); { std::lock_guard<std::mutex> lock(mtx); ready = true; std::cout << "Main thread signals ready" << std::endl; } cv.notify_one(); t.join(); return 0; }
|
原子操作
原子操作是不可分割的操作,不会被其他线程中断,用于无锁编程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <thread> #include <atomic>
std::atomic<int> counter(0);
void increment(int times) { for (int i = 0; i < times; ++i) { ++counter; } }
int main() { std::thread t1(increment, 100000); std::thread t2(increment, 100000); t1.join(); t2.join(); std::cout << "Counter: " << counter << std::endl; return 0; }
|
线程局部存储
线程局部存储(Thread Local Storage,TLS)允许每个线程拥有自己的变量副本,避免线程之间的竞争。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <iostream> #include <thread>
thread_local int thread_id = 0;
void function(int id) { thread_id = id; std::cout << "Thread " << std::this_thread::get_id() << ": thread_id = " << thread_id << std::endl; }
int main() { std::thread t1(function, 1); std::thread t2(function, 2); t1.join(); t2.join(); std::cout << "Main thread: thread_id = " << thread_id << std::endl; return 0; }
|
异步编程
异步任务
C++11引入了std::async、std::future和std::promise等组件,用于异步编程。
std::async和std::future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <future> #include <chrono>
int calculate(int x, int y) { std::this_thread::sleep_for(std::chrono::seconds(2)); return x + y; }
int main() { std::future<int> fut = std::async(std::launch::async, calculate, 10, 20); std::cout << "Waiting for result..." << std::endl; int result = fut.get(); std::cout << "Result: " << result << std::endl; return 0; }
|
std::promise和std::future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <iostream> #include <future> #include <thread>
void process_data(std::promise<int>&& prom, int data) { int result = data * 2; prom.set_value(result); }
int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(process_data, std::move(prom), 42); int result = fut.get(); std::cout << "Result: " << result << std::endl; t.join(); return 0; }
|
任务并行
任务并行是指将一个问题分解为多个独立的任务,并行执行这些任务。
并行算法
C++17引入了并行算法库,提供了并行版本的标准算法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <vector> #include <algorithm> #include <execution>
int main() { std::vector<int> v(1000000); std::generate(v.begin(), v.end(), []{ return rand() % 1000; }); std::sort(std::execution::par, v.begin(), v.end()); int sum = 0; std::for_each(std::execution::par, v.begin(), v.end(), [&sum](int x) { sum += x; }); std::cout << "Sum: " << sum << std::endl; return 0; }
|
异步模式
回调模式
回调是一种常见的异步编程模式,通过传递函数指针或函数对象来处理异步操作的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #include <iostream> #include <functional> #include <thread> #include <chrono>
void async_operation(int value, std::function<void(int)> callback) { std::thread t([value, callback]() { std::this_thread::sleep_for(std::chrono::seconds(2)); int result = value * 2; callback(result); }); t.detach(); }
int main() { async_operation(42, [](int result) { std::cout << "Callback received result: " << result << std::endl; }); std::cout << "Main thread continues..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); return 0; }
|
Future-Promise模式
Future-Promise模式是一种更现代的异步编程模式,通过Future对象来获取异步操作的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| #include <iostream> #include <future> #include <thread>
std::future<int> async_operation(int value) { return std::async(std::launch::async, [value]() { std::this_thread::sleep_for(std::chrono::seconds(2)); return value * 2; }); }
int main() { std::future<int> fut = async_operation(42); std::cout << "Main thread continues..." << std::endl; int result = fut.get(); std::cout << "Received result: " << result << std::endl; return 0; }
|
协程
C++20引入了协程(Coroutines),这是一种更高级的异步编程机制,允许函数在执行过程中暂停和恢复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include <iostream> #include <coroutine> #include <future> #include <chrono>
template<typename T> struct task { struct promise_type { T value; std::exception_ptr exception; task get_return_object() { return task{std::coroutine_handle<promise_type>::from_promise(*this)}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_value(T v) { value = v; } void unhandled_exception() { exception = std::current_exception(); } }; std::coroutine_handle<promise_type> handle; ~task() { if (handle) handle.destroy(); } T get() { if (handle.promise().exception) { std::rethrow_exception(handle.promise().exception); } return handle.promise().value; } };
task<int> async_operation(int value) { co_await std::suspend_always{}; co_return value * 2; }
int main() { auto t = async_operation(42); t.handle.resume(); std::cout << "Result: " << t.get() << std::endl; return 0; }
|
线程池
线程池是一种管理线程的机制,它预先创建一组线程,用于执行多个任务,避免频繁创建和销毁线程的开销。
简单的线程池实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| #include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <future>
class ThreadPool { public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type> { using return_type = typename std::invoke_result<F, Args...>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
int main() { ThreadPool pool(4); std::vector<std::future<int>> results; for (int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "Task " << i << " started" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Task " << i << " finished" << std::endl; return i * i; }) ); } for (size_t i = 0; i < results.size(); ++i) { std::cout << "Result " << i << ": " << results[i].get() << std::endl; } return 0; }
|
并发模式
生产者-消费者模式
生产者-消费者模式是一种常见的并发设计模式,用于解决多线程之间的协作问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| #include <iostream> #include <vector> #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <random>
class ProducerConsumer { public: ProducerConsumer(size_t buffer_size) : buffer_size(buffer_size), stop(false) {} void produce(int value) { std::unique_lock<std::mutex> lock(mutex); condition.wait(lock, [this] { return stop || buffer.size() < buffer_size; }); if (stop) return; buffer.push(value); std::cout << "Produced: " << value << std::endl; lock.unlock(); condition.notify_one(); } int consume() { std::unique_lock<std::mutex> lock(mutex); condition.wait(lock, [this] { return stop || !buffer.empty(); }); if (stop && buffer.empty()) return -1; int value = buffer.front(); buffer.pop(); std::cout << "Consumed: " << value << std::endl; lock.unlock(); condition.notify_one(); return value; } void stop_production() { std::unique_lock<std::mutex> lock(mutex); stop = true; lock.unlock(); condition.notify_all(); } private: size_t buffer_size; std::queue<int> buffer; std::mutex mutex; std::condition_variable condition; bool stop; };
int main() { ProducerConsumer pc(5); std::thread producer([&pc] { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dis(1, 100); for (int i = 0; i < 10; ++i) { int value = dis(gen); pc.produce(value); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } pc.stop_production(); }); std::thread consumer([&pc] { for (int i = 0; i < 10; ++i) { pc.consume(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } }); producer.join(); consumer.join(); return 0; }
|
读写锁模式
读写锁模式允许多个线程同时读取共享数据,但同一时间只能有一个线程写入共享数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| #include <iostream> #include <vector> #include <thread> #include <shared_mutex>
class ReadWriteLock { public: void read() { std::shared_lock<std::shared_mutex> lock(mutex); std::cout << "Reading..." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout << "Reading done" << std::endl; } void write() { std::unique_lock<std::shared_mutex> lock(mutex); std::cout << "Writing..." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::cout << "Writing done" << std::endl; } private: std::shared_mutex mutex; };
int main() { ReadWriteLock rwlock; std::vector<std::thread> readers; for (int i = 0; i < 5; ++i) { readers.emplace_back([&rwlock, i] { std::cout << "Reader " << i << " started" << std::endl; rwlock.read(); }); } std::thread writer([&rwlock] { std::cout << "Writer started" << std::endl; rwlock.write(); }); for (auto& reader : readers) { reader.join(); } writer.join(); return 0; }
|
屏障模式
屏障(Barrier)是一种同步原语,用于等待一组线程都到达某个点,然后继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include <iostream> #include <vector> #include <thread> #include <barrier>
int main() { std::barrier sync_point(4, [] { std::cout << "All threads have reached the barrier" << std::endl; }); std::vector<std::thread> threads; for (int i = 0; i < 3; ++i) { threads.emplace_back([&sync_point, i] { std::cout << "Thread " << i << " working" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Thread " << i << " reached barrier" << std::endl; sync_point.arrive_and_wait(); std::cout << "Thread " << i << " continuing" << std::endl; }); } std::cout << "Main thread working" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Main thread reached barrier" << std::endl; sync_point.arrive_and_wait(); std::cout << "Main thread continuing" << std::endl; for (auto& thread : threads) { thread.join(); } return 0; }
|
并发工具库
C++11/14/17/20中的并发特性
- C++11:
std::thread、std::mutex、std::condition_variable、std::future、std::promise、std::async、std::atomic - C++14:
std::shared_timed_mutex、std::make_unique(用于线程安全的工厂函数) - C++17:
std::shared_mutex、并行算法、std::scoped_lock - C++20:
std::barrier、std::latch、std::semaphore、协程
第三方并发库
- Intel TBB(Threading Building Blocks):提供高级并行算法和数据结构
- Boost.Thread:提供跨平台的线程库
- libuv:提供跨平台的异步I/O库
- folly:Facebook开源的C++库,包含大量并发工具
性能优化
并发性能优化
- 减少锁竞争:使用细粒度锁、无锁数据结构、原子操作等
- 避免虚假唤醒:使用条件变量时,总是在循环中检查条件
- 合理使用线程数量:线程数量不应超过硬件并发数
- 避免线程创建和销毁:使用线程池
- 减少线程间通信:最小化共享数据,使用消息传递等方式
异步性能优化
- 使用非阻塞I/O:避免线程阻塞在I/O操作上
- 合理使用异步模式:根据具体场景选择合适的异步模式
- 避免过度使用异步:对于简单的操作,同步执行可能更高效
- 使用事件循环:在单线程中处理多个异步操作
安全性
并发安全
- 避免共享状态:尽量减少线程间的共享数据
- 使用同步原语:正确使用互斥量、条件变量等同步原语
- 使用线程安全的数据结构:使用STL中的线程安全容器或第三方库
- 避免死锁:使用
std::scoped_lock、避免嵌套锁等 - 使用RAII:使用RAII模式管理锁和其他资源
异步安全
- 正确处理异常:确保异步操作中的异常能够被捕获和处理
- 避免悬空引用:确保异步操作中使用的变量在操作完成前仍然有效
- 正确管理资源:确保异步操作中使用的资源能够被正确释放
- 避免回调地狱:使用Future-Promise模式或协程,避免深度嵌套的回调
示例:并发服务器
简单的并发回声服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
| #include <iostream> #include <vector> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> #include <asio.hpp>
using asio::ip::tcp;
class ThreadPool { public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(queue_mutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } task(); } }); } } template<class F> void enqueue(F&& f) { { std::unique_lock<std::mutex> lock(queue_mutex); tasks.emplace(std::forward<F>(f)); } condition.notify_one(); } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
class Session : public std::enable_shared_from_this<Session> { public: Session(tcp::socket socket) : socket(std::move(socket)) {} void start() { do_read(); } private: void do_read() { auto self(shared_from_this()); socket.async_read_some(asio::buffer(data, max_length), [this, self](std::error_code ec, std::size_t length) { if (!ec) { do_write(length); } }); } void do_write(std::size_t length) { auto self(shared_from_this()); asio::async_write(socket, asio::buffer(data, length), [this, self](std::error_code ec, std::size_t ) { if (!ec) { do_read(); } }); } tcp::socket socket; enum { max_length = 1024 }; char data[max_length]; };
class Server { public: Server(asio::io_context& io_context, short port) : acceptor(io_context, tcp::endpoint(tcp::v4(), port)), io_context(io_context), pool(4) { do_accept(); } private: void do_accept() { acceptor.async_accept( [this](std::error_code ec, tcp::socket socket) { if (!ec) { pool.enqueue([this, socket = std::move(socket)]() { auto session = std::make_shared<Session>(std::move(socket)); session->start(); io_context.post([session]() { session->start(); }); }); } do_accept(); }); } tcp::acceptor acceptor; asio::io_context& io_context; ThreadPool pool; };
int main(int argc, char* argv[]) { try { if (argc != 2) { std::cerr << "Usage: server <port>" << std::endl; return 1; } asio::io_context io_context; Server server(io_context, std::atoi(argv[1])); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } return 0; }
|
总结
并发与异步编程是现代C++中的重要主题,它们可以提高程序的性能和响应速度。C++11及后续标准引入了大量的并发和异步编程工具,使得并发编程变得更加简单和安全。
在实际开发中,应该根据具体的应用场景选择合适的并发或异步模式:
- 对于CPU密集型任务:使用多线程并发执行
- 对于I/O密集型任务:使用异步I/O或事件循环
- 对于需要等待的任务:使用
std::async和std::future - 对于复杂的异步流程:使用协程
同时,应该注意并发编程中的常见问题,如竞态条件、死锁等,采用合适的同步原语和设计模式来避免这些问题。
通过合理使用并发和异步编程,可以充分利用现代计算机系统的资源,开发出高性能、响应迅速的应用程序。