第14章 生成器与迭代器

学习目标

完成本章学习后,读者应能够:

  1. 理解迭代协议:掌握迭代器协议(__iter__/__next__)与可迭代协议(__iter__)的设计原理与实现机制
  2. 精通生成器函数:深入理解yield的执行模型、生成器状态机、协程式通信(send/throw/close
  3. 运用yield from:理解子生成器委托机制及其在异步编程中的基础作用
  4. 掌握生成器表达式:理解惰性求值的内存优势与适用场景
  5. 运用itertools:熟练使用标准库迭代器工具构建高效的数据处理管道
  6. 设计迭代器架构:能够为实际项目设计自定义迭代器,处理无限序列、数据流管道等场景

14.1 迭代协议与底层机制

14.1.1 迭代器协议

Python的迭代基于两个核心协议:

  • 可迭代协议(Iterable Protocol):对象实现__iter__()方法,返回一个迭代器
  • 迭代器协议(Iterator Protocol):对象实现__next__()方法,返回下一个元素;元素耗尽时抛出StopIteration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections.abc import Iterable, Iterator

class Counter:
def __init__(self, max_count):
self.max_count = max_count
self.count = 0

def __iter__(self):
return self

def __next__(self):
if self.count < self.max_count:
self.count += 1
return self.count
raise StopIteration

counter = Counter(5)
print(f"是否可迭代: {isinstance(counter, Iterable)}")
print(f"是否迭代器: {isinstance(counter, Iterator)}")

for num in counter:
print(num, end=" ")
print()

counter2 = Counter(3)
print(next(counter2))
print(next(counter2))
print(next(counter2))
try:
next(counter2)
except StopIteration:
print("迭代结束")

关键理解:迭代器自身也是可迭代的(__iter__返回self),但可迭代对象不一定是迭代器。例如列表是可迭代的,但不是迭代器——每次调用iter(list)都返回一个新的迭代器对象。

1
2
3
4
5
6
7
8
9
10
my_list = [1, 2, 3]
it1 = iter(my_list)
it2 = iter(my_list)
print(f"it1和it2是同一对象: {it1 is it2}")
print(f"it1: {next(it1)}, it2: {next(it2)}")

counter = Counter(3)
it3 = iter(counter)
it4 = iter(counter)
print(f"it3和it4是同一对象: {it3 is it4}")

14.1.2 for循环的底层机制

for循环的本质是调用iter()获取迭代器,然后反复调用next()直到StopIteration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def manual_for(iterable, func):
iterator = iter(iterable)
while True:
try:
item = next(iterator)
func(item)
except StopIteration:
break

manual_for([1, 2, 3], lambda x: print(x * 2, end=" "))
print()

def for_loop_equivalent(iterable):
result = []
iterator = iter(iterable)
while True:
try:
item = next(iterator)
result.append(item)
except StopIteration:
break
return result

print(for_loop_equivalent(range(5)))

14.1.3 分离可迭代对象与迭代器

最佳实践是将可迭代对象和迭代器分离为两个类。这样可迭代对象可以多次迭代,每次创建独立的迭代器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Range:
def __init__(self, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self.start = start
self.stop = stop
self.step = step

def __iter__(self):
return RangeIterator(self.start, self.stop, self.step)

def __len__(self):
return max(0, (self.stop - self.start + self.step - 1) // self.step if self.step > 0
else (self.start - self.stop - self.step - 1) // (-self.step))

def __contains__(self, value):
if self.step > 0:
return self.start <= value < self.stop and (value - self.start) % self.step == 0
return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0

class RangeIterator:
def __init__(self, start, stop, step):
self.current = start
self.stop = stop
self.step = step

def __iter__(self):
return self

def __next__(self):
if self.step > 0 and self.current >= self.stop:
raise StopIteration
if self.step < 0 and self.current <= self.stop:
raise StopIteration
result = self.current
self.current += self.step
return result

r = Range(2, 10, 2)
print(f"Range(2,10,2): {list(r)}")
print(f"再次迭代: {list(r)}")
print(f"长度: {len(r)}")
print(f"6 in Range: {6 in r}")

14.1.4 迭代器的常见陷阱

迭代器只能消费一次

1
2
3
4
5
6
7
8
9
10
it = iter([1, 2, 3])
print(f"第一次: {list(it)}")
print(f"第二次: {list(it)}")

def process_twice(iterable):
data = list(iterable)
print(f"第一遍: {data}")
print(f"第二遍: {data}")

process_twice(iter([1, 2, 3]))

迭代器不是序列:迭代器不支持len()、索引和切片,因为它可能是无限的或长度未知的。

1
2
3
4
5
6
7
8
9
10
it = iter([1, 2, 3])
try:
print(len(it))
except TypeError as e:
print(f"迭代器不支持len: {e}")

try:
print(it[0])
except TypeError as e:
print(f"迭代器不支持索引: {e}")

14.2 生成器函数

14.2.1 yield的执行模型

生成器函数是包含yield语句的函数。调用生成器函数不会执行函数体,而是返回一个生成器对象。每次调用next()时,函数从上次暂停的位置恢复执行,直到遇到下一个yield或函数结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def simple_generator():
print(" 第1步: 开始")
yield 1
print(" 第2步: 继续")
yield 2
print(" 第3步: 结束")
yield 3

gen = simple_generator()
print(f"生成器类型: {type(gen)}")
print(f"是否迭代器: {isinstance(gen, Iterator)}")

print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
try:
next(gen)
except StopIteration:
print("生成器耗尽")

生成器的内部状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import inspect

def my_generator():
yield 1
yield 2
yield 3

gen = my_generator()
print(f"创建后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第一次next后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第二次next后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第三次next后: {inspect.getgeneratorstate(gen)}")

try:
next(gen)
except StopIteration:
pass
print(f"耗尽后: {inspect.getgeneratorstate(gen)}")

生成器状态有四种:

  • GEN_CREATED:已创建但尚未启动
  • GEN_RUNNING:正在执行(多线程中可见)
  • GEN_SUSPENDED:在yield处暂停
  • GEN_CLOSED:已关闭

14.2.2 生成器与协程

生成器不仅是迭代器,还是协程(Coroutine)的基础。通过send()throw()close()方法,生成器可以接收外部输入、处理异常和响应关闭请求。

send方法:向生成器发送值,该值成为yield表达式的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value

gen = accumulator()
next(gen)

print(gen.send(10))
print(gen.send(20))
print(gen.send(30))

gen.close()

协程式生成器模式send使生成器成为数据消费者而非仅是生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def processor():
buffer = []
while True:
data = yield buffer
if data is None:
break
buffer.append(data.upper())

gen = processor()
next(gen)

print(gen.send("hello"))
print(gen.send("world"))
print(gen.send("python"))

gen.close()

throw方法:在生成器暂停处抛出指定异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def error_handler():
try:
while True:
value = yield
print(f" 处理: {value}")
except ValueError as e:
print(f" 捕获ValueError: {e}")
yield "已恢复"
except Exception as e:
print(f" 捕获异常: {type(e).__name__}: {e}")

gen = error_handler()
next(gen)

gen.send("data1")
gen.send("data2")

result = gen.throw(ValueError, "无效值")
print(f"throw返回: {result}")

gen.close()

close方法:在生成器暂停处抛出GeneratorExit异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def cleanup_generator():
try:
while True:
value = yield
print(f" 处理: {value}")
except GeneratorExit:
print(" 清理资源...")
finally:
print(" 生成器已关闭")

gen = cleanup_generator()
next(gen)
gen.send("data")
gen.close()
print(f"关闭后状态: {inspect.getgeneratorstate(gen)}")

14.2.3 生成器的高级应用

无限序列:生成器天然适合表示无限序列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b

fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(f"前10个斐波那契数: {first_10}")

def primes():
seen = []
n = 2
while True:
if all(n % p != 0 for p in seen):
seen.append(n)
yield n
n += 1

prime_gen = primes()
first_20_primes = [next(prime_gen) for _ in range(20)]
print(f"前20个素数: {first_20_primes}")

def count(start=0, step=1):
n = start
while True:
yield n
n += step

evens = count(0, 2)
print(f"前5个偶数: {[next(evens) for _ in range(5)]}")

数据管道:生成器可以串联形成数据处理管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def read_lines(filepath):
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")

def filter_lines(lines, predicate):
for line in lines:
if predicate(line):
yield line

def transform_lines(lines, transform):
for line in lines:
yield transform(line)

def limit(lines, n):
for i, line in enumerate(lines):
if i >= n:
break
yield line

def strip(lines):
for line in lines:
yield line.strip()

def non_empty(lines):
for line in lines:
if line:
yield line

data = [
" Hello World ",
"",
" Python Programming ",
" ",
" Generator Pipeline ",
"",
]

pipeline = limit(
transform_lines(
filter_lines(data, bool),
str.upper
),
3
)
print(list(pipeline))

递归生成器:生成器可以递归调用自身处理嵌套结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def flatten(nested):
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item)
else:
yield item

nested = [1, [2, 3], [4, [5, 6, [7]]], 8, (9, 10)]
print(f"展平: {list(flatten(nested))}")

def tree_walker(node, depth=0):
if isinstance(node, dict):
for key, value in node.items():
yield (depth, key)
yield from tree_walker(value, depth + 1)
elif isinstance(node, list):
for i, item in enumerate(node):
yield from tree_walker(item, depth)
else:
yield (depth, node)

tree = {
"root": {
"branch1": ["leaf1", "leaf2"],
"branch2": {
"sub1": "leaf3",
"sub2": ["leaf4", "leaf5"]
}
}
}

for depth, value in tree_walker(tree):
print(" " * depth + f"→ {value}")

14.3 生成器表达式

14.3.1 语法与性能

生成器表达式是列表推导式的惰性版本,使用圆括号而非方括号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import sys

list_comp = [x ** 2 for x in range(10000)]
gen_exp = (x ** 2 for x in range(10000))

print(f"列表推导式内存: {sys.getsizeof(list_comp)} 字节")
print(f"生成器表达式内存: {sys.getsizeof(gen_exp)} 字节")
print(f"内存比: {sys.getsizeof(list_comp) / sys.getsizeof(gen_exp):.1f}x")

total = sum(x ** 2 for x in range(1000000))
print(f"1到1000000的平方和: {total}")

max_val = max(x ** 2 for x in range(1000))
print(f"最大平方值: {max_val}")

has_match = any(x > 100 for x in range(50, 150))
print(f"存在大于100的数: {has_match}")

all_positive = all(x > 0 for x in [1, 2, 3, 4, 5])
print(f"全部为正: {all_positive}")

14.3.2 嵌套生成器表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
matrix = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
]

flat = (x for row in matrix for x in row)
print(f"展平矩阵: {list(flat)}")

diagonal = (matrix[i][i] for i in range(len(matrix)))
print(f"对角线: {list(diagonal)}")

transposed = (
[row[i] for row in matrix]
for i in range(len(matrix[0]))
)
print(f"转置: {list(transposed)}")

14.3.3 生成器表达式的局限

生成器表达式只能包含单个表达式,不能包含复杂语句。对于复杂逻辑,应使用生成器函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 生成器表达式:简单转换
squares = (x ** 2 for x in range(10))

# 生成器函数:复杂逻辑
def process_data(data):
for item in data:
cleaned = item.strip()
if not cleaned:
continue
try:
value = float(cleaned)
except ValueError:
continue
yield value * 2

raw_data = [" 1.5 ", "", "abc", " 2.3 ", " ", "4.0"]
print(f"处理结果: {list(process_data(raw_data))}")

14.4 yield from:子生成器委托

14.4.1 基本机制

yield from是Python 3.3引入的语法,用于将生成器的部分操作委托给子生成器。它不仅简化了嵌套生成器的写法,还建立了主生成器与子生成器之间的双向通信通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def chain(*iterables):
for iterable in iterables:
yield from iterable

result = chain([1, 2], "ab", range(3))
print(f"链接: {list(result)}")

def flatten(nested):
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item)
else:
yield item

