# 第十二章:API 认证模式 > "在微服务时代,API 认证不再是可选项,而是每个请求的必经之路。" ```{mermaid} mindmap root((API 认证)) 认证方式 API Key Bearer Token mTLS HMAC 签名 Gateway 模式 集中认证 分布式认证 Token 内省 服务间认证 Service Account Service Mesh SPIFFE/SPIRE 最佳实践 生命周期管理 速率限制 审计日志 ``` ## 12.1 API 认证的特殊性 API 认证与传统 Web 认证有本质区别: | 维度 | Web 认证 | API 认证 | |------|---------|---------| | 交互方式 | 用户交互(浏览器) | 程序化(HTTP 客户端) | | 状态 | 有状态(Session) | 无状态(Token) | | 凭证类型 | Cookie | Authorization Header | | 调用者 | 人类 | 人类 + 机器 | | 频率 | 低(页面级) | 高(API 级) | ## 12.2 API Key 认证 最简单的 API 认证方式,适合低安全要求的场景: ```python from fastapi import FastAPI, Header, HTTPException app = FastAPI() VALID_API_KEYS = { "key-abc123": {"name": "Service A", "permissions": ["read"]}, "key-def456": {"name": "Service B", "permissions": ["read", "write"]}, } @app.get("/api/data") async def get_data(x_api_key: str = Header(...)): if x_api_key not in VALID_API_KEYS: raise HTTPException(status_code=401, detail="Invalid API key") client = VALID_API_KEYS[x_api_key] if "read" not in client["permissions"]: raise HTTPException(status_code=403, detail="Insufficient permissions") return {"data": "sensitive information", "client": client["name"]} ``` **API Key 的问题**: - ❌ 无过期时间(除非手动管理) - ❌ 无法携带用户身份信息 - ❌ 容易泄露(日志、URL、代码仓库) - ❌ 无标准的轮换机制 ## 12.3 Bearer Token 认证 使用 OAuth2 Access Token,是最推荐的 API 认证方式: ``` 请求格式: GET /api/resource HTTP/1.1 Host: api.example.com Authorization: Bearer eyJhbGciOiJSUzI1NiIs... ``` ```python from fastapi import FastAPI, Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt app = FastAPI() security = HTTPBearer() JWKS_URL = "https://auth.example.com/.well-known/jwks.json" async def verify_bearer_token( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> dict: """验证 Bearer Token""" token = credentials.credentials try: # 从 JWKS 获取公钥验证 payload = jwt.decode( token, get_public_key(token), # 从 JWKS 获取 algorithms=["RS256"], audience="my-api", issuer="https://auth.example.com", ) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.get("/api/profile") async def get_profile(user: dict = Depends(verify_bearer_token)): return {"user_id": user["sub"], "roles": user.get("roles", [])} ``` ## 12.4 mTLS 客户端证书认证 最安全的 API 认证方式,适合服务间通信: ``` 请求流程: ┌──────────┐ ┌──────────┐ │ 客户端 │ TLS 握手(双向证书验证) │ 服务器 │ │ (带证书) │◀────────────────────────▶│ (带证书) │ │ │ │ │ │ │ HTTP 请求(加密通道内) │ │ │ │─────────────────────────▶│ │ │ │ 无需额外 Token! │ 从证书中 │ │ │ │ 提取身份 │ └──────────┘ └──────────┘ ``` ```python import httpx # mTLS 客户端请求 async def call_service_with_mtls(): async with httpx.AsyncClient( cert=("client.crt", "client.key"), # 客户端证书 verify="ca.crt", # CA 证书 ) as client: response = await client.get("https://service-b:8443/api/data") return response.json() ``` ## 12.5 HMAC 签名认证 类似 AWS Signature V4,通过对请求内容签名来验证身份和完整性: ```python import hmac import hashlib import time from urllib.parse import urlencode def sign_request(method: str, path: str, body: str, access_key: str, secret_key: str) -> dict: """对 API 请求进行 HMAC 签名""" timestamp = str(int(time.time())) # 构造签名字符串 string_to_sign = f"{method}\n{path}\n{timestamp}\n{hashlib.sha256(body.encode()).hexdigest()}" # 计算签名 signature = hmac.new( secret_key.encode(), string_to_sign.encode(), hashlib.sha256 ).hexdigest() return { "X-Access-Key": access_key, "X-Timestamp": timestamp, "X-Signature": signature, } # 服务端验证 def verify_signature(method, path, body, headers, secret_key) -> bool: timestamp = headers.get("X-Timestamp", "") signature = headers.get("X-Signature", "") # 检查时间戳(防重放,5分钟窗口) if abs(time.time() - int(timestamp)) > 300: return False # 重新计算签名 string_to_sign = f"{method}\n{path}\n{timestamp}\n{hashlib.sha256(body.encode()).hexdigest()}" expected = hmac.new(secret_key.encode(), string_to_sign.encode(), hashlib.sha256).hexdigest() return hmac.compare_digest(signature, expected) ``` ## 12.6 API Gateway 认证模式 ### 集中认证 vs 分布式认证 ``` 集中认证(推荐): ┌──────────┐ ┌──────────────┐ ┌──────────┐ │ 客户端 │───▶│ API Gateway │───▶│ 微服务 │ │ │ │ ① 验证 Token │ │ 信任 GW │ │ │ │ ② 速率限制 │ │ 的转发 │ │ │ │ ③ 注入用户信息│ │ │ └──────────┘ └──────────────┘ └──────────┘ 分布式认证: ┌──────────┐ ┌──────────────┐ ┌──────────┐ │ 客户端 │───▶│ API Gateway │───▶│ 微服务 │ │ │ │ 仅路由转发 │ │ 自己验证 │ │ │ │ │ │ Token │ └──────────┘ └──────────────┘ └──────────┘ ``` ### Token 内省 vs 本地验证 | 方式 | Token 内省(Introspection) | 本地验证(JWT) | |------|---------------------------|----------------| | 原理 | 调用授权服务器验证 Token | 本地用公钥验证签名 | | 延迟 | 高(网络调用) | 低(本地计算) | | 实时性 | 实时(可检查撤销) | 延迟(依赖过期时间) | | 依赖 | 授权服务器必须可用 | 仅需公钥(JWKS) | | 适用 | Opaque Token | JWT | ## 12.7 Service-to-Service 认证 ``` 认证方式演进: ┌─────────────────────────────────────────────┐ │ Level 1: 无认证(内网信任) │ │ → 不安全,任何内部服务都可以调用 │ ├─────────────────────────────────────────────┤ │ Level 2: Service Account + Token │ │ → 静态凭证,需要手动管理和轮换 │ ├─────────────────────────────────────────────┤ │ Level 3: Service Mesh mTLS │ │ → 自动证书管理,透明加密 │ ├─────────────────────────────────────────────┤ │ Level 4: SPIFFE/SPIRE │ │ → 统一工作负载身份,跨平台,自动轮换 │ │ → 这是我们在第19-21章要深入讨论的 │ └─────────────────────────────────────────────┘ ``` ## 12.8 API 认证方式对比 | 方式 | 安全性 | 复杂度 | 适用场景 | |------|--------|--------|---------| | API Key | ⭐⭐ | 低 | 公开 API、低安全要求 | | Bearer Token (JWT) | ⭐⭐⭐⭐ | 中 | Web/Mobile API | | mTLS | ⭐⭐⭐⭐⭐ | 高 | 服务间通信 | | HMAC 签名 | ⭐⭐⭐⭐ | 高 | 高安全 API(支付) | | SPIFFE SVID | ⭐⭐⭐⭐⭐ | 中 | 云原生服务间通信 | ## 12.9 小结 - **Bearer Token(JWT)** 是最通用的 API 认证方式 - **mTLS** 提供最强的安全保障,适合服务间通信 - **API Gateway 集中认证** 简化了微服务的认证逻辑 - **HMAC 签名** 适合需要请求完整性验证的高安全场景 - **SPIFFE/SPIRE** 是云原生时代服务间认证的最佳实践(详见第四部分) - 选择认证方式时需权衡:安全性、复杂度、性能、运维成本