# 迭代器与生成器 迭代器和生成器是 Python 中处理数据流的核心机制,掌握它们对于写出高效、内存友好的代码至关重要。 ## 迭代协议 ### 可迭代对象 vs 迭代器 ```python # 可迭代对象(Iterable):实现 __iter__ 方法 # 迭代器(Iterator):实现 __iter__ 和 __next__ 方法 my_list = [1, 2, 3] # 可迭代对象 iterator = iter(my_list) # 获取迭代器 print(next(iterator)) # 1 print(next(iterator)) # 2 print(next(iterator)) # 3 # print(next(iterator)) # StopIteration # 迭代器是一次性的 iterator = iter(my_list) print(list(iterator)) # [1, 2, 3] print(list(iterator)) # [] - 已耗尽 ``` ### 自定义迭代器 ```python class CountDown: """倒计时迭代器""" def __init__(self, start): self.start = start def __iter__(self): return self def __next__(self): if self.start <= 0: raise StopIteration self.start -= 1 return self.start + 1 for num in CountDown(5): print(num, end=' ') # 5 4 3 2 1 # ⚠️ 问题:这个迭代器只能使用一次 countdown = CountDown(3) print(list(countdown)) # [3, 2, 1] print(list(countdown)) # [] - 已耗尽 ``` ### 可重复使用的迭代器 ```python class ReusableCountDown: """可重复使用的倒计时(分离可迭代对象和迭代器)""" def __init__(self, start): self.start = start def __iter__(self): # 每次迭代返回新的迭代器 return CountDownIterator(self.start) class CountDownIterator: def __init__(self, start): self.current = start def __iter__(self): return self def __next__(self): if self.current <= 0: raise StopIteration self.current -= 1 return self.current + 1 countdown = ReusableCountDown(3) print(list(countdown)) # [3, 2, 1] print(list(countdown)) # [3, 2, 1] - 可以再次使用 ``` ## 生成器 生成器是创建迭代器的简洁方式,使用 `yield` 关键字。 ### 基本生成器 ```python def countdown(n): """生成器函数""" while n > 0: yield n # 暂停并返回值 n -= 1 # 生成器函数返回生成器对象 gen = countdown(5) print(type(gen)) # print(next(gen)) # 5 print(next(gen)) # 4 # 在循环中使用 for num in countdown(3): print(num, end=' ') # 3 2 1 ``` ### 生成器 vs 列表的内存对比 ```python import sys # 列表:立即生成所有元素,占用大量内存 list_comp = [x ** 2 for x in range(1000000)] print(f"List size: {sys.getsizeof(list_comp):,} bytes") # 约 8 MB # 生成器:惰性生成,几乎不占内存 gen_exp = (x ** 2 for x in range(1000000)) print(f"Generator size: {sys.getsizeof(gen_exp)} bytes") # 约 200 bytes # 实际应用:处理大文件 def read_large_file(file_path): """惰性读取大文件""" with open(file_path) as f: for line in f: # 逐行读取,不加载整个文件 yield line.strip() # 处理每一行而不需要全部加载到内存 # for line in read_large_file("huge_file.txt"): # process(line) ``` ### yield from ```python def chain(*iterables): """连接多个可迭代对象""" for it in iterables: yield from it # 相当于 for item in it: yield item result = list(chain([1, 2], [3, 4], [5, 6])) print(result) # [1, 2, 3, 4, 5, 6] # 递归生成器 def flatten(nested_list): """展平嵌套列表""" for item in nested_list: if isinstance(item, list): yield from flatten(item) else: yield item nested = [1, [2, 3, [4, 5]], 6, [7, 8]] print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8] ``` ### 生成器的 send 和 throw ```python def accumulator(): """可接收值的生成器(协程风格)""" total = 0 while True: value = yield total # 接收 send 的值 if value is None: break total += value acc = accumulator() print(next(acc)) # 0 - 启动生成器 print(acc.send(10)) # 10 print(acc.send(20)) # 30 print(acc.send(5)) # 35 acc.send(None) # 终止 # 生成器的 close 和异常处理 def managed_generator(): try: while True: yield "running" except GeneratorExit: print("Generator closed") finally: print("Cleanup") gen = managed_generator() print(next(gen)) # running gen.close() # Generator closed \n Cleanup ``` ## itertools 模块 Python 标准库提供了强大的迭代器工具。 ### 无限迭代器 ```python from itertools import count, cycle, repeat # count: 无限计数 for i in count(10, 2): # 从 10 开始,步长 2 if i > 20: break print(i, end=' ') # 10 12 14 16 18 20 # cycle: 无限循环 colors = cycle(['red', 'green', 'blue']) for _ in range(7): print(next(colors), end=' ') # red green blue red green blue red # repeat: 重复 list(repeat('A', 5)) # ['A', 'A', 'A', 'A', 'A'] ``` ### 组合迭代器 ```python from itertools import chain, zip_longest, product, permutations, combinations # chain: 连接迭代器 list(chain([1, 2], [3, 4])) # [1, 2, 3, 4] # zip_longest: 最长填充的 zip list(zip_longest([1, 2, 3], ['a', 'b'], fillvalue='-')) # [(1, 'a'), (2, 'b'), (3, '-')] # product: 笛卡尔积 list(product('AB', [1, 2])) # [('A', 1), ('A', 2), ('B', 1), ('B', 2)] # permutations: 排列 list(permutations('ABC', 2)) # [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')] # combinations: 组合 list(combinations('ABC', 2)) # [('A', 'B'), ('A', 'C'), ('B', 'C')] ``` ### 过滤和分组 ```python from itertools import filterfalse, takewhile, dropwhile, groupby # filterfalse: filter 的反面 list(filterfalse(lambda x: x % 2, range(10))) # [0, 2, 4, 6, 8] # takewhile: 条件为真时获取元素 list(takewhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [1, 3] # dropwhile: 条件为真时跳过元素 list(dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [5, 2, 1] # groupby: 分组(数据需要预先排序) data = [('a', 1), ('a', 2), ('b', 3), ('b', 4), ('a', 5)] sorted_data = sorted(data, key=lambda x: x[0]) for key, group in groupby(sorted_data, key=lambda x: x[0]): print(f"{key}: {list(group)}") # a: [('a', 1), ('a', 2), ('a', 5)] # b: [('b', 3), ('b', 4)] ``` ### 实用工具 ```python from itertools import islice, tee, accumulate # islice: 切片迭代器 gen = (x ** 2 for x in range(10)) list(islice(gen, 2, 5)) # [4, 9, 16] - 索引 2, 3, 4 # tee: 复制迭代器 original = iter([1, 2, 3, 4, 5]) copy1, copy2 = tee(original, 2) print(list(copy1)) # [1, 2, 3, 4, 5] print(list(copy2)) # [1, 2, 3, 4, 5] # accumulate: 累积 list(accumulate([1, 2, 3, 4])) # [1, 3, 6, 10] list(accumulate([1, 2, 3, 4], lambda a, b: a * b)) # [1, 2, 6, 24] ``` ## 实际应用 ### 分块处理 ```python from itertools import islice def chunked(iterable, size): """将可迭代对象分块""" it = iter(iterable) while True: chunk = list(islice(it, size)) if not chunk: break yield chunk # 分批处理大数据 data = range(10) for chunk in chunked(data, 3): print(chunk) # [0, 1, 2] # [3, 4, 5] # [6, 7, 8] # [9] # 实际应用:批量 API 调用 def batch_api_call(items, batch_size=100): for batch in chunked(items, batch_size): # 批量处理 yield process_batch(batch) ``` ### 滑动窗口 ```python from collections import deque from itertools import islice def sliding_window(iterable, n): """滑动窗口""" it = iter(iterable) window = deque(islice(it, n), maxlen=n) if len(window) == n: yield tuple(window) for item in it: window.append(item) yield tuple(window) # 移动平均 data = [1, 2, 3, 4, 5, 6, 7] for window in sliding_window(data, 3): avg = sum(window) / len(window) print(f"{window} -> avg: {avg:.2f}") # (1, 2, 3) -> avg: 2.00 # (2, 3, 4) -> avg: 3.00 # (3, 4, 5) -> avg: 4.00 # ... ``` ### 管道处理 ```python def pipeline(*steps): """函数管道""" def process(data): for step in steps: data = step(data) return data return process # 数据处理管道 def read_data(source): return (x for x in source) def filter_positive(data): return (x for x in data if x > 0) def square(data): return (x ** 2 for x in data) def take(n): def _take(data): return islice(data, n) return _take # 组合管道 process = pipeline( read_data, filter_positive, square, take(5) ) data = [-2, -1, 0, 1, 2, 3, 4, 5, 6] result = list(process(data)) print(result) # [1, 4, 9, 16, 25] ``` ## 最佳实践 ::::{grid} 1 :gutter: 2 :::{grid-item-card} 何时使用生成器 1. **处理大数据**:文件、数据库结果 2. **无限序列**:流数据、实时数据 3. **惰性计算**:只在需要时计算 4. **管道处理**:数据转换链 ::: :::{grid-item-card} 注意事项 1. **生成器只能遍历一次**:需要多次使用时转为列表或使用 `tee` 2. **不要在循环中重复调用 `list(generator)`**:这会多次创建生成器 3. **注意内存**:`tee` 会缓存数据,大数据时谨慎使用 4. **调试困难**:考虑在开发时先用列表,确认逻辑正确后改为生成器 ::: ::::