# 第二十五章:安全框架与库实战 > "不要重新发明轮子 — 使用经过审计的安全库,站在巨人的肩膀上。" ```{mermaid} mindmap root((安全框架)) Python FastAPI Security Authlib PyJWT Java Spring Security Keycloak Adapter Go go-jose casbin-go Node.js Passport.js Helmet 选型原则 社区活跃度 安全审计 维护状态 ``` ## 25.1 FastAPI 安全实战 ### 完整的认证授权中间件 ```python from fastapi import FastAPI, Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from pydantic import BaseModel from functools import wraps from typing import Callable import httpx app = FastAPI() security = HTTPBearer() # 配置 JWKS_URL = "https://auth.example.com/.well-known/jwks.json" ISSUER = "https://auth.example.com" AUDIENCE = "my-api" class User(BaseModel): id: str email: str | None = None roles: list[str] = [] # JWKS 缓存 _jwks_cache = None async def get_jwks(): global _jwks_cache if _jwks_cache is None: async with httpx.AsyncClient() as client: resp = await client.get(JWKS_URL) _jwks_cache = resp.json() return _jwks_cache async def get_current_user( credentials: HTTPAuthorizationCredentials = Security(security) ) -> User: """从 JWT Token 中提取用户信息""" token = credentials.credentials try: jwks = await get_jwks() # 简化示例:实际应根据 kid 选择正确的密钥 payload = jwt.decode( token, jwks["keys"][0], algorithms=["RS256"], audience=AUDIENCE, issuer=ISSUER, ) return User( id=payload["sub"], email=payload.get("email"), roles=payload.get("roles", []), ) except JWTError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {e}") def require_role(*roles: str): """角色授权装饰器""" async def role_checker(user: User = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException( status_code=403, detail=f"Requires one of roles: {roles}" ) return user return role_checker # API 端点 @app.get("/api/public") async def public_endpoint(): return {"message": "Public data"} @app.get("/api/profile") async def get_profile(user: User = Depends(get_current_user)): return {"user_id": user.id, "email": user.email} @app.get("/api/admin/users") async def list_users(user: User = Depends(require_role("admin"))): return {"users": [...], "requested_by": user.id} @app.delete("/api/admin/users/{user_id}") async def delete_user( user_id: str, user: User = Depends(require_role("admin", "superadmin")) ): return {"message": f"User {user_id} deleted by {user.id}"} ``` ### 安全中间件 ```python from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware # CORS app.add_middleware( CORSMiddleware, allow_origins=["https://app.example.com"], # 不要用 * allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], allow_credentials=True, max_age=3600, ) # 可信主机 app.add_middleware( TrustedHostMiddleware, allowed_hosts=["api.example.com", "*.example.com"] ) # 安全响应头 from starlette.middleware.base import BaseHTTPMiddleware class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = "default-src 'self'" return response app.add_middleware(SecurityHeadersMiddleware) ``` ## 25.2 Spring Security 概览 ```java @Configuration @EnableWebSecurity public class SecurityConfig { @Bean public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { http .authorizeHttpRequests(auth -> auth .requestMatchers("/api/public/**").permitAll() .requestMatchers("/api/admin/**").hasRole("ADMIN") .anyRequest().authenticated() ) .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> jwt .jwkSetUri("https://auth.example.com/.well-known/jwks.json") ) ) .csrf(csrf -> csrf.disable()) // API 不需要 CSRF .cors(Customizer.withDefaults()); return http.build(); } } ``` ## 25.3 安全库选型原则 | 原则 | 说明 | 检查项 | |------|------|--------| | 社区活跃度 | 活跃的社区意味着快速的漏洞修复 | GitHub Stars、Issue 响应时间 | | 安全审计 | 经过专业安全审计的库更可信 | 审计报告、CVE 历史 | | 维护状态 | 持续维护的库才能及时修复漏洞 | 最近提交时间、发布频率 | | 依赖链 | 依赖越少,攻击面越小 | 依赖数量、传递依赖 | | 标准合规 | 遵循标准的库互操作性更好 | RFC 合规、FIPS 认证 | ## 25.4 小结 - **FastAPI** 通过 Depends 和 Security 提供灵活的认证授权机制 - **Spring Security** 是 Java 生态最成熟的安全框架 - 选择安全库时优先考虑:社区活跃度、安全审计、维护状态 - **不要自己实现加密算法**,使用经过审计的库