2.4. asyncio 异步编程

asyncio 是 Python 的异步 I/O 框架,提供了编写高性能并发代码的能力。

2.4.1. 基础概念

2.4.1.1. 协程(Coroutine)

import asyncio

# 使用 async def 定义协程
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 非阻塞等待
    print("World")

# 运行协程
asyncio.run(hello())

# ⚠️ 常见错误
async def greet():
    return "Hello"

# result = greet()  # 返回协程对象,不是结果!
# print(result)  # <coroutine object greet at ...>

# 正确方式
result = asyncio.run(greet())
print(result)  # Hello

2.4.1.2. 事件循环

import asyncio

async def task(name, delay):
    print(f"Task {name} starting")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return name

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    print(f"Running in loop: {loop}")
    
    # 并发执行多个任务
    results = await asyncio.gather(
        task("A", 2),
        task("B", 1),
        task("C", 3)
    )
    print(f"Results: {results}")

asyncio.run(main())

2.4.2. 任务管理

2.4.2.1. 创建任务

import asyncio

async def worker(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    # 创建任务(立即开始执行)
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 1))
    
    # 可以继续做其他事情
    print("Tasks created, doing other work...")
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"Results: {result1}, {result2}")

asyncio.run(main())

2.4.2.2. gather vs wait

import asyncio

async def may_fail(n):
    if n == 2:
        raise ValueError("Error!")
    await asyncio.sleep(n)
    return n

async def main():
    # gather:等待所有任务
    try:
        results = await asyncio.gather(
            may_fail(1),
            may_fail(2),
            may_fail(3),
            return_exceptions=True  # 不抛出异常,而是返回异常对象
        )
        print(results)  # [1, ValueError('Error!'), 3]
    except ValueError:
        print("One task failed")
    
    # wait:更灵活的等待方式
    tasks = [
        asyncio.create_task(may_fail(1)),
        asyncio.create_task(may_fail(3)),
    ]
    
    # 等待第一个完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    for task in done:
        print(f"Completed: {task.result()}")
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()

asyncio.run(main())

2.4.2.3. 超时控制

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    # 使用 wait_for
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Operation timed out!")
    
    # 使用 timeout(Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            await slow_operation()
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

2.4.3. 同步原语

2.4.3.1. Lock

import asyncio

class SharedResource:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:
            temp = self.value
            await asyncio.sleep(0.01)  # 模拟一些 I/O
            self.value = temp + 1

async def main():
    resource = SharedResource()
    
    # 并发执行 100 次增加
    await asyncio.gather(*[resource.increment() for _ in range(100)])
    
    print(f"Final value: {resource.value}")  # 100

asyncio.run(main())

2.4.3.2. Semaphore

import asyncio

async def fetch_with_limit(sem, url):
    async with sem:
        print(f"Fetching {url}")
        await asyncio.sleep(1)  # 模拟网络请求
        return f"Data from {url}"

async def main():
    # 限制同时进行的请求数
    sem = asyncio.Semaphore(3)
    
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    
    tasks = [fetch_with_limit(sem, url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    print(f"Fetched {len(results)} pages")

asyncio.run(main())

2.4.3.3. Event

import asyncio

async def waiter(event, name):
    print(f"{name} waiting for event")
    await event.wait()
    print(f"{name} got event!")

async def setter(event):
    await asyncio.sleep(2)
    print("Setting event")
    event.set()

async def main():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "A"),
        waiter(event, "B"),
        setter(event)
    )

asyncio.run(main())

2.4.3.4. Queue

import asyncio

async def producer(queue):
    for i in range(10):
        await queue.put(i)
        print(f"Produced: {i}")
        await asyncio.sleep(0.1)
    
    # 发送停止信号
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        
        print(f"{name} consumed: {item}")
        await asyncio.sleep(0.2)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)
    
    await asyncio.gather(
        producer(queue),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2")
    )

asyncio.run(main())

2.4.4. 异步迭代器和生成器

2.4.4.1. 异步迭代器

