# 第十八章:授权系统实战 — OpenFGA + FastAPI > "纸上得来终觉浅,绝知此事要躬行。" ```{mermaid} mindmap root((授权实战)) 需求分析 用户 组织 团队 文档 OpenFGA 模型 DSL 设计 关系元组 权限继承 FastAPI 集成 SDK 初始化 授权中间件 API 端点 测试与部署 单元测试 集成测试 Docker Compose ``` ## 18.1 项目目标 构建一个文档管理系统的完整授权方案,支持: - 用户、组织、团队、文档四种实体 - 文档的查看、编辑、删除、分享权限 - 组织管理员和团队成员的权限继承 - 细粒度的资源级访问控制 ## 18.2 OpenFGA 授权模型 ``` model schema 1.1 type user type organization relations define admin: [user] define member: [user] or admin type team relations define member: [user] define parent: [organization] define can_manage: admin from parent type folder relations define owner: [user] define editor: [user, team#member, organization#member] define viewer: [user, team#member, organization#member] or editor or owner define parent: [folder] define can_edit: editor or owner or can_edit from parent define can_view: viewer or can_edit or can_view from parent type document relations define owner: [user] define editor: [user, team#member] define viewer: [user, team#member, organization#member] or editor or owner define parent: [folder] define can_edit: editor or owner or can_edit from parent define can_view: viewer or can_edit or can_view from parent define can_delete: owner define can_share: owner ``` ## 18.3 项目结构 ``` doc-authz/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 应用入口 │ ├── auth.py # JWT 认证 │ ├── authz.py # OpenFGA 授权 │ ├── models.py # 数据模型 │ ├── routers/ │ │ ├── documents.py # 文档 API │ │ ├── organizations.py # 组织 API │ │ └── permissions.py # 权限管理 API │ └── config.py # 配置 ├── tests/ │ ├── test_authz.py # 授权测试 │ └── test_api.py # API 测试 ├── openfga/ │ └── model.fga # 授权模型 ├── docker-compose.yml └── requirements.txt ``` ## 18.4 核心代码实现 ### 授权客户端(authz.py) ```python from openfga_sdk import ClientConfiguration, OpenFgaClient from openfga_sdk.client.models import ( ClientWriteRequest, ClientTupleKey, ClientCheckRequest, ClientListObjectsRequest, ) from fastapi import HTTPException from functools import lru_cache class AuthzService: def __init__(self, api_url: str, store_id: str, model_id: str): self.config = ClientConfiguration( api_url=api_url, store_id=store_id, authorization_model_id=model_id, ) self._client = None async def get_client(self) -> OpenFgaClient: if self._client is None: self._client = OpenFgaClient(self.config) return self._client async def check(self, user_id: str, relation: str, object_id: str) -> bool: """检查用户是否有权限""" client = await self.get_client() result = await client.check(ClientCheckRequest( user=f"user:{user_id}", relation=relation, object=object_id, )) return result.allowed async def grant(self, user: str, relation: str, object_id: str): """授予权限""" client = await self.get_client() await client.write(ClientWriteRequest( writes=[ClientTupleKey(user=user, relation=relation, object=object_id)] )) async def revoke(self, user: str, relation: str, object_id: str): """撤销权限""" client = await self.get_client() await client.write(ClientWriteRequest( deletes=[ClientTupleKey(user=user, relation=relation, object=object_id)] )) async def list_objects(self, user_id: str, relation: str, type: str) -> list[str]: """列出用户有权限的对象""" client = await self.get_client() result = await client.list_objects(ClientListObjectsRequest( user=f"user:{user_id}", relation=relation, type=type, )) return result.objects async def require(self, user_id: str, relation: str, object_id: str): """要求权限,无权限则抛 403""" allowed = await self.check(user_id, relation, object_id) if not allowed: raise HTTPException( status_code=403, detail=f"User {user_id} does not have {relation} on {object_id}" ) # 全局实例 authz = AuthzService( api_url="http://localhost:8080", store_id="your-store-id", model_id="your-model-id", ) ``` ### 文档 API(routers/documents.py) ```python from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from ..auth import get_current_user, User from ..authz import authz router = APIRouter(prefix="/api/documents", tags=["documents"]) class CreateDocumentRequest(BaseModel): title: str content: str folder_id: str | None = None class ShareDocumentRequest(BaseModel): user_id: str relation: str # "viewer" | "editor" @router.post("/") async def create_document(req: CreateDocumentRequest, user: User = Depends(get_current_user)): """创建文档""" doc_id = generate_id() # 保存文档到数据库 doc = save_document(doc_id, req.title, req.content, user.id) # 设置权限:创建者是 owner await authz.grant(f"user:{user.id}", "owner", f"document:{doc_id}") # 如果指定了文件夹,设置父级关系 if req.folder_id: await authz.grant(f"folder:{req.folder_id}", "parent", f"document:{doc_id}") return {"id": doc_id, "title": req.title} @router.get("/{doc_id}") async def get_document(doc_id: str, user: User = Depends(get_current_user)): """查看文档(需要 can_view 权限)""" await authz.require(user.id, "can_view", f"document:{doc_id}") doc = get_document_from_db(doc_id) return doc @router.put("/{doc_id}") async def update_document(doc_id: str, req: CreateDocumentRequest, user: User = Depends(get_current_user)): """编辑文档(需要 can_edit 权限)""" await authz.require(user.id, "can_edit", f"document:{doc_id}") doc = update_document_in_db(doc_id, req.title, req.content) return doc @router.delete("/{doc_id}") async def delete_document(doc_id: str, user: User = Depends(get_current_user)): """删除文档(需要 can_delete 权限,仅 owner)""" await authz.require(user.id, "can_delete", f"document:{doc_id}") delete_document_from_db(doc_id) # 清理权限关系 await authz.revoke(f"user:{user.id}", "owner", f"document:{doc_id}") return {"message": "Document deleted"} @router.post("/{doc_id}/share") async def share_document(doc_id: str, req: ShareDocumentRequest, user: User = Depends(get_current_user)): """分享文档(需要 can_share 权限,仅 owner)""" await authz.require(user.id, "can_share", f"document:{doc_id}") if req.relation not in ("viewer", "editor"): raise HTTPException(400, "Invalid relation") await authz.grant(f"user:{req.user_id}", req.relation, f"document:{doc_id}") return {"message": f"Shared with {req.user_id} as {req.relation}"} @router.get("/") async def list_my_documents(user: User = Depends(get_current_user)): """列出我可以查看的所有文档""" objects = await authz.list_objects(user.id, "can_view", "document") return {"documents": objects} ``` ## 18.5 测试 ```python import pytest from httpx import AsyncClient @pytest.mark.asyncio async def test_document_permissions(): """测试文档权限流程""" # Alice 创建文档 async with AsyncClient(app=app, base_url="http://test") as client: # 创建文档 resp = await client.post("/api/documents/", json={"title": "Budget", "content": "..."}, headers={"Authorization": f"Bearer {alice_token}"} ) doc_id = resp.json()["id"] # Alice(owner)可以查看 resp = await client.get(f"/api/documents/{doc_id}", headers={"Authorization": f"Bearer {alice_token}"}) assert resp.status_code == 200 # Bob 不能查看 resp = await client.get(f"/api/documents/{doc_id}", headers={"Authorization": f"Bearer {bob_token}"}) assert resp.status_code == 403 # Alice 分享给 Bob(viewer) resp = await client.post(f"/api/documents/{doc_id}/share", json={"user_id": "bob", "relation": "viewer"}, headers={"Authorization": f"Bearer {alice_token}"}) assert resp.status_code == 200 # Bob 现在可以查看 resp = await client.get(f"/api/documents/{doc_id}", headers={"Authorization": f"Bearer {bob_token}"}) assert resp.status_code == 200 # Bob 不能编辑 resp = await client.put(f"/api/documents/{doc_id}", json={"title": "Hacked", "content": "..."}, headers={"Authorization": f"Bearer {bob_token}"}) assert resp.status_code == 403 # Bob 不能删除 resp = await client.delete(f"/api/documents/{doc_id}", headers={"Authorization": f"Bearer {bob_token}"}) assert resp.status_code == 403 ``` ## 18.6 Docker Compose 部署 ```yaml services: postgres: image: postgres:16 environment: POSTGRES_DB: docapp POSTGRES_USER: docapp POSTGRES_PASSWORD: docapp_password openfga: image: openfga/openfga:latest command: run environment: OPENFGA_DATASTORE_ENGINE: postgres OPENFGA_DATASTORE_URI: postgres://docapp:docapp_password@postgres:5432/docapp?sslmode=disable ports: - "8080:8080" - "3000:3000" depends_on: - postgres app: build: . environment: DATABASE_URL: postgresql://docapp:docapp_password@postgres:5432/docapp OPENFGA_API_URL: http://openfga:8080 JWT_SECRET: your-jwt-secret ports: - "8000:8000" depends_on: - postgres - openfga ``` ## 18.7 小结 - 通过 OpenFGA + FastAPI 构建了完整的**文档管理系统授权** - 授权模型支持**用户、组织、团队、文档**四种实体的权限继承 - **AuthzService** 封装了 OpenFGA SDK,提供简洁的 check/grant/revoke API - 每个 API 端点通过 `authz.require()` 进行细粒度权限检查 - 完整的测试覆盖了权限的授予、检查、继承和撤销 - Docker Compose 一键部署整个系统