第二十六章:API 安全设计

“API 是现代应用的前门,也是攻击者最喜欢的入口。”

        mindmap
  root((API 安全))
    OWASP API Top 10
      BOLA
      认证失败
      过度数据暴露
      资源消耗
    设计原则
      输入验证
      输出编码
      速率限制
      幂等性
    协议安全
      REST
      GraphQL
      gRPC
    测试
      DAST
      Fuzzing
      渗透测试
    

26.1 OWASP API Security Top 10(2023)

排名

风险

描述

防御

API1

BOLA

对象级授权失效

每个请求检查资源所有权

API2

认证失效

认证机制缺陷

OAuth2 + MFA

API3

对象属性级授权

返回过多属性

响应过滤、DTO

API4

不受限资源消耗

无速率限制

Rate Limiting

API5

功能级授权失效

缺少功能权限检查

RBAC/ABAC

API6

服务端请求伪造

SSRF

URL 白名单

API7

安全配置错误

默认配置不安全

安全基线

API8

缺乏自动化威胁防护

无 Bot 防护

WAF + 行为分析

API9

资产管理不当

影子 API、过时版本

API 清单管理

API10

不安全的 API 消费

信任第三方 API

输入验证

BOLA(Broken Object Level Authorization)

最常见的 API 安全问题:

# ❌ 危险:没有检查资源所有权
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    order = db.get_order(order_id)
    return order  # 任何用户都能查看任何订单!

# ✅ 安全:检查资源所有权
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    order = db.get_order(order_id)
    if order.user_id != user.id:
        raise HTTPException(403, "Access denied")
    return order

# ✅ 更好:使用 OpenFGA 检查权限
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    await authz.require(user.id, "can_view", f"order:{order_id}")
    order = db.get_order(order_id)
    return order

26.2 输入验证

from pydantic import BaseModel, validator, Field
from typing import Annotated
import re

class CreateUserRequest(BaseModel):
    username: Annotated[str, Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_]+$')]
    email: Annotated[str, Field(max_length=255)]
    age: Annotated[int, Field(ge=0, le=150)]
    
    @validator('email')
    def validate_email(cls, v):
        if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', v):
            raise ValueError('Invalid email format')
        return v.lower()

    @validator('username')
    def no_sql_injection(cls, v):
        dangerous_patterns = ["'", '"', ';', '--', '/*', '*/', 'DROP', 'DELETE', 'UPDATE']
        for pattern in dangerous_patterns:
            if pattern.upper() in v.upper():
                raise ValueError('Invalid characters in username')
        return v

26.3 速率限制

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/search")
@limiter.limit("30/minute")  # 每分钟 30 次
async def search(request: Request, q: str):
    return {"results": [...]}

@app.post("/api/login")
@limiter.limit("5/minute")  # 登录更严格
async def login(request: Request):
    return {"token": "..."}

26.4 GraphQL 安全

# GraphQL 特有的安全问题和防御

# 1. 查询深度限制
# 防止:{ user { friends { friends { friends { ... } } } } }
MAX_DEPTH = 5

# 2. 查询复杂度限制
# 防止:{ users(first: 1000000) { posts { comments { ... } } } }
MAX_COMPLEXITY = 1000

# 3. 禁用内省(生产环境)
# 防止攻击者发现所有 API 端点
INTROSPECTION_ENABLED = False  # 生产环境禁用

26.5 gRPC 安全

import grpc

# mTLS gRPC 服务端
server_credentials = grpc.ssl_server_credentials(
    [(server_key, server_cert)],
    root_certificates=ca_cert,
    require_client_auth=True,  # 要求客户端证书
)
server = grpc.server(futures.ThreadPoolExecutor())
server.add_secure_port('[::]:50051', server_credentials)

# Token 拦截器
class AuthInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization', '')
        if not verify_token(token):
            return grpc.unary_unary_rpc_method_handler(
                lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, 'Invalid token')
            )
        return continuation(handler_call_details)

26.6 小结

  • BOLA 是最常见的 API 安全问题,必须在每个请求中检查资源所有权

  • 输入验证 使用 Pydantic 等库进行严格的类型和格式检查

  • 速率限制 防止暴力破解和资源耗尽

  • GraphQL 需要额外的深度限制、复杂度限制和内省控制

  • gRPC 通过 mTLS 和拦截器实现安全通信