Tutorial 8: 多 Agent 协作
多 Agent 系统概述
多 Agent 系统让多个专业化的 Agent 协同工作,各司其职:
┌─────────────────────────────────────────────────────────────┐
│ Multi-Agent System │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Supervisor Agent │ │
│ │ (协调者:分配任务、汇总结果) │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Researcher │ │ Writer │ │ Reviewer │ │
│ │ 研究员 │ │ 写手 │ │ 审核员 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
多 Agent 模式
1. Supervisor 模式
一个主管 Agent 协调多个工作 Agent:
from typing import TypedDict, Literal, List
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
class TeamState(TypedDict):
task: str
current_agent: str
results: dict
final_output: str
# Supervisor 决定下一个执行的 Agent
def supervisor(state: TeamState) -> dict:
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke(f"""
任务: {state['task']}
已完成: {list(state['results'].keys())}
决定下一步由谁执行:
- researcher: 需要研究信息
- writer: 需要写内容
- reviewer: 需要审核
- FINISH: 任务完成
只输出一个词。
""")
return {"current_agent": response.content.strip().lower()}
def route_to_agent(state: TeamState) -> str:
return state["current_agent"]
2. 协作模式
Agent 之间直接传递工作:
# Agent A 完成后传给 Agent B
graph.add_edge("agent_a", "agent_b")
graph.add_edge("agent_b", "agent_c")
3. 竞争模式
多个 Agent 并行工作,选择最佳结果:
# 并行执行
graph.add_edge(START, "agent_1")
graph.add_edge(START, "agent_2")
graph.add_edge(START, "agent_3")
# 汇总选择
graph.add_edge("agent_1", "selector")
graph.add_edge("agent_2", "selector")
graph.add_edge("agent_3", "selector")
实战:自媒体内容团队
from typing import TypedDict, List, Literal, Optional, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# ========== 状态定义 ==========
class TeamState(TypedDict):
# 任务信息
topic: str
platform: str
requirements: str
# Agent 输出
research_output: Optional[str]
outline_output: Optional[dict]
draft_output: Optional[str]
review_output: Optional[dict]
final_output: Optional[str]
# 协调状态
next_agent: str
iteration: int
max_iterations: int
# 通信日志
messages: Annotated[List[str], add]
# ========== Agent 定义 ==========
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
class ResearcherAgent:
"""研究员 Agent:负责话题研究和资料收集"""
name = "researcher"
@staticmethod
def run(state: TeamState) -> dict:
topic = state["topic"]
platform = state["platform"]
prompt = ChatPromptTemplate.from_template("""
你是专业的内容研究员。
任务:研究话题「{topic}」
目标平台:{platform}
请提供:
1. 话题背景和重要性
2. 目标受众分析
3. 竞品内容分析
4. 推荐的内容角度
5. 关键数据和案例
输出详细的研究报告。
""")
chain = prompt | llm
response = chain.invoke({"topic": topic, "platform": platform})
return {
"research_output": response.content,
"messages": [f"[Researcher] 完成话题研究: {topic}"]
}
class PlannerAgent:
"""策划 Agent:负责内容规划和大纲设计"""
name = "planner"
@staticmethod
def run(state: TeamState) -> dict:
research = state["research_output"]
platform = state["platform"]
prompt = ChatPromptTemplate.from_template("""
你是资深的内容策划师。
研究报告:
{research}
目标平台:{platform}
请创建详细的内容大纲:
1. 标题(吸引人、适合平台)
2. 开头钩子
3. 主体章节(3-5个)
4. 结尾和CTA
输出JSON格式:
{{"title": "标题", "hook": "钩子", "sections": [{{"heading": "章节", "points": ["要点"]}}], "cta": "行动号召"}}
""")
chain = prompt | llm
response = chain.invoke({"research": research, "platform": platform})
import json
try:
outline = json.loads(response.content)
except:
outline = {"title": state["topic"], "sections": []}
return {
"outline_output": outline,
"messages": [f"[Planner] 完成内容大纲: {outline.get('title', 'N/A')}"]
}
class WriterAgent:
"""写手 Agent:负责内容创作"""
name = "writer"
@staticmethod
def run(state: TeamState) -> dict:
outline = state["outline_output"]
research = state["research_output"]
platform = state["platform"]
prompt = ChatPromptTemplate.from_template("""
你是专业的内容写手。
大纲:
{outline}
研究资料:
{research}
目标平台:{platform}
请根据大纲撰写完整的文章:
- 开头要有吸引力
- 内容要有深度和价值
- 语言风格适合平台
- 结尾要有互动引导
""")
chain = prompt | llm
response = chain.invoke({
"outline": str(outline),
"research": research[:1000], # 限制长度
"platform": platform
})
return {
"draft_output": response.content,
"messages": [f"[Writer] 完成文章草稿"]
}
class ReviewerAgent:
"""审核 Agent:负责内容审核和优化建议"""
name = "reviewer"
@staticmethod
def run(state: TeamState) -> dict:
draft = state["draft_output"]
platform = state["platform"]
prompt = ChatPromptTemplate.from_template("""
你是严格的内容审核专家。
文章内容:
{draft}
目标平台:{platform}
请审核并评分(0-100):
1. 内容质量
2. 结构清晰度
3. 平台适配度
4. 语言表达
输出格式:
总分:[数字]
通过:[是/否]
问题:[具体问题]
建议:[改进建议]
""")
chain = prompt | llm
response = chain.invoke({"draft": draft, "platform": platform})
result = response.content
passed = "是" in result and "通过:是" in result or int(''.join(filter(str.isdigit, result.split('\n')[0])) or 0) >= 80
return {
"review_output": {
"result": result,
"passed": passed
},
"messages": [f"[Reviewer] 审核完成: {'通过' if passed else '需修改'}"]
}
class EditorAgent:
"""编辑 Agent:负责最终润色"""
name = "editor"
@staticmethod
def run(state: TeamState) -> dict:
draft = state["draft_output"]
review = state["review_output"]
suggestions = review.get("result", "") if review else ""
prompt = ChatPromptTemplate.from_template("""
你是资深编辑。
原稿:
{draft}
审核意见:
{suggestions}
请进行最终润色:
1. 修正问题
2. 优化表达
3. 确保质量
输出最终版本。
""")
chain = prompt | llm
response = chain.invoke({"draft": draft, "suggestions": suggestions})
return {
"final_output": response.content,
"messages": [f"[Editor] 完成最终润色"]
}
# ========== Supervisor Agent ==========
class SupervisorAgent:
"""主管 Agent:协调团队工作"""
@staticmethod
def decide_next(state: TeamState) -> dict:
"""决定下一个执行的 Agent"""
# 检查当前进度
has_research = state.get("research_output") is not None
has_outline = state.get("outline_output") is not None
has_draft = state.get("draft_output") is not None
has_review = state.get("review_output") is not None
iteration = state.get("iteration", 0)
max_iter = state.get("max_iterations", 3)
# 决策逻辑
if not has_research:
next_agent = "researcher"
elif not has_outline:
next_agent = "planner"
elif not has_draft:
next_agent = "writer"
elif not has_review:
next_agent = "reviewer"
elif has_review:
review = state["review_output"]
if review.get("passed") or iteration >= max_iter:
next_agent = "editor"
else:
# 需要修改,回到写手
next_agent = "writer"
else:
next_agent = "FINISH"
return {
"next_agent": next_agent,
"iteration": iteration + 1,
"messages": [f"[Supervisor] 下一步: {next_agent}"]
}
# ========== 路由函数 ==========
def route_to_agent(state: TeamState) -> str:
"""路由到对应的 Agent"""
next_agent = state["next_agent"]
if next_agent == "FINISH":
return "end"
return next_agent
# ========== 构建图 ==========
def create_content_team():
graph = StateGraph(TeamState)
# 添加 Agent 节点
graph.add_node("supervisor", SupervisorAgent.decide_next)
graph.add_node("researcher", ResearcherAgent.run)
graph.add_node("planner", PlannerAgent.run)
graph.add_node("writer", WriterAgent.run)
graph.add_node("reviewer", ReviewerAgent.run)
graph.add_node("editor", EditorAgent.run)
# 流程:从 Supervisor 开始
graph.add_edge(START, "supervisor")
# Supervisor 路由到各 Agent
graph.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"planner": "planner",
"writer": "writer",
"reviewer": "reviewer",
"editor": "editor",
"end": END
}
)
# 各 Agent 完成后回到 Supervisor
graph.add_edge("researcher", "supervisor")
graph.add_edge("planner", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
graph.add_edge("editor", END)
return graph.compile()
# ========== 运行 ==========
def run_content_team():
team = create_content_team()
initial_state = {
"topic": "AI Agent 开发入门",
"platform": "微信公众号",
"requirements": "深入浅出,适合初学者",
"iteration": 0,
"max_iterations": 2,
"messages": []
}
print("=" * 60)
print("自媒体内容团队开始工作")
print("=" * 60)
# 流式执行,观察每个 Agent 的工作
for event in team.stream(initial_state):
for node, output in event.items():
if "messages" in output and output["messages"]:
for msg in output["messages"]:
print(msg)
# 获取最终结果
result = team.invoke(initial_state)
print("\n" + "=" * 60)
print("最终输出")
print("=" * 60)
print(f"\n标题: {result['outline_output'].get('title', 'N/A')}")
print(f"\n内容预览:\n{result['final_output'][:500]}...")
return result
if __name__ == "__main__":
run_content_team()
Agent 通信模式
共享状态通信
class State(TypedDict):
# 共享的通信通道
shared_memory: dict
agent_outputs: dict
def agent_a(state):
# 写入共享内存
return {"shared_memory": {"agent_a_data": "..."}}
def agent_b(state):
# 读取 Agent A 的数据
a_data = state["shared_memory"].get("agent_a_data")
# 处理...
消息传递通信
from typing import Annotated
from operator import add
class State(TypedDict):
messages: Annotated[List[dict], add]
def agent_a(state):
return {
"messages": [{
"from": "agent_a",
"to": "agent_b",
"content": "请处理这个任务",
"data": {...}
}]
}
def agent_b(state):
# 找到发给自己的消息
my_messages = [m for m in state["messages"] if m["to"] == "agent_b"]
# 处理...
最佳实践
明确职责分工
# ✅ 好:每个 Agent 职责清晰
ResearcherAgent # 只负责研究
WriterAgent # 只负责写作
ReviewerAgent # 只负责审核
# ❌ 差:职责混乱
DoEverythingAgent # 什么都做
设计清晰的通信协议
class AgentMessage(TypedDict):
sender: str
receiver: str
type: str # request, response, notification
content: dict
timestamp: str
处理 Agent 失败
def safe_agent_run(agent_func, state, max_retries=3):
for i in range(max_retries):
try:
return agent_func(state)
except Exception as e:
if i == max_retries - 1:
return {"error": str(e), "status": "failed"}
time.sleep(1)
下一步
在下一个教程中,我们将构建完整的自媒体内容工作流。