第七章:OAuth 2.0 授权框架

“OAuth 2.0 不是认证协议,它是授权框架。这个区别至关重要。”

        mindmap
  root((OAuth 2.0))
    四个角色
      Resource Owner
      Client
      Auth Server
      Resource Server
    授权模式
      Authorization Code
      Client Credentials
      Device Code
      PKCE
    Token
      Access Token
      Refresh Token
      Scope
    安全
      PKCE
      Token 存储
      OAuth 2.1
    

7.1 OAuth 2.0 解决什么问题

OAuth 2.0 解决的核心问题是委托授权:允许第三方应用在不获取用户密码的情况下,访问用户在另一个服务上的资源。

没有 OAuth 的世界:
用户:"这是我的 GitHub 密码,请帮我读取仓库列表"
第三方应用:拿到密码后可以做任何事情 😱

有了 OAuth:
用户:"我授权这个应用只读取我的仓库列表"
第三方应用:只能做用户授权的事情 ✅

7.2 四个角色

角色

说明

示例

Resource Owner

资源拥有者(通常是用户)

GitHub 用户

Client

第三方应用

CI/CD 工具

Authorization Server

授权服务器

GitHub OAuth

Resource Server

资源服务器

GitHub API

7.3 授权码模式(Authorization Code)

最安全、最推荐的模式:

┌──────────┐                              ┌──────────────┐
│          │  1. 重定向到授权端点           │              │
│  用户     │─────────────────────────────▶│  授权服务器    │
│ (浏览器)  │                              │              │
│          │  2. 用户登录并授权             │              │
│          │◀─────────────────────────────│              │
│          │  3. 重定向回 Client           │              │
│          │     (携带 authorization_code) │              │
└────┬─────┘                              └──────┬───────┘
     │                                           │
     │ 4. 将 code 发送给 Client                   │
     ▼                                           │
┌──────────┐                                     │
│          │  5. 用 code 换取 token               │
│  Client   │────────────────────────────────────▶│
│ (后端)    │                                     │
│          │  6. 返回 access_token + refresh_token│
│          │◀────────────────────────────────────│
│          │                                     │
│          │  7. 用 access_token 访问资源         │
│          │────────────────────────────────────▶ Resource Server
└──────────┘

PKCE(Proof Key for Code Exchange)

PKCE 防止授权码被拦截:

import hashlib
import base64
import secrets

# 1. Client 生成 code_verifier(随机字符串)
code_verifier = secrets.token_urlsafe(32)

# 2. 计算 code_challenge
code_challenge = base64.urlsafe_b64encode(
    hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()

# 3. 授权请求携带 code_challenge
auth_url = (
    f"https://auth.example.com/authorize?"
    f"response_type=code&"
    f"client_id=my_app&"
    f"redirect_uri=https://myapp.com/callback&"
    f"scope=read:repos&"
    f"code_challenge={code_challenge}&"
    f"code_challenge_method=S256"
)

# 4. Token 请求携带 code_verifier
token_request = {
    "grant_type": "authorization_code",
    "code": "received_auth_code",
    "redirect_uri": "https://myapp.com/callback",
    "client_id": "my_app",
    "code_verifier": code_verifier,  # 授权服务器验证这个
}

7.4 客户端凭证模式(Client Credentials)

适用于机器对机器(M2M)通信,没有用户参与:

import httpx

async def get_m2m_token():
    """机器对机器认证"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://auth.example.com/oauth/token",
            data={
                "grant_type": "client_credentials",
                "client_id": "service-a",
                "client_secret": "secret",
                "scope": "read:data write:data",
            }
        )
        token_data = response.json()
        return token_data["access_token"]

7.5 设备码模式(Device Code)

适用于输入受限的设备(智能电视、CLI 工具):

┌──────────┐                         ┌──────────────┐
│  设备     │  1. 请求 device_code    │  授权服务器    │
│ (无浏览器)│────────────────────────▶│              │
│          │  2. 返回 device_code +  │              │
│          │     user_code +         │              │
│          │     verification_uri    │              │
│          │◀────────────────────────│              │
│          │                         │              │
│  显示:   │                         │              │
│  请访问   │  3. 轮询 token 端点     │              │
│  xxx.com  │────────────────────────▶│              │
│  输入代码 │  (等待用户授权)          │              │
│  ABCD-1234│                         │              │
│          │  4. 用户在手机上授权后    │              │
│          │     返回 access_token   │              │
│          │◀────────────────────────│              │
└──────────┘                         └──────────────┘

7.6 Token 安全

Access Token vs Refresh Token

特性

Access Token

Refresh Token

用途

访问资源

获取新的 Access Token

生命周期

短(15分钟-1小时)

长(天-月)

存储位置

内存

安全存储(HttpOnly Cookie)

传输方式

Authorization Header

Token 端点

泄露影响

有限(短期)

严重(可持续获取访问)

Token 存储最佳实践

┌─────────────────────────────────────────────┐
│  存储方式          │ 安全性 │ 推荐场景       │
├───────────────────┼────────┼───────────────┤
│  HttpOnly Cookie  │  ✅ 高  │ Web 应用首选   │
│  内存(变量)      │  ✅ 高  │ SPA 应用      │
│  SessionStorage   │  ⚠️ 中  │ 可接受        │
│  LocalStorage     │  ❌ 低  │ 不推荐        │
│  URL 参数         │  ❌ 极低 │ 禁止          │
└─────────────────────────────────────────────┘

7.7 OAuth 2.1

OAuth 2.1 是 OAuth 2.0 的整合更新,主要变化:

  • PKCE 成为必须(所有公开客户端)

  • 移除 Implicit 模式(不安全)

  • 移除 Password 模式(不安全)

  • Refresh Token 轮换(每次使用后失效旧的)

  • 精确的 redirect_uri 匹配(禁止通配符)

7.8 FastAPI OAuth2 实现

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone

app = FastAPI()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        return {"username": username, "roles": payload.get("roles", [])}
    except JWTError:
        raise credentials_exception

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # 验证用户凭证(简化示例)
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect credentials")
    access_token = create_access_token(
        data={"sub": user["username"], "roles": user["roles"]},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
    return current_user

7.9 小结

  • OAuth 2.0 是授权框架,不是认证协议

  • 授权码 + PKCE 是最安全的模式,OAuth 2.1 将其设为必须

  • Client Credentials 适合机器对机器通信

  • Token 应存储在安全位置,Access Token 保持短生命周期

  • OAuth 2.1 移除了不安全的 Implicit 和 Password 模式