3.1. FastAPI 实战

FastAPI 是现代、高性能的 Python Web 框架,基于标准 Python 类型提示构建 API。

3.1.1. 快速入门

3.1.1.1. 基础应用

from fastapi import FastAPI

app = FastAPI(
    title="My API",
    description="A sample API",
    version="1.0.0"
)

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str = None):
    return {"item_id": item_id, "q": q}

# 运行: uvicorn main:app --reload
# 文档: http://localhost:8000/docs

3.1.1.2. 请求体与 Pydantic

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime

app = FastAPI()

class UserCreate(BaseModel):
    """用户创建模型"""
    name: str = Field(..., min_length=1, max_length=50)
    email: EmailStr
    age: int = Field(..., ge=0, le=150)
    bio: Optional[str] = None
    
    class Config:
        schema_extra = {
            "example": {
                "name": "Alice",
                "email": "alice@example.com",
                "age": 25,
                "bio": "A developer"
            }
        }

class UserResponse(BaseModel):
    id: int
    name: str
    email: EmailStr
    created_at: datetime
    
    class Config:
        orm_mode = True  # 支持 ORM 对象

@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    # user 已经过 Pydantic 验证
    return {
        "id": 1,
        "name": user.name,
        "email": user.email,
        "created_at": datetime.now()
    }

3.1.2. 依赖注入

FastAPI 的依赖注入系统是其核心特性之一。

3.1.2.1. 基础依赖

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

app = FastAPI()
security = HTTPBearer()

# 数据库连接依赖
async def get_db():
    db = DatabaseSession()
    try:
        yield db
    finally:
        db.close()

# 认证依赖
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    token = credentials.credentials
    user = decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token"
        )
    return user

# 权限检查依赖
def require_permission(permission: str):
    async def check_permission(
        user = Depends(get_current_user)
    ):
        if permission not in user.permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Permission denied"
            )
        return user
    return check_permission

# 使用依赖
@app.get("/admin/users")
async def list_users(
    db = Depends(get_db),
    user = Depends(require_permission("admin"))
):
    return db.query(User).all()

3.1.2.2. 类作为依赖

from fastapi import Depends, Query

class Pagination:
    def __init__(
        self,
        page: int = Query(1, ge=1),
        page_size: int = Query(20, ge=1, le=100)
    ):
        self.page = page
        self.page_size = page_size
        self.offset = (page - 1) * page_size

@app.get("/items/")
async def list_items(
    pagination: Pagination = Depends()
):
    items = get_items(
        offset=pagination.offset,
        limit=pagination.page_size
    )
    return {
        "page": pagination.page,
        "page_size": pagination.page_size,
        "items": items
    }

3.1.3. 路由组织

3.1.3.1. 使用 APIRouter

# routers/users.py
from fastapi import APIRouter, Depends

router = APIRouter(
    prefix="/users",
    tags=["users"],
    dependencies=[Depends(get_current_user)],
    responses={404: {"description": "Not found"}}
)

@router.get("/")
async def list_users():
    return []

@router.get("/{user_id}")
async def get_user(user_id: int):
    return {"id": user_id}

@router.post("/")
async def create_user(user: UserCreate):
    return user

# main.py
from fastapi import FastAPI
from routers import users, items

app = FastAPI()

app.include_router(users.router)
app.include_router(items.router, prefix="/api/v1")

3.1.4. 中间件

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
import time
import logging

app = FastAPI()

# CORS 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 自定义中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    
    response = await call_next(request)
    
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    
    return response

# 请求日志中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):
    logging.info(f"{request.method} {request.url}")
    response = await call_next(request)
    logging.info(f"Response: {response.status_code}")
    return response

3.1.5. 异常处理

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import ValidationError

app = FastAPI()

# 自定义异常
class ItemNotFoundError(Exception):
    def __init__(self, item_id: int):
        self.item_id = item_id

# 异常处理器
@app.exception_handler(ItemNotFoundError)
async def item_not_found_handler(request: Request, exc: ItemNotFoundError):
    return JSONResponse(
        status_code=404,
        content={"message": f"Item {exc.item_id} not found"}
    )

# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"message": "Internal server error"}
    )

# 使用
@app.get("/items/{item_id}")
async def get_item(item_id: int):
    item = find_item(item_id)
    if not item:
        raise ItemNotFoundError(item_id)
    return item

3.1.6. 后台任务

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email(email: str, message: str):
    """后台发送邮件"""
    import time
    time.sleep(3)  # 模拟发送
    print(f"Email sent to {email}: {message}")

def write_log(message: str):
    """后台写日志"""
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")

@app.post("/send-notification/{email}")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks
):
    # 添加后台任务
    background_tasks.add_task(send_email, email, "Notification!")
    background_tasks.add_task(write_log, f"Notification sent to {email}")
    
    # 立即返回响应
    return {"message": "Notification scheduled"}

3.1.7. 数据库集成

3.1.7.1. SQLAlchemy 异步

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String
from fastapi import Depends

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(100), unique=True)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    from sqlalchemy import select
    
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404)
    
    return user

3.1.8. 测试

# test_main.py
from fastapi.testclient import TestClient
from httpx import AsyncClient
import pytest

from main import app

# 同步测试
client = TestClient(app)

def test_read_root():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello World"}

def test_create_user():
    response = client.post(
        "/users/",
        json={"name": "Alice", "email": "alice@example.com", "age": 25}
    )
    assert response.status_code == 201

# 异步测试
@pytest.mark.asyncio
async def test_async_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.get("/")
    assert response.status_code == 200

# 带依赖覆盖的测试
def test_with_dependency_override():
    def override_get_db():
        return MockDB()
    
    app.dependency_overrides[get_db] = override_get_db
    
    response = client.get("/users/1")
    assert response.status_code == 200
    
    app.dependency_overrides.clear()

3.1.9. 最佳实践

Pydantic 模型设计
# ✅ 分离输入和输出模型
class UserCreate(BaseModel):
    name: str
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    name: str
    email: EmailStr
    # 不包含 password

# ✅ 使用 Field 添加验证和文档
class Item(BaseModel):
    name: str = Field(..., min_length=1, description="Item name")
    price: float = Field(..., gt=0, description="Price in USD")
性能优化
  1. 使用连接池:SQLAlchemy 自动管理

  2. 异步数据库驱动:asyncpg, aiomysql

  3. 缓存:Redis, 内存缓存

  4. 分页:限制返回数据量

  5. 后台任务:非关键操作异步处理

安全实践
  1. 验证所有输入:Pydantic 自动处理

  2. 参数化查询:ORM 默认支持

  3. 限制请求大小:防止 DoS

  4. 使用 HTTPS:生产环境必须

  5. 速率限制:防止滥用