import asyncio

class AsyncRange:
    def __init__(self, start, stop):
        self.start = start
        self.stop = stop
    
    def __aiter__(self):
        self.current = self.start
        return self
    
    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # 模拟异步操作
        value = self.current
        self.current += 1
        return value

async def main():
    async for i in AsyncRange(0, 5):
        print(i)

asyncio.run(main())

2.4.4.2. 异步生成器

import asyncio

async def async_range(start, stop):
    """异步生成器"""
    for i in range(start, stop):
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for i in async_range(0, 5):
        print(i)
    
    # 异步列表推导
    result = [i async for i in async_range(0, 5)]
    print(result)

asyncio.run(main())

2.4.5. 异步上下文管理器

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource...")
        await asyncio.sleep(0.5)
        print("Resource acquired")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource...")
        await asyncio.sleep(0.5)
        print("Resource released")
        return False
    
    async def do_work(self):
        print("Working...")
        await asyncio.sleep(0.5)

async def main():
    async with AsyncResource() as resource:
        await resource.do_work()

asyncio.run(main())

# 使用 contextlib
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_open(filename):
    print(f"Opening {filename}")
    await asyncio.sleep(0.1)
    try:
        yield f"handle to {filename}"
    finally:
        print(f"Closing {filename}")
        await asyncio.sleep(0.1)

async def main():
    async with async_open("test.txt") as f:
        print(f"Got: {f}")

asyncio.run(main())

2.4.6. 与阻塞代码集成

2.4.6.1. 运行阻塞代码

import asyncio
import time

def blocking_io():
    """阻塞 I/O 操作"""
    time.sleep(1)
    return "done"

def cpu_intensive(n):
    """CPU 密集型操作"""
    total = 0
    for i in range(n):
        total += i * i
    return total

async def main():
    loop = asyncio.get_running_loop()
    
    # 在线程池中运行阻塞 I/O
    result = await loop.run_in_executor(None, blocking_io)
    print(f"IO result: {result}")
    
    # 在进程池中运行 CPU 密集型任务
    from concurrent.futures import ProcessPoolExecutor
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive, 1000000)
        print(f"CPU result: {result}")

asyncio.run(main())

2.4.6.2. 从同步代码调用异步

import asyncio

async def async_function():
    await asyncio.sleep(1)
    return "done"

# 方式1:asyncio.run(推荐,创建新事件循环)
def sync_wrapper1():
    result = asyncio.run(async_function())
    return result

# 方式2:在已有事件循环中(用于框架集成)
def sync_wrapper2():
    loop = asyncio.get_event_loop()
    if loop.is_running():
        # 在运行中的循环内,需要使用不同方法
        import concurrent.futures
        future = asyncio.run_coroutine_threadsafe(async_function(), loop)
        return future.result()
    else:
        return loop.run_until_complete(async_function())

2.4.7. 最佳实践

设计原则
  1. 不要混用 sync 和 async:尽量保持代码一致

  2. 使用 async withasync for:正确处理资源

  3. 限制并发数:使用 Semaphore 避免资源耗尽

  4. 正确处理取消:设计可取消的协程

常见错误
# ❌ 忘记 await
async def main():
    asyncio.sleep(1)  # 协程没有执行!

# ✅ 正确使用 await
async def main():
    await asyncio.sleep(1)

# ❌ 在异步代码中使用 time.sleep
async def main():
    time.sleep(1)  # 阻塞整个事件循环!

# ✅ 使用 asyncio.sleep
async def main():
    await asyncio.sleep(1)

# ❌ 创建任务但不等待
async def main():
    asyncio.create_task(some_task())  # 任务可能被垃圾回收

# ✅ 保持任务引用或等待
async def main():
    task = asyncio.create_task(some_task())
    await task
性能建议
  1. 批量操作:使用 gather 而非顺序 await

  2. 使用连接池:复用数据库/HTTP 连接

  3. 避免过多小任务:任务切换有开销

  4. 监控任务:记录长时间运行的任务