nested = [1, [2, [3, 4]], [5], 6]
print(f"展平: {list(flatten(nested))}")

14.4.2 yield from的完整语义

yield from不仅仅是for item in subgen: yield item的语法糖,它还处理了以下细节:

  1. send()的值传递给子生成器
  2. throw()的异常传递给子生成器
  3. 当子生成器完成时,其return值成为yield from表达式的值
  4. 正确处理StopIteration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def subgen():
yield 1
yield 2
return "subgen完成"

def main_gen():
result = yield from subgen()
print(f" 子生成器返回: {result!r}")
yield "main继续"

gen = main_gen()
print(next(gen))
print(next(gen))
print(next(gen))

双向通信示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def echo_subgen():
while True:
received = yield
if received is None:
break
yield f"子生成器处理: {received}"

def main_gen():
yield from echo_subgen()
yield "主生成器结束"

gen = main_gen()
next(gen)

print(gen.send("hello"))
next(gen)
print(gen.send("world"))
next(gen)
gen.send(None)

14.4.3 yield from与归约模式

yield from的一个重要应用是归约(Reduction)模式——子生成器产生部分结果,主生成器汇总:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def average_gen():
total = 0
count = 0
while True:
value = yield
if value is None:
break
total += value
count += 1
return total / count if count else 0

