Tutorial 8: 多 Agent 协作

多 Agent 系统概述

多 Agent 系统让多个专业化的 Agent 协同工作,各司其职:

┌─────────────────────────────────────────────────────────────┐
│                    Multi-Agent System                        │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                   Supervisor Agent                    │   │
│  │              (协调者:分配任务、汇总结果)               │   │
│  └──────────────────────────┬───────────────────────────┘   │
│                             │                                │
│         ┌───────────────────┼───────────────────┐           │
│         │                   │                   │           │
│         ▼                   ▼                   ▼           │
│  ┌────────────┐     ┌────────────┐     ┌────────────┐      │
│  │ Researcher │     │   Writer   │     │  Reviewer  │      │
│  │  研究员    │     │   写手     │     │   审核员   │      │
│  └────────────┘     └────────────┘     └────────────┘      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

多 Agent 模式

1. Supervisor 模式

一个主管 Agent 协调多个工作 Agent:

from typing import TypedDict, Literal, List
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

class TeamState(TypedDict):
    task: str
    current_agent: str
    results: dict
    final_output: str

# Supervisor 决定下一个执行的 Agent
def supervisor(state: TeamState) -> dict:
    llm = ChatOpenAI(model="gpt-4o-mini")

    response = llm.invoke(f"""
任务: {state['task']}
已完成: {list(state['results'].keys())}

决定下一步由谁执行:
- researcher: 需要研究信息
- writer: 需要写内容
- reviewer: 需要审核
- FINISH: 任务完成

只输出一个词。
""")

    return {"current_agent": response.content.strip().lower()}

def route_to_agent(state: TeamState) -> str:
    return state["current_agent"]

2. 协作模式

Agent 之间直接传递工作:

# Agent A 完成后传给 Agent B
graph.add_edge("agent_a", "agent_b")
graph.add_edge("agent_b", "agent_c")

3. 竞争模式

多个 Agent 并行工作,选择最佳结果:

# 并行执行
graph.add_edge(START, "agent_1")
graph.add_edge(START, "agent_2")
graph.add_edge(START, "agent_3")

# 汇总选择
graph.add_edge("agent_1", "selector")
graph.add_edge("agent_2", "selector")
graph.add_edge("agent_3", "selector")

实战:自媒体内容团队

from typing import TypedDict, List, Literal, Optional, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# ========== 状态定义 ==========

class TeamState(TypedDict):
    # 任务信息
    topic: str
    platform: str
    requirements: str

    # Agent 输出
    research_output: Optional[str]
    outline_output: Optional[dict]
    draft_output: Optional[str]
    review_output: Optional[dict]
    final_output: Optional[str]

    # 协调状态
    next_agent: str
    iteration: int
    max_iterations: int

    # 通信日志
    messages: Annotated[List[str], add]

# ========== Agent 定义 ==========

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

class ResearcherAgent:
    """研究员 Agent:负责话题研究和资料收集"""

    name = "researcher"

    @staticmethod
    def run(state: TeamState) -> dict:
        topic = state["topic"]
        platform = state["platform"]

        prompt = ChatPromptTemplate.from_template("""
你是专业的内容研究员。

任务:研究话题「{topic}
目标平台:{platform}

请提供:
1. 话题背景和重要性
2. 目标受众分析
3. 竞品内容分析
4. 推荐的内容角度
5. 关键数据和案例

输出详细的研究报告。
""")

        chain = prompt | llm
        response = chain.invoke({"topic": topic, "platform": platform})

        return {
            "research_output": response.content,
            "messages": [f"[Researcher] 完成话题研究: {topic}"]
        }

class PlannerAgent:
    """策划 Agent:负责内容规划和大纲设计"""

    name = "planner"

    @staticmethod
    def run(state: TeamState) -> dict:
        research = state["research_output"]
        platform = state["platform"]

        prompt = ChatPromptTemplate.from_template("""
你是资深的内容策划师。

研究报告:
{research}

目标平台:{platform}

请创建详细的内容大纲:
1. 标题(吸引人、适合平台)
2. 开头钩子
3. 主体章节(3-5个)
4. 结尾和CTA

输出JSON格式:
{{"title": "标题", "hook": "钩子", "sections": [{{"heading": "章节", "points": ["要点"]}}], "cta": "行动号召"}}
""")

        chain = prompt | llm
        response = chain.invoke({"research": research, "platform": platform})

        import json
        try:
            outline = json.loads(response.content)
        except:
            outline = {"title": state["topic"], "sections": []}

        return {
            "outline_output": outline,
            "messages": [f"[Planner] 完成内容大纲: {outline.get('title', 'N/A')}"]
        }

