第13章 装饰器 学习目标 完成本章学习后,读者应能够:
理解闭包机制 :掌握词法作用域、自由变量与闭包的底层实现原理,理解__closure__属性与cell对象的运作机制精通装饰器模式 :从函数装饰器到类装饰器,从无参装饰器到带参装饰器,全面掌握装饰器的实现范式掌握元数据保留 :深入理解functools.wraps、__wrapped__与函数内省机制运用标准库工具 :熟练使用lru_cache、singledispatch、cached_property等高级装饰器设计装饰器架构 :能够为实际项目设计可组合、可测试、可维护的装饰器体系理解装饰器局限性 :认知装饰器在调试、序列化、类型检查等方面的挑战与应对策略13.1 闭包:装饰器的基石 13.1.1 作用域与自由变量 闭包(Closure)是装饰器的理论基础。理解闭包需要先理解Python的作用域规则与自由变量机制。
Python采用词法作用域(Lexical Scoping),函数定义时即确定了其可以访问的变量范围。LEGB规则定义了名称查找顺序:L ocal → E nclosing → G lobal → B uilt-in。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def demonstrate_scope (): x = 10 def inner (): y = 20 print (f"Local y: {y} " ) print (f"Enclosing x: {x} " ) inner() try : print (y) except NameError as e: print (f"Cannot access inner's local: {e} " ) demonstrate_scope()
当一个函数引用了其外层作用域中的变量,且该变量既不是全局变量也不是局部变量时,它被称为自由变量 (Free Variable)。Python编译器在编译时检测自由变量,并通过cell对象在运行时实现跨作用域的变量绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def make_counter (): count = 0 def increment (): nonlocal count count += 1 return count return increment counter = make_counter() print (counter())print (counter())print (counter())print (f"闭包自由变量: {counter.__code__.co_freevars} " )print (f"闭包单元格: {counter.__closure__} " )print (f"单元格值: {counter.__closure__[0 ].cell_contents} " )
__closure__属性是一个cell对象元组,每个cell对象持有一个自由变量的引用。nonlocal声明告诉解释器该变量是自由变量,应从外层作用域查找而非创建新的局部绑定。
13.1.2 闭包的语义与实现 闭包是一个函数对象,它记住了定义时的词法环境,即使该环境在函数调用时已经不可达。闭包 = 函数 + 环境。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def make_multiplier (factor ): def multiply (x ): return x * factor return multiply double = make_multiplier(2 ) triple = make_multiplier(3 ) print (double(5 ))print (triple(5 ))print (f"double的自由变量: {double.__code__.co_freevars} " )print (f"double的闭包值: {double.__closure__[0 ].cell_contents} " )print (f"triple的闭包值: {triple.__closure__[0 ].cell_contents} " )
关键理解:double和triple共享相同的函数代码,但各自持有不同的闭包环境。factor作为自由变量被绑定到不同的cell对象中。
闭包的延迟绑定陷阱 :闭包中的自由变量是引用绑定而非值绑定,这意味着在闭包被调用时才解析变量的当前值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def create_functions (): functions = [] for i in range (5 ): def func (): return i functions.append(func) return functions funcs = create_functions() print ([f() for f in funcs])def create_functions_fixed (): functions = [] for i in range (5 ): def func (i=i ): return i functions.append(func) return functions funcs = create_functions_fixed() print ([f() for f in funcs])
延迟绑定发生的原因是闭包捕获的是变量的引用,而非变量在定义时的值。当循环结束后,i的值为4,所有闭包都引用同一个i。通过默认参数i=i可以在定义时将当前值绑定到参数上。
13.1.3 闭包的实际应用 可变状态封装 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def make_averager (): total = 0 count = 0 def averager (new_value ): nonlocal total, count total += new_value count += 1 return total / count return averager avg = make_averager() print (avg(10 ))print (avg(20 ))print (avg(30 ))print (f"当前闭包状态: total={avg.__closure__[0 ].cell_contents} , count={avg.__closure__[1 ].cell_contents} " )
记忆化缓存 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def make_cache (): cache = {} def get (key, compute ): if key not in cache: cache[key] = compute(key) return cache[key] def clear (): cache.clear() get.clear = clear get.cache = cache return get cached = make_cache() def expensive_sqrt (n ): print (f" Computing sqrt({n} )..." ) return n ** 0.5 print (cached(16 , expensive_sqrt))print (cached(16 , expensive_sqrt))print (cached(25 , expensive_sqrt))print (f"缓存内容: {cached.cache} " )cached.clear() print (f"清空后: {cached.cache} " )
函数工厂与策略模式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def make_validator (field_name, min_val, max_val ): def validate (value ): if not isinstance (value, (int , float )): raise TypeError(f"{field_name} must be numeric" ) if value < min_val: raise ValueError(f"{field_name} must be >= {min_val} " ) if value > max_val: raise ValueError(f"{field_name} must be <= {max_val} " ) return True validate.field_name = field_name validate.min_val = min_val validate.max_val = max_val return validate validate_age = make_validator("age" , 0 , 150 ) validate_score = make_validator("score" , 0 , 100 ) validate_temp = make_validator("temperature" , -273.15 , float ("inf" )) print (validate_age(25 ))print (validate_score(85 ))print (validate_temp(36.5 ))try : validate_age(-5 ) except ValueError as e: print (e)
13.1.4 闭包与类的对比 闭包和类都可以封装状态与行为,但各有适用场景:
特性 闭包 类 状态封装 __closure__ + nonlocal实例属性 接口暴露 仅暴露返回的函数 通过属性和方法 继承 不支持 支持 内省 困难(需访问__closure__) 方便(属性访问) 内存 较轻量 较重(__dict__等) 可读性 简单场景更简洁 复杂场景更清晰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def counter_closure (start=0 ): count = start def increment (step=1 ): nonlocal count count += step return count def reset (): nonlocal count count = start increment.reset = reset return increment class CounterClass : def __init__ (self, start=0 ): self ._count = start self ._start = start def increment (self, step=1 ): self ._count += step return self ._count def reset (self ): self ._count = self ._start c1 = counter_closure(10 ) print (c1())print (c1())c1.reset() print (c1())c2 = CounterClass(10 ) print (c2.increment())print (c2.increment())c2.reset() print (c2.increment())
13.2 函数装饰器 13.2.1 装饰器的本质 装饰器(Decorator)是一种语法糖,本质是一个高阶函数,接受一个函数作为参数并返回一个新函数。@decorator语法等价于func = decorator(func)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def trace (func ): def wrapper (*args, **kwargs ): print (f"→ 调用 {func.__name__} ({args} , {kwargs} )" ) result = func(*args, **kwargs) print (f"← 返回 {func.__name__} => {result} " ) return result return wrapper @trace def add (a, b ): return a + b print (add(3 , 5 ))add_equivalent = trace(add) print (add is add_equivalent)
理解装饰器的关键在于:装饰器在模块加载时执行 ,而非在函数调用时。这意味着装饰器是声明式的——它在函数定义时就已经生效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 registry = [] def register (func ): print (f"注册函数: {func.__name__} " ) registry.append(func) return func @register def func_a (): return "A" @register def func_b (): return "B" print (f"注册表: {[f.__name__ for f in registry]} " )
13.2.2 通用装饰器模板 一个健壮的函数装饰器应遵循以下模板:
1 2 3 4 5 6 7 8 from functools import wrapsdef robust_decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): result = func(*args, **kwargs) return result return wrapper
关键要素:
*args, **kwargs:接受任意参数,确保装饰器适用于所有函数签名@wraps(func):保留原函数的元数据(__name__、__doc__、__module__等)返回result:确保原函数的返回值不被丢弃 13.2.3 常用装饰器模式 计时装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import timefrom functools import wrapsdef timer (func ): @wraps(func ) def wrapper (*args, **kwargs ): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start print (f"{func.__name__} 执行耗时: {elapsed:.6 f} 秒" ) return result return wrapper @timer def compute_sum (n ): return sum (range (n)) result = compute_sum(1_000_000 ) print (f"结果: {result} " )
日志装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import loggingfrom functools import wrapslogging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) def log_calls (level=logging.DEBUG ): def decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): args_repr = [repr (a) for a in args] kwargs_repr = [f"{k} ={v!r} " for k, v in kwargs.items()] signature = ", " .join(args_repr + kwargs_repr) logger.log(level, f"调用 {func.__name__} ({signature} )" ) try : result = func(*args, **kwargs) logger.log(level, f"{func.__name__} 返回 {result!r} " ) return result except Exception as e: logger.error(f"{func.__name__} 抛出 {type (e).__name__} : {e} " ) raise return wrapper return decorator @log_calls(level=logging.INFO ) def divide (a, b ): return a / b print (divide(10 , 3 ))try : divide(10 , 0 ) except ZeroDivisionError: pass
类型检查装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from functools import wrapsimport inspectdef typecheck (func ): sig = inspect.signature(func) @wraps(func ) def wrapper (*args, **kwargs ): bound = sig.bind(*args, **kwargs) bound.apply_defaults() for name, value in bound.arguments.items(): param = sig.parameters[name] if param.annotation is not param.empty: if not isinstance (value, param.annotation): raise TypeError( f"参数 '{name} ' 期望类型 {param.annotation.__name__} , " f"实际类型 {type (value).__name__} " ) result = func(*args, **kwargs) if sig.return_annotation is not sig.empty: if not isinstance (result, sig.return_annotation): raise TypeError( f"返回值期望类型 {sig.return_annotation.__name__} , " f"实际类型 {type (result).__name__} " ) return result return wrapper @typecheck def greet (name: str , times: int ) -> str : return (f"Hello, {name} ! " * times).strip() print (greet("Alice" , 2 ))try : greet(123 , 2 ) except TypeError as e: print (e)
重试装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import timeimport randomfrom functools import wrapsdef retry ( max_attempts=3 , delay=1.0 , backoff=2.0 , exceptions=(Exception, ), on_retry=None ): def decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): current_delay = delay for attempt in range (1 , max_attempts + 1 ): try : return func(*args, **kwargs) except exceptions as e: if attempt == max_attempts: raise if on_retry: on_retry(attempt, e) time.sleep(current_delay) current_delay *= backoff return wrapper return decorator @retry( max_attempts=5 , delay=0.1 , backoff=1.5 , exceptions=(ConnectionError, TimeoutError ), on_retry=lambda attempt, err: print (f"第{attempt} 次重试: {err} " ) )def fetch_url (url ): if random.random() < 0.7 : raise ConnectionError(f"连接失败: {url} " ) return f"内容来自 {url} " try : result = fetch_url("https://example.com" ) print (result) except ConnectionError as e: print (f"最终失败: {e} " )
速率限制装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import timefrom functools import wrapsfrom collections import dequedef rate_limit (calls_per_second=1 ): min_interval = 1.0 / calls_per_second def decorator (func ): timestamps = deque() @wraps(func ) def wrapper (*args, **kwargs ): now = time.monotonic() while timestamps and now - timestamps[0 ] >= 1.0 : timestamps.popleft() if len (timestamps) >= calls_per_second: sleep_time = min_interval - (now - timestamps[-1 ]) if sleep_time > 0 : time.sleep(sleep_time) timestamps.append(time.monotonic()) return func(*args, **kwargs) return wrapper return decorator @rate_limit(calls_per_second=3 ) def api_call (endpoint ): print (f"调用 {endpoint} @ {time.strftime('%H:%M:%S' )} " ) return f"响应: {endpoint} " for i in range (6 ): api_call(f"/api/data/{i} " )
13.2.4 保留函数元数据 functools.wraps不仅仅是复制__name__和__doc__,它还处理了多个关键属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from functools import wrapsdef show_wraps_behavior (func ): @wraps(func ) def wrapper (*args, **kwargs ): return func(*args, **kwargs) return wrapper @show_wraps_behavior def example (x: int , y: str = "default" ) -> str : """这是一个示例函数。""" return f"{x} : {y} " print (f"__name__: {example.__name__} " )print (f"__doc__: {example.__doc__} " )print (f"__module__: {example.__module__} " )print (f"__wrapped__: {example.__wrapped__} " )print (f"__annotations__: {example.__annotations__} " )print (f"__qualname__: {example.__qualname__} " )
__wrapped__属性特别重要——它保存了对原始函数的引用,使得装饰后的函数仍可访问原始函数。这在调试、内省和装饰器叠加时至关重要。
1 2 3 4 5 import inspectoriginal = example.__wrapped__ print (f"原始函数签名: {inspect.signature(original)} " )print (f"原始函数源码:\n{inspect.getsource(original)} " )
13.3 带参数的装饰器 13.3.1 三层嵌套模式 带参数的装饰器需要三层嵌套:最外层接收装饰器参数,中间层接收被装饰函数,最内层是实际包装函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from functools import wrapsdef repeat (times=1 ): def decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): result = None for _ in range (times): result = func(*args, **kwargs) return result return wrapper return decorator @repeat(times=3 ) def greet (name ): print (f"Hello, {name} !" ) return f"Greeted {name} " greet("Alice" )
执行流程:repeat(times=3) → 返回decorator → @decorator应用于greet → 返回wrapper。
13.3.2 可选参数装饰器 当装饰器既需要支持@decorator(无参数)又需要支持@decorator(option=value)(有参数)时,需要特殊处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from functools import wrapsdef flexible_decorator (func=None , *, option="default" ): def decorator (f ): @wraps(f ) def wrapper (*args, **kwargs ): print (f"[{option} ] 调用 {f.__name__} " ) return f(*args, **kwargs) return wrapper if func is not None : return decorator(func) return decorator @flexible_decorator def func_a (): return "A" @flexible_decorator(option="custom" ) def func_b (): return "B" print (func_a())print (func_b())
当@flexible_decorator不带括号时,func接收被装饰函数;当带括号时,func为None,返回真正的装饰器。
13.3.3 基于类的参数化装饰器 使用类实现带参数的装饰器通常更清晰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from functools import wrapsclass Retry : def __init__ (self, max_attempts=3 , delay=1.0 , backoff=2.0 , exceptions=(Exception, ) ): self .max_attempts = max_attempts self .delay = delay self .backoff = backoff self .exceptions = exceptions def __call__ (self, func ): @wraps(func ) def wrapper (*args, **kwargs ): current_delay = self .delay for attempt in range (1 , self .max_attempts + 1 ): try : return func(*args, **kwargs) except self .exceptions as e: if attempt == self .max_attempts: raise import time time.sleep(current_delay) current_delay *= self .backoff return wrapper @Retry(max_attempts=3 , delay=0.5 , backoff=2.0 , exceptions=(ValueError, ) ) def parse_int (value ): return int (value) try : print (parse_int("42" )) parse_int("abc" ) except ValueError: print ("解析失败" )
13.4 类装饰器 13.4.1 用类实现函数装饰器 类装饰器通过__call__方法使实例可调用,适合需要维护状态的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from functools import wrapsclass CallStats : def __init__ (self, func ): wraps(func)(self ) self .func = func self .call_count = 0 self .total_time = 0.0 self .last_result = None def __call__ (self, *args, **kwargs ): import time start = time.perf_counter() self .last_result = self .func(*args, **kwargs) elapsed = time.perf_counter() - start self .call_count += 1 self .total_time += elapsed return self .last_result def stats (self ): avg = self .total_time / self .call_count if self .call_count else 0 return { "function" : self .func.__name__, "calls" : self .call_count, "total_time" : self .total_time, "avg_time" : avg, } def reset (self ): self .call_count = 0 self .total_time = 0.0 @CallStats def compute (n ): return sum (i ** 2 for i in range (n)) compute(100_000 ) compute(200_000 ) compute(50_000 ) import jsonprint (json.dumps(compute.stats(), indent=2 ))compute.reset() print (f"重置后调用次数: {compute.call_count} " )
13.4.2 装饰类 类装饰器可以修改类的行为、添加方法或改变实例创建过程:
添加方法 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def add_repr (cls ): def __repr__ (self ): fields = ", " .join( f"{k} ={v!r} " for k, v in self .__dict__.items() ) return f"{cls.__name__} ({fields} )" cls.__repr__ = __repr__ return cls @add_repr class Point : def __init__ (self, x, y ): self .x = x self .y = y p = Point(3 , 5 ) print (p)
单例模式 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def singleton (cls ): instances = {} def get_instance (*args, **kwargs ): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] get_instance.__wrapped__ = cls get_instance._instances = instances return get_instance @singleton class DatabaseConnection : def __init__ (self, host="localhost" ): self .host = host print (f"连接数据库: {host} " ) db1 = DatabaseConnection("localhost" ) db2 = DatabaseConnection("remote-host" ) print (f"db1 is db2: {db1 is db2} " )print (f"原始类: {DatabaseConnection.__wrapped__} " )
数据验证装饰器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from functools import wrapsdef validated (cls ): original_init = cls.__init__ @wraps(original_init ) def new_init (self, *args, **kwargs ): annotations = cls.__annotations__ bound = original_init.__code__ varnames = bound.co_varnames[1 :bound.co_argcount] for i, name in enumerate (varnames): if i < len (args): value = args[i] elif name in kwargs: value = kwargs[name] else : continue if name in annotations: expected = annotations[name] if not isinstance (value, expected): raise TypeError( f"{cls.__name__} .{name} 期望 {expected.__name__} , " f"得到 {type (value).__name__} " ) original_init(self , *args, **kwargs) cls.__init__ = new_init return cls @validated class User : name: str age: int def __init__ (self, name: str , age: int ): self .name = name self .age = age u1 = User("Alice" , 25 ) print (f"{u1.name} , {u1.age} " )try : User(123 , "twenty" ) except TypeError as e: print (e)
13.5 装饰器叠加 13.5.1 执行顺序 多个装饰器按从下到上的顺序应用,但执行时按从外到内的顺序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from functools import wrapsdef decorator_a (func ): @wraps(func ) def wrapper (*args, **kwargs ): print ("A: 函数调用前" ) result = func(*args, **kwargs) print ("A: 函数调用后" ) return result return wrapper def decorator_b (func ): @wraps(func ) def wrapper (*args, **kwargs ): print ("B: 函数调用前" ) result = func(*args, **kwargs) print ("B: 函数调用后" ) return result return wrapper @decorator_a @decorator_b def greet (name ): print (f"Hello, {name} !" ) greet("Alice" )
输出顺序:
1 2 3 4 5 A: 函数调用前 B: 函数调用前 Hello, Alice! B: 函数调用后 A: 函数调用后
等价于:greet = decorator_a(decorator_b(greet)),形成洋葱式调用栈。
13.5.2 装饰器组合器 为了管理复杂的装饰器组合,可以设计组合器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from functools import wrapsdef compose (*decorators ): def decorator (func ): for dec in reversed (decorators): func = dec(func) return func return decorator def trace (func ): @wraps(func ) def wrapper (*args, **kwargs ): print (f"[TRACE] {func.__name__} " ) return func(*args, **kwargs) return wrapper def timer (func ): @wraps(func ) def wrapper (*args, **kwargs ): import time start = time.perf_counter() result = func(*args, **kwargs) print (f"[TIMER] {func.__name__} : {time.perf_counter() - start:.6 f} s" ) return result return wrapper def validate_positive (func ): @wraps(func ) def wrapper (*args, **kwargs ): if any (arg < 0 for arg in args if isinstance (arg, (int , float ))): raise ValueError("参数不能为负数" ) return func(*args, **kwargs) return wrapper @compose(trace, timer, validate_positive ) def compute (a, b ): return a ** b print (compute(2 , 10 ))
13.6.1 lru_cache:最近最少使用缓存 lru_cache是Python标准库中最实用的装饰器之一,实现了LRU(Least Recently Used)缓存淘汰策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from functools import lru_cacheimport time@lru_cache(maxsize=128 ) def fibonacci (n ): if n <= 1 : return n return fibonacci(n - 1 ) + fibonacci(n - 2 ) start = time.perf_counter() print (f"fib(100) = {fibonacci(100 )} " )print (f"耗时: {time.perf_counter() - start:.6 f} 秒" )print (fibonacci.cache_info())fibonacci.cache_clear() print (f"清空后: {fibonacci.cache_info()} " )
缓存参数选择 :
maxsize=None:无限制缓存,适用于结果集有限且可全部缓存的情况maxsize=128:默认值,适合大多数场景maxsize=0:禁用缓存(实际上只是禁用了缓存效果,但仍保留装饰器接口)缓存键的计算 :lru_cache使用参数的值作为缓存键。参数必须是可哈希的(hashable),这意味着列表、字典等不可哈希类型不能直接作为参数。
1 2 3 4 5 6 7 8 @lru_cache(maxsize=32 ) def process_data (data_hash: int , threshold: float ) -> list : print (f" 处理数据: hash={data_hash} , threshold={threshold} " ) return [data_hash * threshold] print (process_data(100 , 0.5 ))print (process_data(100 , 0.5 ))print (process_data.cache_info())
typed参数 :typed=True时,不同类型的参数会产生不同的缓存条目:
1 2 3 4 5 6 7 8 @lru_cache(maxsize=128 , typed=True ) def typed_cache (x ): print (f" 计算: {x} (type={type (x).__name__} )" ) return x * 2 typed_cache(1 ) typed_cache(1.0 ) print (typed_cache.cache_info())
13.6.2 singledispatch:单分派泛型函数 singledispatch实现了基于第一个参数类型的运行时多态(单分派),是Python实现访问者模式的优雅方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from functools import singledispatchfrom collections.abc import Sequence from typing import Union @singledispatch def to_json (obj ) -> str : raise TypeError(f"不支持的类型: {type (obj)} " ) @to_json.register(str ) def _ (obj ): return f'"{obj} "' @to_json.register(int ) def _ (obj ): return str (obj) @to_json.register(float ) def _ (obj ): return str (obj) @to_json.register(bool ) def _ (obj ): return "true" if obj else "false" @to_json.register(type (None ) ) def _ (obj ): return "null" @to_json.register(dict ) def _ (obj ): pairs = ", " .join( f"{to_json(k)} : {to_json(v)} " for k, v in obj.items() ) return "{" + pairs + "}" @to_json.register(list ) def _ (obj ): items = ", " .join(to_json(item) for item in obj) return "[" + items + "]" print (to_json("hello" ))print (to_json(42 ))print (to_json(3.14 ))print (to_json(True ))print (to_json(None ))print (to_json({"name" : "Alice" , "age" : 25 }))print (to_json([1 , "two" , None , True ]))
继承与分派 :singledispatch支持继承关系,子类会使用最近父类的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @singledispatch def process (obj ): return f"默认处理: {type (obj).__name__} " @process.register(int ) def _ (obj ): return f"整数处理: {obj} " @process.register(float ) def _ (obj ): return f"浮点处理: {obj:.2 f} " class SpecialInt (int ): pass print (process(42 ))print (process(3.14 ))print (process("hello" ))print (process(SpecialInt(7 )))
13.6.3 cached_property与cached_method Python 3.8+引入了cached_property,3.9+引入了cached_method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from functools import cached_propertyimport mathclass Circle : def __init__ (self, radius ): self .radius = radius @cached_property def area (self ): print (" 计算面积..." ) return math.pi * self .radius ** 2 @cached_property def circumference (self ): print (" 计算周长..." ) return 2 * math.pi * self .radius c = Circle(5 ) print (f"面积: {c.area} " )print (f"面积(缓存): {c.area} " )print (f"周长: {c.circumference} " )print (f"周长(缓存): {c.circumference} " )c.radius = 10 print (f"面积(未更新): {c.area} " )del c.areaprint (f"面积(重新计算): {c.area} " )
注意:cached_property在实例的__dict__中存储缓存值。修改radius不会自动使缓存失效,需要手动删除缓存属性。
13.6.4 partial与partialmethod 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from functools import partialdef power (base, exp ): return base ** exp square = partial(power, exp=2 ) cube = partial(power, exp=3 ) print (square(5 ))print (cube(5 ))print (f"partial属性: func={square.func} , keywords={square.keywords} " )from functools import partialmethodclass Cell : def __init__ (self ): self ._alive = False def set_state (self, state ): self ._alive = state set_alive = partialmethod(set_state, True ) set_dead = partialmethod(set_state, False ) cell = Cell() cell.set_alive() print (f"存活: {cell._alive} " )cell.set_dead() print (f"存活: {cell._alive} " )
13.6.5 total_ordering total_ordering装饰器根据已定义的部分比较方法自动补全其余比较方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from functools import total_ordering@total_ordering class Student : def __init__ (self, name, grade ): self .name = name self .grade = grade def __eq__ (self, other ): if not isinstance (other, Student): return NotImplemented return self .grade == other.grade def __lt__ (self, other ): if not isinstance (other, Student): return NotImplemented return self .grade < other.grade students = [ Student("Alice" , 90 ), Student("Bob" , 85 ), Student("Charlie" , 92 ), ] for s in sorted (students): print (f"{s.name} : {s.grade} " ) print (Student("A" , 90 ) <= Student("B" , 90 ))print (Student("A" , 85 ) > Student("B" , 90 ))
13.7 装饰器设计模式与最佳实践 13.7.1 装饰器与设计模式 装饰器模式(Decorator Pattern)是GoF设计模式之一,Python的装饰器语法是该模式的一种实现。但两者有区别:
GoF装饰器模式 :运行时动态添加职责,通过对象组合实现Python装饰器 :定义时静态添加行为,通过函数包装实现1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from functools import wrapsclass TextProcessor : def process (self, text: str ) -> str : return text class TrimDecorator : def __init__ (self, processor: TextProcessor ): self ._processor = processor def process (self, text: str ) -> str : return self ._processor.process(text.strip()) class UpperDecorator : def __init__ (self, processor: TextProcessor ): self ._processor = processor def process (self, text: str ) -> str : return self ._processor.process(text).upper() class ReverseDecorator : def __init__ (self, processor: TextProcessor ): self ._processor = processor def process (self, text: str ) -> str : return self ._processor.process(text)[::-1 ] processor = ReverseDecorator(UpperDecorator(TrimDecorator(TextProcessor()))) print (processor.process(" hello world " ))
13.7.2 装饰器可测试性 装饰器应该是可独立测试的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from functools import wrapsimport timedef timer (func ): @wraps(func ) def wrapper (*args, **kwargs ): start = time.perf_counter() result = func(*args, **kwargs) wrapper.last_elapsed = time.perf_counter() - start return result wrapper.last_elapsed = 0.0 return wrapper @timer def slow_add (a, b ): time.sleep(0.01 ) return a + b result = slow_add(1 , 2 ) print (f"结果: {result} , 耗时: {slow_add.last_elapsed:.6 f} 秒" )
13.7.3 装饰器与类型提示 Python 3.10+提供了ParamSpec和Concatenate用于精确描述装饰器的类型签名:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from functools import wrapsfrom typing import TypeVar, ParamSpec, Callable P = ParamSpec("P" ) R = TypeVar("R" ) def trace (func: Callable [P, R] ) -> Callable [P, R]: @wraps(func ) def wrapper (*args: P.args, **kwargs: P.kwargs ) -> R: print (f"调用 {func.__name__} " ) return func(*args, **kwargs) return wrapper @trace def add (a: int , b: int ) -> int : return a + b result: int = add(1 , 2 )
13.7.4 装饰器的局限性 调试困难 :装饰器改变了函数的调用栈,可能导致traceback不直观。使用@wraps和__wrapped__可以缓解。
序列化问题 :被装饰的函数可能无法被pickle:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import picklefrom functools import wrapsdef my_decorator (func ): @wraps(func ) def wrapper (*args, **kwargs ): return func(*args, **kwargs) return wrapper @my_decorator def greet (name ): return f"Hello, {name} " try : pickled = pickle.dumps(greet) print ("序列化成功" ) except (pickle.PicklingError, AttributeError) as e: print (f"序列化失败: {e} " ) import copyregimport typesdef pickle_wrapper (func ): return unpickle_wrapper, (func.__wrapped__,) def unpickle_wrapper (func ): def wrapper (*args, **kwargs ): return func(*args, **kwargs) return wrapper copyreg.pickle(types.FunctionType, pickle_wrapper)
装饰器顺序敏感 :不同装饰器的叠加顺序可能产生不同效果,需要仔细设计。
性能开销 :每层装饰器增加一次函数调用开销,在高频调用场景下需要考虑。
13.8 前沿技术动态 13.8.1 Python 3.12+的改进 Python 3.12对装饰器语法进行了增强,允许更灵活的装饰器表达式:
1 2 3 4 5 6 7 8 9 10 11 12 13 decorators = [trace, timer] @trace @timer def func_v1 (): pass
13.8.2 基于装饰器的依赖注入 现代Python框架广泛使用装饰器实现依赖注入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from functools import wrapsfrom typing import get_type_hintsclass Container : _instances = {} _factories = {} @classmethod def register (cls, interface ): def decorator (implementation ): cls._factories[interface] = implementation return implementation return decorator @classmethod def get (cls, interface ): if interface not in cls._instances: if interface in cls._factories: cls._instances[interface] = cls._factories[interface]() else : raise KeyError(f"未注册的类型: {interface} " ) return cls._instances[interface] @classmethod def inject (cls, func ): hints = get_type_hints(func) @wraps(func ) def wrapper (*args, **kwargs ): for name, hint in hints.items(): if name not in kwargs and hint in cls._factories: kwargs[name] = cls.get(hint) return func(*args, **kwargs) return wrapper class Database : pass @Container.register(Database ) class PostgresDB (Database ): def query (self, sql ): return f"PostgreSQL: {sql} " @Container.inject def get_data (db: Database ): return db.query("SELECT * FROM users" ) print (get_data())
13.8.3 装饰器与AST转换 高级用法是利用装饰器触发AST(抽象语法树)转换,实现编译时优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import astimport inspectimport textwrapdef optimize (func ): source = inspect.getsource(func) source = textwrap.dedent(source) tree = ast.parse(source) class ConstantFolder (ast.NodeTransformer): def visit_BinOp (self, node ): self .generic_visit(node) if isinstance (node.left, ast.Constant) and isinstance (node.right, ast.Constant): if isinstance (node.op, ast.Add): return ast.Constant(value=node.left.value + node.right.value) if isinstance (node.op, ast.Mult): return ast.Constant(value=node.left.value * node.right.value) return node optimized = ConstantFolder().visit(tree) ast.fix_missing_locations(optimized) code = compile (optimized, func.__code__.co_filename, "exec" ) namespace = {} exec (code, func.__globals__, namespace) return namespace[func.__name__] @optimize def compute (): return 2 + 3 print (compute())
13.9 本章小结 本章深入探讨了Python装饰器机制:
闭包基础 :词法作用域、自由变量、__closure__与cell对象、延迟绑定陷阱函数装饰器 :装饰器本质、通用模板、常用模式(计时、日志、类型检查、重试、速率限制)带参装饰器 :三层嵌套、可选参数、基于类的参数化类装饰器 :__call__协议、装饰类、单例模式、数据验证装饰器叠加 :执行顺序、组合器模式functools工具 :lru_cache、singledispatch、cached_property、partial、total_ordering设计实践 :装饰器模式对比、可测试性、类型提示、局限性与应对13.10 习题与项目练习 基础习题 闭包分析 :分析以下代码的输出,并解释原因:
1 2 funcs = [lambda : i for i in range (5 )] print ([f() for f in funcs])
如何修改使其输出[0, 1, 2, 3, 4]?
计时装饰器 :实现一个@timeit装饰器,记录函数执行时间并支持配置输出格式(秒/毫秒/微秒)。
缓存装饰器 :不使用functools.lru_cache,手动实现一个带TTL(生存时间)的缓存装饰器。
进阶习题 权限检查装饰器 :实现一个@require_permission(permission)装饰器,用于Web框架中的权限控制。要求支持权限组合(AND/OR逻辑)。
异步装饰器 :实现一个适用于async函数的重试装饰器@async_retry,支持指数退避策略。
装饰器注册系统 :设计一个基于装饰器的插件注册系统,支持:
自动发现和注册插件 按名称/标签查询插件 插件依赖声明 项目练习 AOP框架 :使用装饰器实现一个轻量级面向切面编程(AOP)框架,支持:
Before通知:方法调用前执行 After通知:方法调用后执行 Around通知:包裹方法调用 AfterThrowing通知:异常时执行 切点(Pointcut)表达式匹配 性能监控中间件 :实现一个基于装饰器的性能监控系统,包括:
函数级耗时统计 调用频率统计 内存使用监控 统计数据导出(JSON/CSV) 可配置的告警阈值 13.11 延伸阅读 13.11.1 装饰器理论 13.11.3 类型提示与装饰器 13.11.4 实践案例 下一章:第14章 生成器与迭代器