第十八章:授权系统实战 — OpenFGA + FastAPI

“纸上得来终觉浅,绝知此事要躬行。”

        mindmap
  root((授权实战))
    需求分析
      用户
      组织
      团队
      文档
    OpenFGA 模型
      DSL 设计
      关系元组
      权限继承
    FastAPI 集成
      SDK 初始化
      授权中间件
      API 端点
    测试与部署
      单元测试
      集成测试
      Docker Compose
    

18.1 项目目标

构建一个文档管理系统的完整授权方案,支持:

  • 用户、组织、团队、文档四种实体

  • 文档的查看、编辑、删除、分享权限

  • 组织管理员和团队成员的权限继承

  • 细粒度的资源级访问控制

18.2 OpenFGA 授权模型

model
  schema 1.1

type user

type organization
  relations
    define admin: [user]
    define member: [user] or admin

type team
  relations
    define member: [user]
    define parent: [organization]
    define can_manage: admin from parent

type folder
  relations
    define owner: [user]
    define editor: [user, team#member, organization#member]
    define viewer: [user, team#member, organization#member] or editor or owner
    define parent: [folder]
    define can_edit: editor or owner or can_edit from parent
    define can_view: viewer or can_edit or can_view from parent

type document
  relations
    define owner: [user]
    define editor: [user, team#member]
    define viewer: [user, team#member, organization#member] or editor or owner
    define parent: [folder]
    define can_edit: editor or owner or can_edit from parent
    define can_view: viewer or can_edit or can_view from parent
    define can_delete: owner
    define can_share: owner

18.3 项目结构

doc-authz/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI 应用入口
│   ├── auth.py              # JWT 认证
│   ├── authz.py             # OpenFGA 授权
│   ├── models.py            # 数据模型
│   ├── routers/
│   │   ├── documents.py     # 文档 API
│   │   ├── organizations.py # 组织 API
│   │   └── permissions.py   # 权限管理 API
│   └── config.py            # 配置
├── tests/
│   ├── test_authz.py        # 授权测试
│   └── test_api.py          # API 测试
├── openfga/
│   └── model.fga            # 授权模型
├── docker-compose.yml
└── requirements.txt

18.4 核心代码实现

授权客户端(authz.py)

from openfga_sdk import ClientConfiguration, OpenFgaClient
from openfga_sdk.client.models import (
    ClientWriteRequest, ClientTupleKey,
    ClientCheckRequest, ClientListObjectsRequest,
)
from fastapi import HTTPException
from functools import lru_cache

class AuthzService:
    def __init__(self, api_url: str, store_id: str, model_id: str):
        self.config = ClientConfiguration(
            api_url=api_url,
            store_id=store_id,
            authorization_model_id=model_id,
        )
        self._client = None

    async def get_client(self) -> OpenFgaClient:
        if self._client is None:
            self._client = OpenFgaClient(self.config)
        return self._client

    async def check(self, user_id: str, relation: str, object_id: str) -> bool:
        """检查用户是否有权限"""
        client = await self.get_client()
        result = await client.check(ClientCheckRequest(
            user=f"user:{user_id}",
            relation=relation,
            object=object_id,
        ))
        return result.allowed

    async def grant(self, user: str, relation: str, object_id: str):
        """授予权限"""
        client = await self.get_client()
        await client.write(ClientWriteRequest(
            writes=[ClientTupleKey(user=user, relation=relation, object=object_id)]
        ))

    async def revoke(self, user: str, relation: str, object_id: str):
        """撤销权限"""
        client = await self.get_client()
        await client.write(ClientWriteRequest(
            deletes=[ClientTupleKey(user=user, relation=relation, object=object_id)]
        ))

    async def list_objects(self, user_id: str, relation: str, type: str) -> list[str]:
        """列出用户有权限的对象"""
        client = await self.get_client()
        result = await client.list_objects(ClientListObjectsRequest(
            user=f"user:{user_id}",
            relation=relation,
            type=type,
        ))
        return result.objects

    async def require(self, user_id: str, relation: str, object_id: str):
        """要求权限,无权限则抛 403"""
        allowed = await self.check(user_id, relation, object_id)
        if not allowed:
            raise HTTPException(
                status_code=403,
                detail=f"User {user_id} does not have {relation} on {object_id}"
            )

# 全局实例
authz = AuthzService(
    api_url="http://localhost:8080",
    store_id="your-store-id",
    model_id="your-model-id",
)

文档 API(routers/documents.py)

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from ..auth import get_current_user, User
from ..authz import authz

router = APIRouter(prefix="/api/documents", tags=["documents"])

class CreateDocumentRequest(BaseModel):
    title: str
    content: str
    folder_id: str | None = None

class ShareDocumentRequest(BaseModel):
    user_id: str
    relation: str  # "viewer" | "editor"

@router.post("/")
async def create_document(req: CreateDocumentRequest, user: User = Depends(get_current_user)):
    """创建文档"""
    doc_id = generate_id()
    
    # 保存文档到数据库
    doc = save_document(doc_id, req.title, req.content, user.id)
    
    # 设置权限:创建者是 owner
    await authz.grant(f"user:{user.id}", "owner", f"document:{doc_id}")
    
    # 如果指定了文件夹,设置父级关系
    if req.folder_id:
        await authz.grant(f"folder:{req.folder_id}", "parent", f"document:{doc_id}")
    
    return {"id": doc_id, "title": req.title}

@router.get("/{doc_id}")
async def get_document(doc_id: str, user: User = Depends(get_current_user)):
    """查看文档(需要 can_view 权限)"""
    await authz.require(user.id, "can_view", f"document:{doc_id}")
    doc = get_document_from_db(doc_id)
    return doc

@router.put("/{doc_id}")
async def update_document(doc_id: str, req: CreateDocumentRequest, user: User = Depends(get_current_user)):
    """编辑文档(需要 can_edit 权限)"""
    await authz.require(user.id, "can_edit", f"document:{doc_id}")
    doc = update_document_in_db(doc_id, req.title, req.content)
    return doc

@router.delete("/{doc_id}")
async def delete_document(doc_id: str, user: User = Depends(get_current_user)):
    """删除文档(需要 can_delete 权限,仅 owner)"""
    await authz.require(user.id, "can_delete", f"document:{doc_id}")
    delete_document_from_db(doc_id)
    # 清理权限关系
    await authz.revoke(f"user:{user.id}", "owner", f"document:{doc_id}")
    return {"message": "Document deleted"}

@router.post("/{doc_id}/share")
async def share_document(doc_id: str, req: ShareDocumentRequest, user: User = Depends(get_current_user)):
    """分享文档(需要 can_share 权限,仅 owner)"""
    await authz.require(user.id, "can_share", f"document:{doc_id}")
    
    if req.relation not in ("viewer", "editor"):
        raise HTTPException(400, "Invalid relation")
    
    await authz.grant(f"user:{req.user_id}", req.relation, f"document:{doc_id}")
    return {"message": f"Shared with {req.user_id} as {req.relation}"}

@router.get("/")
async def list_my_documents(user: User = Depends(get_current_user)):
    """列出我可以查看的所有文档"""
    objects = await authz.list_objects(user.id, "can_view", "document")
    return {"documents": objects}

18.5 测试

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_document_permissions():
    """测试文档权限流程"""
    # Alice 创建文档
    async with AsyncClient(app=app, base_url="http://test") as client:
        # 创建文档
        resp = await client.post("/api/documents/", 
            json={"title": "Budget", "content": "..."},
            headers={"Authorization": f"Bearer {alice_token}"}
        )
        doc_id = resp.json()["id"]
        
        # Alice(owner)可以查看
        resp = await client.get(f"/api/documents/{doc_id}",
            headers={"Authorization": f"Bearer {alice_token}"})
        assert resp.status_code == 200
        
        # Bob 不能查看
        resp = await client.get(f"/api/documents/{doc_id}",
            headers={"Authorization": f"Bearer {bob_token}"})
        assert resp.status_code == 403
        
        # Alice 分享给 Bob(viewer)
        resp = await client.post(f"/api/documents/{doc_id}/share",
            json={"user_id": "bob", "relation": "viewer"},
            headers={"Authorization": f"Bearer {alice_token}"})
        assert resp.status_code == 200
        
        # Bob 现在可以查看
        resp = await client.get(f"/api/documents/{doc_id}",
            headers={"Authorization": f"Bearer {bob_token}"})
        assert resp.status_code == 200
        
        # Bob 不能编辑
        resp = await client.put(f"/api/documents/{doc_id}",
            json={"title": "Hacked", "content": "..."},
            headers={"Authorization": f"Bearer {bob_token}"})
        assert resp.status_code == 403
        
        # Bob 不能删除
        resp = await client.delete(f"/api/documents/{doc_id}",
            headers={"Authorization": f"Bearer {bob_token}"})
        assert resp.status_code == 403

18.6 Docker Compose 部署

services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: docapp
      POSTGRES_USER: docapp
      POSTGRES_PASSWORD: docapp_password

  openfga:
    image: openfga/openfga:latest
    command: run
    environment:
      OPENFGA_DATASTORE_ENGINE: postgres
      OPENFGA_DATASTORE_URI: postgres://docapp:docapp_password@postgres:5432/docapp?sslmode=disable
    ports:
      - "8080:8080"
      - "3000:3000"
    depends_on:
      - postgres

  app:
    build: .
    environment:
      DATABASE_URL: postgresql://docapp:docapp_password@postgres:5432/docapp
      OPENFGA_API_URL: http://openfga:8080
      JWT_SECRET: your-jwt-secret
    ports:
      - "8000:8000"
    depends_on:
      - postgres
      - openfga

18.7 小结

  • 通过 OpenFGA + FastAPI 构建了完整的文档管理系统授权

  • 授权模型支持用户、组织、团队、文档四种实体的权限继承

  • AuthzService 封装了 OpenFGA SDK,提供简洁的 check/grant/revoke API

  • 每个 API 端点通过 authz.require() 进行细粒度权限检查

  • 完整的测试覆盖了权限的授予、检查、继承和撤销

  • Docker Compose 一键部署整个系统