def aggregator(values):
avg = yield from average_gen()
print(f" 平均值: {avg}")
return avg

gen = aggregator(None)
next(gen)
for v in [10, 20, 30, 40, 50]:
gen.send(v)
try:
gen.send(None)
except StopIteration as e:
print(f"最终结果: {e.value}")

14.5 itertools模块详解

14.5.1 无限迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from itertools import count, cycle, repeat

# count: 无限计数
for i in count(10, 2):
if i > 20:
break
print(i, end=" ")
print()

# cycle: 无限循环
colors = cycle(["red", "green", "blue"])
for i, color in enumerate(colors):
if i >= 7:
break
print(color, end=" ")
print()

# repeat: 重复
for val in repeat("hello", 3):
print(val, end=" ")
print()

# 实际应用:为序列生成索引
items = ["a", "b", "c", "d"]
for idx, item in zip(count(), items):
print(f" {idx}: {item}")

14.5.2 终止迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from itertools import (
accumulate, chain, compress, dropwhile,
filterfalse, groupby, islice, starmap,
takewhile, tee, zip_longest
)

# accumulate: 累积计算
numbers = [1, 2, 3, 4, 5]
print(f"累加: {list(accumulate(numbers))}")
print(f"累积最大: {list(accumulate(numbers, max))}")
print(f"累积乘积: {list(accumulate(numbers, lambda x, y: x * y))}")

