# Web 安全 Web 应用安全是开发中最重要的考量之一。本节介绍常见的安全问题和防护措施。 ## 输入验证 ### SQL 注入防护 ```python # ❌ 危险:字符串拼接 SQL def get_user_unsafe(username): query = f"SELECT * FROM users WHERE username = '{username}'" # 如果 username = "'; DROP TABLE users; --" # 会执行恶意 SQL return db.execute(query) # ✅ 安全:使用参数化查询 def get_user_safe(username): # SQLAlchemy ORM(自动参数化) user = User.query.filter_by(username=username).first() # SQLAlchemy Core from sqlalchemy import text result = db.execute( text("SELECT * FROM users WHERE username = :username"), {"username": username} ) return result # ✅ Pydantic 验证(FastAPI) from pydantic import BaseModel, validator import re class UserQuery(BaseModel): username: str @validator('username') def validate_username(cls, v): if not re.match(r'^[a-zA-Z0-9_]+$', v): raise ValueError('Invalid username format') return v ``` ### XSS 防护 ```python # ❌ 危险:直接输出用户输入 from flask import render_template_string @app.route('/unsafe') def unsafe(): name = request.args.get('name', '') # 如果 name = "" return f"

Hello {name}

" # XSS! # ✅ 安全:使用模板引擎自动转义 from flask import render_template @app.route('/safe') def safe(): name = request.args.get('name', '') # Jinja2 默认转义 HTML return render_template('hello.html', name=name) # ✅ 手动转义 from markupsafe import escape @app.route('/escaped') def escaped(): name = request.args.get('name', '') return f"

Hello {escape(name)}

