第13章 装饰器

学习目标

完成本章学习后,读者应能够:

  1. 理解闭包机制:掌握词法作用域、自由变量与闭包的底层实现原理,理解__closure__属性与cell对象的运作机制
  2. 精通装饰器模式:从函数装饰器到类装饰器,从无参装饰器到带参装饰器,全面掌握装饰器的实现范式
  3. 掌握元数据保留:深入理解functools.wraps__wrapped__与函数内省机制
  4. 运用标准库工具:熟练使用lru_cachesingledispatchcached_property等高级装饰器
  5. 设计装饰器架构:能够为实际项目设计可组合、可测试、可维护的装饰器体系
  6. 理解装饰器局限性:认知装饰器在调试、序列化、类型检查等方面的挑战与应对策略

13.1 闭包:装饰器的基石

13.1.1 作用域与自由变量

闭包(Closure)是装饰器的理论基础。理解闭包需要先理解Python的作用域规则与自由变量机制。

Python采用词法作用域(Lexical Scoping),函数定义时即确定了其可以访问的变量范围。LEGB规则定义了名称查找顺序:Local → Enclosing → Global → Built-in。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def demonstrate_scope():
x = 10

def inner():
y = 20
print(f"Local y: {y}")
print(f"Enclosing x: {x}")

inner()
try:
print(y)
except NameError as e:
print(f"Cannot access inner's local: {e}")

demonstrate_scope()

当一个函数引用了其外层作用域中的变量,且该变量既不是全局变量也不是局部变量时,它被称为自由变量(Free Variable)。Python编译器在编译时检测自由变量,并通过cell对象在运行时实现跨作用域的变量绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def make_counter():
count = 0

def increment():
nonlocal count
count += 1
return count

return increment

counter = make_counter()
print(counter())
print(counter())
print(counter())

print(f"闭包自由变量: {counter.__code__.co_freevars}")
print(f"闭包单元格: {counter.__closure__}")
print(f"单元格值: {counter.__closure__[0].cell_contents}")

__closure__属性是一个cell对象元组,每个cell对象持有一个自由变量的引用。nonlocal声明告诉解释器该变量是自由变量,应从外层作用域查找而非创建新的局部绑定。

13.1.2 闭包的语义与实现

闭包是一个函数对象,它记住了定义时的词法环境,即使该环境在函数调用时已经不可达。闭包 = 函数 + 环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def make_multiplier(factor):
def multiply(x):
return x * factor
return multiply

double = make_multiplier(2)
triple = make_multiplier(3)

print(double(5))
print(triple(5))

print(f"double的自由变量: {double.__code__.co_freevars}")
print(f"double的闭包值: {double.__closure__[0].cell_contents}")
print(f"triple的闭包值: {triple.__closure__[0].cell_contents}")

关键理解:doubletriple共享相同的函数代码,但各自持有不同的闭包环境。factor作为自由变量被绑定到不同的cell对象中。

闭包的延迟绑定陷阱:闭包中的自由变量是引用绑定而非值绑定,这意味着在闭包被调用时才解析变量的当前值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def create_functions():
functions = []
for i in range(5):
def func():
return i
functions.append(func)
return functions

funcs = create_functions()
print([f() for f in funcs])

def create_functions_fixed():
functions = []
for i in range(5):
def func(i=i):
return i
functions.append(func)
return functions

funcs = create_functions_fixed()
print([f() for f in funcs])

延迟绑定发生的原因是闭包捕获的是变量的引用,而非变量在定义时的值。当循环结束后,i的值为4,所有闭包都引用同一个i。通过默认参数i=i可以在定义时将当前值绑定到参数上。

13.1.3 闭包的实际应用

可变状态封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def make_averager():
total = 0
count = 0

def averager(new_value):
nonlocal total, count
total += new_value
count += 1
return total / count

return averager

avg = make_averager()
print(avg(10))
print(avg(20))
print(avg(30))
print(f"当前闭包状态: total={avg.__closure__[0].cell_contents}, count={avg.__closure__[1].cell_contents}")

记忆化缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def make_cache():
cache = {}