# chain: 链接多个可迭代对象
print(f"链接: {list(chain([1, 2], [3, 4], [5, 6]))}")
print(f"展平: {list(chain.from_iterable([[1, 2], [3, 4], [5, 6]]))}")

# compress: 按选择器过滤
data = ["a", "b", "c", "d"]
selectors = [True, False, True, False]
print(f"压缩: {list(compress(data, selectors))}")

# dropwhile: 跳过直到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"跳过: {list(dropwhile(lambda x: x < 3, numbers))}")

# filterfalse: 反向过滤
print(f"奇数: {list(filterfalse(lambda x: x % 2 == 0, range(10)))}")

# groupby: 分组
data = [("a", 1), ("a", 2), ("b", 1), ("b", 2), ("c", 1)]
for key, group in groupby(data, lambda x: x[0]):
print(f" 组 {key}: {list(group)}")

# islice: 切片
numbers = range(100)
print(f"前10个: {list(islice(numbers, 10))}")
print(f"10-20: {list(islice(numbers, 10, 20))}")
print(f"0-20步长2: {list(islice(numbers, 0, 20, 2))}")

# starmap: 解参映射
data = [(2, 3), (4, 5), (6, 7)]
print(f"幂运算: {list(starmap(pow, data))}")

# takewhile: 取到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"取到: {list(takewhile(lambda x: x < 3, numbers))}")

