2.4. asyncio 异步编程
asyncio 是 Python 的异步 I/O 框架,提供了编写高性能并发代码的能力。
2.4.1. 基础概念
2.4.1.1. 协程(Coroutine)
import asyncio
# 使用 async def 定义协程
async def hello():
print("Hello")
await asyncio.sleep(1) # 非阻塞等待
print("World")
# 运行协程
asyncio.run(hello())
# ⚠️ 常见错误
async def greet():
return "Hello"
# result = greet() # 返回协程对象,不是结果!
# print(result) # <coroutine object greet at ...>
# 正确方式
result = asyncio.run(greet())
print(result) # Hello
2.4.1.2. 事件循环
import asyncio
async def task(name, delay):
print(f"Task {name} starting")
await asyncio.sleep(delay)
print(f"Task {name} completed")
return name
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
print(f"Running in loop: {loop}")
# 并发执行多个任务
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
print(f"Results: {results}")
asyncio.run(main())
2.4.2. 任务管理
2.4.2.1. 创建任务
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
# 创建任务(立即开始执行)
task1 = asyncio.create_task(worker("A", 2))
task2 = asyncio.create_task(worker("B", 1))
# 可以继续做其他事情
print("Tasks created, doing other work...")
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
2.4.2.2. gather vs wait
import asyncio
async def may_fail(n):
if n == 2:
raise ValueError("Error!")
await asyncio.sleep(n)
return n
async def main():
# gather:等待所有任务
try:
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True # 不抛出异常,而是返回异常对象
)
print(results) # [1, ValueError('Error!'), 3]
except ValueError:
print("One task failed")
# wait:更灵活的等待方式
tasks = [
asyncio.create_task(may_fail(1)),
asyncio.create_task(may_fail(3)),
]
# 等待第一个完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
print(f"Completed: {task.result()}")
# 取消未完成的任务
for task in pending:
task.cancel()
asyncio.run(main())
2.4.2.3. 超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
# 使用 wait_for
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out!")
# 使用 timeout(Python 3.11+)
try:
async with asyncio.timeout(2.0):
await slow_operation()
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
2.4.3. 同步原语
2.4.3.1. Lock
import asyncio
class SharedResource:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
temp = self.value
await asyncio.sleep(0.01) # 模拟一些 I/O
self.value = temp + 1
async def main():
resource = SharedResource()
# 并发执行 100 次增加
await asyncio.gather(*[resource.increment() for _ in range(100)])
print(f"Final value: {resource.value}") # 100
asyncio.run(main())
2.4.3.2. Semaphore
import asyncio
async def fetch_with_limit(sem, url):
async with sem:
print(f"Fetching {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"Data from {url}"
async def main():
# 限制同时进行的请求数
sem = asyncio.Semaphore(3)
urls = [f"https://api.example.com/{i}" for i in range(10)]
tasks = [fetch_with_limit(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
2.4.3.3. Event
import asyncio
async def waiter(event, name):
print(f"{name} waiting for event")
await event.wait()
print(f"{name} got event!")
async def setter(event):
await asyncio.sleep(2)
print("Setting event")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "A"),
waiter(event, "B"),
setter(event)
)
asyncio.run(main())
2.4.3.4. Queue
import asyncio
async def producer(queue):
for i in range(10):
await queue.put(i)
print(f"Produced: {i}")
await asyncio.sleep(0.1)
# 发送停止信号
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2")
)
asyncio.run(main())
2.4.4. 异步迭代器和生成器
2.4.4.1. 异步迭代器
import asyncio
class AsyncRange:
def __init__(self, start, stop):
self.start = start
self.stop = stop
def __aiter__(self):
self.current = self.start
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
value = self.current
self.current += 1
return value
async def main():
async for i in AsyncRange(0, 5):
print(i)
asyncio.run(main())
2.4.4.2. 异步生成器
import asyncio
async def async_range(start, stop):
"""异步生成器"""
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for i in async_range(0, 5):
print(i)
# 异步列表推导
result = [i async for i in async_range(0, 5)]
print(result)
asyncio.run(main())
2.4.5. 异步上下文管理器
import asyncio
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(0.5)
print("Resource acquired")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(0.5)
print("Resource released")
return False
async def do_work(self):
print("Working...")
await asyncio.sleep(0.5)
async def main():
async with AsyncResource() as resource:
await resource.do_work()
asyncio.run(main())
# 使用 contextlib
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_open(filename):
print(f"Opening {filename}")
await asyncio.sleep(0.1)
try:
yield f"handle to {filename}"
finally:
print(f"Closing {filename}")
await asyncio.sleep(0.1)
async def main():
async with async_open("test.txt") as f:
print(f"Got: {f}")
asyncio.run(main())
2.4.6. 与阻塞代码集成
2.4.6.1. 运行阻塞代码
import asyncio
import time
def blocking_io():
"""阻塞 I/O 操作"""
time.sleep(1)
return "done"
def cpu_intensive(n):
"""CPU 密集型操作"""
total = 0
for i in range(n):
total += i * i
return total
async def main():
loop = asyncio.get_running_loop()
# 在线程池中运行阻塞 I/O
result = await loop.run_in_executor(None, blocking_io)
print(f"IO result: {result}")
# 在进程池中运行 CPU 密集型任务
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive, 1000000)
print(f"CPU result: {result}")
asyncio.run(main())
2.4.6.2. 从同步代码调用异步
import asyncio
async def async_function():
await asyncio.sleep(1)
return "done"
# 方式1:asyncio.run(推荐,创建新事件循环)
def sync_wrapper1():
result = asyncio.run(async_function())
return result
# 方式2:在已有事件循环中(用于框架集成)
def sync_wrapper2():
loop = asyncio.get_event_loop()
if loop.is_running():
# 在运行中的循环内,需要使用不同方法
import concurrent.futures
future = asyncio.run_coroutine_threadsafe(async_function(), loop)
return future.result()
else:
return loop.run_until_complete(async_function())
2.4.7. 最佳实践
设计原则
不要混用 sync 和 async:尽量保持代码一致
使用
async with和async for:正确处理资源限制并发数:使用 Semaphore 避免资源耗尽
正确处理取消:设计可取消的协程
常见错误
# ❌ 忘记 await
async def main():
asyncio.sleep(1) # 协程没有执行!
# ✅ 正确使用 await
async def main():
await asyncio.sleep(1)
# ❌ 在异步代码中使用 time.sleep
async def main():
time.sleep(1) # 阻塞整个事件循环!
# ✅ 使用 asyncio.sleep
async def main():
await asyncio.sleep(1)
# ❌ 创建任务但不等待
async def main():
asyncio.create_task(some_task()) # 任务可能被垃圾回收
# ✅ 保持任务引用或等待
async def main():
task = asyncio.create_task(some_task())
await task
性能建议
批量操作:使用
gather而非顺序await使用连接池:复用数据库/HTTP 连接
避免过多小任务:任务切换有开销
监控任务:记录长时间运行的任务