def get(key, compute):
if key not in cache:
cache[key] = compute(key)
return cache[key]

def clear():
cache.clear()

get.clear = clear
get.cache = cache
return get

cached = make_cache()

def expensive_sqrt(n):
print(f" Computing sqrt({n})...")
return n ** 0.5

print(cached(16, expensive_sqrt))
print(cached(16, expensive_sqrt))
print(cached(25, expensive_sqrt))
print(f"缓存内容: {cached.cache}")
cached.clear()
print(f"清空后: {cached.cache}")

函数工厂与策略模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def make_validator(field_name, min_val, max_val):
def validate(value):
if not isinstance(value, (int, float)):
raise TypeError(f"{field_name} must be numeric")
if value < min_val:
raise ValueError(f"{field_name} must be >= {min_val}")
if value > max_val:
raise ValueError(f"{field_name} must be <= {max_val}")
return True

validate.field_name = field_name
validate.min_val = min_val
validate.max_val = max_val
return validate

validate_age = make_validator("age", 0, 150)
validate_score = make_validator("score", 0, 100)
validate_temp = make_validator("temperature", -273.15, float("inf"))

print(validate_age(25))
print(validate_score(85))
print(validate_temp(36.5))

try:
validate_age(-5)
except ValueError as e:
print(e)

13.1.4 闭包与类的对比

闭包和类都可以封装状态与行为,但各有适用场景:

特性闭包
状态封装__closure__ + nonlocal实例属性
接口暴露仅暴露返回的函数通过属性和方法
继承不支持支持
内省困难(需访问__closure__方便(属性访问)
内存较轻量较重(__dict__等)
可读性简单场景更简洁复杂场景更清晰
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def counter_closure(start=0):
count = start

def increment(step=1):
nonlocal count
count += step
return count

def reset():
nonlocal count
count = start

increment.reset = reset
return increment

class CounterClass:
def __init__(self, start=0):
self._count = start
self._start = start

def increment(self, step=1):
self._count += step
return self._count

def reset(self):
self._count = self._start

c1 = counter_closure(10)
print(c1())
print(c1())
c1.reset()
print(c1())

c2 = CounterClass(10)
print(c2.increment())
print(c2.increment())
c2.reset()
print(c2.increment())

13.2 函数装饰器

13.2.1 装饰器的本质

装饰器(Decorator)是一种语法糖,本质是一个高阶函数,接受一个函数作为参数并返回一个新函数。@decorator语法等价于func = decorator(func)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def trace(func):
def wrapper(*args, **kwargs):
print(f"→ 调用 {func.__name__}({args}, {kwargs})")
result = func(*args, **kwargs)
print(f"← 返回 {func.__name__} => {result}")
return result
return wrapper

@trace
def add(a, b):
return a + b

print(add(3, 5))

add_equivalent = trace(add)
print(add is add_equivalent)

理解装饰器的关键在于:装饰器在模块加载时执行,而非在函数调用时。这意味着装饰器是声明式的——它在函数定义时就已经生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
registry = []

def register(func):
print(f"注册函数: {func.__name__}")
registry.append(func)
return func

@register
def func_a():
return "A"

@register
def func_b():
return "B"

print(f"注册表: {[f.__name__ for f in registry]}")

13.2.2 通用装饰器模板

一个健壮的函数装饰器应遵循以下模板:

1
2
3
4
5
6
7
8
from functools import wraps

def robust_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
return result
return wrapper

关键要素:

  • *args, **kwargs:接受任意参数,确保装饰器适用于所有函数签名
  • @wraps(func):保留原函数的元数据(__name____doc____module__等)
  • 返回result:确保原函数的返回值不被丢弃

13.2.3 常用装饰器模式

计时装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from functools import wraps

def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} 执行耗时: {elapsed:.6f}秒")
return result
return wrapper

@timer
def compute_sum(n):
return sum(range(n))

result = compute_sum(1_000_000)
print(f"结果: {result}")