# tee: 复制迭代器
numbers = iter([1, 2, 3, 4, 5])
it1, it2 = tee(numbers, 2)
print(f"副本1: {list(it1)}")
print(f"副本2: {list(it2)}")

# zip_longest: 不等长zip
print(f"不等长zip: {list(zip_longest([1, 2, 3], ["a", "b"], fillvalue=None))}")

14.5.3 组合迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from itertools import combinations, combinations_with_replacement, permutations, product

# 排列
print(f"排列(3选2): {list(permutations([1, 2, 3], 2))}")

# 组合
print(f"组合(3选2): {list(combinations([1, 2, 3], 2))}")

# 可重复组合
print(f"可重复组合(3选2): {list(combinations_with_replacement([1, 2, 3], 2))}")

# 笛卡尔积
print(f"笛卡尔积: {list(product([1, 2], ["a", "b"]))}")

# 实际应用:密码组合生成
import string

def generate_passwords(chars, length):
for combo in product(chars, repeat=length):
yield "".join(combo)

digits = string.digits
count = 0
for pwd in generate_passwords(digits, 3):
count += 1
print(f"3位数字密码总数: {count}")

14.5.4 构建数据处理管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from itertools import islice, chain, groupby
from collections import Counter

def analyze_text(text):
words = text.lower().split()
word_lengths = (len(w.strip(".,!?;:")) for w in words)
length_dist = Counter(word_lengths)

print("词长分布:")
for length, count in sorted(length_dist.items()):
print(f" {length}个字符: {'█' * count} ({count})")

avg_length = sum(len(w.strip(".,!?;:")) for w in words) / len(words) if words else 0
print(f"平均词长: {avg_length:.2f}")

longest = max(words, key=lambda w: len(w.strip(".,!?;:")))
print(f"最长词: {longest}")

text = """
Python is an interpreted high-level general-purpose programming language.
Its design philosophy emphasizes code readability with its use of significant indentation.
Its language constructs as well as its object-oriented approach aim to help programmers
write clear logical code for small and large-scale projects.
"""

analyze_text(text)

14.6 自定义迭代器设计模式