class WriterAgent:
    """写手 Agent:负责内容创作"""

    name = "writer"

    @staticmethod
    def run(state: TeamState) -> dict:
        outline = state["outline_output"]
        research = state["research_output"]
        platform = state["platform"]

        prompt = ChatPromptTemplate.from_template("""
你是专业的内容写手。

大纲:
{outline}

研究资料:
{research}

目标平台:{platform}

请根据大纲撰写完整的文章:
- 开头要有吸引力
- 内容要有深度和价值
- 语言风格适合平台
- 结尾要有互动引导
""")

        chain = prompt | llm
        response = chain.invoke({
            "outline": str(outline),
            "research": research[:1000],  # 限制长度
            "platform": platform
        })

        return {
            "draft_output": response.content,
            "messages": [f"[Writer] 完成文章草稿"]
        }

class ReviewerAgent:
    """审核 Agent:负责内容审核和优化建议"""

    name = "reviewer"

    @staticmethod
    def run(state: TeamState) -> dict:
        draft = state["draft_output"]
        platform = state["platform"]

        prompt = ChatPromptTemplate.from_template("""
你是严格的内容审核专家。

文章内容:
{draft}

目标平台:{platform}

请审核并评分(0-100):
1. 内容质量
2. 结构清晰度
3. 平台适配度
4. 语言表达

输出格式:
总分:[数字]
通过:[是/否]
问题:[具体问题]
建议:[改进建议]
""")

        chain = prompt | llm
        response = chain.invoke({"draft": draft, "platform": platform})

        result = response.content
        passed = "是" in result and "通过:是" in result or int(''.join(filter(str.isdigit, result.split('\n')[0])) or 0) >= 80

        return {
            "review_output": {
                "result": result,
                "passed": passed
            },
            "messages": [f"[Reviewer] 审核完成: {'通过' if passed else '需修改'}"]
        }

class EditorAgent:
    """编辑 Agent:负责最终润色"""

    name = "editor"

    @staticmethod
    def run(state: TeamState) -> dict:
        draft = state["draft_output"]
        review = state["review_output"]

        suggestions = review.get("result", "") if review else ""

        prompt = ChatPromptTemplate.from_template("""
你是资深编辑。

原稿:
{draft}

审核意见:
{suggestions}

请进行最终润色:
1. 修正问题
2. 优化表达
3. 确保质量

输出最终版本。
""")

        chain = prompt | llm
        response = chain.invoke({"draft": draft, "suggestions": suggestions})

        return {
            "final_output": response.content,
            "messages": [f"[Editor] 完成最终润色"]
        }

# ========== Supervisor Agent ==========

class SupervisorAgent:
    """主管 Agent:协调团队工作"""

    @staticmethod
    def decide_next(state: TeamState) -> dict:
        """决定下一个执行的 Agent"""

        # 检查当前进度
        has_research = state.get("research_output") is not None
        has_outline = state.get("outline_output") is not None
        has_draft = state.get("draft_output") is not None
        has_review = state.get("review_output") is not None
        iteration = state.get("iteration", 0)
        max_iter = state.get("max_iterations", 3)

        # 决策逻辑
        if not has_research:
            next_agent = "researcher"
        elif not has_outline:
            next_agent = "planner"
        elif not has_draft:
            next_agent = "writer"
        elif not has_review:
            next_agent = "reviewer"
        elif has_review:
            review = state["review_output"]
            if review.get("passed") or iteration >= max_iter:
                next_agent = "editor"
            else:
                # 需要修改,回到写手
                next_agent = "writer"
        else:
            next_agent = "FINISH"

        return {
            "next_agent": next_agent,
            "iteration": iteration + 1,
            "messages": [f"[Supervisor] 下一步: {next_agent}"]
        }

