1.4. 迭代器与生成器
迭代器和生成器是 Python 中处理数据流的核心机制,掌握它们对于写出高效、内存友好的代码至关重要。
1.4.1. 迭代协议
1.4.1.1. 可迭代对象 vs 迭代器
# 可迭代对象(Iterable):实现 __iter__ 方法
# 迭代器(Iterator):实现 __iter__ 和 __next__ 方法
my_list = [1, 2, 3] # 可迭代对象
iterator = iter(my_list) # 获取迭代器
print(next(iterator)) # 1
print(next(iterator)) # 2
print(next(iterator)) # 3
# print(next(iterator)) # StopIteration
# 迭代器是一次性的
iterator = iter(my_list)
print(list(iterator)) # [1, 2, 3]
print(list(iterator)) # [] - 已耗尽
1.4.1.2. 自定义迭代器
class CountDown:
"""倒计时迭代器"""
def __init__(self, start):
self.start = start
def __iter__(self):
return self
def __next__(self):
if self.start <= 0:
raise StopIteration
self.start -= 1
return self.start + 1
for num in CountDown(5):
print(num, end=' ') # 5 4 3 2 1
# ⚠️ 问题:这个迭代器只能使用一次
countdown = CountDown(3)
print(list(countdown)) # [3, 2, 1]
print(list(countdown)) # [] - 已耗尽
1.4.1.3. 可重复使用的迭代器
class ReusableCountDown:
"""可重复使用的倒计时(分离可迭代对象和迭代器)"""
def __init__(self, start):
self.start = start
def __iter__(self):
# 每次迭代返回新的迭代器
return CountDownIterator(self.start)
class CountDownIterator:
def __init__(self, start):
self.current = start
def __iter__(self):
return self
def __next__(self):
if self.current <= 0:
raise StopIteration
self.current -= 1
return self.current + 1
countdown = ReusableCountDown(3)
print(list(countdown)) # [3, 2, 1]
print(list(countdown)) # [3, 2, 1] - 可以再次使用
1.4.2. 生成器
生成器是创建迭代器的简洁方式,使用 yield 关键字。
1.4.2.1. 基本生成器
def countdown(n):
"""生成器函数"""
while n > 0:
yield n # 暂停并返回值
n -= 1
# 生成器函数返回生成器对象
gen = countdown(5)
print(type(gen)) # <class 'generator'>
print(next(gen)) # 5
print(next(gen)) # 4
# 在循环中使用
for num in countdown(3):
print(num, end=' ') # 3 2 1
1.4.2.2. 生成器 vs 列表的内存对比
import sys
# 列表:立即生成所有元素,占用大量内存
list_comp = [x ** 2 for x in range(1000000)]
print(f"List size: {sys.getsizeof(list_comp):,} bytes") # 约 8 MB
# 生成器:惰性生成,几乎不占内存
gen_exp = (x ** 2 for x in range(1000000))
print(f"Generator size: {sys.getsizeof(gen_exp)} bytes") # 约 200 bytes
# 实际应用:处理大文件
def read_large_file(file_path):
"""惰性读取大文件"""
with open(file_path) as f:
for line in f: # 逐行读取,不加载整个文件
yield line.strip()
# 处理每一行而不需要全部加载到内存
# for line in read_large_file("huge_file.txt"):
# process(line)
1.4.2.3. yield from
def chain(*iterables):
"""连接多个可迭代对象"""
for it in iterables:
yield from it # 相当于 for item in it: yield item
result = list(chain([1, 2], [3, 4], [5, 6]))
print(result) # [1, 2, 3, 4, 5, 6]
# 递归生成器
def flatten(nested_list):
"""展平嵌套列表"""
for item in nested_list:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
nested = [1, [2, 3, [4, 5]], 6, [7, 8]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8]
1.4.2.4. 生成器的 send 和 throw
def accumulator():
"""可接收值的生成器(协程风格)"""
total = 0
while True:
value = yield total # 接收 send 的值
if value is None:
break
total += value
acc = accumulator()
print(next(acc)) # 0 - 启动生成器
print(acc.send(10)) # 10
print(acc.send(20)) # 30
print(acc.send(5)) # 35
acc.send(None) # 终止
# 生成器的 close 和异常处理
def managed_generator():
try:
while True:
yield "running"
except GeneratorExit:
print("Generator closed")
finally:
print("Cleanup")
gen = managed_generator()
print(next(gen)) # running
gen.close() # Generator closed \n Cleanup
1.4.3. itertools 模块
Python 标准库提供了强大的迭代器工具。
1.4.3.1. 无限迭代器
from itertools import count, cycle, repeat
# count: 无限计数
for i in count(10, 2): # 从 10 开始,步长 2
if i > 20:
break
print(i, end=' ') # 10 12 14 16 18 20
# cycle: 无限循环
colors = cycle(['red', 'green', 'blue'])
for _ in range(7):
print(next(colors), end=' ') # red green blue red green blue red
# repeat: 重复
list(repeat('A', 5)) # ['A', 'A', 'A', 'A', 'A']
1.4.3.2. 组合迭代器
from itertools import chain, zip_longest, product, permutations, combinations
# chain: 连接迭代器
list(chain([1, 2], [3, 4])) # [1, 2, 3, 4]
# zip_longest: 最长填充的 zip
list(zip_longest([1, 2, 3], ['a', 'b'], fillvalue='-'))
# [(1, 'a'), (2, 'b'), (3, '-')]
# product: 笛卡尔积
list(product('AB', [1, 2])) # [('A', 1), ('A', 2), ('B', 1), ('B', 2)]
# permutations: 排列
list(permutations('ABC', 2)) # [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]
# combinations: 组合
list(combinations('ABC', 2)) # [('A', 'B'), ('A', 'C'), ('B', 'C')]
1.4.3.3. 过滤和分组
from itertools import filterfalse, takewhile, dropwhile, groupby
# filterfalse: filter 的反面
list(filterfalse(lambda x: x % 2, range(10))) # [0, 2, 4, 6, 8]
# takewhile: 条件为真时获取元素
list(takewhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [1, 3]
# dropwhile: 条件为真时跳过元素
list(dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [5, 2, 1]
# groupby: 分组(数据需要预先排序)
data = [('a', 1), ('a', 2), ('b', 3), ('b', 4), ('a', 5)]
sorted_data = sorted(data, key=lambda x: x[0])
for key, group in groupby(sorted_data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# a: [('a', 1), ('a', 2), ('a', 5)]
# b: [('b', 3), ('b', 4)]
1.4.3.4. 实用工具
from itertools import islice, tee, accumulate
# islice: 切片迭代器
gen = (x ** 2 for x in range(10))
list(islice(gen, 2, 5)) # [4, 9, 16] - 索引 2, 3, 4
# tee: 复制迭代器
original = iter([1, 2, 3, 4, 5])
copy1, copy2 = tee(original, 2)
print(list(copy1)) # [1, 2, 3, 4, 5]
print(list(copy2)) # [1, 2, 3, 4, 5]
# accumulate: 累积
list(accumulate([1, 2, 3, 4])) # [1, 3, 6, 10]
list(accumulate([1, 2, 3, 4], lambda a, b: a * b)) # [1, 2, 6, 24]
1.4.4. 实际应用
1.4.4.1. 分块处理
from itertools import islice
def chunked(iterable, size):
"""将可迭代对象分块"""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
break
yield chunk
# 分批处理大数据
data = range(10)
for chunk in chunked(data, 3):
print(chunk)
# [0, 1, 2]
# [3, 4, 5]
# [6, 7, 8]
# [9]
# 实际应用:批量 API 调用
def batch_api_call(items, batch_size=100):
for batch in chunked(items, batch_size):
# 批量处理
yield process_batch(batch)
1.4.4.2. 滑动窗口
from collections import deque
from itertools import islice
def sliding_window(iterable, n):
"""滑动窗口"""
it = iter(iterable)
window = deque(islice(it, n), maxlen=n)
if len(window) == n:
yield tuple(window)
for item in it:
window.append(item)
yield tuple(window)
# 移动平均
data = [1, 2, 3, 4, 5, 6, 7]
for window in sliding_window(data, 3):
avg = sum(window) / len(window)
print(f"{window} -> avg: {avg:.2f}")
# (1, 2, 3) -> avg: 2.00
# (2, 3, 4) -> avg: 3.00
# (3, 4, 5) -> avg: 4.00
# ...
1.4.4.3. 管道处理
def pipeline(*steps):
"""函数管道"""
def process(data):
for step in steps:
data = step(data)
return data
return process
# 数据处理管道
def read_data(source):
return (x for x in source)
def filter_positive(data):
return (x for x in data if x > 0)
def square(data):
return (x ** 2 for x in data)
def take(n):
def _take(data):
return islice(data, n)
return _take
# 组合管道
process = pipeline(
read_data,
filter_positive,
square,
take(5)
)
data = [-2, -1, 0, 1, 2, 3, 4, 5, 6]
result = list(process(data))
print(result) # [1, 4, 9, 16, 25]
1.4.5. 最佳实践
何时使用生成器
处理大数据:文件、数据库结果
无限序列:流数据、实时数据
惰性计算:只在需要时计算
管道处理:数据转换链
注意事项
生成器只能遍历一次:需要多次使用时转为列表或使用
tee不要在循环中重复调用
list(generator):这会多次创建生成器注意内存:
tee会缓存数据,大数据时谨慎使用调试困难:考虑在开发时先用列表,确认逻辑正确后改为生成器