14.6.1 惰性求值迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class LazyRange:
def __init__(self, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self.start = start
self.stop = stop
self.step = step

def __iter__(self):
current = self.start
if self.step > 0:
while current < self.stop:
yield current
current += self.step
elif self.step < 0:
while current > self.stop:
yield current
current += self.step

def __contains__(self, value):
if self.step > 0:
return self.start <= value < self.stop and (value - self.start) % self.step == 0
return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0

def __reversed__(self):
if self.step > 0:
last = self.stop - 1 - (self.stop - 1 - self.start) % self.step
return LazyRange(last, self.start - 1, -self.step)
first = self.stop + 1 + (self.start - self.stop - 1) % (-self.step)
return LazyRange(first, self.start + 1, -self.step)

r = LazyRange(0, 10, 2)
print(f"正向: {list(r)}")
print(f"反向: {list(reversed(r))}")
print(f"6 in range: {6 in r}")

14.6.2 缓冲迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BufferedIterator:
def __init__(self, iterable, buffer_size=1):
self._iterator = iter(iterable)
self._buffer = []
self._buffer_size = buffer_size
self._exhausted = False

def __iter__(self):
return self

def __next__(self):
if self._buffer:
return self._buffer.pop(0)
if self._exhausted:
raise StopIteration
try:
for _ in range(self._buffer_size):
self._buffer.append(next(self._iterator))
except StopIteration:
self._exhausted = True
if self._buffer:
return self._buffer.pop(0)
raise StopIteration

def peek(self):
if not self._buffer and not self._exhausted:
try:
self._buffer.append(next(self._iterator))
except StopIteration:
self._exhausted = True
if self._buffer:
return self._buffer[0]
raise StopIteration

def has_next(self):
try:
self.peek()
return True
except StopIteration:
return False

data = [1, 2, 3, 4, 5]
buf = BufferedIterator(data, buffer_size=2)
print(f"peek: {buf.peek()}")
print(f"next: {next(buf)}")
print(f"peek: {buf.peek()}")
print(f"has_next: {buf.has_next()}")
print(f"剩余: {list(buf)}")

14.6.3 分块迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def chunked(iterable, size):
it = iter(iterable)
while True:
chunk = []
try:
for _ in range(size):
chunk.append(next(it))
except StopIteration:
if chunk:
yield chunk
break
yield chunk

data = list(range(10))
for chunk in chunked(data, 3):
print(f" 块: {chunk}")

def sliding_window(iterable, size):
it = iter(iterable)
window = []
for _ in range(size):
try:
window.append(next(it))
except StopIteration:
return
yield tuple(window)
for item in it:
window = window[1:] + [item]
yield tuple(window)

data = [1, 2, 3, 4, 5, 6, 7]
print(f"滑动窗口(3): {list(sliding_window(data, 3))}")

14.6.4 合并迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def merge_sorted(*iterables, key=None):
import heapq
if key is None:
return heapq.merge(*iterables)

keyed = []
for it in iterables:
it = iter(it)
try:
first = next(it)
keyed.append([key(first), first, it])
except StopIteration:
pass

while keyed:
keyed.sort(key=lambda x: x[0])
k, value, it = keyed[0]
yield value
try:
next_val = next(it)
keyed[0] = [key(next_val), next_val, it]
except StopIteration:
keyed.pop(0)

a = [1, 3, 5, 7, 9]
b = [2, 4, 6, 8, 10]
c = [0, 11, 12]

print(f"合并: {list(merge_sorted(a, b, c))}")

14.7 生成器与内存优化

14.7.1 大文件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def read_large_file(filepath, chunk_size=8192):
with open(filepath, "r", encoding="utf-8") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk

def read_lines_lazy(filepath):
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")

def count_lines(filepath):
return sum(1 for _ in read_lines_lazy(filepath))

def grep(pattern, filepath):
import re
regex = re.compile(pattern)
for line in read_lines_lazy(filepath):
if regex.search(line):
yield line

14.7.2 流式数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def stream_processor(data_stream):
pipeline = data_stream
pipeline = (line.strip() for line in pipeline)
pipeline = (line for line in pipeline if line)
pipeline = (line.lower() for line in pipeline)
pipeline = (line.split() for line in pipeline)
pipeline = (len(words) for words in pipeline)
return pipeline

sample_data = [
" Hello World ",
"",
" Python Programming ",
" Generator Pipeline ",
"",
]

word_counts = stream_processor(iter(sample_data))
print(f"每行词数: {list(word_counts)}")

14.7.3 生成器与递归的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import sys
import time

def fibonacci_recursive(n):
if n <= 1:
return n
return fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)

def fibonacci_generator(limit):
a, b = 0, 1
for _ in range(limit):
yield a
a, b = b, a + b

def fibonacci_memo(limit):
cache = [0, 1]
for i in range(2, limit):
cache.append(cache[i-1] + cache[i-2])
return cache[:limit]

n = 30
start = time.perf_counter()
result_rec = [fibonacci_recursive(i) for i in range(n)]
time_rec = time.perf_counter() - start

start = time.perf_counter()
result_gen = list(fibonacci_generator(n))
time_gen = time.perf_counter() - start

start = time.perf_counter()
result_memo = fibonacci_memo(n)
time_memo = time.perf_counter() - start

print(f"递归: {time_rec:.6f}秒")
print(f"生成器: {time_gen:.6f}秒")
print(f"记忆化: {time_memo:.6f}秒")
print(f"结果一致: {result_rec == result_gen == result_memo}")

14.8 前沿技术动态

14.8.1 生成器与异步编程

生成器是Python异步编程(async/await)的基础。在Python 3.5之前,协程就是基于生成器实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import types

@types.coroutine
def legacy_coroutine():
result = yield "暂停"
print(f" 收到: {result}")
return "完成"

async def modern_coroutine():
result = await legacy_coroutine()
print(f" 现代协程结果: {result}")

import asyncio
asyncio.run(modern_coroutine())

14.8.2 生成器与结构化并发

