第二十五章:安全框架与库实战

“不要重新发明轮子 — 使用经过审计的安全库,站在巨人的肩膀上。”

        mindmap
  root((安全框架))
    Python
      FastAPI Security
      Authlib
      PyJWT
    Java
      Spring Security
      Keycloak Adapter
    Go
      go-jose
      casbin-go
    Node.js
      Passport.js
      Helmet
    选型原则
      社区活跃度
      安全审计
      维护状态
    

25.1 FastAPI 安全实战

完整的认证授权中间件

from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from pydantic import BaseModel
from functools import wraps
from typing import Callable
import httpx

app = FastAPI()
security = HTTPBearer()

# 配置
JWKS_URL = "https://auth.example.com/.well-known/jwks.json"
ISSUER = "https://auth.example.com"
AUDIENCE = "my-api"

class User(BaseModel):
    id: str
    email: str | None = None
    roles: list[str] = []

# JWKS 缓存
_jwks_cache = None

async def get_jwks():
    global _jwks_cache
    if _jwks_cache is None:
        async with httpx.AsyncClient() as client:
            resp = await client.get(JWKS_URL)
            _jwks_cache = resp.json()
    return _jwks_cache

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security)
) -> User:
    """从 JWT Token 中提取用户信息"""
    token = credentials.credentials
    try:
        jwks = await get_jwks()
        # 简化示例:实际应根据 kid 选择正确的密钥
        payload = jwt.decode(
            token,
            jwks["keys"][0],
            algorithms=["RS256"],
            audience=AUDIENCE,
            issuer=ISSUER,
        )
        return User(
            id=payload["sub"],
            email=payload.get("email"),
            roles=payload.get("roles", []),
        )
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

def require_role(*roles: str):
    """角色授权装饰器"""
    async def role_checker(user: User = Depends(get_current_user)):
        if not any(role in user.roles for role in roles):
            raise HTTPException(
                status_code=403,
                detail=f"Requires one of roles: {roles}"
            )
        return user
    return role_checker

# API 端点
@app.get("/api/public")
async def public_endpoint():
    return {"message": "Public data"}

@app.get("/api/profile")
async def get_profile(user: User = Depends(get_current_user)):
    return {"user_id": user.id, "email": user.email}

@app.get("/api/admin/users")
async def list_users(user: User = Depends(require_role("admin"))):
    return {"users": [...], "requested_by": user.id}

@app.delete("/api/admin/users/{user_id}")
async def delete_user(
    user_id: str,
    user: User = Depends(require_role("admin", "superadmin"))
):
    return {"message": f"User {user_id} deleted by {user.id}"}

安全中间件

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],  # 不要用 *
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    allow_credentials=True,
    max_age=3600,
)

# 可信主机
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["api.example.com", "*.example.com"]
)

# 安全响应头
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        return response

app.add_middleware(SecurityHeadersMiddleware)

25.2 Spring Security 概览

@Configuration
@EnableWebSecurity
public class SecurityConfig {

    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/public/**").permitAll()
                .requestMatchers("/api/admin/**").hasRole("ADMIN")
                .anyRequest().authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt
                    .jwkSetUri("https://auth.example.com/.well-known/jwks.json")
                )
            )
            .csrf(csrf -> csrf.disable())  // API 不需要 CSRF
            .cors(Customizer.withDefaults());
        return http.build();
    }
}

25.3 安全库选型原则

原则

说明

检查项

社区活跃度

活跃的社区意味着快速的漏洞修复

GitHub Stars、Issue 响应时间

安全审计

经过专业安全审计的库更可信

审计报告、CVE 历史

维护状态

持续维护的库才能及时修复漏洞

最近提交时间、发布频率

依赖链

依赖越少,攻击面越小

依赖数量、传递依赖

标准合规

遵循标准的库互操作性更好

RFC 合规、FIPS 认证

25.4 小结

  • FastAPI 通过 Depends 和 Security 提供灵活的认证授权机制

  • Spring Security 是 Java 生态最成熟的安全框架

  • 选择安全库时优先考虑:社区活跃度、安全审计、维护状态

  • 不要自己实现加密算法,使用经过审计的库