# asyncio 异步编程 asyncio 是 Python 的异步 I/O 框架,提供了编写高性能并发代码的能力。 ## 基础概念 ### 协程(Coroutine) ```python import asyncio # 使用 async def 定义协程 async def hello(): print("Hello") await asyncio.sleep(1) # 非阻塞等待 print("World") # 运行协程 asyncio.run(hello()) # ⚠️ 常见错误 async def greet(): return "Hello" # result = greet() # 返回协程对象,不是结果! # print(result) # # 正确方式 result = asyncio.run(greet()) print(result) # Hello ``` ### 事件循环 ```python import asyncio async def task(name, delay): print(f"Task {name} starting") await asyncio.sleep(delay) print(f"Task {name} completed") return name async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() print(f"Running in loop: {loop}") # 并发执行多个任务 results = await asyncio.gather( task("A", 2), task("B", 1), task("C", 3) ) print(f"Results: {results}") asyncio.run(main()) ``` ## 任务管理 ### 创建任务 ```python import asyncio async def worker(name, delay): await asyncio.sleep(delay) return f"{name} done" async def main(): # 创建任务(立即开始执行) task1 = asyncio.create_task(worker("A", 2)) task2 = asyncio.create_task(worker("B", 1)) # 可以继续做其他事情 print("Tasks created, doing other work...") # 等待任务完成 result1 = await task1 result2 = await task2 print(f"Results: {result1}, {result2}") asyncio.run(main()) ``` ### gather vs wait ```python import asyncio async def may_fail(n): if n == 2: raise ValueError("Error!") await asyncio.sleep(n) return n async def main(): # gather:等待所有任务 try: results = await asyncio.gather( may_fail(1), may_fail(2), may_fail(3), return_exceptions=True # 不抛出异常,而是返回异常对象 ) print(results) # [1, ValueError('Error!'), 3] except ValueError: print("One task failed") # wait:更灵活的等待方式 tasks = [ asyncio.create_task(may_fail(1)), asyncio.create_task(may_fail(3)), ] # 等待第一个完成 done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) for task in done: print(f"Completed: {task.result()}") # 取消未完成的任务 for task in pending: task.cancel() asyncio.run(main()) ``` ### 超时控制 ```python import asyncio async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): # 使用 wait_for try: result = await asyncio.wait_for(slow_operation(), timeout=2.0) except asyncio.TimeoutError: print("Operation timed out!") # 使用 timeout(Python 3.11+) try: async with asyncio.timeout(2.0): await slow_operation() except asyncio.TimeoutError: print("Operation timed out!") asyncio.run(main()) ``` ## 同步原语 ### Lock ```python import asyncio class SharedResource: def __init__(self): self.value = 0 self.lock = asyncio.Lock() async def increment(self): async with self.lock: temp = self.value await asyncio.sleep(0.01) # 模拟一些 I/O self.value = temp + 1 async def main(): resource = SharedResource() # 并发执行 100 次增加 await asyncio.gather(*[resource.increment() for _ in range(100)]) print(f"Final value: {resource.value}") # 100 asyncio.run(main()) ``` ### Semaphore ```python import asyncio async def fetch_with_limit(sem, url): async with sem: print(f"Fetching {url}") await asyncio.sleep(1) # 模拟网络请求 return f"Data from {url}" async def main(): # 限制同时进行的请求数 sem = asyncio.Semaphore(3) urls = [f"https://api.example.com/{i}" for i in range(10)] tasks = [fetch_with_limit(sem, url) for url in urls] results = await asyncio.gather(*tasks) print(f"Fetched {len(results)} pages") asyncio.run(main()) ``` ### Event ```python import asyncio async def waiter(event, name): print(f"{name} waiting for event") await event.wait() print(f"{name} got event!") async def setter(event): await asyncio.sleep(2) print("Setting event") event.set() async def main(): event = asyncio.Event() await asyncio.gather( waiter(event, "A"), waiter(event, "B"), setter(event) ) asyncio.run(main()) ``` ### Queue ```python import asyncio async def producer(queue): for i in range(10): await queue.put(i) print(f"Produced: {i}") await asyncio.sleep(0.1) # 发送停止信号 await queue.put(None) async def consumer(queue, name): while True: item = await queue.get() if item is None: queue.task_done() break print(f"{name} consumed: {item}") await asyncio.sleep(0.2) queue.task_done() async def main(): queue = asyncio.Queue(maxsize=5) await asyncio.gather( producer(queue), consumer(queue, "Consumer-1"), consumer(queue, "Consumer-2") ) asyncio.run(main()) ``` ## 异步迭代器和生成器 ### 异步迭代器 ```python import asyncio class AsyncRange: def __init__(self, start, stop): self.start = start self.stop = stop def __aiter__(self): self.current = self.start return self async def __anext__(self): if self.current >= self.stop: raise StopAsyncIteration await asyncio.sleep(0.1) # 模拟异步操作 value = self.current self.current += 1 return value async def main(): async for i in AsyncRange(0, 5): print(i) asyncio.run(main()) ``` ### 异步生成器 ```python import asyncio async def async_range(start, stop): """异步生成器""" for i in range(start, stop): await asyncio.sleep(0.1) yield i async def main(): async for i in async_range(0, 5): print(i) # 异步列表推导 result = [i async for i in async_range(0, 5)] print(result) asyncio.run(main()) ``` ## 异步上下文管理器 ```python import asyncio class AsyncResource: async def __aenter__(self): print("Acquiring resource...") await asyncio.sleep(0.5) print("Resource acquired") return self async def __aexit__(self, exc_type, exc_val, exc_tb): print("Releasing resource...") await asyncio.sleep(0.5) print("Resource released") return False async def do_work(self): print("Working...") await asyncio.sleep(0.5) async def main(): async with AsyncResource() as resource: await resource.do_work() asyncio.run(main()) # 使用 contextlib from contextlib import asynccontextmanager @asynccontextmanager async def async_open(filename): print(f"Opening {filename}") await asyncio.sleep(0.1) try: yield f"handle to {filename}" finally: print(f"Closing {filename}") await asyncio.sleep(0.1) async def main(): async with async_open("test.txt") as f: print(f"Got: {f}") asyncio.run(main()) ``` ## 与阻塞代码集成 ### 运行阻塞代码 ```python import asyncio import time def blocking_io(): """阻塞 I/O 操作""" time.sleep(1) return "done" def cpu_intensive(n): """CPU 密集型操作""" total = 0 for i in range(n): total += i * i return total async def main(): loop = asyncio.get_running_loop() # 在线程池中运行阻塞 I/O result = await loop.run_in_executor(None, blocking_io) print(f"IO result: {result}") # 在进程池中运行 CPU 密集型任务 from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, cpu_intensive, 1000000) print(f"CPU result: {result}") asyncio.run(main()) ``` ### 从同步代码调用异步 ```python import asyncio async def async_function(): await asyncio.sleep(1) return "done" # 方式1:asyncio.run(推荐,创建新事件循环) def sync_wrapper1(): result = asyncio.run(async_function()) return result # 方式2:在已有事件循环中(用于框架集成) def sync_wrapper2(): loop = asyncio.get_event_loop() if loop.is_running(): # 在运行中的循环内,需要使用不同方法 import concurrent.futures future = asyncio.run_coroutine_threadsafe(async_function(), loop) return future.result() else: return loop.run_until_complete(async_function()) ``` ## 最佳实践 ::::{grid} 1 :gutter: 2 :::{grid-item-card} 设计原则 1. **不要混用 sync 和 async**:尽量保持代码一致 2. **使用 `async with` 和 `async for`**:正确处理资源 3. **限制并发数**:使用 Semaphore 避免资源耗尽 4. **正确处理取消**:设计可取消的协程 ::: :::{grid-item-card} 常见错误 ```python # ❌ 忘记 await async def main(): asyncio.sleep(1) # 协程没有执行! # ✅ 正确使用 await async def main(): await asyncio.sleep(1) # ❌ 在异步代码中使用 time.sleep async def main(): time.sleep(1) # 阻塞整个事件循环! # ✅ 使用 asyncio.sleep async def main(): await asyncio.sleep(1) # ❌ 创建任务但不等待 async def main(): asyncio.create_task(some_task()) # 任务可能被垃圾回收 # ✅ 保持任务引用或等待 async def main(): task = asyncio.create_task(some_task()) await task ``` ::: :::{grid-item-card} 性能建议 1. **批量操作**:使用 `gather` 而非顺序 `await` 2. **使用连接池**:复用数据库/HTTP 连接 3. **避免过多小任务**:任务切换有开销 4. **监控任务**:记录长时间运行的任务 ::: ::::