1.4. 迭代器与生成器

迭代器和生成器是 Python 中处理数据流的核心机制,掌握它们对于写出高效、内存友好的代码至关重要。

1.4.1. 迭代协议

1.4.1.1. 可迭代对象 vs 迭代器

# 可迭代对象(Iterable):实现 __iter__ 方法
# 迭代器(Iterator):实现 __iter__ 和 __next__ 方法

my_list = [1, 2, 3]  # 可迭代对象
iterator = iter(my_list)  # 获取迭代器

print(next(iterator))  # 1
print(next(iterator))  # 2
print(next(iterator))  # 3
# print(next(iterator))  # StopIteration

# 迭代器是一次性的
iterator = iter(my_list)
print(list(iterator))  # [1, 2, 3]
print(list(iterator))  # [] - 已耗尽

1.4.1.2. 自定义迭代器

class CountDown:
    """倒计时迭代器"""
    
    def __init__(self, start):
        self.start = start
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.start <= 0:
            raise StopIteration
        self.start -= 1
        return self.start + 1

for num in CountDown(5):
    print(num, end=' ')  # 5 4 3 2 1

# ⚠️ 问题:这个迭代器只能使用一次
countdown = CountDown(3)
print(list(countdown))  # [3, 2, 1]
print(list(countdown))  # [] - 已耗尽

1.4.1.3. 可重复使用的迭代器

class ReusableCountDown:
    """可重复使用的倒计时(分离可迭代对象和迭代器)"""
    
    def __init__(self, start):
        self.start = start
    
    def __iter__(self):
        # 每次迭代返回新的迭代器
        return CountDownIterator(self.start)

class CountDownIterator:
    def __init__(self, start):
        self.current = start
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current <= 0:
            raise StopIteration
        self.current -= 1
        return self.current + 1

countdown = ReusableCountDown(3)
print(list(countdown))  # [3, 2, 1]
print(list(countdown))  # [3, 2, 1] - 可以再次使用

1.4.2. 生成器

生成器是创建迭代器的简洁方式,使用 yield 关键字。

1.4.2.1. 基本生成器

def countdown(n):
    """生成器函数"""
    while n > 0:
        yield n  # 暂停并返回值
        n -= 1

# 生成器函数返回生成器对象
gen = countdown(5)
print(type(gen))  # <class 'generator'>

print(next(gen))  # 5
print(next(gen))  # 4

# 在循环中使用
for num in countdown(3):
    print(num, end=' ')  # 3 2 1

1.4.2.2. 生成器 vs 列表的内存对比

import sys

# 列表:立即生成所有元素,占用大量内存
list_comp = [x ** 2 for x in range(1000000)]
print(f"List size: {sys.getsizeof(list_comp):,} bytes")  # 约 8 MB

# 生成器:惰性生成,几乎不占内存
gen_exp = (x ** 2 for x in range(1000000))
print(f"Generator size: {sys.getsizeof(gen_exp)} bytes")  # 约 200 bytes

# 实际应用:处理大文件
def read_large_file(file_path):
    """惰性读取大文件"""
    with open(file_path) as f:
        for line in f:  # 逐行读取,不加载整个文件
            yield line.strip()

# 处理每一行而不需要全部加载到内存
# for line in read_large_file("huge_file.txt"):
#     process(line)

1.4.2.3. yield from

def chain(*iterables):
    """连接多个可迭代对象"""
    for it in iterables:
        yield from it  # 相当于 for item in it: yield item

result = list(chain([1, 2], [3, 4], [5, 6]))
print(result)  # [1, 2, 3, 4, 5, 6]

# 递归生成器
def flatten(nested_list):
    """展平嵌套列表"""
    for item in nested_list:
        if isinstance(item, list):
            yield from flatten(item)
        else:
            yield item

nested = [1, [2, 3, [4, 5]], 6, [7, 8]]
print(list(flatten(nested)))  # [1, 2, 3, 4, 5, 6, 7, 8]

1.4.2.4. 生成器的 send 和 throw

def accumulator():
    """可接收值的生成器(协程风格)"""
    total = 0
    while True:
        value = yield total  # 接收 send 的值
        if value is None:
            break
        total += value

acc = accumulator()
print(next(acc))       # 0 - 启动生成器
print(acc.send(10))    # 10
print(acc.send(20))    # 30
print(acc.send(5))     # 35
acc.send(None)         # 终止

# 生成器的 close 和异常处理
def managed_generator():
    try:
        while True:
            yield "running"
    except GeneratorExit:
        print("Generator closed")
    finally:
        print("Cleanup")

gen = managed_generator()
print(next(gen))  # running
gen.close()       # Generator closed \n Cleanup

1.4.3. itertools 模块

Python 标准库提供了强大的迭代器工具。

1.4.3.1. 无限迭代器

from itertools import count, cycle, repeat

# count: 无限计数
for i in count(10, 2):  # 从 10 开始,步长 2
    if i > 20:
        break
    print(i, end=' ')  # 10 12 14 16 18 20

# cycle: 无限循环
colors = cycle(['red', 'green', 'blue'])
for _ in range(7):
    print(next(colors), end=' ')  # red green blue red green blue red

