# 第二十六章:API 安全设计 > "API 是现代应用的前门,也是攻击者最喜欢的入口。" ```{mermaid} mindmap root((API 安全)) OWASP API Top 10 BOLA 认证失败 过度数据暴露 资源消耗 设计原则 输入验证 输出编码 速率限制 幂等性 协议安全 REST GraphQL gRPC 测试 DAST Fuzzing 渗透测试 ``` ## 26.1 OWASP API Security Top 10(2023) | 排名 | 风险 | 描述 | 防御 | |------|------|------|------| | API1 | BOLA | 对象级授权失效 | 每个请求检查资源所有权 | | API2 | 认证失效 | 认证机制缺陷 | OAuth2 + MFA | | API3 | 对象属性级授权 | 返回过多属性 | 响应过滤、DTO | | API4 | 不受限资源消耗 | 无速率限制 | Rate Limiting | | API5 | 功能级授权失效 | 缺少功能权限检查 | RBAC/ABAC | | API6 | 服务端请求伪造 | SSRF | URL 白名单 | | API7 | 安全配置错误 | 默认配置不安全 | 安全基线 | | API8 | 缺乏自动化威胁防护 | 无 Bot 防护 | WAF + 行为分析 | | API9 | 资产管理不当 | 影子 API、过时版本 | API 清单管理 | | API10 | 不安全的 API 消费 | 信任第三方 API | 输入验证 | ### BOLA(Broken Object Level Authorization) 最常见的 API 安全问题: ```python # ❌ 危险:没有检查资源所有权 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): order = db.get_order(order_id) return order # 任何用户都能查看任何订单! # ✅ 安全:检查资源所有权 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): order = db.get_order(order_id) if order.user_id != user.id: raise HTTPException(403, "Access denied") return order # ✅ 更好:使用 OpenFGA 检查权限 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): await authz.require(user.id, "can_view", f"order:{order_id}") order = db.get_order(order_id) return order ``` ## 26.2 输入验证 ```python from pydantic import BaseModel, validator, Field from typing import Annotated import re class CreateUserRequest(BaseModel): username: Annotated[str, Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_]+$')] email: Annotated[str, Field(max_length=255)] age: Annotated[int, Field(ge=0, le=150)] @validator('email') def validate_email(cls, v): if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', v): raise ValueError('Invalid email format') return v.lower() @validator('username') def no_sql_injection(cls, v): dangerous_patterns = ["'", '"', ';', '--', '/*', '*/', 'DROP', 'DELETE', 'UPDATE'] for pattern in dangerous_patterns: if pattern.upper() in v.upper(): raise ValueError('Invalid characters in username') return v ``` ## 26.3 速率限制 ```python from fastapi import FastAPI, Request from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get("/api/search") @limiter.limit("30/minute") # 每分钟 30 次 async def search(request: Request, q: str): return {"results": [...]} @app.post("/api/login") @limiter.limit("5/minute") # 登录更严格 async def login(request: Request): return {"token": "..."} ``` ## 26.4 GraphQL 安全 ```python # GraphQL 特有的安全问题和防御 # 1. 查询深度限制 # 防止:{ user { friends { friends { friends { ... } } } } } MAX_DEPTH = 5 # 2. 查询复杂度限制 # 防止:{ users(first: 1000000) { posts { comments { ... } } } } MAX_COMPLEXITY = 1000 # 3. 禁用内省(生产环境) # 防止攻击者发现所有 API 端点 INTROSPECTION_ENABLED = False # 生产环境禁用 ``` ## 26.5 gRPC 安全 ```python import grpc # mTLS gRPC 服务端 server_credentials = grpc.ssl_server_credentials( [(server_key, server_cert)], root_certificates=ca_cert, require_client_auth=True, # 要求客户端证书 ) server = grpc.server(futures.ThreadPoolExecutor()) server.add_secure_port('[::]:50051', server_credentials) # Token 拦截器 class AuthInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): metadata = dict(handler_call_details.invocation_metadata) token = metadata.get('authorization', '') if not verify_token(token): return grpc.unary_unary_rpc_method_handler( lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, 'Invalid token') ) return continuation(handler_call_details) ``` ## 26.6 小结 - **BOLA** 是最常见的 API 安全问题,必须在每个请求中检查资源所有权 - **输入验证** 使用 Pydantic 等库进行严格的类型和格式检查 - **速率限制** 防止暴力破解和资源耗尽 - **GraphQL** 需要额外的深度限制、复杂度限制和内省控制 - **gRPC** 通过 mTLS 和拦截器实现安全通信