日志装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import logging
from functools import wraps

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def log_calls(level=logging.DEBUG):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
args_repr = [repr(a) for a in args]
kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
logger.log(level, f"调用 {func.__name__}({signature})")
try:
result = func(*args, **kwargs)
logger.log(level, f"{func.__name__} 返回 {result!r}")
return result
except Exception as e:
logger.error(f"{func.__name__} 抛出 {type(e).__name__}: {e}")
raise
return wrapper
return decorator

@log_calls(level=logging.INFO)
def divide(a, b):
return a / b

print(divide(10, 3))
try:
divide(10, 0)
except ZeroDivisionError:
pass

类型检查装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from functools import wraps
import inspect

def typecheck(func):
sig = inspect.signature(func)

@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()

for name, value in bound.arguments.items():
param = sig.parameters[name]
if param.annotation is not param.empty:
if not isinstance(value, param.annotation):
raise TypeError(
f"参数 '{name}' 期望类型 {param.annotation.__name__}, "
f"实际类型 {type(value).__name__}"
)

result = func(*args, **kwargs)

if sig.return_annotation is not sig.empty:
if not isinstance(result, sig.return_annotation):
raise TypeError(
f"返回值期望类型 {sig.return_annotation.__name__}, "
f"实际类型 {type(result).__name__}"
)

return result
return wrapper

@typecheck
def greet(name: str, times: int) -> str:
return (f"Hello, {name}! " * times).strip()

print(greet("Alice", 2))

try:
greet(123, 2)
except TypeError as e:
print(e)

重试装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
import random
from functools import wraps

def retry(
max_attempts=3,
delay=1.0,
backoff=2.0,
exceptions=(Exception,),
on_retry=None
):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
raise
if on_retry:
on_retry(attempt, e)
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator

@retry(
max_attempts=5,
delay=0.1,
backoff=1.5,
exceptions=(ConnectionError, TimeoutError),
on_retry=lambda attempt, err: print(f"第{attempt}次重试: {err}")
)
def fetch_url(url):
if random.random() < 0.7:
raise ConnectionError(f"连接失败: {url}")
return f"内容来自 {url}"

try:
result = fetch_url("https://example.com")
print(result)
except ConnectionError as e:
print(f"最终失败: {e}")

速率限制装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
from functools import wraps
from collections import deque

def rate_limit(calls_per_second=1):
min_interval = 1.0 / calls_per_second

def decorator(func):
timestamps = deque()

@wraps(func)
def wrapper(*args, **kwargs):
now = time.monotonic()
while timestamps and now - timestamps[0] >= 1.0:
timestamps.popleft()

if len(timestamps) >= calls_per_second:
sleep_time = min_interval - (now - timestamps[-1])
if sleep_time > 0:
time.sleep(sleep_time)

timestamps.append(time.monotonic())
return func(*args, **kwargs)
return wrapper
return decorator

@rate_limit(calls_per_second=3)
def api_call(endpoint):
print(f"调用 {endpoint} @ {time.strftime('%H:%M:%S')}")
return f"响应: {endpoint}"

for i in range(6):
api_call(f"/api/data/{i}")

13.2.4 保留函数元数据

functools.wraps不仅仅是复制__name____doc__,它还处理了多个关键属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from functools import wraps