# repeat: 重复
list(repeat('A', 5))  # ['A', 'A', 'A', 'A', 'A']

1.4.3.2. 组合迭代器

from itertools import chain, zip_longest, product, permutations, combinations

# chain: 连接迭代器
list(chain([1, 2], [3, 4]))  # [1, 2, 3, 4]

# zip_longest: 最长填充的 zip
list(zip_longest([1, 2, 3], ['a', 'b'], fillvalue='-'))
# [(1, 'a'), (2, 'b'), (3, '-')]

# product: 笛卡尔积
list(product('AB', [1, 2]))  # [('A', 1), ('A', 2), ('B', 1), ('B', 2)]

# permutations: 排列
list(permutations('ABC', 2))  # [('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]

# combinations: 组合
list(combinations('ABC', 2))  # [('A', 'B'), ('A', 'C'), ('B', 'C')]

1.4.3.3. 过滤和分组

from itertools import filterfalse, takewhile, dropwhile, groupby

# filterfalse: filter 的反面
list(filterfalse(lambda x: x % 2, range(10)))  # [0, 2, 4, 6, 8]

# takewhile: 条件为真时获取元素
list(takewhile(lambda x: x < 5, [1, 3, 5, 2, 1]))  # [1, 3]

# dropwhile: 条件为真时跳过元素
list(dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1]))  # [5, 2, 1]

# groupby: 分组(数据需要预先排序)
data = [('a', 1), ('a', 2), ('b', 3), ('b', 4), ('a', 5)]
sorted_data = sorted(data, key=lambda x: x[0])

for key, group in groupby(sorted_data, key=lambda x: x[0]):
    print(f"{key}: {list(group)}")
# a: [('a', 1), ('a', 2), ('a', 5)]
# b: [('b', 3), ('b', 4)]

1.4.3.4. 实用工具

from itertools import islice, tee, accumulate

# islice: 切片迭代器
gen = (x ** 2 for x in range(10))
list(islice(gen, 2, 5))  # [4, 9, 16] - 索引 2, 3, 4

# tee: 复制迭代器
original = iter([1, 2, 3, 4, 5])
copy1, copy2 = tee(original, 2)
print(list(copy1))  # [1, 2, 3, 4, 5]
print(list(copy2))  # [1, 2, 3, 4, 5]

# accumulate: 累积
list(accumulate([1, 2, 3, 4]))  # [1, 3, 6, 10]
list(accumulate([1, 2, 3, 4], lambda a, b: a * b))  # [1, 2, 6, 24]

1.4.4. 实际应用

1.4.4.1. 分块处理

from itertools import islice

def chunked(iterable, size):
    """将可迭代对象分块"""
    it = iter(iterable)
    while True:
        chunk = list(islice(it, size))
        if not chunk:
            break
        yield chunk

# 分批处理大数据
data = range(10)
for chunk in chunked(data, 3):
    print(chunk)
# [0, 1, 2]
# [3, 4, 5]
# [6, 7, 8]
# [9]

# 实际应用:批量 API 调用
def batch_api_call(items, batch_size=100):
    for batch in chunked(items, batch_size):
        # 批量处理
        yield process_batch(batch)

1.4.4.2. 滑动窗口

from collections import deque
from itertools import islice

def sliding_window(iterable, n):
    """滑动窗口"""
    it = iter(iterable)
    window = deque(islice(it, n), maxlen=n)
    if len(window) == n:
        yield tuple(window)
    for item in it:
        window.append(item)
        yield tuple(window)

# 移动平均
data = [1, 2, 3, 4, 5, 6, 7]
for window in sliding_window(data, 3):
    avg = sum(window) / len(window)
    print(f"{window} -> avg: {avg:.2f}")
# (1, 2, 3) -> avg: 2.00
# (2, 3, 4) -> avg: 3.00
# (3, 4, 5) -> avg: 4.00
# ...

1.4.4.3. 管道处理

def pipeline(*steps):
    """函数管道"""
    def process(data):
        for step in steps:
            data = step(data)
        return data
    return process

# 数据处理管道
def read_data(source):
    return (x for x in source)

def filter_positive(data):
    return (x for x in data if x > 0)

def square(data):
    return (x ** 2 for x in data)

def take(n):
    def _take(data):
        return islice(data, n)
    return _take

# 组合管道
process = pipeline(
    read_data,
    filter_positive,
    square,
    take(5)
)

data = [-2, -1, 0, 1, 2, 3, 4, 5, 6]
result = list(process(data))
print(result)  # [1, 4, 9, 16, 25]

1.4.5. 最佳实践

何时使用生成器
  1. 处理大数据:文件、数据库结果

  2. 无限序列:流数据、实时数据

  3. 惰性计算:只在需要时计算

  4. 管道处理:数据转换链

注意事项
  1. 生成器只能遍历一次:需要多次使用时转为列表或使用 tee

  2. 不要在循环中重复调用 list(generator):这会多次创建生成器

  3. 注意内存tee 会缓存数据,大数据时谨慎使用

  4. 调试困难:考虑在开发时先用列表,确认逻辑正确后改为生成器