3.4. Web 安全

Web 应用安全是开发中最重要的考量之一。本节介绍常见的安全问题和防护措施。

3.4.1. 输入验证

3.4.1.1. SQL 注入防护

# ❌ 危险:字符串拼接 SQL
def get_user_unsafe(username):
    query = f"SELECT * FROM users WHERE username = '{username}'"
    # 如果 username = "'; DROP TABLE users; --"
    # 会执行恶意 SQL
    return db.execute(query)

# ✅ 安全:使用参数化查询
def get_user_safe(username):
    # SQLAlchemy ORM(自动参数化)
    user = User.query.filter_by(username=username).first()
    
    # SQLAlchemy Core
    from sqlalchemy import text
    result = db.execute(
        text("SELECT * FROM users WHERE username = :username"),
        {"username": username}
    )
    return result

# ✅ Pydantic 验证(FastAPI)
from pydantic import BaseModel, validator
import re

class UserQuery(BaseModel):
    username: str
    
    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Invalid username format')
        return v

3.4.1.2. XSS 防护

# ❌ 危险:直接输出用户输入
from flask import render_template_string

@app.route('/unsafe')
def unsafe():
    name = request.args.get('name', '')
    # 如果 name = "<script>alert('XSS')</script>"
    return f"<h1>Hello {name}</h1>"  # XSS!

# ✅ 安全:使用模板引擎自动转义
from flask import render_template

@app.route('/safe')
def safe():
    name = request.args.get('name', '')
    # Jinja2 默认转义 HTML
    return render_template('hello.html', name=name)

# ✅ 手动转义
from markupsafe import escape

@app.route('/escaped')
def escaped():
    name = request.args.get('name', '')
    return f"<h1>Hello {escape(name)}</h1>"

# ✅ Content Security Policy
from flask import make_response

@app.after_request
def add_security_headers(response):
    response.headers['Content-Security-Policy'] = (
        "default-src 'self'; "
        "script-src 'self'; "
        "style-src 'self' 'unsafe-inline'; "
        "img-src 'self' data:;"
    )
    return response

3.4.1.3. CSRF 防护

# Flask-WTF 自动处理 CSRF
from flask_wtf import FlaskForm, CSRFProtect
from wtforms import StringField, SubmitField

csrf = CSRFProtect(app)

class LoginForm(FlaskForm):
    username = StringField('Username')
    submit = SubmitField('Login')

# 模板中
# <form method="post">
#     {{ form.csrf_token }}
#     ...
# </form>

# API 的 CSRF Token
@app.route('/api/csrf-token')
def get_csrf_token():
    from flask_wtf.csrf import generate_csrf
    return {'csrf_token': generate_csrf()}

# 在 AJAX 请求中使用
# headers: {'X-CSRF-Token': token}

3.4.2. 认证安全

3.4.2.1. 密码处理

from werkzeug.security import generate_password_hash, check_password_hash
import secrets

class User:
    def set_password(self, password):
        # 使用强哈希算法
        self.password_hash = generate_password_hash(
            password,
            method='pbkdf2:sha256',
            salt_length=16
        )
    
    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

# 密码强度验证
import re

def validate_password(password: str) -> list:
    """验证密码强度,返回错误列表"""
    errors = []
    
    if len(password) < 8:
        errors.append("Password must be at least 8 characters")
    if not re.search(r'[A-Z]', password):
        errors.append("Password must contain uppercase letter")
    if not re.search(r'[a-z]', password):
        errors.append("Password must contain lowercase letter")
    if not re.search(r'\d', password):
        errors.append("Password must contain digit")
    if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
        errors.append("Password must contain special character")
    
    return errors

3.4.2.2. JWT 安全

import jwt
from datetime import datetime, timedelta
import os

SECRET_KEY = os.environ.get('JWT_SECRET_KEY')  # 从环境变量读取

def create_token(user_id: int, expires_in: int = 3600) -> str:
    payload = {
        'user_id': user_id,
        'exp': datetime.utcnow() + timedelta(seconds=expires_in),
        'iat': datetime.utcnow(),
        'jti': secrets.token_urlsafe(16)  # 唯一标识,可用于撤销
    }
    return jwt.encode(payload, SECRET_KEY, algorithm='HS256')