# ========== 路由函数 ==========

def route_to_agent(state: TeamState) -> str:
    """路由到对应的 Agent"""
    next_agent = state["next_agent"]

    if next_agent == "FINISH":
        return "end"
    return next_agent

# ========== 构建图 ==========

def create_content_team():
    graph = StateGraph(TeamState)

    # 添加 Agent 节点
    graph.add_node("supervisor", SupervisorAgent.decide_next)
    graph.add_node("researcher", ResearcherAgent.run)
    graph.add_node("planner", PlannerAgent.run)
    graph.add_node("writer", WriterAgent.run)
    graph.add_node("reviewer", ReviewerAgent.run)
    graph.add_node("editor", EditorAgent.run)

    # 流程:从 Supervisor 开始
    graph.add_edge(START, "supervisor")

    # Supervisor 路由到各 Agent
    graph.add_conditional_edges(
        "supervisor",
        route_to_agent,
        {
            "researcher": "researcher",
            "planner": "planner",
            "writer": "writer",
            "reviewer": "reviewer",
            "editor": "editor",
            "end": END
        }
    )

    # 各 Agent 完成后回到 Supervisor
    graph.add_edge("researcher", "supervisor")
    graph.add_edge("planner", "supervisor")
    graph.add_edge("writer", "supervisor")
    graph.add_edge("reviewer", "supervisor")
    graph.add_edge("editor", END)

    return graph.compile()

# ========== 运行 ==========

def run_content_team():
    team = create_content_team()

    initial_state = {
        "topic": "AI Agent 开发入门",
        "platform": "微信公众号",
        "requirements": "深入浅出,适合初学者",
        "iteration": 0,
        "max_iterations": 2,
        "messages": []
    }

    print("=" * 60)
    print("自媒体内容团队开始工作")
    print("=" * 60)

    # 流式执行,观察每个 Agent 的工作
    for event in team.stream(initial_state):
        for node, output in event.items():
            if "messages" in output and output["messages"]:
                for msg in output["messages"]:
                    print(msg)

    # 获取最终结果
    result = team.invoke(initial_state)

    print("\n" + "=" * 60)
    print("最终输出")
    print("=" * 60)
    print(f"\n标题: {result['outline_output'].get('title', 'N/A')}")
    print(f"\n内容预览:\n{result['final_output'][:500]}...")

    return result

if __name__ == "__main__":
    run_content_team()

Agent 通信模式

共享状态通信

class State(TypedDict):
    # 共享的通信通道
    shared_memory: dict
    agent_outputs: dict

def agent_a(state):
    # 写入共享内存
    return {"shared_memory": {"agent_a_data": "..."}}

def agent_b(state):
    # 读取 Agent A 的数据
    a_data = state["shared_memory"].get("agent_a_data")
    # 处理...

消息传递通信

from typing import Annotated
from operator import add

class State(TypedDict):
    messages: Annotated[List[dict], add]

def agent_a(state):
    return {
        "messages": [{
            "from": "agent_a",
            "to": "agent_b",
            "content": "请处理这个任务",
            "data": {...}
        }]
    }

def agent_b(state):
    # 找到发给自己的消息
    my_messages = [m for m in state["messages"] if m["to"] == "agent_b"]
    # 处理...

最佳实践

  1. 明确职责分工

# ✅ 好:每个 Agent 职责清晰
ResearcherAgent  # 只负责研究
WriterAgent      # 只负责写作
ReviewerAgent    # 只负责审核

# ❌ 差:职责混乱
DoEverythingAgent  # 什么都做
  1. 设计清晰的通信协议

class AgentMessage(TypedDict):
    sender: str
    receiver: str
    type: str  # request, response, notification
    content: dict
    timestamp: str
  1. 处理 Agent 失败

def safe_agent_run(agent_func, state, max_retries=3):
    for i in range(max_retries):
        try:
            return agent_func(state)
        except Exception as e:
            if i == max_retries - 1:
                return {"error": str(e), "status": "failed"}
            time.sleep(1)

下一步

在下一个教程中,我们将构建完整的自媒体内容工作流。

Tutorial 9: 自媒体内容工作流