# 第七章:OAuth 2.0 授权框架 > "OAuth 2.0 不是认证协议,它是授权框架。这个区别至关重要。" ```{mermaid} mindmap root((OAuth 2.0)) 四个角色 Resource Owner Client Auth Server Resource Server 授权模式 Authorization Code Client Credentials Device Code PKCE Token Access Token Refresh Token Scope 安全 PKCE Token 存储 OAuth 2.1 ``` ## 7.1 OAuth 2.0 解决什么问题 OAuth 2.0 解决的核心问题是**委托授权**:允许第三方应用在不获取用户密码的情况下,访问用户在另一个服务上的资源。 ``` 没有 OAuth 的世界: 用户:"这是我的 GitHub 密码,请帮我读取仓库列表" 第三方应用:拿到密码后可以做任何事情 😱 有了 OAuth: 用户:"我授权这个应用只读取我的仓库列表" 第三方应用:只能做用户授权的事情 ✅ ``` ## 7.2 四个角色 | 角色 | 说明 | 示例 | |------|------|------| | Resource Owner | 资源拥有者(通常是用户) | GitHub 用户 | | Client | 第三方应用 | CI/CD 工具 | | Authorization Server | 授权服务器 | GitHub OAuth | | Resource Server | 资源服务器 | GitHub API | ## 7.3 授权码模式(Authorization Code) 最安全、最推荐的模式: ``` ┌──────────┐ ┌──────────────┐ │ │ 1. 重定向到授权端点 │ │ │ 用户 │─────────────────────────────▶│ 授权服务器 │ │ (浏览器) │ │ │ │ │ 2. 用户登录并授权 │ │ │ │◀─────────────────────────────│ │ │ │ 3. 重定向回 Client │ │ │ │ (携带 authorization_code) │ │ └────┬─────┘ └──────┬───────┘ │ │ │ 4. 将 code 发送给 Client │ ▼ │ ┌──────────┐ │ │ │ 5. 用 code 换取 token │ │ Client │────────────────────────────────────▶│ │ (后端) │ │ │ │ 6. 返回 access_token + refresh_token│ │ │◀────────────────────────────────────│ │ │ │ │ │ 7. 用 access_token 访问资源 │ │ │────────────────────────────────────▶ Resource Server └──────────┘ ``` ### PKCE(Proof Key for Code Exchange) PKCE 防止授权码被拦截: ```python import hashlib import base64 import secrets # 1. Client 生成 code_verifier(随机字符串) code_verifier = secrets.token_urlsafe(32) # 2. 计算 code_challenge code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).rstrip(b'=').decode() # 3. 授权请求携带 code_challenge auth_url = ( f"https://auth.example.com/authorize?" f"response_type=code&" f"client_id=my_app&" f"redirect_uri=https://myapp.com/callback&" f"scope=read:repos&" f"code_challenge={code_challenge}&" f"code_challenge_method=S256" ) # 4. Token 请求携带 code_verifier token_request = { "grant_type": "authorization_code", "code": "received_auth_code", "redirect_uri": "https://myapp.com/callback", "client_id": "my_app", "code_verifier": code_verifier, # 授权服务器验证这个 } ``` ## 7.4 客户端凭证模式(Client Credentials) 适用于**机器对机器**(M2M)通信,没有用户参与: ```python import httpx async def get_m2m_token(): """机器对机器认证""" async with httpx.AsyncClient() as client: response = await client.post( "https://auth.example.com/oauth/token", data={ "grant_type": "client_credentials", "client_id": "service-a", "client_secret": "secret", "scope": "read:data write:data", } ) token_data = response.json() return token_data["access_token"] ``` ## 7.5 设备码模式(Device Code) 适用于输入受限的设备(智能电视、CLI 工具): ``` ┌──────────┐ ┌──────────────┐ │ 设备 │ 1. 请求 device_code │ 授权服务器 │ │ (无浏览器)│────────────────────────▶│ │ │ │ 2. 返回 device_code + │ │ │ │ user_code + │ │ │ │ verification_uri │ │ │ │◀────────────────────────│ │ │ │ │ │ │ 显示: │ │ │ │ 请访问 │ 3. 轮询 token 端点 │ │ │ xxx.com │────────────────────────▶│ │ │ 输入代码 │ (等待用户授权) │ │ │ ABCD-1234│ │ │ │ │ 4. 用户在手机上授权后 │ │ │ │ 返回 access_token │ │ │ │◀────────────────────────│ │ └──────────┘ └──────────────┘ ``` ## 7.6 Token 安全 ### Access Token vs Refresh Token | 特性 | Access Token | Refresh Token | |------|-------------|---------------| | 用途 | 访问资源 | 获取新的 Access Token | | 生命周期 | 短(15分钟-1小时) | 长(天-月) | | 存储位置 | 内存 | 安全存储(HttpOnly Cookie) | | 传输方式 | Authorization Header | Token 端点 | | 泄露影响 | 有限(短期) | 严重(可持续获取访问) | ### Token 存储最佳实践 ``` ┌─────────────────────────────────────────────┐ │ 存储方式 │ 安全性 │ 推荐场景 │ ├───────────────────┼────────┼───────────────┤ │ HttpOnly Cookie │ ✅ 高 │ Web 应用首选 │ │ 内存(变量) │ ✅ 高 │ SPA 应用 │ │ SessionStorage │ ⚠️ 中 │ 可接受 │ │ LocalStorage │ ❌ 低 │ 不推荐 │ │ URL 参数 │ ❌ 极低 │ 禁止 │ └─────────────────────────────────────────────┘ ``` ## 7.7 OAuth 2.1 OAuth 2.1 是 OAuth 2.0 的整合更新,主要变化: - ✅ **PKCE 成为必须**(所有公开客户端) - ❌ **移除 Implicit 模式**(不安全) - ❌ **移除 Password 模式**(不安全) - ✅ **Refresh Token 轮换**(每次使用后失效旧的) - ✅ **精确的 redirect_uri 匹配**(禁止通配符) ## 7.8 FastAPI OAuth2 实现 ```python from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from datetime import datetime, timedelta, timezone app = FastAPI() SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception return {"username": username, "roles": payload.get("roles", [])} except JWTError: raise credentials_exception @app.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 验证用户凭证(简化示例) user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException(status_code=400, detail="Incorrect credentials") access_token = create_access_token( data={"sub": user["username"], "roles": user["roles"]}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me") async def read_users_me(current_user: dict = Depends(get_current_user)): return current_user ``` ## 7.9 小结 - OAuth 2.0 是**授权框架**,不是认证协议 - **授权码 + PKCE** 是最安全的模式,OAuth 2.1 将其设为必须 - **Client Credentials** 适合机器对机器通信 - Token 应存储在安全位置,Access Token 保持短生命周期 - OAuth 2.1 移除了不安全的 Implicit 和 Password 模式