Python 3.11+引入了TaskGroup,结合生成器可以构建更安全的并发模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def fetch_item(item_id):
await asyncio.sleep(0.1)
return f"data-{item_id}"

async def fetch_all(item_ids):
async def fetch_and_collect(item_id, results):
result = await fetch_item(item_id)
results.append(result)

results = []
async with asyncio.TaskGroup() as tg:
for item_id in item_ids:
tg.create_task(fetch_and_collect(item_id, results))
return results

async def main():
data = await fetch_all(range(5))
print(f"获取结果: {data}")

asyncio.run(main())

14.8.3 生成器在数据科学中的应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def batch_generator(data, batch_size, shuffle=False):
import random
indices = list(range(len(data)))
if shuffle:
random.shuffle(indices)

for start in range(0, len(indices), batch_size):
batch_indices = indices[start:start + batch_size]
yield [data[i] for i in batch_indices]

class DataStream:
def __init__(self, source, transform=None, filter_fn=None):
self.source = source
self.transform = transform
self.filter_fn = filter_fn

def __iter__(self):
for item in self.source:
if self.filter_fn and not self.filter_fn(item):
continue
if self.transform:
item = self.transform(item)
yield item

def map(self, transform):
return DataStream(self, transform=transform)

def filter(self, filter_fn):
return DataStream(self, filter_fn=filter_fn)

def take(self, n):
return list(islice(self, n))

def collect(self):
return list(self)

from itertools import islice

data = list(range(100))
stream = DataStream(
data,
transform=lambda x: x ** 2,
filter_fn=lambda x: x % 2 == 0
)
print(f"前5个偶数的平方: {stream.take(5)}")

14.9 本章小结

本章深入探讨了Python生成器与迭代器机制:

  1. 迭代协议:可迭代协议与迭代器协议的区分、for循环的底层机制、分离可迭代对象与迭代器的最佳实践
  2. 生成器函数yield的执行模型、生成器状态机、send/throw/close的协程式通信
  3. 生成器表达式:惰性求值的内存优势、嵌套表达式、与列表推导式的对比
  4. yield from:子生成器委托机制、双向通信、归约模式
  5. itertools:无限迭代器、终止迭代器、组合迭代器、数据处理管道
  6. 自定义迭代器:惰性求值、缓冲、分块、合并等设计模式
  7. 内存优化:大文件处理、流式数据处理、生成器与递归的对比

14.10 习题与项目练习

基础习题

  1. 迭代器实现:实现一个PermutationIterator类,给定一个序列,按字典序逐个返回所有排列。

  2. 生成器管道:使用生成器实现一个CSV文件处理管道,支持过滤、映射、聚合操作,且全程惰性求值。

  3. 素数筛:使用生成器实现埃拉托斯特尼筛法(Sieve of Eratosthenes),生成无限素数序列。

进阶习题

  1. 协程式生成器:实现一个基于send的词频统计生成器,支持动态添加文本、查询词频、重置统计。

  2. 合并K路有序流:实现merge_k_sorted(k_streams),合并K个已排序的生成器为一个有序生成器,要求空间复杂度为O(K)。

  3. 迭代器适配器:实现一组迭代器适配器:peekable(可预览)、partition(分流)、roundrobin(轮流取值)。

项目练习

  1. 日志分析引擎:使用生成器管道实现一个日志分析系统,支持:

    • 惰性读取大日志文件
    • 多条件过滤(时间范围、级别、关键词)
    • 实时统计(QPS、错误率、响应时间分布)
    • 结果导出(JSON/CSV)
    • 支持管道组合与复用
  2. 数据流处理框架:设计一个基于生成器的数据流处理框架,包括:

    • Source:数据源抽象(文件、网络、数据库)
    • Transform:数据转换算子(map、filter、flatmap、window)
    • Sink:数据输出(文件、数据库、控制台)
    • 支持背压(Backpressure)机制
    • 支持检查点(Checkpoint)与故障恢复

14.11 延伸阅读

14.11.1 迭代器与生成器理论

14.11.2 itertools与函数式编程

14.11.3 协程与异步编程

14.11.4 数据处理实践


下一章:第15章 异常处理