3.1. FastAPI 实战
FastAPI 是现代、高性能的 Python Web 框架,基于标准 Python 类型提示构建 API。
3.1.1. 快速入门
3.1.1.1. 基础应用
from fastapi import FastAPI
app = FastAPI(
title="My API",
description="A sample API",
version="1.0.0"
)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str = None):
return {"item_id": item_id, "q": q}
# 运行: uvicorn main:app --reload
# 文档: http://localhost:8000/docs
3.1.1.2. 请求体与 Pydantic
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime
app = FastAPI()
class UserCreate(BaseModel):
"""用户创建模型"""
name: str = Field(..., min_length=1, max_length=50)
email: EmailStr
age: int = Field(..., ge=0, le=150)
bio: Optional[str] = None
class Config:
schema_extra = {
"example": {
"name": "Alice",
"email": "alice@example.com",
"age": 25,
"bio": "A developer"
}
}
class UserResponse(BaseModel):
id: int
name: str
email: EmailStr
created_at: datetime
class Config:
orm_mode = True # 支持 ORM 对象
@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
# user 已经过 Pydantic 验证
return {
"id": 1,
"name": user.name,
"email": user.email,
"created_at": datetime.now()
}
3.1.2. 依赖注入
FastAPI 的依赖注入系统是其核心特性之一。
3.1.2.1. 基础依赖
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
app = FastAPI()
security = HTTPBearer()
# 数据库连接依赖
async def get_db():
db = DatabaseSession()
try:
yield db
finally:
db.close()
# 认证依赖
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
token = credentials.credentials
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return user
# 权限检查依赖
def require_permission(permission: str):
async def check_permission(
user = Depends(get_current_user)
):
if permission not in user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Permission denied"
)
return user
return check_permission
# 使用依赖
@app.get("/admin/users")
async def list_users(
db = Depends(get_db),
user = Depends(require_permission("admin"))
):
return db.query(User).all()
3.1.2.2. 类作为依赖
from fastapi import Depends, Query
class Pagination:
def __init__(
self,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
self.page = page
self.page_size = page_size
self.offset = (page - 1) * page_size
@app.get("/items/")
async def list_items(
pagination: Pagination = Depends()
):
items = get_items(
offset=pagination.offset,
limit=pagination.page_size
)
return {
"page": pagination.page,
"page_size": pagination.page_size,
"items": items
}
3.1.3. 路由组织
3.1.3.1. 使用 APIRouter
# routers/users.py
from fastapi import APIRouter, Depends
router = APIRouter(
prefix="/users",
tags=["users"],
dependencies=[Depends(get_current_user)],
responses={404: {"description": "Not found"}}
)
@router.get("/")
async def list_users():
return []
@router.get("/{user_id}")
async def get_user(user_id: int):
return {"id": user_id}
@router.post("/")
async def create_user(user: UserCreate):
return user
# main.py
from fastapi import FastAPI
from routers import users, items
app = FastAPI()
app.include_router(users.router)
app.include_router(items.router, prefix="/api/v1")
3.1.4. 中间件
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
import logging
app = FastAPI()
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 自定义中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
# 请求日志中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):
logging.info(f"{request.method} {request.url}")
response = await call_next(request)
logging.info(f"Response: {response.status_code}")
return response
3.1.5. 异常处理
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError
app = FastAPI()
# 自定义异常
class ItemNotFoundError(Exception):
def __init__(self, item_id: int):
self.item_id = item_id
# 异常处理器
@app.exception_handler(ItemNotFoundError)
async def item_not_found_handler(request: Request, exc: ItemNotFoundError):
return JSONResponse(
status_code=404,
content={"message": f"Item {exc.item_id} not found"}
)
# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"message": "Internal server error"}
)
# 使用
@app.get("/items/{item_id}")
async def get_item(item_id: int):
item = find_item(item_id)
if not item:
raise ItemNotFoundError(item_id)
return item
3.1.6. 后台任务
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def send_email(email: str, message: str):
"""后台发送邮件"""
import time
time.sleep(3) # 模拟发送
print(f"Email sent to {email}: {message}")
def write_log(message: str):
"""后台写日志"""
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
# 添加后台任务
background_tasks.add_task(send_email, email, "Notification!")
background_tasks.add_task(write_log, f"Notification sent to {email}")
# 立即返回响应
return {"message": "Notification scheduled"}
3.1.7. 数据库集成
3.1.7.1. SQLAlchemy 异步
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String
from fastapi import Depends
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50))
email = Column(String(100), unique=True)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
from sqlalchemy import select
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return user
3.1.8. 测试
# test_main.py
from fastapi.testclient import TestClient
from httpx import AsyncClient
import pytest
from main import app
# 同步测试
client = TestClient(app)
def test_read_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_user():
response = client.post(
"/users/",
json={"name": "Alice", "email": "alice@example.com", "age": 25}
)
assert response.status_code == 201
# 异步测试
@pytest.mark.asyncio
async def test_async_endpoint():
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.get("/")
assert response.status_code == 200
# 带依赖覆盖的测试
def test_with_dependency_override():
def override_get_db():
return MockDB()
app.dependency_overrides[get_db] = override_get_db
response = client.get("/users/1")
assert response.status_code == 200
app.dependency_overrides.clear()
3.1.9. 最佳实践
Pydantic 模型设计
# ✅ 分离输入和输出模型
class UserCreate(BaseModel):
name: str
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
name: str
email: EmailStr
# 不包含 password
# ✅ 使用 Field 添加验证和文档
class Item(BaseModel):
name: str = Field(..., min_length=1, description="Item name")
price: float = Field(..., gt=0, description="Price in USD")
性能优化
使用连接池:SQLAlchemy 自动管理
异步数据库驱动:asyncpg, aiomysql
缓存:Redis, 内存缓存
分页:限制返回数据量
后台任务:非关键操作异步处理
安全实践
验证所有输入:Pydantic 自动处理
参数化查询:ORM 默认支持
限制请求大小:防止 DoS
使用 HTTPS:生产环境必须
速率限制:防止滥用