第16章 并发编程 学习目标 完成本章学习后,读者应能够:
理论层面 :深入理解并发与并行的本质区别,掌握Python GIL的实现机制及其对多线程性能的影响,理解操作系统级别的线程、进程调度原理技术层面 :熟练运用threading、multiprocessing、concurrent.futures和asyncio四大并发模型,能够根据任务特征选择最优并发策略工程层面 :设计线程安全的数据结构与并发控制机制,实现高性能异步I/O系统,构建可扩展的并发应用架构前沿视野 :了解结构化并发(Structured Concurrency)、Python 3.12+ TaskGroup、子解释器等前沿技术动态16.1 并发编程基础理论 16.1.1 并发与并行 并发(Concurrency)与并行(Parallelism)是两个常被混淆但本质不同的概念:
维度 并发(Concurrency) 并行(Parallelism) 定义 多个任务在重叠的时间段内推进 多个任务在同一时刻同时执行 核心思想 交替处理(时间片轮转) 真正的同时执行 硬件要求 单核即可 需要多核/多处理器 典型场景 I/O密集型任务 CPU密集型任务 比喻 一个人交替处理多件事 多个人同时各处理一件事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import timefrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutordef cpu_bound_task (n ): total = 0 for i in range (n): total += i * i return total def io_bound_task (duration ): time.sleep(duration) return duration def benchmark_sequential (tasks, task_func ): start = time.perf_counter() results = [task_func(t) for t in tasks] return time.perf_counter() - start def benchmark_threaded (tasks, task_func ): start = time.perf_counter() with ThreadPoolExecutor(max_workers=len (tasks)) as executor: results = list (executor.map (task_func, tasks)) return time.perf_counter() - start def benchmark_process (tasks, task_func ): start = time.perf_counter() with ProcessPoolExecutor(max_workers=len (tasks)) as executor: results = list (executor.map (task_func, tasks)) return time.perf_counter() - start if __name__ == "__main__" : cpu_tasks = [10_000_000 ] * 4 io_tasks = [1.0 ] * 4 print ("=== CPU密集型任务 ===" ) print (f"顺序执行: {benchmark_sequential(cpu_tasks, cpu_bound_task):.2 f} s" ) print (f"多线程: {benchmark_threaded(cpu_tasks, cpu_bound_task):.2 f} s" ) print (f"多进程: {benchmark_process(cpu_tasks, cpu_bound_task):.2 f} s" ) print ("\n=== I/O密集型任务 ===" ) print (f"顺序执行: {benchmark_sequential(io_tasks, io_bound_task):.2 f} s" ) print (f"多线程: {benchmark_threaded(io_tasks, io_bound_task):.2 f} s" ) print (f"多进程: {benchmark_process(io_tasks, io_bound_task):.2 f} s" )
选择策略决策树 :
1 2 3 4 5 6 7 8 9 任务类型? ├── I/O密集型(网络请求、文件读写、数据库查询) │ ├── 需要与同步代码兼容 → threading + ThreadPoolExecutor │ └── 高并发、低资源消耗 → asyncio ├── CPU密集型(数值计算、图像处理、加密运算) │ └── 多进程 → multiprocessing + ProcessPoolExecutor └── 混合型 ├── I/O为主 + 少量计算 → asyncio + run_in_executor └── 计算为主 + 少量I/O → 多进程 + 线程池
16.1.2 Python GIL深度解析 全局解释器锁(Global Interpreter Lock,GIL)是CPython实现中最具争议的设计之一。理解GIL对于正确选择并发模型至关重要。
GIL的本质 :
GIL是CPython解释器中的一把互斥锁,确保同一时刻只有一个线程执行Python字节码。其存在原因:
内存管理安全 :CPython使用引用计数进行内存管理,若无GIL,多线程同时修改引用计数将导致内存泄漏或提前释放C扩展兼容性 :大量C扩展库假设GIL存在,移除GIL将破坏整个生态系统单线程性能 :GIL简化了解释器实现,使单线程Python代码运行更快GIL的调度机制 :
1 2 3 4 5 6 import sysprint (f"检查间隔(switch interval): {sys.getswitchinterval()} s" )print (f"默认值: 5ms = 0.005s" )sys.setswitchinterval(0.005 )
GIL的切换规则:
Python 3.2+:采用固定时间片机制,默认每5ms强制释放GIL I/O操作时主动释放GIL(如time.sleep()、文件读写、网络请求) C扩展可在执行计算密集操作时手动释放GIL GIL对性能的影响 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import threadingimport timedef pure_python_count (n ): start = time.perf_counter() total = 0 for i in range (n): total += i return time.perf_counter() - start def threaded_python_count (n, num_threads ): def worker (count ): total = 0 for i in range (count): total += i start = time.perf_counter() threads = [] chunk = n // num_threads for i in range (num_threads): count = chunk if i < num_threads - 1 else n - chunk * (num_threads - 1 ) t = threading.Thread(target=worker, args=(count,)) threads.append(t) t.start() for t in threads: t.join() return time.perf_counter() - start if __name__ == "__main__" : N = 50_000_000 single = pure_python_count(N) multi = threaded_python_count(N, 4 ) print (f"单线程: {single:.2 f} s" ) print (f"4线程: {multi:.2 f} s" ) print (f"加速比: {single/multi:.2 f} x" ) print ("注意: 多线程可能更慢,因为GIL切换开销" )
绕过GIL的策略 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import numpy as npfrom concurrent.futures import ProcessPoolExecutordef numpy_computation (size ): arr = np.random.rand(size) return np.sum (arr ** 2 ) def parallel_numpy (size, workers=4 ): chunk = size // workers def compute (s ): arr = np.random.rand(s) return np.sum (arr ** 2 ) with ProcessPoolExecutor(max_workers=workers) as executor: results = list (executor.map (compute, [chunk] * workers)) return sum (results)
前沿动态 :Python 3.13引入了实验性的自由线程模式(PEP 703),可通过--disable-gil编译选项构建无GIL的Python解释器。这标志着Python社区在移除GIL方面迈出了关键一步。
16.2 多线程编程 16.2.1 线程基础与生命周期 Python线程是对操作系统原生线程的封装,每个Python线程对应一个OS线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadingimport timedef worker (name, duration ): print (f"[{threading.current_thread().name} ] {name} 启动" ) time.sleep(duration) print (f"[{threading.current_thread().name} ] {name} 完成" ) return f"{name} 的结果" t = threading.Thread( target=worker, args=("任务A" , 2 ), name="WorkerThread-1" , daemon=False ) print (f"线程名称: {t.name} " )print (f"是否守护线程: {t.daemon} " )print (f"线程标识: {t.ident} " )print (f"是否存活: {t.is_alive()} " )t.start() t.join(timeout=5 ) if t.is_alive(): print ("线程未在超时时间内完成" ) else : print (f"线程已完成,标识: {t.ident} " )
线程生命周期状态图 :
1 2 3 4 5 6 7 8 New → start() → Runnable → 获取GIL → Running ↑ ↓ ← 释放GIL ← ← 阻塞操作 ↓ Blocked (I/O/Lock) ↓ wait完成 → Runnable Running → 函数返回/异常 → Terminated
守护线程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threadingimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) def background_monitor (): while True : logging.info("监控心跳..." ) time.sleep(1 ) def main_task (): logging.info("主任务开始" ) time.sleep(3 ) logging.info("主任务完成" ) monitor = threading.Thread(target=background_monitor, name="Monitor" , daemon=True ) monitor.start() main_task() print ("主线程结束,守护线程将自动终止" )
16.2.2 线程同步原语 多线程共享进程的内存空间,因此必须使用同步原语防止竞态条件(Race Condition)。
竞态条件示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import threadingclass UnsafeCounter : def __init__ (self ): self .value = 0 def increment (self ): temp = self .value temp += 1 self .value = temp counter = UnsafeCounter() threads = [] for _ in range (10 ): t = threading.Thread(target=lambda : [counter.increment() for _ in range (100000 )]) threads.append(t) t.start() for t in threads: t.join() print (f"期望值: 1000000, 实际值: {counter.value} " )
Lock(互斥锁) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadingclass SafeCounter : def __init__ (self ): self ._value = 0 self ._lock = threading.Lock() def increment (self ): with self ._lock: self ._value += 1 @property def value (self ): with self ._lock: return self ._value counter = SafeCounter() threads = [] for _ in range (10 ): t = threading.Thread(target=lambda : [counter.increment() for _ in range (100000 )]) threads.append(t) t.start() for t in threads: t.join() print (f"期望值: 1000000, 实际值: {counter.value} " )
Lock与RLock的区别 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import threadinglock = threading.Lock() try : lock.acquire() lock.acquire() except RuntimeError as e: print (f"同一线程重复获取非可重入锁: {e} " ) rlock = threading.RLock() rlock.acquire() rlock.acquire() print (f"可重入锁嵌套获取成功,计数: 2" )rlock.release() rlock.release() print ("可重入锁全部释放" )
RLock的应用场景——递归调用 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingclass TreeWalker : def __init__ (self ): self ._lock = threading.RLock() self .visited = [] def walk (self, node ): with self ._lock: self .visited.append(node.value) for child in node.children: self .walk(child) class TreeNode : def __init__ (self, value ): self .value = value self .children = [] root = TreeNode("root" ) root.children = [TreeNode("left" ), TreeNode("right" )] root.children[0 ].children = [TreeNode("left-left" )] walker = TreeWalker() walker.walk(root) print (f"遍历结果: {walker.visited} " )
Semaphore(信号量) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import threadingimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) class ConnectionPool : def __init__ (self, max_connections=3 ): self ._semaphore = threading.Semaphore(max_connections) self ._connections = [] self ._lock = threading.Lock() def acquire (self ): self ._semaphore.acquire() with self ._lock: conn = f"Connection-{len (self._connections)} " self ._connections.append(conn) logging.info(f"获取连接: {conn} " ) return conn def release (self, conn ): with self ._lock: self ._connections.remove(conn) logging.info(f"释放连接: {conn} " ) self ._semaphore.release() def __enter__ (self ): self ._conn = self .acquire() return self ._conn def __exit__ (self, *args ): self .release(self ._conn) pool = ConnectionPool(max_connections=2 ) def use_connection (name, duration ): with pool as conn: logging.info(f"{name} 使用 {conn} " ) time.sleep(duration) threads = [ threading.Thread(target=use_connection, args=(f"Client-{i} " , i * 0.5 + 0.5 ), name=f"Client-{i} " ) for i in range (5 ) ] for t in threads: t.start() for t in threads: t.join()
Event(事件) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import threadingimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) class ServiceManager : def __init__ (self ): self ._ready = threading.Event() self ._shutdown = threading.Event() def wait_ready (self, timeout=None ): return self ._ready.wait(timeout) def mark_ready (self ): self ._ready.set () logging.info("服务就绪" ) def request_shutdown (self ): self ._shutdown.set () logging.info("请求关闭" ) def is_shutdown_requested (self ): return self ._shutdown.is_set() manager = ServiceManager() def service_worker (): logging.info("等待服务就绪..." ) manager.wait_ready() logging.info("服务已就绪,开始工作" ) while not manager.is_shutdown_requested(): time.sleep(0.5 ) logging.info("工作中..." ) logging.info("收到关闭信号,退出" ) def service_initializer (): logging.info("初始化服务..." ) time.sleep(2 ) manager.mark_ready() time.sleep(3 ) manager.request_shutdown() threads = [ threading.Thread(target=service_worker, name="Worker" ), threading.Thread(target=service_initializer, name="Initializer" ), ] for t in threads: t.start() for t in threads: t.join()
Condition(条件变量) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import threadingimport timeimport randomimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) class BoundedBuffer : def __init__ (self, capacity ): self ._buffer = [] self ._capacity = capacity self ._condition = threading.Condition() def put (self, item ): with self ._condition: while len (self ._buffer) >= self ._capacity: logging.info(f"缓冲区满,等待消费... (size={len (self._buffer)} )" ) self ._condition.wait() self ._buffer.append(item) logging.info(f"生产: {item} (size={len (self._buffer)} )" ) self ._condition.notify_all() def get (self ): with self ._condition: while not self ._buffer: logging.info("缓冲区空,等待生产..." ) self ._condition.wait() item = self ._buffer.pop(0 ) logging.info(f"消费: {item} (size={len (self._buffer)} )" ) self ._condition.notify_all() return item buffer = BoundedBuffer(capacity=3 ) def producer (pid ): for i in range (5 ): item = f"P{pid} -{i} " buffer.put(item) time.sleep(random.uniform(0.1 , 0.3 )) def consumer (cid ): for _ in range (5 ): item = buffer.get() time.sleep(random.uniform(0.2 , 0.5 )) threads = [] for i in range (2 ): threads.append(threading.Thread(target=producer, args=(i,), name=f"Producer-{i} " )) for i in range (2 ): threads.append(threading.Thread(target=consumer, args=(i,), name=f"Consumer-{i} " )) for t in threads: t.start() for t in threads: t.join()
Barrier(栅栏) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import threadingimport timeimport randomimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) def parallel_computation (worker_id, barrier ): phase1_time = random.uniform(0.5 , 1.5 ) logging.info(f"Worker-{worker_id} 阶段1 (耗时{phase1_time:.1 f} s)" ) time.sleep(phase1_time) logging.info(f"Worker-{worker_id} 等待其他worker完成阶段1" ) barrier.wait() phase2_time = random.uniform(0.5 , 1.0 ) logging.info(f"Worker-{worker_id} 阶段2 (耗时{phase2_time:.1 f} s)" ) time.sleep(phase2_time) barrier.wait() logging.info(f"Worker-{worker_id} 全部完成" ) num_workers = 4 barrier = threading.Barrier(num_workers) threads = [ threading.Thread(target=parallel_computation, args=(i, barrier), name=f"Worker-{i} " ) for i in range (num_workers) ] for t in threads: t.start() for t in threads: t.join()
16.2.3 线程安全的数据结构 基于Queue的生产者-消费者模式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 import threadingimport queueimport timeimport randomimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) class TaskQueue : def __init__ (self, maxsize=0 ): self ._queue = queue.Queue(maxsize=maxsize) self ._shutdown = threading.Event() def submit (self, task ): if not self ._shutdown.is_set(): self ._queue.put(task) return True return False def get (self, timeout=1.0 ): try : return self ._queue.get(timeout=timeout) except queue.Empty: return None def task_done (self ): self ._queue.task_done() def shutdown (self ): self ._shutdown.set () def join (self ): self ._queue.join() @property def size (self ): return self ._queue.qsize() class WorkerPool : def __init__ (self, num_workers, task_handler ): self ._task_queue = TaskQueue(maxsize=100 ) self ._workers = [] self ._handler = task_handler for i in range (num_workers): t = threading.Thread(target=self ._worker_loop, name=f"Worker-{i} " , daemon=True ) t.start() self ._workers.append(t) def _worker_loop (self ): while True : task = self ._task_queue.get() if task is None : break try : self ._handler(task) except Exception as e: logging.error(f"任务处理失败: {e} " ) finally : self ._task_queue.task_done() def submit (self, task ): self ._task_queue.submit(task) def wait_completion (self ): self ._task_queue.join() def shutdown (self ): self ._task_queue.shutdown() for _ in self ._workers: self ._task_queue._queue.put(None ) for t in self ._workers: t.join() def process_task (task ): task_id, data = task processing_time = random.uniform(0.1 , 0.5 ) time.sleep(processing_time) result = data * 2 logging.info(f"任务 {task_id} : {data} → {result} " ) pool = WorkerPool(num_workers=3 , task_handler=process_task) for i in range (10 ): pool.submit((i, i * 10 )) pool.wait_completion() logging.info("所有任务完成" ) pool.shutdown()
PriorityQueue与LifoQueue :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import queueimport threadingimport timepriority_q = queue.PriorityQueue() def priority_producer (): tasks = [ (3 , "低优先级任务" ), (1 , "高优先级任务" ), (2 , "中优先级任务" ), (1 , "另一个高优先级任务" ), (3 , "另一个低优先级任务" ), ] for priority, task in tasks: priority_q.put((priority, task)) time.sleep(0.1 ) def priority_consumer (): while True : try : priority, task = priority_q.get(timeout=2 ) print (f"处理: [{priority} ] {task} " ) priority_q.task_done() except queue.Empty: break t1 = threading.Thread(target=priority_producer) t2 = threading.Thread(target=priority_consumer) t1.start() t1.join() t2.start() t2.join() lifo_q = queue.LifoQueue() for i in range (5 ): lifo_q.put(f"Item-{i} " ) print ("\nLIFO顺序:" )while not lifo_q.empty(): print (f" 取出: {lifo_q.get()} " )
16.2.4 线程本地存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import threadingimport randomimport timeclass RequestContext : _local = threading.local() @classmethod def set_request_id (cls, request_id ): cls._local.request_id = request_id @classmethod def get_request_id (cls ): return getattr (cls._local, 'request_id' , 'unknown' ) @classmethod def set_user (cls, user ): cls._local.user = user @classmethod def get_user (cls ): return getattr (cls._local, 'user' , None ) def handle_request (request_id, user ): RequestContext.set_request_id(request_id) RequestContext.set_user(user) time.sleep(random.uniform(0.1 , 0.3 )) print (f"RequestID={RequestContext.get_request_id()} , " f"User={RequestContext.get_user()} , " f"Thread={threading.current_thread().name} " ) threads = [] for i in range (5 ): t = threading.Thread( target=handle_request, args=(f"REQ-{i:04d} " , f"user_{i} " ), name=f"Handler-{i} " ) threads.append(t) t.start() for t in threads: t.join()
16.2.5 线程池深度应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from concurrent.futures import ThreadPoolExecutor, as_completed, Futureimport threadingimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(threadName)s] %(message)s" ) class ManagedThreadPool : def __init__ (self, max_workers, thread_name_prefix="Worker" ): self ._executor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix=thread_name_prefix ) self ._futures = [] self ._lock = threading.Lock() def submit (self, fn, *args, **kwargs ) -> Future: future = self ._executor.submit(fn, *args, **kwargs) with self ._lock: self ._futures.append(future) future.add_done_callback(self ._on_task_done) return future def _on_task_done (self, future ): try : result = future.result() logging.info(f"任务完成: {result} " ) except Exception as e: logging.error(f"任务失败: {e} " ) def map_as_completed (self, fn, iterables ): futures = {self ._executor.submit(fn, item): item for item in iterables} results = [] for future in as_completed(futures): item = futures[future] try : results.append((item, future.result())) except Exception as e: results.append((item, e)) return results def shutdown (self, wait=True ): self ._executor.shutdown(wait=wait) def fetch_url (url ): time.sleep(1 ) return f"Response from {url} " urls = [f"http://api.example.com/data/{i} " for i in range (8 )] pool = ManagedThreadPool(max_workers=4 ) futures = [pool.submit(fetch_url, url) for url in urls] for future in as_completed(futures): try : print (future.result()) except Exception as e: print (f"Error: {e} " ) pool.shutdown()
Future回调链 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from concurrent.futures import ThreadPoolExecutorimport timedef fetch_data (source ): time.sleep(1 ) return f"raw_data_from_{source} " def parse_data (raw_data ): time.sleep(0.5 ) return raw_data.replace("raw_data_from_" , "parsed: " ) def store_data (parsed_data ): time.sleep(0.3 ) return f"stored[{parsed_data} ]" with ThreadPoolExecutor(max_workers=3 ) as executor: future = executor.submit(fetch_data, "database" ) def on_fetch_done (f ): raw = f.result() parse_future = executor.submit(parse_data, raw) parse_future.add_done_callback(on_parse_done) def on_parse_done (f ): parsed = f.result() store_future = executor.submit(store_data, parsed) store_future.add_done_callback(on_store_done) def on_store_done (f ): print (f"Pipeline完成: {f.result()} " ) future.add_done_callback(on_fetch_done) time.sleep(3 )
16.3 多进程编程 16.3.1 进程模型与内存模型 每个进程拥有独立的内存空间,进程间不共享变量,因此天然避免了竞态条件,但也带来了进程间通信的复杂性。
1 2 3 4 5 6 7 8 9 10 11 12 13 import multiprocessingimport osdef show_process_info (): print (f"PID: {os.getpid()} , PPID: {os.getppid()} " ) print (f"进程名: {multiprocessing.current_process().name} " ) if __name__ == "__main__" : show_process_info() p = multiprocessing.Process(target=show_process_info, name="ChildProcess" ) p.start() p.join()
进程启动方法 :
1 2 3 4 5 6 7 8 9 import multiprocessing as mpavailable = mp.get_all_start_methods() print (f"可用启动方法: {available} " )print (f"当前方法: {mp.get_start_method()} " )if 'spawn' in available: ctx = mp.get_context('spawn' ) print (f"spawn上下文: {ctx} " )
启动方法 平台 特点 spawn全平台 创建新进程,导入主模块,安全但慢 forkUnix 复制父进程内存,快但可能不安全 forkserverUnix 先创建服务器进程,再fork,折中方案
进程间数据隔离 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import multiprocessingglobal_var = 0 def modify_global (): global global_var global_var = 100 print (f"子进程 global_var = {global_var} " ) if __name__ == "__main__" : p = multiprocessing.Process(target=modify_global) p.start() p.join() print (f"主进程 global_var = {global_var} " )
16.3.2 进程间通信(IPC) Queue :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import multiprocessingimport timedef producer (q ): for i in range (5 ): item = f"Item-{i} " q.put(item) print (f"生产: {item} " ) time.sleep(0.1 ) q.put(None ) def consumer (q ): while True : item = q.get() if item is None : break print (f"消费: {item} " ) time.sleep(0.2 ) if __name__ == "__main__" : q = multiprocessing.Queue(maxsize=10 ) p1 = multiprocessing.Process(target=producer, args=(q,)) p2 = multiprocessing.Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join()
Pipe :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import multiprocessingdef sender (conn ): messages = ["Hello" , "World" , "Python" , "Concurrency" ] for msg in messages: conn.send(msg) print (f"发送: {msg} " ) conn.send(None ) conn.close() def receiver (conn ): while True : msg = conn.recv() if msg is None : break print (f"接收: {msg} " ) conn.close() if __name__ == "__main__" : parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()
共享内存(Value与Array) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import multiprocessingimport ctypesdef worker (shared_value, shared_array, lock ): with lock: shared_value.value += 1 for i in range (len (shared_array)): shared_array[i] += 1 if __name__ == "__main__" : lock = multiprocessing.Lock() value = multiprocessing.Value(ctypes.c_int, 0 ) array = multiprocessing.Array(ctypes.c_int, [0 , 0 , 0 , 0 , 0 ]) processes = [] for _ in range (5 ): p = multiprocessing.Process(target=worker, args=(value, array, lock)) processes.append(p) p.start() for p in processes: p.join() print (f"Value: {value.value} " ) print (f"Array: {list (array)} " )
Manager(托管对象) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import multiprocessingimport timedef worker (namespace, shared_list, shared_dict, lock ): with lock: namespace.count += 1 shared_list.append(multiprocessing.current_process().name) shared_dict[multiprocessing.current_process().name] = time.time() if __name__ == "__main__" : with multiprocessing.Manager() as manager: namespace = manager.Namespace() namespace.count = 0 shared_list = manager.list () shared_dict = manager.dict () lock = manager.Lock() processes = [] for i in range (3 ): p = multiprocessing.Process( target=worker, args=(namespace, shared_list, shared_dict, lock), name=f"Worker-{i} " ) processes.append(p) p.start() for p in processes: p.join() print (f"计数: {namespace.count} " ) print (f"列表: {list (shared_list)} " ) print (f"字典: {dict (shared_dict)} " )
16.3.3 进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from concurrent.futures import ProcessPoolExecutor, as_completedimport timeimport mathdef is_prime (n ): if n < 2 : return False if n < 4 : return True if n % 2 == 0 or n % 3 == 0 : return False for i in range (5 , int (math.sqrt(n)) + 1 , 6 ): if n % i == 0 or n % (i + 2 ) == 0 : return False return True def find_primes_in_range (start, end ): return [n for n in range (start, end) if is_prime(n)] if __name__ == "__main__" : ranges = [(i * 100000 , (i + 1 ) * 100000 ) for i in range (8 )] start = time.perf_counter() with ProcessPoolExecutor(max_workers=4 ) as executor: futures = {executor.submit(find_primes_in_range, s, e): (s, e) for s, e in ranges} total_primes = 0 for future in as_completed(futures): r = futures[future] primes = future.result() total_primes += len (primes) print (f"范围 {r} : 找到 {len (primes)} 个素数" ) print (f"总计: {total_primes} 个素数, 耗时: {time.perf_counter() - start:.2 f} s" ) start = time.perf_counter() all_primes = [] for s, e in ranges: all_primes.extend(find_primes_in_range(s, e)) print (f"顺序执行: {len (all_primes)} 个素数, 耗时: {time.perf_counter() - start:.2 f} s" )
进程池初始化与大数据处理 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from concurrent.futures import ProcessPoolExecutordef init_worker (shared_data ): global _shared_data _shared_data = shared_data def process_chunk (chunk ): return sum (x * _shared_data for x in chunk) if __name__ == "__main__" : data_chunks = [list (range (1000 )) for _ in range (4 )] shared_value = 2 with ProcessPoolExecutor( max_workers=4 , initializer=init_worker, initargs=(shared_value,) ) as executor: results = list (executor.map (process_chunk, data_chunks)) print (f"各块结果: {results} " ) print (f"总和: {sum (results)} " )
16.4 异步编程(asyncio) 16.4.1 事件循环与协程原理 asyncio是Python的异步I/O框架,基于事件循环(Event Loop)和协程(Coroutine)实现单线程并发。
协程的本质 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioasync def modern_coroutine (): await asyncio.sleep(1 ) return "result" coro = modern_coroutine() print (f"类型: {type (coro)} " )print (f"是否为协程: {asyncio.iscoroutine(coro)} " )async def demonstrate_await (): result = await modern_coroutine() print (f"结果: {result} " ) asyncio.run(demonstrate_await())
事件循环的工作机制 :
1 2 3 4 5 6 7 8 事件循环 (Event Loop) ┌──────────────────────────────────────┐ │ 1. 检查就绪的I/O事件 │ │ 2. 检查到期的定时器 │ │ 3. 执行就绪的回调 │ │ 4. 推进挂起的协程 │ │ 5. 重复步骤1-4 │ └──────────────────────────────────────┘
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioasync def task_lifecycle (): print ("1. 协程开始" ) await asyncio.sleep(0 ) print ("2. 让出控制权后恢复" ) await asyncio.sleep(0 ) print ("3. 再次恢复" ) return "完成" async def main (): result = await task_lifecycle() print (f"结果: {result} " ) asyncio.run(main())
16.4.2 Task与任务调度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioimport timeasync def fetch_data (name, delay ): print (f"{name} 开始请求" ) await asyncio.sleep(delay) print (f"{name} 完成" ) return f"{name} 的数据" async def gather_demo (): start = time.perf_counter() results = await asyncio.gather( fetch_data("API-A" , 2 ), fetch_data("API-B" , 1 ), fetch_data("API-C" , 3 ), ) elapsed = time.perf_counter() - start print (f"总耗时: {elapsed:.1 f} s (并行执行,约等于最长任务)" ) print (f"结果: {results} " ) asyncio.run(gather_demo())
create_task与gather的区别 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def task_demo (): t1 = asyncio.create_task(fetch_data("Task-1" , 2 ), name="Task-1" ) t2 = asyncio.create_task(fetch_data("Task-2" , 1 ), name="Task-2" ) print (f"Task-1 状态: {t1.done()} " ) print (f"Task-2 名称: {t2.get_name()} " ) r1 = await t1 r2 = await t2 print (f"Task-1 状态: {t1.done()} " ) print (f"结果: {r1} , {r2} " ) asyncio.run(task_demo())
TaskGroup(Python 3.11+) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def taskgroup_demo (): results = [] async with asyncio.TaskGroup() as tg: async def tracked_fetch (name, delay ): result = await fetch_data(name, delay) results.append(result) tg.create_task(tracked_fetch("TG-A" , 2 )) tg.create_task(tracked_fetch("TG-B" , 1 )) tg.create_task(tracked_fetch("TG-C" , 3 )) print (f"所有任务完成: {results} " ) asyncio.run(taskgroup_demo())
异常处理对比 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioasync def failing_task (): await asyncio.sleep(0.5 ) raise ValueError("任务失败" ) async def gather_with_error (): results = await asyncio.gather( fetch_data("OK-Task" , 1 ), failing_task(), return_exceptions=True ) for r in results: if isinstance (r, Exception): print (f"异常: {r} " ) else : print (f"成功: {r} " ) asyncio.run(gather_with_error()) async def taskgroup_with_error (): try : async with asyncio.TaskGroup() as tg: tg.create_task(fetch_data("OK-Task" , 1 )) tg.create_task(failing_task()) except * ValueError as eg: print (f"TaskGroup捕获ValueError: {eg.exceptions} " ) asyncio.run(taskgroup_with_error())
16.4.3 异步同步原语 asyncio提供了与threading对应的异步同步原语,但它们在事件循环中协作式调度,不会真正阻塞线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioclass AsyncCounter : def __init__ (self ): self ._value = 0 self ._lock = asyncio.Lock() async def increment (self ): async with self ._lock: self ._value += 1 @property def value (self ): return self ._value async def worker (counter, worker_id, increments ): for _ in range (increments): await counter.increment() print (f"Worker-{worker_id} 完成" ) async def lock_demo (): counter = AsyncCounter() increments = 10000 workers = 5 await asyncio.gather(*[ worker(counter, i, increments) for i in range (workers) ]) expected = increments * workers print (f"期望: {expected} , 实际: {counter.value} " ) asyncio.run(lock_demo())
asyncio.Event :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import asyncioclass AsyncService : def __init__ (self ): self ._ready = asyncio.Event() self ._data = {} async def initialize (self ): await asyncio.sleep(1 ) self ._data = {"status" : "ready" , "version" : "2.0" } self ._ready.set () async def wait_ready (self ): await self ._ready.wait() return self ._data async def service_demo (): service = AsyncService() async def client (name ): print (f"{name} : 等待服务就绪..." ) data = await service.wait_ready() print (f"{name} : 服务就绪,数据={data} " ) await asyncio.gather( service.initialize(), client("Client-A" ), client("Client-B" ), ) asyncio.run(service_demo())
asyncio.Queue :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioimport randomclass AsyncProducerConsumer : def __init__ (self, buffer_size=5 ): self ._queue = asyncio.Queue(maxsize=buffer_size) self ._produced = 0 self ._consumed = 0 async def producer (self, name, count ): for i in range (count): item = f"{name} -{i} " await self ._queue.put(item) self ._produced += 1 print (f"生产: {item} (队列大小: {self._queue.qsize()} )" ) await asyncio.sleep(random.uniform(0.05 , 0.15 )) async def consumer (self, name, count ): for _ in range (count): item = await self ._queue.get() self ._consumed += 1 print (f"消费: {item} by {name} " ) self ._queue.task_done() await asyncio.sleep(random.uniform(0.1 , 0.2 )) async def run (self, num_producers=2 , num_consumers=2 , items_per_producer=5 ): producers = [ self .producer(f"P{i} " , items_per_producer) for i in range (num_producers) ] consumers = [ self .consumer(f"C{i} " , num_producers * items_per_producer // num_consumers) for i in range (num_consumers) ] await asyncio.gather(*producers) await self ._queue.join() await asyncio.gather(*consumers) print (f"生产: {self._produced} , 消费: {self._consumed} " ) asyncio.run(AsyncProducerConsumer().run())
asyncio.Semaphore与限流 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport timeclass RateLimiter : def __init__ (self, max_concurrent=3 ): self ._semaphore = asyncio.Semaphore(max_concurrent) async def __aenter__ (self ): await self ._semaphore.acquire() return self async def __aexit__ (self, *args ): self ._semaphore.release() async def rate_limited_fetch (url, limiter ): async with limiter: print (f"开始请求: {url} " ) await asyncio.sleep(1 ) print (f"完成请求: {url} " ) return f"Response from {url} " async def rate_limiter_demo (): limiter = RateLimiter(max_concurrent=2 ) urls = [f"http://api.example.com/{i} " for i in range (6 )] start = time.perf_counter() results = await asyncio.gather(*[ rate_limited_fetch(url, limiter) for url in urls ]) elapsed = time.perf_counter() - start print (f"6个请求,并发限制2,耗时: {elapsed:.1 f} s (约3批次×1s)" ) asyncio.run(rate_limiter_demo())
16.4.4 异步迭代器与生成器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import asyncioclass AsyncPageIterator : def __init__ (self, total_items, page_size=10 ): self ._total = total_items self ._page_size = page_size self ._current = 0 def __aiter__ (self ): return self async def __anext__ (self ): if self ._current >= self ._total: raise StopAsyncIteration start = self ._current end = min (self ._current + self ._page_size, self ._total) self ._current = end await asyncio.sleep(0.1 ) return list (range (start, end)) async def pagination_demo (): async for page in AsyncPageIterator(total_items=35 , page_size=10 ): print (f"获取页面: {page} " ) asyncio.run(pagination_demo()) async def async_chunk_generator (iterable, chunk_size ): chunk = [] for item in iterable: chunk.append(item) if len (chunk) == chunk_size: await asyncio.sleep(0 ) yield chunk chunk = [] if chunk: yield chunk async def chunk_demo (): data = range (20 ) async for chunk in async_chunk_generator(data, 5 ): print (f"处理批次: {chunk} " ) asyncio.run(chunk_demo())
异步推导式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def async_transform (x ): await asyncio.sleep(0.01 ) return x ** 2 async def comprehension_demo (): result = [await async_transform(i) for i in range (10 )] print (f"顺序异步推导: {result} " ) result = await asyncio.gather(*[async_transform(i) for i in range (10 )]) print (f"并行异步推导: {result} " ) asyncio.run(comprehension_demo())
16.4.5 异步上下文管理器与子进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import asynciofrom contextlib import asynccontextmanagerclass AsyncDatabaseConnection : def __init__ (self, url ): self ._url = url self ._connected = False async def __aenter__ (self ): await asyncio.sleep(0.1 ) self ._connected = True print (f"连接到: {self._url} " ) return self async def __aexit__ (self, exc_type, exc_val, exc_tb ): await asyncio.sleep(0.05 ) self ._connected = False print (f"断开连接: {self._url} " ) async def query (self, sql ): if not self ._connected: raise RuntimeError("未连接" ) await asyncio.sleep(0.05 ) return f"[{sql} ] → 结果" @asynccontextmanager async def managed_connection (url ): conn = AsyncDatabaseConnection(url) await conn.__aenter__() try : yield conn finally : await conn.__aexit__(None , None , None ) async def db_demo (): async with AsyncDatabaseConnection("postgresql://localhost/mydb" ) as db: result = await db.query("SELECT * FROM users" ) print (result) async with managed_connection("mysql://localhost/test" ) as db: result = await db.query("SELECT COUNT(*) FROM orders" ) print (result) asyncio.run(db_demo())
异步子进程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import asyncioasync def run_command (cmd ): process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return process.returncode, stdout.decode(), stderr.decode() async def subprocess_demo (): returncode, stdout, stderr = await run_command( ["python" , "-c" , "print('Hello from subprocess')" ] ) print (f"返回码: {returncode} " ) print (f"输出: {stdout.strip()} " ) async def parallel_commands (): commands = [ ["python" , "-c" , f"import time; time.sleep({i} ); print('Task {i} done')" ] for i in range (1 , 4 ) ] results = await asyncio.gather(*[run_command(cmd) for cmd in commands]) for i, (rc, out, err) in enumerate (results, 1 ): print (f"命令{i} : 返回码={rc} , 输出={out.strip()} " ) asyncio.run(parallel_commands())
16.4.6 异步网络编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import asyncioasync def handle_client (reader, writer ): addr = writer.get_extra_info('peername' ) print (f"新连接: {addr} " ) try : while True : data = await reader.read(1024 ) if not data: break message = data.decode() print (f"收到来自 {addr} : {message} " ) response = f"Echo: {message} " writer.write(response.encode()) await writer.drain() except ConnectionResetError: print (f"连接断开: {addr} " ) finally : writer.close() await writer.wait_closed() print (f"关闭连接: {addr} " ) async def start_echo_server (): server = await asyncio.start_server(handle_client, '127.0.0.1' , 8888 ) addrs = ', ' .join(str (s.getsockname()) for s in server.sockets) print (f"服务器运行在 {addrs} " ) async with server: await server.serve_forever() async def tcp_client (): reader, writer = await asyncio.open_connection('127.0.0.1' , 8888 ) messages = ["Hello" , "World" , "Python" ] for msg in messages: writer.write(msg.encode()) await writer.drain() data = await reader.read(1024 ) print (f"收到响应: {data.decode()} " ) writer.close() await writer.wait_closed() async def server_client_demo (): server_task = asyncio.create_task(start_echo_server()) await asyncio.sleep(0.5 ) await tcp_client() server_task.cancel() try : await server_task except asyncio.CancelledError: print ("服务器已关闭" ) asyncio.run(server_client_demo())
16.4.7 异步与同步代码的桥接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import asyncioimport timefrom concurrent.futures import ProcessPoolExecutordef blocking_io_task (duration ): time.sleep(duration) return f"阻塞任务完成 ({duration} s)" def cpu_heavy (n ): total = sum (i * i for i in range (n)) return total async def bridge_demo (): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None , blocking_io_task, 2 ) print (result) process_executor = ProcessPoolExecutor(max_workers=2 ) results = await asyncio.gather( loop.run_in_executor(process_executor, cpu_heavy, 10_000_000 ), loop.run_in_executor(process_executor, cpu_heavy, 20_000_000 ), fetch_data("async-task" , 1 ), ) print (f"混合执行结果: {results} " ) process_executor.shutdown(wait=True ) asyncio.run(bridge_demo())
16.5 高级并发模式 16.5.1 生产者-消费者模式的完整实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 import asyncioimport randomimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(name)s] %(message)s" ) class AsyncPipeline : def __init__ (self, buffer_size=10 ): self ._input_queue = asyncio.Queue(maxsize=buffer_size) self ._output_queue = asyncio.Queue(maxsize=buffer_size) self ._error_queue = asyncio.Queue() self ._stats = {"produced" : 0 , "processed" : 0 , "errors" : 0 } async def producer (self, count ): for i in range (count): item = {"id" : i, "data" : random.random(), "timestamp" : time.time()} await self ._input_queue.put(item) self ._stats["produced" ] += 1 logging.info(f"生产: Item-{i} " ) await asyncio.sleep(random.uniform(0.01 , 0.05 )) await self ._input_queue.put(None ) async def processor (self, name ): while True : item = await self ._input_queue.get() if item is None : await self ._input_queue.put(None ) break try : if random.random() < 0.1 : raise ValueError(f"处理 Item-{item['id' ]} 时出错" ) processed = { "id" : item["id" ], "result" : item["data" ] ** 2 , "processed_by" : name, } await self ._output_queue.put(processed) self ._stats["processed" ] += 1 logging.info(f"{name} 处理: Item-{item['id' ]} " ) except Exception as e: error_info = {"item_id" : item["id" ], "error" : str (e)} await self ._error_queue.put(error_info) self ._stats["errors" ] += 1 logging.error(f"{name} 错误: {e} " ) finally : self ._input_queue.task_done() await asyncio.sleep(random.uniform(0.02 , 0.08 )) async def consumer (self, count ): consumed = 0 while consumed < count: item = await self ._output_queue.get() logging.info( f"消费: Item-{item['id' ]} = {item['result' ]:.4 f} " f"(by {item['processed_by' ]} )" ) self ._output_queue.task_done() consumed += 1 async def error_handler (self ): while True : try : error = await asyncio.wait_for(self ._error_queue.get(), timeout=0.5 ) logging.warning(f"错误记录: Item-{error['item_id' ]} - {error['error' ]} " ) except asyncio.TimeoutError: break async def run (self, num_items=20 , num_processors=3 ): start = time.perf_counter() producers = [self .producer(num_items)] processors = [self .processor(f"Proc-{i} " ) for i in range (num_processors)] consumers = [self .consumer(num_items)] error_handlers = [self .error_handler()] await asyncio.gather(*producers) await asyncio.gather(*processors) await asyncio.gather(*consumers) await asyncio.gather(*error_handlers) elapsed = time.perf_counter() - start print (f"\n统计: {self._stats} " ) print (f"耗时: {elapsed:.2 f} s" ) asyncio.run(AsyncPipeline().run())
16.5.2 限流与背压 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import asyncioimport timeclass TokenBucket : def __init__ (self, rate, capacity ): self ._rate = rate self ._capacity = capacity self ._tokens = capacity self ._last_refill = time.monotonic() self ._lock = asyncio.Lock() async def acquire (self, tokens=1 ): async with self ._lock: now = time.monotonic() elapsed = now - self ._last_refill self ._tokens = min (self ._capacity, self ._tokens + elapsed * self ._rate) self ._last_refill = now if self ._tokens >= tokens: self ._tokens -= tokens return True return False async def wait_and_acquire (self, tokens=1 ): while True : if await self .acquire(tokens): return wait_time = (tokens - self ._tokens) / self ._rate await asyncio.sleep(wait_time) class ThrottledClient : def __init__ (self, requests_per_second=5 ): self ._bucket = TokenBucket( rate=requests_per_second, capacity=requests_per_second * 2 ) self ._request_count = 0 async def request (self, url ): await self ._bucket.wait_and_acquire() self ._request_count += 1 await asyncio.sleep(0.1 ) return f"Response-{self._request_count} from {url} " async def throttled_demo (): client = ThrottledClient(requests_per_second=3 ) urls = [f"http://api.example.com/{i} " for i in range (10 )] start = time.perf_counter() results = await asyncio.gather(*[client.request(url) for url in urls]) elapsed = time.perf_counter() - start for r in results: print (r) print (f"10个请求,3/s限流,耗时: {elapsed:.1 f} s" ) asyncio.run(throttled_demo())
16.5.3 超时与重试模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import asyncioimport randomimport timeimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s %(message)s" ) class RetryPolicy : def __init__ (self, max_retries=3 , base_delay=1.0 , max_delay=30.0 , exponential=True ): self .max_retries = max_retries self .base_delay = base_delay self .max_delay = max_delay self .exponential = exponential def get_delay (self, attempt ): if not self .exponential: return self .base_delay delay = self .base_delay * (2 ** attempt) jitter = random.uniform(0 , delay * 0.1 ) return min (delay + jitter, self .max_delay) async def retry_async (coro_factory, policy=None , retryable_exceptions=(Exception, ) ): policy = policy or RetryPolicy() last_exception = None for attempt in range (policy.max_retries + 1 ): try : return await asyncio.wait_for(coro_factory(), timeout=5.0 ) except retryable_exceptions as e: last_exception = e if attempt < policy.max_retries: delay = policy.get_delay(attempt) logging.warning(f"第{attempt+1 } 次失败: {e} , {delay:.1 f} s后重试..." ) await asyncio.sleep(delay) else : logging.error(f"达到最大重试次数({policy.max_retries} )" ) raise last_exception async def unreliable_service (): if random.random() < 0.7 : raise ConnectionError("服务不可用" ) return "成功响应" async def retry_demo (): try : result = await retry_async( unreliable_service, policy=RetryPolicy(max_retries=5 , base_delay=0.5 ), retryable_exceptions=(ConnectionError,) ) print (f"最终结果: {result} " ) except ConnectionError as e: print (f"彻底失败: {e} " ) asyncio.run(retry_demo())
16.5.4 结构化并发 结构化并发(Structured Concurrency)是近年来并发编程领域的重要范式,其核心思想是:所有并发任务的生命周期必须被严格限定在一个语法作用域内。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def structured_concurrency_demo (): async def task (name, duration ): print (f"{name} 开始" ) await asyncio.sleep(duration) print (f"{name} 完成" ) return f"{name} -结果" async with asyncio.TaskGroup() as tg: t1 = tg.create_task(task("A" , 2 )) t2 = tg.create_task(task("B" , 1 )) t3 = tg.create_task(task("C" , 3 )) print (f"所有结果: {t1.result()} , {t2.result()} , {t3.result()} " ) asyncio.run(structured_concurrency_demo())
结构化并发的优势 :
异常安全 :子任务异常会传播到父作用域,不会出现”孤儿”任务资源管理 :任务完成前不会退出作用域,确保资源正确释放可推理性 :并发逻辑被限定在明确的作用域内,易于理解和调试超时取消模式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioasync def cancellable_task (name, duration ): try : print (f"{name} 开始" ) await asyncio.sleep(duration) print (f"{name} 完成" ) return f"{name} -结果" except asyncio.CancelledError: print (f"{name} 被取消,执行清理..." ) await asyncio.sleep(0.1 ) print (f"{name} 清理完成" ) raise async def timeout_pattern (): try : result = await asyncio.wait_for( cancellable_task("SlowTask" , 10 ), timeout=2.0 ) except asyncio.TimeoutError: print ("任务超时,已取消" ) asyncio.run(timeout_pattern())
16.5.5 Actor模式 Actor模式是一种基于消息传递的并发模型,每个Actor独立处理消息,避免共享状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import asyncioimport logginglogging.basicConfig(level=logging.INFO, format ="%(asctime)s [%(name)s] %(message)s" ) class Actor : def __init__ (self, name ): self ._name = name self ._inbox = asyncio.Queue() self ._running = False async def start (self ): self ._running = True asyncio.create_task(self ._process_messages()) logging.info(f"Actor {self._name} 启动" ) async def stop (self ): self ._running = False await self ._inbox.put(None ) async def send (self, message ): await self ._inbox.put(message) async def _process_messages (self ): while self ._running: message = await self ._inbox.get() if message is None : break await self .handle(message) async def handle (self, message ): raise NotImplementedError class CounterActor (Actor ): def __init__ (self, name ): super ().__init__(name) self ._count = 0 async def handle (self, message ): match message: case {"action" : "increment" , "value" : v}: self ._count += v logging.info(f"{self._name} : count = {self._count} " ) case {"action" : "decrement" , "value" : v}: self ._count -= v logging.info(f"{self._name} : count = {self._count} " ) case {"action" : "get" , "reply_to" : reply_queue}: await reply_queue.put(self ._count) case _: logging.warning(f"{self._name} : 未知消息 {message} " ) class PrinterActor (Actor ): async def handle (self, message ): logging.info(f"{self._name} 打印: {message} " ) async def actor_demo (): counter = CounterActor("Counter" ) printer = PrinterActor("Printer" ) await counter.start() await printer.start() await asyncio.gather( counter.send({"action" : "increment" , "value" : 5 }), counter.send({"action" : "increment" , "value" : 3 }), counter.send({"action" : "decrement" , "value" : 2 }), printer.send("Hello from Actor system" ), ) await asyncio.sleep(0.5 ) reply_queue = asyncio.Queue() await counter.send({"action" : "get" , "reply_to" : reply_queue}) result = await reply_queue.get() print (f"计数器值: {result} " ) await counter.stop() await printer.stop() asyncio.run(actor_demo())
16.6 并发调试与性能分析 16.6.1 常见并发问题 死锁 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import threadingimport timedef demonstrate_deadlock (): lock_a = threading.Lock() lock_b = threading.Lock() def task1 (): with lock_a: time.sleep(0.1 ) print ("Task1: 等待lock_b..." ) with lock_b: print ("Task1: 获得lock_b" ) def task2 (): with lock_b: time.sleep(0.1 ) print ("Task2: 等待lock_a..." ) with lock_a: print ("Task2: 获得lock_a" ) t1 = threading.Thread(target=task1) t2 = threading.Thread(target=task2) t1.start() t2.start() t1.join(timeout=3 ) t2.join(timeout=3 ) if t1.is_alive() or t2.is_alive(): print ("检测到死锁!" ) return True return False def avoid_deadlock_with_ordered_locks (): lock_a = threading.Lock() lock_b = threading.Lock() def task1 (): with lock_a: time.sleep(0.1 ) with lock_b: print ("Task1: 安全获得两把锁" ) def task2 (): with lock_a: time.sleep(0.1 ) with lock_b: print ("Task2: 安全获得两把锁" ) t1 = threading.Thread(target=task1) t2 = threading.Thread(target=task2) t1.start() t2.start() t1.join() t2.join() print ("无死锁完成" )
避免死锁的策略 :
锁排序 :始终按固定顺序获取多把锁超时机制 :使用acquire(timeout=...)避免无限等待锁粒度 :尽量减小临界区范围,减少持锁时间避免嵌套 :尽量不要在持有一把锁时获取另一把锁16.6.2 性能分析与调优 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import asyncioimport timeimport statisticsasync def benchmark_async (coro_factory, iterations=100 ): times = [] for _ in range (iterations): start = time.perf_counter() await coro_factory() times.append(time.perf_counter() - start) return { "mean" : statistics.mean(times), "median" : statistics.median(times), "stdev" : statistics.stdev(times) if len (times) > 1 else 0 , "min" : min (times), "max" : max (times), } async def benchmark_comparison (): async def async_sleep (): await asyncio.sleep(0.001 ) async def async_compute (): total = 0 for i in range (1000 ): total += i * i return total sleep_stats = await benchmark_async(async_sleep, 50 ) compute_stats = await benchmark_async(async_compute, 50 ) print ("异步sleep基准:" ) for k, v in sleep_stats.items(): print (f" {k} : {v*1000 :.3 f} ms" ) print ("\n异步计算基准:" ) for k, v in compute_stats.items(): print (f" {k} : {v*1000 :.3 f} ms" ) asyncio.run(benchmark_comparison())
最优工作线程/进程数计算 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import osdef optimal_pool_size (task_type="mixed" , cpu_count=None ): cpu_count = cpu_count or os.cpu_count() or 1 if task_type == "cpu_bound" : return cpu_count elif task_type == "io_bound" : return min (32 , cpu_count * 5 ) else : return min (16 , cpu_count * 2 ) print (f"CPU核心数: {os.cpu_count()} " )print (f"CPU密集型池大小: {optimal_pool_size('cpu_bound' )} " )print (f"I/O密集型池大小: {optimal_pool_size('io_bound' )} " )print (f"混合型池大小: {optimal_pool_size('mixed' )} " )
16.7 前沿技术动态 16.7.1 Python 3.12+ 并发新特性 asyncio.timeout(Python 3.12+) :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioasync def per_task_timeout_demo (): async def risky_task (name, duration ): await asyncio.sleep(duration) return f"{name} 完成" async with asyncio.TaskGroup() as tg: t1 = tg.create_task(risky_task("Fast" , 0.5 )) t2 = tg.create_task(risky_task("Slow" , 5.0 )) try : async with asyncio.timeout(2.0 ): await t2 except TimeoutError: print ("Slow任务超时" ) t2.cancel() if t1.done(): print (f"Fast任务结果: {t1.result()} " )
16.7.2 PEP 703 — 自由线程CPython Python 3.13引入了实验性的自由线程模式(Free-threaded CPython),这是Python并发领域最具革命性的变化:
移除GIL,允许多线程真正并行执行Python字节码 通过--disable-gil编译选项启用 对引用计数机制进行了重新设计,采用 biased reference counting + per-object lock 预计在Python 3.14+中逐步稳定 1 2 3 4 5 import sysif hasattr (sys, '_is_gil_enabled' ): print (f"GIL启用: {sys._is_gil_enabled()} " ) else : print ("当前Python版本不支持GIL状态查询" )
16.7.3 PEP 734 — 子解释器 Python 3.12+引入的子解释器(Sub-interpreters)提供了一种介于线程和进程之间的并发模型:
每个子解释器拥有独立的GIL 比多进程更轻量,启动更快 通过interpreters模块使用 适合需要隔离但又不希望付出进程创建开销的场景 1 2 3 4 import interpretersinterp = interpreters.create() interp.run("print('Hello from sub-interpreter')" )
16.8 本章小结 本章系统学习了Python并发编程的四大模型:
模型 适用场景 优势 劣势 threading I/O密集型 简单易用,共享内存 GIL限制,需同步 multiprocessing CPU密集型 真正并行,绕过GIL 进程开销大,IPC复杂 concurrent.futures 通用任务提交 统一接口,Future模式 不够灵活 asyncio 高并发I/O 高效,低资源消耗 学习曲线陡,需异步生态
核心原则 :
I/O密集型优先选择asyncio或threading CPU密集型选择multiprocessing 混合型使用asyncio + run_in_executor 始终考虑死锁、竞态条件和资源泄漏 优先使用高层抽象(TaskGroup、线程池)而非底层原语 16.9 练习题 基础题 使用threading.Lock实现一个线程安全的字典类ThreadSafeDict,支持get、set、delete操作。
使用multiprocessing.Queue实现一个简单的分布式任务分发系统,包含一个调度进程和多个工作进程。
使用asyncio.gather并发请求5个URL(模拟即可),并统计总耗时。
进阶题 实现一个基于asyncio.Semaphore的连接池管理器,支持最大连接数限制、连接超时和自动回收。
使用ProcessPoolExecutor实现一个并行MapReduce框架,能够处理大规模数据集的词频统计。
实现一个异步爬虫框架,支持:
并发控制(最大并发数) 请求限流(每秒请求数) 自动重试(指数退避) 结果持久化 思考题 为什么Python的GIL在CPU密集型任务中成为瓶颈,而在I/O密集型任务中影响不大?请从GIL的调度机制角度深入分析。
比较结构化并发(TaskGroup)与非结构化并发(create_task + gather)的优劣,在什么场景下应该优先选择哪种方式?
随着PEP 703(自由线程CPython)的推进,Python并发编程的生态将如何变化?现有代码需要做哪些适配?
项目练习 并发Web服务器 :使用asyncio实现一个支持路由、中间件和静态文件服务的HTTP服务器,要求能处理至少1000个并发连接。
并行数据处理管道 :设计一个多阶段数据处理系统,使用多进程处理数据加载、清洗、转换和存储四个阶段,支持背压控制和错误恢复。
分布式任务队列 :实现一个类似Celery的轻量级任务队列系统,支持任务优先级、重试、超时和结果存储。
16.10 延伸阅读 16.10.1 并发理论 《The Art of Multiprocessor Programming》 (Herlihy & Shavit) — 多处理器编程理论《Concurrency in Practice》 (Brian Goetz) — Java并发编程经典,原理通用《Seven Concurrency Models in Seven Weeks》 — 并发模型全景16.10.2 Python并发 《Python Concurrency with asyncio》 (Matthew Fowler) — asyncio权威指南《Fluent Python》第19-21章 — 并发编程深度解析PEP 703 — A Free-Threaded CPython (https://peps.python.org/pep-0703/ ) — 无GIL Python设计16.10.3 asyncio生态 16.10.4 并发工具 下一章:第17章 Flask入门