" # ✅ Content Security Policy from flask import make_response @app.after_request def add_security_headers(response): response.headers['Content-Security-Policy'] = ( "default-src 'self'; " "script-src 'self'; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data:;" ) return response ``` ### CSRF 防护 ```python # Flask-WTF 自动处理 CSRF from flask_wtf import FlaskForm, CSRFProtect from wtforms import StringField, SubmitField csrf = CSRFProtect(app) class LoginForm(FlaskForm): username = StringField('Username') submit = SubmitField('Login') # 模板中 #
# {{ form.csrf_token }} # ... #
# API 的 CSRF Token @app.route('/api/csrf-token') def get_csrf_token(): from flask_wtf.csrf import generate_csrf return {'csrf_token': generate_csrf()} # 在 AJAX 请求中使用 # headers: {'X-CSRF-Token': token} ``` ## 认证安全 ### 密码处理 ```python from werkzeug.security import generate_password_hash, check_password_hash import secrets class User: def set_password(self, password): # 使用强哈希算法 self.password_hash = generate_password_hash( password, method='pbkdf2:sha256', salt_length=16 ) def check_password(self, password): return check_password_hash(self.password_hash, password) # 密码强度验证 import re def validate_password(password: str) -> list: """验证密码强度,返回错误列表""" errors = [] if len(password) < 8: errors.append("Password must be at least 8 characters") if not re.search(r'[A-Z]', password): errors.append("Password must contain uppercase letter") if not re.search(r'[a-z]', password): errors.append("Password must contain lowercase letter") if not re.search(r'\d', password): errors.append("Password must contain digit") if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): errors.append("Password must contain special character") return errors ``` ### JWT 安全 ```python import jwt from datetime import datetime, timedelta import os SECRET_KEY = os.environ.get('JWT_SECRET_KEY') # 从环境变量读取 def create_token(user_id: int, expires_in: int = 3600) -> str: payload = { 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(seconds=expires_in), 'iat': datetime.utcnow(), 'jti': secrets.token_urlsafe(16) # 唯一标识,可用于撤销 } return jwt.encode(payload, SECRET_KEY, algorithm='HS256') def verify_token(token: str) -> dict: try: payload = jwt.decode( token, SECRET_KEY, algorithms=['HS256'], options={ 'require': ['exp', 'iat', 'user_id'], 'verify_exp': True } ) return payload except jwt.ExpiredSignatureError: raise AuthError("Token expired") except jwt.InvalidTokenError: raise AuthError("Invalid token") # ⚠️ 安全建议 # 1. 使用足够长的密钥(至少 256 位) # 2. 设置合理的过期时间 # 3. 使用 HTTPS 传输 # 4. 考虑使用 refresh token 机制 # 5. 实现 token 撤销机制 ``` ### Rate Limiting ```python from flask_limiter import Limiter from flask_limiter.util import get_remote_address limiter = Limiter( app, key_func=get_remote_address, default_limits=["200 per day", "50 per hour"] ) @app.route("/api/login", methods=["POST"]) @limiter.limit("5 per minute") # 登录接口更严格 def login(): pass @app.route("/api/data") @limiter.limit("100 per minute") def get_data(): pass # FastAPI 版本 from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get("/api/data") @limiter.limit("100/minute") async def get_data(request: Request): pass ``` ## 安全响应头 ```python from flask import Flask from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app) @app.after_request def add_security_headers(response): # 防止 XSS response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' # 强制 HTTPS response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' # 内容安全策略 response.headers['Content-Security-Policy'] = "default-src 'self'" # 防止信息泄露 response.headers['Server'] = '' return response # FastAPI 使用中间件 from starlette.middleware.trustedhost import TrustedHostMiddleware app.add_middleware( TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"] ) ``` ## 敏感数据处理 ### 日志脱敏 ```python import logging import re class SensitiveDataFilter(logging.Filter): """过滤日志中的敏感数据""" patterns = [ (re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'password=***'), (re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'token=***'), (re.compile(r'\b\d{16}\b'), '****'), # 信用卡号 ] def filter(self, record): if isinstance(record.msg, str): for pattern, replacement in self.patterns: record.msg = pattern.sub(replacement, record.msg) return True # 应用过滤器 logger = logging.getLogger(__name__) logger.addFilter(SensitiveDataFilter()) ``` ### 响应数据过滤 ```python from pydantic import BaseModel class UserInternal(BaseModel): """内部使用的完整用户模型""" id: int email: str password_hash: str is_admin: bool internal_notes: str class UserResponse(BaseModel): """API 响应用的用户模型""" id: int email: str # 不包含敏感字段 class Config: orm_mode = True @app.get("/users/{id}", response_model=UserResponse) async def get_user(id: int): user = db.get_user(id) # 返回 UserInternal return user # 自动过滤为 UserResponse ``` ## 文件上传安全 ```python import os from werkzeug.utils import secure_filename import magic # python-magic ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'} MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def validate_file_content(file_stream): """验证文件内容类型""" header = file_stream.read(2048) file_stream.seek(0) mime = magic.from_buffer(header, mime=True) allowed_mimes = { 'image/png', 'image/jpeg', 'image/gif', 'application/pdf' } return mime in allowed_mimes @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file'}), 400 file = request.files['file'] if not file.filename: return jsonify({'error': 'No filename'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'File type not allowed'}), 400 if not validate_file_content(file.stream): return jsonify({'error': 'Invalid file content'}), 400 # 使用安全的文件名 filename = secure_filename(file.filename) # 存储在 web 根目录之外 upload_path = os.path.join('/var/uploads', filename) file.save(upload_path) return jsonify({'message': 'Uploaded successfully'}) ``` ## 安全检查清单 ::::{grid} 1 :gutter: 2 :::{grid-item-card} 输入处理 - [ ] 所有用户输入都经过验证 - [ ] 使用参数化查询防止 SQL 注入 - [ ] 输出编码防止 XSS - [ ] 文件上传验证类型和大小 ::: :::{grid-item-card} 认证授权 - [ ] 密码使用强哈希存储 - [ ] 实现账户锁定机制 - [ ] 敏感操作需要重新认证 - [ ] 实现最小权限原则 ::: :::{grid-item-card} 会话管理 - [ ] 使用安全的 session ID - [ ] 设置合理的会话超时 - [ ] 登录后重新生成 session ID - [ ] 注销时销毁服务端 session ::: :::{grid-item-card} 传输安全 - [ ] 全站 HTTPS - [ ] 设置 HSTS 头 - [ ] Cookie 设置 Secure 和 HttpOnly - [ ] 敏感数据不在 URL 中传输 ::: ::::