# Web 安全
Web 应用安全是开发中最重要的考量之一。本节介绍常见的安全问题和防护措施。
## 输入验证
### SQL 注入防护
```python
# ❌ 危险:字符串拼接 SQL
def get_user_unsafe(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
# 如果 username = "'; DROP TABLE users; --"
# 会执行恶意 SQL
return db.execute(query)
# ✅ 安全:使用参数化查询
def get_user_safe(username):
# SQLAlchemy ORM(自动参数化)
user = User.query.filter_by(username=username).first()
# SQLAlchemy Core
from sqlalchemy import text
result = db.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username}
)
return result
# ✅ Pydantic 验证(FastAPI)
from pydantic import BaseModel, validator
import re
class UserQuery(BaseModel):
username: str
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Invalid username format')
return v
```
### XSS 防护
```python
# ❌ 危险:直接输出用户输入
from flask import render_template_string
@app.route('/unsafe')
def unsafe():
name = request.args.get('name', '')
# 如果 name = ""
return f"
Hello {name}
" # XSS!
# ✅ 安全:使用模板引擎自动转义
from flask import render_template
@app.route('/safe')
def safe():
name = request.args.get('name', '')
# Jinja2 默认转义 HTML
return render_template('hello.html', name=name)
# ✅ 手动转义
from markupsafe import escape
@app.route('/escaped')
def escaped():
name = request.args.get('name', '')
return f"Hello {escape(name)}
"
# ✅ Content Security Policy
from flask import make_response
@app.after_request
def add_security_headers(response):
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:;"
)
return response
```
### CSRF 防护
```python
# Flask-WTF 自动处理 CSRF
from flask_wtf import FlaskForm, CSRFProtect
from wtforms import StringField, SubmitField
csrf = CSRFProtect(app)
class LoginForm(FlaskForm):
username = StringField('Username')
submit = SubmitField('Login')
# 模板中
#
# API 的 CSRF Token
@app.route('/api/csrf-token')
def get_csrf_token():
from flask_wtf.csrf import generate_csrf
return {'csrf_token': generate_csrf()}
# 在 AJAX 请求中使用
# headers: {'X-CSRF-Token': token}
```
## 认证安全
### 密码处理
```python
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
class User:
def set_password(self, password):
# 使用强哈希算法
self.password_hash = generate_password_hash(
password,
method='pbkdf2:sha256',
salt_length=16
)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
# 密码强度验证
import re
def validate_password(password: str) -> list:
"""验证密码强度,返回错误列表"""
errors = []
if len(password) < 8:
errors.append("Password must be at least 8 characters")
if not re.search(r'[A-Z]', password):
errors.append("Password must contain uppercase letter")
if not re.search(r'[a-z]', password):
errors.append("Password must contain lowercase letter")
if not re.search(r'\d', password):
errors.append("Password must contain digit")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("Password must contain special character")
return errors
```
### JWT 安全
```python
import jwt
from datetime import datetime, timedelta
import os
SECRET_KEY = os.environ.get('JWT_SECRET_KEY') # 从环境变量读取
def create_token(user_id: int, expires_in: int = 3600) -> str:
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iat': datetime.utcnow(),
'jti': secrets.token_urlsafe(16) # 唯一标识,可用于撤销
}
return jwt.encode(payload, SECRET_KEY, algorithm='HS256')
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=['HS256'],
options={
'require': ['exp', 'iat', 'user_id'],
'verify_exp': True
}
)
return payload
except jwt.ExpiredSignatureError:
raise AuthError("Token expired")
except jwt.InvalidTokenError:
raise AuthError("Invalid token")
# ⚠️ 安全建议
# 1. 使用足够长的密钥(至少 256 位)
# 2. 设置合理的过期时间
# 3. 使用 HTTPS 传输
# 4. 考虑使用 refresh token 机制
# 5. 实现 token 撤销机制
```
### Rate Limiting
```python
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route("/api/login", methods=["POST"])
@limiter.limit("5 per minute") # 登录接口更严格
def login():
pass
@app.route("/api/data")
@limiter.limit("100 per minute")
def get_data():
pass
# FastAPI 版本
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
pass
```
## 安全响应头
```python
from flask import Flask
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)
@app.after_request
def add_security_headers(response):
# 防止 XSS
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
# 强制 HTTPS
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# 内容安全策略
response.headers['Content-Security-Policy'] = "default-src 'self'"
# 防止信息泄露
response.headers['Server'] = ''
return response
# FastAPI 使用中间件
from starlette.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "*.example.com"]
)
```
## 敏感数据处理
### 日志脱敏
```python
import logging
import re
class SensitiveDataFilter(logging.Filter):
"""过滤日志中的敏感数据"""
patterns = [
(re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'password=***'),
(re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'token=***'),
(re.compile(r'\b\d{16}\b'), '****'), # 信用卡号
]
def filter(self, record):
if isinstance(record.msg, str):
for pattern, replacement in self.patterns:
record.msg = pattern.sub(replacement, record.msg)
return True
# 应用过滤器
logger = logging.getLogger(__name__)
logger.addFilter(SensitiveDataFilter())
```
### 响应数据过滤
```python
from pydantic import BaseModel
class UserInternal(BaseModel):
"""内部使用的完整用户模型"""
id: int
email: str
password_hash: str
is_admin: bool
internal_notes: str
class UserResponse(BaseModel):
"""API 响应用的用户模型"""
id: int
email: str
# 不包含敏感字段
class Config:
orm_mode = True
@app.get("/users/{id}", response_model=UserResponse)
async def get_user(id: int):
user = db.get_user(id) # 返回 UserInternal
return user # 自动过滤为 UserResponse
```
## 文件上传安全
```python
import os
from werkzeug.utils import secure_filename
import magic # python-magic
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def validate_file_content(file_stream):
"""验证文件内容类型"""
header = file_stream.read(2048)
file_stream.seek(0)
mime = magic.from_buffer(header, mime=True)
allowed_mimes = {
'image/png', 'image/jpeg', 'image/gif', 'application/pdf'
}
return mime in allowed_mimes
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file'}), 400
file = request.files['file']
if not file.filename:
return jsonify({'error': 'No filename'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
if not validate_file_content(file.stream):
return jsonify({'error': 'Invalid file content'}), 400
# 使用安全的文件名
filename = secure_filename(file.filename)
# 存储在 web 根目录之外
upload_path = os.path.join('/var/uploads', filename)
file.save(upload_path)
return jsonify({'message': 'Uploaded successfully'})
```
## 安全检查清单
::::{grid} 1
:gutter: 2
:::{grid-item-card} 输入处理
- [ ] 所有用户输入都经过验证
- [ ] 使用参数化查询防止 SQL 注入
- [ ] 输出编码防止 XSS
- [ ] 文件上传验证类型和大小
:::
:::{grid-item-card} 认证授权
- [ ] 密码使用强哈希存储
- [ ] 实现账户锁定机制
- [ ] 敏感操作需要重新认证
- [ ] 实现最小权限原则
:::
:::{grid-item-card} 会话管理
- [ ] 使用安全的 session ID
- [ ] 设置合理的会话超时
- [ ] 登录后重新生成 session ID
- [ ] 注销时销毁服务端 session
:::
:::{grid-item-card} 传输安全
- [ ] 全站 HTTPS
- [ ] 设置 HSTS 头
- [ ] Cookie 设置 Secure 和 HttpOnly
- [ ] 敏感数据不在 URL 中传输
:::
::::