def verify_token(token: str) -> dict:
    try:
        payload = jwt.decode(
            token, 
            SECRET_KEY, 
            algorithms=['HS256'],
            options={
                'require': ['exp', 'iat', 'user_id'],
                'verify_exp': True
            }
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise AuthError("Token expired")
    except jwt.InvalidTokenError:
        raise AuthError("Invalid token")

# ⚠️ 安全建议
# 1. 使用足够长的密钥(至少 256 位)
# 2. 设置合理的过期时间
# 3. 使用 HTTPS 传输
# 4. 考虑使用 refresh token 机制
# 5. 实现 token 撤销机制

3.4.2.3. Rate Limiting

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

@app.route("/api/login", methods=["POST"])
@limiter.limit("5 per minute")  # 登录接口更严格
def login():
    pass

@app.route("/api/data")
@limiter.limit("100 per minute")
def get_data():
    pass

# FastAPI 版本
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
    pass

3.4.3. 安全响应头

from flask import Flask
from werkzeug.middleware.proxy_fix import ProxyFix

app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)

@app.after_request
def add_security_headers(response):
    # 防止 XSS
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    
    # 强制 HTTPS
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    
    # 内容安全策略
    response.headers['Content-Security-Policy'] = "default-src 'self'"
    
    # 防止信息泄露
    response.headers['Server'] = ''
    
    return response

# FastAPI 使用中间件
from starlette.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["example.com", "*.example.com"]
)

3.4.4. 敏感数据处理

3.4.4.1. 日志脱敏

import logging
import re

class SensitiveDataFilter(logging.Filter):
    """过滤日志中的敏感数据"""
    
    patterns = [
        (re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'password=***'),
        (re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\'\s,}]+)'), 'token=***'),
        (re.compile(r'\b\d{16}\b'), '****'),  # 信用卡号
    ]
    
    def filter(self, record):
        if isinstance(record.msg, str):
            for pattern, replacement in self.patterns:
                record.msg = pattern.sub(replacement, record.msg)
        return True

# 应用过滤器
logger = logging.getLogger(__name__)
logger.addFilter(SensitiveDataFilter())

3.4.4.2. 响应数据过滤

from pydantic import BaseModel

class UserInternal(BaseModel):
    """内部使用的完整用户模型"""
    id: int
    email: str
    password_hash: str
    is_admin: bool
    internal_notes: str

class UserResponse(BaseModel):
    """API 响应用的用户模型"""
    id: int
    email: str
    # 不包含敏感字段
    
    class Config:
        orm_mode = True

@app.get("/users/{id}", response_model=UserResponse)
async def get_user(id: int):
    user = db.get_user(id)  # 返回 UserInternal
    return user  # 自动过滤为 UserResponse

3.4.5. 文件上传安全

import os
from werkzeug.utils import secure_filename
import magic  # python-magic

ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024  # 16MB

def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

def validate_file_content(file_stream):
    """验证文件内容类型"""
    header = file_stream.read(2048)
    file_stream.seek(0)
    
    mime = magic.from_buffer(header, mime=True)
    allowed_mimes = {
        'image/png', 'image/jpeg', 'image/gif', 'application/pdf'
    }
    return mime in allowed_mimes

@app.route('/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file'}), 400
    
    file = request.files['file']
    
    if not file.filename:
        return jsonify({'error': 'No filename'}), 400
    
    if not allowed_file(file.filename):
        return jsonify({'error': 'File type not allowed'}), 400
    
    if not validate_file_content(file.stream):
        return jsonify({'error': 'Invalid file content'}), 400
    
    # 使用安全的文件名
    filename = secure_filename(file.filename)
    
    # 存储在 web 根目录之外
    upload_path = os.path.join('/var/uploads', filename)
    file.save(upload_path)
    
    return jsonify({'message': 'Uploaded successfully'})

3.4.6. 安全检查清单

输入处理
  • 所有用户输入都经过验证

  • 使用参数化查询防止 SQL 注入

  • 输出编码防止 XSS

  • 文件上传验证类型和大小

认证授权
  • 密码使用强哈希存储

  • 实现账户锁定机制

  • 敏感操作需要重新认证

  • 实现最小权限原则

会话管理
  • 使用安全的 session ID

  • 设置合理的会话超时

  • 登录后重新生成 session ID

  • 注销时销毁服务端 session

传输安全
  • 全站 HTTPS

  • 设置 HSTS 头

  • Cookie 设置 Secure 和 HttpOnly

  • 敏感数据不在 URL 中传输