第二十五章:安全框架与库实战
“不要重新发明轮子 — 使用经过审计的安全库,站在巨人的肩膀上。”
mindmap
root((安全框架))
Python
FastAPI Security
Authlib
PyJWT
Java
Spring Security
Keycloak Adapter
Go
go-jose
casbin-go
Node.js
Passport.js
Helmet
选型原则
社区活跃度
安全审计
维护状态
25.1 FastAPI 安全实战
完整的认证授权中间件
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from pydantic import BaseModel
from functools import wraps
from typing import Callable
import httpx
app = FastAPI()
security = HTTPBearer()
# 配置
JWKS_URL = "https://auth.example.com/.well-known/jwks.json"
ISSUER = "https://auth.example.com"
AUDIENCE = "my-api"
class User(BaseModel):
id: str
email: str | None = None
roles: list[str] = []
# JWKS 缓存
_jwks_cache = None
async def get_jwks():
global _jwks_cache
if _jwks_cache is None:
async with httpx.AsyncClient() as client:
resp = await client.get(JWKS_URL)
_jwks_cache = resp.json()
return _jwks_cache
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> User:
"""从 JWT Token 中提取用户信息"""
token = credentials.credentials
try:
jwks = await get_jwks()
# 简化示例:实际应根据 kid 选择正确的密钥
payload = jwt.decode(
token,
jwks["keys"][0],
algorithms=["RS256"],
audience=AUDIENCE,
issuer=ISSUER,
)
return User(
id=payload["sub"],
email=payload.get("email"),
roles=payload.get("roles", []),
)
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
def require_role(*roles: str):
"""角色授权装饰器"""
async def role_checker(user: User = Depends(get_current_user)):
if not any(role in user.roles for role in roles):
raise HTTPException(
status_code=403,
detail=f"Requires one of roles: {roles}"
)
return user
return role_checker
# API 端点
@app.get("/api/public")
async def public_endpoint():
return {"message": "Public data"}
@app.get("/api/profile")
async def get_profile(user: User = Depends(get_current_user)):
return {"user_id": user.id, "email": user.email}
@app.get("/api/admin/users")
async def list_users(user: User = Depends(require_role("admin"))):
return {"users": [...], "requested_by": user.id}
@app.delete("/api/admin/users/{user_id}")
async def delete_user(
user_id: str,
user: User = Depends(require_role("admin", "superadmin"))
):
return {"message": f"User {user_id} deleted by {user.id}"}
安全中间件
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.example.com"], # 不要用 *
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
allow_credentials=True,
max_age=3600,
)
# 可信主机
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["api.example.com", "*.example.com"]
)
# 安全响应头
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
app.add_middleware(SecurityHeadersMiddleware)
25.2 Spring Security 概览
@Configuration
@EnableWebSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/api/public/**").permitAll()
.requestMatchers("/api/admin/**").hasRole("ADMIN")
.anyRequest().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.jwkSetUri("https://auth.example.com/.well-known/jwks.json")
)
)
.csrf(csrf -> csrf.disable()) // API 不需要 CSRF
.cors(Customizer.withDefaults());
return http.build();
}
}
25.3 安全库选型原则
原则 |
说明 |
检查项 |
|---|---|---|
社区活跃度 |
活跃的社区意味着快速的漏洞修复 |
GitHub Stars、Issue 响应时间 |
安全审计 |
经过专业安全审计的库更可信 |
审计报告、CVE 历史 |
维护状态 |
持续维护的库才能及时修复漏洞 |
最近提交时间、发布频率 |
依赖链 |
依赖越少,攻击面越小 |
依赖数量、传递依赖 |
标准合规 |
遵循标准的库互操作性更好 |
RFC 合规、FIPS 认证 |
25.4 小结
FastAPI 通过 Depends 和 Security 提供灵活的认证授权机制
Spring Security 是 Java 生态最成熟的安全框架
选择安全库时优先考虑:社区活跃度、安全审计、维护状态
不要自己实现加密算法,使用经过审计的库