def show_wraps_behavior(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper

@show_wraps_behavior
def example(x: int, y: str = "default") -> str:
"""这是一个示例函数。"""
return f"{x}: {y}"

print(f"__name__: {example.__name__}")
print(f"__doc__: {example.__doc__}")
print(f"__module__: {example.__module__}")
print(f"__wrapped__: {example.__wrapped__}")
print(f"__annotations__: {example.__annotations__}")
print(f"__qualname__: {example.__qualname__}")

__wrapped__属性特别重要——它保存了对原始函数的引用,使得装饰后的函数仍可访问原始函数。这在调试、内省和装饰器叠加时至关重要。

1
2
3
4
5
import inspect

original = example.__wrapped__
print(f"原始函数签名: {inspect.signature(original)}")
print(f"原始函数源码:\n{inspect.getsource(original)}")

13.3 带参数的装饰器

13.3.1 三层嵌套模式

带参数的装饰器需要三层嵌套:最外层接收装饰器参数,中间层接收被装饰函数,最内层是实际包装函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from functools import wraps

def repeat(times=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = None
for _ in range(times):
result = func(*args, **kwargs)
return result
return wrapper
return decorator

@repeat(times=3)
def greet(name):
print(f"Hello, {name}!")
return f"Greeted {name}"

greet("Alice")

执行流程:repeat(times=3) → 返回decorator@decorator应用于greet → 返回wrapper

13.3.2 可选参数装饰器

当装饰器既需要支持@decorator(无参数)又需要支持@decorator(option=value)(有参数)时,需要特殊处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from functools import wraps

def flexible_decorator(func=None, *, option="default"):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
print(f"[{option}] 调用 {f.__name__}")
return f(*args, **kwargs)
return wrapper

if func is not None:
return decorator(func)
return decorator

@flexible_decorator
def func_a():
return "A"

@flexible_decorator(option="custom")
def func_b():
return "B"

print(func_a())
print(func_b())

@flexible_decorator不带括号时,func接收被装饰函数;当带括号时,funcNone,返回真正的装饰器。

13.3.3 基于类的参数化装饰器

使用类实现带参数的装饰器通常更清晰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from functools import wraps

class Retry:
def __init__(self, max_attempts=3, delay=1.0, backoff=2.0, exceptions=(Exception,)):
self.max_attempts = max_attempts
self.delay = delay
self.backoff = backoff
self.exceptions = exceptions

def __call__(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = self.delay
for attempt in range(1, self.max_attempts + 1):
try:
return func(*args, **kwargs)
except self.exceptions as e:
if attempt == self.max_attempts:
raise
import time
time.sleep(current_delay)
current_delay *= self.backoff
return wrapper

@Retry(max_attempts=3, delay=0.5, backoff=2.0, exceptions=(ValueError,))
def parse_int(value):
return int(value)

try:
print(parse_int("42"))
parse_int("abc")
except ValueError:
print("解析失败")

13.4 类装饰器

13.4.1 用类实现函数装饰器

类装饰器通过__call__方法使实例可调用,适合需要维护状态的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from functools import wraps

class CallStats:
def __init__(self, func):
wraps(func)(self)
self.func = func
self.call_count = 0
self.total_time = 0.0
self.last_result = None

def __call__(self, *args, **kwargs):
import time
start = time.perf_counter()
self.last_result = self.func(*args, **kwargs)
elapsed = time.perf_counter() - start
self.call_count += 1
self.total_time += elapsed
return self.last_result

def stats(self):
avg = self.total_time / self.call_count if self.call_count else 0
return {
"function": self.func.__name__,
"calls": self.call_count,
"total_time": self.total_time,
"avg_time": avg,
}

def reset(self):
self.call_count = 0
self.total_time = 0.0

@CallStats
def compute(n):
return sum(i ** 2 for i in range(n))

compute(100_000)
compute(200_000)
compute(50_000)

import json
print(json.dumps(compute.stats(), indent=2))
compute.reset()
print(f"重置后调用次数: {compute.call_count}")

13.4.2 装饰类

类装饰器可以修改类的行为、添加方法或改变实例创建过程:

添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add_repr(cls):
def __repr__(self):
fields = ", ".join(
f"{k}={v!r}" for k, v in self.__dict__.items()
)
return f"{cls.__name__}({fields})"
cls.__repr__ = __repr__
return cls

@add_repr
class Point:
def __init__(self, x, y):
self.x = x
self.y = y

p = Point(3, 5)
print(p)

单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def singleton(cls):
instances = {}

def get_instance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]

get_instance.__wrapped__ = cls
get_instance._instances = instances
return get_instance

@singleton
class DatabaseConnection:
def __init__(self, host="localhost"):
self.host = host
print(f"连接数据库: {host}")

db1 = DatabaseConnection("localhost")
db2 = DatabaseConnection("remote-host")
print(f"db1 is db2: {db1 is db2}")
print(f"原始类: {DatabaseConnection.__wrapped__}")

数据验证装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from functools import wraps

def validated(cls):
original_init = cls.__init__

@wraps(original_init)
def new_init(self, *args, **kwargs):
annotations = cls.__annotations__
bound = original_init.__code__

varnames = bound.co_varnames[1:bound.co_argcount]
for i, name in enumerate(varnames):
if i < len(args):
value = args[i]
elif name in kwargs:
value = kwargs[name]
else:
continue

if name in annotations:
expected = annotations[name]
if not isinstance(value, expected):
raise TypeError(
f"{cls.__name__}.{name} 期望 {expected.__name__}, "
f"得到 {type(value).__name__}"
)

original_init(self, *args, **kwargs)

cls.__init__ = new_init
return cls

@validated
class User:
name: str
age: int

def __init__(self, name: str, age: int):
self.name = name
self.age = age

u1 = User("Alice", 25)
print(f"{u1.name}, {u1.age}")

try:
User(123, "twenty")
except TypeError as e:
print(e)

13.5 装饰器叠加

13.5.1 执行顺序

多个装饰器按从下到上的顺序应用,但执行时按从外到内的顺序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from functools import wraps

def decorator_a(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("A: 函数调用前")
result = func(*args, **kwargs)
print("A: 函数调用后")
return result
return wrapper

def decorator_b(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("B: 函数调用前")
result = func(*args, **kwargs)
print("B: 函数调用后")
return result
return wrapper

@decorator_a
@decorator_b
def greet(name):
print(f"Hello, {name}!")

greet("Alice")

输出顺序:

1
2
3
4
5
A: 函数调用前
B: 函数调用前
Hello, Alice!
B: 函数调用后
A: 函数调用后

等价于:greet = decorator_a(decorator_b(greet)),形成洋葱式调用栈。

13.5.2 装饰器组合器

为了管理复杂的装饰器组合,可以设计组合器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from functools import wraps

def compose(*decorators):
def decorator(func):
for dec in reversed(decorators):
func = dec(func)
return func
return decorator

def trace(func):
@wraps(func)
def wrapper(*args, **kwargs):
print(f"[TRACE] {func.__name__}")
return func(*args, **kwargs)
return wrapper

def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
import time
start = time.perf_counter()
result = func(*args, **kwargs)
print(f"[TIMER] {func.__name__}: {time.perf_counter() - start:.6f}s")
return result
return wrapper

def validate_positive(func):
@wraps(func)
def wrapper(*args, **kwargs):
if any(arg < 0 for arg in args if isinstance(arg, (int, float))):
raise ValueError("参数不能为负数")
return func(*args, **kwargs)
return wrapper

@compose(trace, timer, validate_positive)
def compute(a, b):
return a ** b

print(compute(2, 10))

13.6 functools模块高级工具

13.6.1 lru_cache:最近最少使用缓存

lru_cache是Python标准库中最实用的装饰器之一,实现了LRU(Least Recently Used)缓存淘汰策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from functools import lru_cache
import time

@lru_cache(maxsize=128)
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n - 1) + fibonacci(n - 2)

start = time.perf_counter()
print(f"fib(100) = {fibonacci(100)}")
print(f"耗时: {time.perf_counter() - start:.6f}秒")

print(fibonacci.cache_info())

fibonacci.cache_clear()
print(f"清空后: {fibonacci.cache_info()}")

缓存参数选择

  • maxsize=None:无限制缓存,适用于结果集有限且可全部缓存的情况
  • maxsize=128:默认值,适合大多数场景
  • maxsize=0:禁用缓存(实际上只是禁用了缓存效果,但仍保留装饰器接口)

缓存键的计算lru_cache使用参数的值作为缓存键。参数必须是可哈希的(hashable),这意味着列表、字典等不可哈希类型不能直接作为参数。

1
2
3
4
5
6
7
8
@lru_cache(maxsize=32)
def process_data(data_hash: int, threshold: float) -> list:
print(f" 处理数据: hash={data_hash}, threshold={threshold}")
return [data_hash * threshold]

print(process_data(100, 0.5))
print(process_data(100, 0.5))
print(process_data.cache_info())

typed参数typed=True时,不同类型的参数会产生不同的缓存条目:

1
2
3
4
5
6
7
8
@lru_cache(maxsize=128, typed=True)
def typed_cache(x):
print(f" 计算: {x} (type={type(x).__name__})")
return x * 2

typed_cache(1)
typed_cache(1.0)
print(typed_cache.cache_info())

13.6.2 singledispatch:单分派泛型函数

singledispatch实现了基于第一个参数类型的运行时多态(单分派),是Python实现访问者模式的优雅方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from functools import singledispatch
from collections.abc import Sequence
from typing import Union

@singledispatch
def to_json(obj) -> str:
raise TypeError(f"不支持的类型: {type(obj)}")

@to_json.register(str)
def _(obj):
return f'"{obj}"'

@to_json.register(int)
def _(obj):
return str(obj)

@to_json.register(float)
def _(obj):
return str(obj)

@to_json.register(bool)
def _(obj):
return "true" if obj else "false"

@to_json.register(type(None))
def _(obj):
return "null"

@to_json.register(dict)
def _(obj):
pairs = ", ".join(
f"{to_json(k)}: {to_json(v)}" for k, v in obj.items()
)
return "{" + pairs + "}"

@to_json.register(list)
def _(obj):
items = ", ".join(to_json(item) for item in obj)
return "[" + items + "]"

print(to_json("hello"))
print(to_json(42))
print(to_json(3.14))
print(to_json(True))
print(to_json(None))
print(to_json({"name": "Alice", "age": 25}))
print(to_json([1, "two", None, True]))

继承与分派singledispatch支持继承关系,子类会使用最近父类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@singledispatch
def process(obj):
return f"默认处理: {type(obj).__name__}"

@process.register(int)
def _(obj):
return f"整数处理: {obj}"

@process.register(float)
def _(obj):
return f"浮点处理: {obj:.2f}"

class SpecialInt(int):
pass

print(process(42))
print(process(3.14))
print(process("hello"))
print(process(SpecialInt(7)))

13.6.3 cached_property与cached_method

Python 3.8+引入了cached_property,3.9+引入了cached_method

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from functools import cached_property
import math

class Circle:
def __init__(self, radius):
self.radius = radius

@cached_property
def area(self):
print(" 计算面积...")
return math.pi * self.radius ** 2

@cached_property
def circumference(self):
print(" 计算周长...")
return 2 * math.pi * self.radius

c = Circle(5)
print(f"面积: {c.area}")
print(f"面积(缓存): {c.area}")
print(f"周长: {c.circumference}")
print(f"周长(缓存): {c.circumference}")

c.radius = 10
print(f"面积(未更新): {c.area}")

del c.area
print(f"面积(重新计算): {c.area}")

注意:cached_property在实例的__dict__中存储缓存值。修改radius不会自动使缓存失效,需要手动删除缓存属性。

13.6.4 partial与partialmethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from functools import partial

def power(base, exp):
return base ** exp

square = partial(power, exp=2)
cube = partial(power, exp=3)

print(square(5))
print(cube(5))
print(f"partial属性: func={square.func}, keywords={square.keywords}")

from functools import partialmethod

class Cell:
def __init__(self):
self._alive = False

def set_state(self, state):
self._alive = state

set_alive = partialmethod(set_state, True)
set_dead = partialmethod(set_state, False)

cell = Cell()
cell.set_alive()
print(f"存活: {cell._alive}")
cell.set_dead()
print(f"存活: {cell._alive}")

13.6.5 total_ordering

total_ordering装饰器根据已定义的部分比较方法自动补全其余比较方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from functools import total_ordering

@total_ordering
class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade

def __eq__(self, other):
if not isinstance(other, Student):
return NotImplemented
return self.grade == other.grade

def __lt__(self, other):
if not isinstance(other, Student):
return NotImplemented
return self.grade < other.grade

students = [
Student("Alice", 90),
Student("Bob", 85),
Student("Charlie", 92),
]

for s in sorted(students):
print(f"{s.name}: {s.grade}")

print(Student("A", 90) <= Student("B", 90))
print(Student("A", 85) > Student("B", 90))

13.7 装饰器设计模式与最佳实践

13.7.1 装饰器与设计模式

装饰器模式(Decorator Pattern)是GoF设计模式之一,Python的装饰器语法是该模式的一种实现。但两者有区别:

  • GoF装饰器模式:运行时动态添加职责,通过对象组合实现
  • Python装饰器:定义时静态添加行为,通过函数包装实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from functools import wraps

class TextProcessor:
def process(self, text: str) -> str:
return text

class TrimDecorator:
def __init__(self, processor: TextProcessor):
self._processor = processor

def process(self, text: str) -> str:
return self._processor.process(text.strip())

class UpperDecorator:
def __init__(self, processor: TextProcessor):
self._processor = processor

def process(self, text: str) -> str:
return self._processor.process(text).upper()

class ReverseDecorator:
def __init__(self, processor: TextProcessor):
self._processor = processor

def process(self, text: str) -> str:
return self._processor.process(text)[::-1]

processor = ReverseDecorator(UpperDecorator(TrimDecorator(TextProcessor())))
print(processor.process(" hello world "))

13.7.2 装饰器可测试性

装饰器应该是可独立测试的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from functools import wraps
import time

def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
wrapper.last_elapsed = time.perf_counter() - start
return result
wrapper.last_elapsed = 0.0
return wrapper

@timer
def slow_add(a, b):
time.sleep(0.01)
return a + b

result = slow_add(1, 2)
print(f"结果: {result}, 耗时: {slow_add.last_elapsed:.6f}秒")

13.7.3 装饰器与类型提示

Python 3.10+提供了ParamSpecConcatenate用于精确描述装饰器的类型签名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from functools import wraps
from typing import TypeVar, ParamSpec, Callable

P = ParamSpec("P")
R = TypeVar("R")

def trace(func: Callable[P, R]) -> Callable[P, R]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
print(f"调用 {func.__name__}")
return func(*args, **kwargs)
return wrapper

@trace
def add(a: int, b: int) -> int:
return a + b

result: int = add(1, 2)

13.7.4 装饰器的局限性

调试困难:装饰器改变了函数的调用栈,可能导致traceback不直观。使用@wraps__wrapped__可以缓解。

序列化问题:被装饰的函数可能无法被pickle:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pickle
from functools import wraps

def my_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper

@my_decorator
def greet(name):
return f"Hello, {name}"

try:
pickled = pickle.dumps(greet)
print("序列化成功")
except (pickle.PicklingError, AttributeError) as e:
print(f"序列化失败: {e}")

import copyreg
import types

def pickle_wrapper(func):
return unpickle_wrapper, (func.__wrapped__,)

def unpickle_wrapper(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper

copyreg.pickle(types.FunctionType, pickle_wrapper)

装饰器顺序敏感:不同装饰器的叠加顺序可能产生不同效果,需要仔细设计。

性能开销:每层装饰器增加一次函数调用开销,在高频调用场景下需要考虑。

13.8 前沿技术动态

13.8.1 Python 3.12+的改进

Python 3.12对装饰器语法进行了增强,允许更灵活的装饰器表达式:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Python 3.12+ 支持更灵活的装饰器语法
# 允许在装饰器位置使用任意表达式

decorators = [trace, timer]

# 传统写法
@trace
@timer
def func_v1():
pass

# Python 3.12+ 可以动态选择装饰器
# @decorators[0] # 新语法(示例,需3.12+环境)

13.8.2 基于装饰器的依赖注入

现代Python框架广泛使用装饰器实现依赖注入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from functools import wraps
from typing import get_type_hints

class Container:
_instances = {}
_factories = {}

@classmethod
def register(cls, interface):
def decorator(implementation):
cls._factories[interface] = implementation
return implementation
return decorator

@classmethod
def get(cls, interface):
if interface not in cls._instances:
if interface in cls._factories:
cls._instances[interface] = cls._factories[interface]()
else:
raise KeyError(f"未注册的类型: {interface}")
return cls._instances[interface]

@classmethod
def inject(cls, func):
hints = get_type_hints(func)

@wraps(func)
def wrapper(*args, **kwargs):
for name, hint in hints.items():
if name not in kwargs and hint in cls._factories:
kwargs[name] = cls.get(hint)
return func(*args, **kwargs)
return wrapper

class Database:
pass

@Container.register(Database)
class PostgresDB(Database):
def query(self, sql):
return f"PostgreSQL: {sql}"

@Container.inject
def get_data(db: Database):
return db.query("SELECT * FROM users")

print(get_data())

13.8.3 装饰器与AST转换

高级用法是利用装饰器触发AST(抽象语法树)转换,实现编译时优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import ast
import inspect
import textwrap

def optimize(func):
source = inspect.getsource(func)
source = textwrap.dedent(source)
tree = ast.parse(source)

class ConstantFolder(ast.NodeTransformer):
def visit_BinOp(self, node):
self.generic_visit(node)
if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
if isinstance(node.op, ast.Add):
return ast.Constant(value=node.left.value + node.right.value)
if isinstance(node.op, ast.Mult):
return ast.Constant(value=node.left.value * node.right.value)
return node

optimized = ConstantFolder().visit(tree)
ast.fix_missing_locations(optimized)

code = compile(optimized, func.__code__.co_filename, "exec")
namespace = {}
exec(code, func.__globals__, namespace)
return namespace[func.__name__]

@optimize
def compute():
return 2 + 3

print(compute())

13.9 本章小结

本章深入探讨了Python装饰器机制:

  1. 闭包基础:词法作用域、自由变量、__closure__cell对象、延迟绑定陷阱
  2. 函数装饰器:装饰器本质、通用模板、常用模式(计时、日志、类型检查、重试、速率限制)
  3. 带参装饰器:三层嵌套、可选参数、基于类的参数化
  4. 类装饰器__call__协议、装饰类、单例模式、数据验证
  5. 装饰器叠加:执行顺序、组合器模式
  6. functools工具lru_cachesingledispatchcached_propertypartialtotal_ordering
  7. 设计实践:装饰器模式对比、可测试性、类型提示、局限性与应对

13.10 习题与项目练习

基础习题

  1. 闭包分析:分析以下代码的输出,并解释原因:

    1
    2
    funcs = [lambda: i for i in range(5)]
    print([f() for f in funcs])

    如何修改使其输出[0, 1, 2, 3, 4]

  2. 计时装饰器:实现一个@timeit装饰器,记录函数执行时间并支持配置输出格式(秒/毫秒/微秒)。

  3. 缓存装饰器:不使用functools.lru_cache,手动实现一个带TTL(生存时间)的缓存装饰器。

进阶习题

  1. 权限检查装饰器:实现一个@require_permission(permission)装饰器,用于Web框架中的权限控制。要求支持权限组合(AND/OR逻辑)。

  2. 异步装饰器:实现一个适用于async函数的重试装饰器@async_retry,支持指数退避策略。

  3. 装饰器注册系统:设计一个基于装饰器的插件注册系统,支持:

    • 自动发现和注册插件
    • 按名称/标签查询插件
    • 插件依赖声明

项目练习

  1. AOP框架:使用装饰器实现一个轻量级面向切面编程(AOP)框架,支持:

    • Before通知:方法调用前执行
    • After通知:方法调用后执行
    • Around通知:包裹方法调用
    • AfterThrowing通知:异常时执行
    • 切点(Pointcut)表达式匹配
  2. 性能监控中间件:实现一个基于装饰器的性能监控系统,包括:

    • 函数级耗时统计
    • 调用频率统计
    • 内存使用监控
    • 统计数据导出(JSON/CSV)
    • 可配置的告警阈值

13.11 延伸阅读

13.11.1 装饰器理论

  • PEP 318 — Decorators for Functions and Methods (https://peps.python.org/pep-0318/) — 装饰器语法的设计决策
  • 《Fluent Python》第7章 — 函数装饰器和闭包深度解析
  • 《Python Cookbook》第9章 — 元编程实用技巧

13.11.2 functools模块

13.11.3 类型提示与装饰器

13.11.4 实践案例


下一章:第14章 生成器与迭代器