Tutorial 9: 自媒体内容工作流

项目概述

本教程将整合前面所学的所有知识,构建一个完整的自媒体内容生产工作流:

┌─────────────────────────────────────────────────────────────────┐
│                  Complete Content Workflow                       │
│                                                                  │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐     │
│   │ 话题发现 │───►│ 内容策划 │───►│ 内容创作 │───►│ 质量审核 │     │
│   └─────────┘    └─────────┘    └─────────┘    └────┬────┘     │
│                                                      │          │
│                    ┌─────────────────────────────────┘          │
│                    │                                            │
│                    ▼                                            │
│              ┌──────────┐                                       │
│              │ 人工审核  │  ◄── Human-in-the-Loop               │
│              └────┬─────┘                                       │
│                   │                                             │
│         ┌─────────┴─────────┐                                   │
│         ▼                   ▼                                   │
│   ┌──────────┐        ┌──────────┐                             │
│   │ 平台适配  │        │   修改   │                             │
│   └────┬─────┘        └──────────┘                             │
│        │                                                        │
│        ▼                                                        │
│   ┌──────────┐                                                  │
│   │ 多平台发布 │                                                 │
│   └────┬─────┘                                                  │
│        │                                                        │
│        ▼                                                        │
│   ┌──────────┐                                                  │
│   │ 数据追踪  │                                                  │
│   └──────────┘                                                  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

完整实现

状态定义

from typing import TypedDict, List, Dict, Optional, Annotated, Literal
from operator import add
from datetime import datetime
from enum import Enum

class WorkflowStage(str, Enum):
    INIT = "init"
    DISCOVERY = "discovery"
    PLANNING = "planning"
    WRITING = "writing"
    AI_REVIEW = "ai_review"
    HUMAN_REVIEW = "human_review"
    ADAPTING = "adapting"
    PUBLISHING = "publishing"
    TRACKING = "tracking"
    COMPLETED = "completed"
    FAILED = "failed"

class ContentWorkflowState(TypedDict):
    # ===== 任务配置 =====
    task_id: str
    topic: str
    target_platforms: List[str]
    style: str
    requirements: Optional[str]

    # ===== 工作流控制 =====
    stage: WorkflowStage
    iteration: int
    max_iterations: int
    created_at: str
    updated_at: str

    # ===== 话题发现 =====
    trending_topics: List[dict]
    selected_topic: Optional[dict]
    topic_analysis: Optional[str]

    # ===== 内容策划 =====
    content_angle: str
    outline: Optional[dict]
    title_candidates: List[str]
    selected_title: str

    # ===== 内容创作 =====
    draft_content: str
    word_count: int

    # ===== 质量审核 =====
    ai_review_result: Optional[dict]
    quality_score: int
    issues: List[str]

    # ===== 人工审核 =====
    human_review: Optional[dict]
    human_approved: bool
    human_feedback: str

    # ===== 平台适配 =====
    adapted_contents: Dict[str, dict]

    # ===== 发布结果 =====
    publish_results: List[dict]

    # ===== 日志 =====
    logs: Annotated[List[str], add]
    errors: List[str]

节点实现

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import json
import uuid

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)

def log(stage: str, message: str) -> dict:
    """生成日志"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    return {
        "stage": stage,
        "updated_at": timestamp,
        "logs": [f"[{timestamp}] [{stage}] {message}"]
    }

# ===== 话题发现节点 =====

def discover_topics(state: ContentWorkflowState) -> dict:
    """发现热门话题"""
    topic = state["topic"]

    response = llm.invoke(f"""
分析话题领域「{topic}」,找出5个当前热门的具体话题。

输出JSON格式:
[{{"topic": "话题名", "heat": 85, "reason": "热门原因"}}]
""")

    try:
        topics = json.loads(response.content)
    except:
        topics = [{"topic": topic, "heat": 80, "reason": "用户指定"}]

    return {
        **log(WorkflowStage.DISCOVERY, f"发现 {len(topics)} 个热门话题"),
        "trending_topics": topics,
        "selected_topic": topics[0] if topics else None
    }

# ===== 内容策划节点 =====

def plan_content(state: ContentWorkflowState) -> dict:
    """策划内容"""
    topic = state["selected_topic"]["topic"] if state["selected_topic"] else state["topic"]
    platforms = state["target_platforms"]
    style = state["style"]

    response = llm.invoke(f"""
为话题「{topic}」策划内容。
目标平台:{', '.join(platforms)}
风格:{style}

输出JSON格式:
{{
    "angle": "内容角度",
    "title": "推荐标题",
    "outline": {{
        "hook": "开头钩子",
        "sections": [{{"heading": "章节标题", "points": ["要点1", "要点2"]}}],
        "cta": "行动号召"
    }}
}}
""")

    try:
        plan = json.loads(response.content)
    except:
        plan = {"angle": "综合介绍", "title": topic, "outline": {}}

    return {
        **log(WorkflowStage.PLANNING, f"内容策划完成: {plan.get('title', 'N/A')}"),
        "content_angle": plan.get("angle", ""),
        "selected_title": plan.get("title", topic),
        "outline": plan.get("outline", {})
    }

# ===== 内容创作节点 =====

def write_content(state: ContentWorkflowState) -> dict:
    """撰写内容"""
    title = state["selected_title"]
    outline = state["outline"]
    style = state["style"]
    feedback = state.get("human_feedback", "")

    feedback_instruction = f"\n根据反馈进行修改:{feedback}" if feedback else ""

    response = llm.invoke(f"""
撰写文章。

标题:{title}
大纲:{json.dumps(outline, ensure_ascii=False)}
风格:{style}
{feedback_instruction}

要求:
1. 开头要有吸引力
2. 内容有深度和价值
3. 结构清晰
4. 字数1000-2000字
""")

    content = response.content
    word_count = len(content)

    return {
        **log(WorkflowStage.WRITING, f"内容撰写完成,字数: {word_count}"),
        "draft_content": content,
        "word_count": word_count
    }

# ===== AI 审核节点 =====

def ai_review(state: ContentWorkflowState) -> dict:
    """AI 质量审核"""
    content = state["draft_content"]

    response = llm.invoke(f"""
审核以下内容的质量。

内容:
{content[:2000]}

评估维度(每项0-25分,总分100):
1. 内容价值
2. 结构清晰度
3. 语言表达
4. 平台适配度

输出JSON格式:
{{
    "total_score": 85,
    "dimensions": {{"value": 22, "structure": 20, "language": 21, "adaptation": 22}},
    "issues": ["问题1", "问题2"],
    "passed": true
}}
""")

    try:
        review = json.loads(response.content)
    except:
        review = {"total_score": 75, "issues": [], "passed": True}

    return {
        **log(WorkflowStage.AI_REVIEW, f"AI审核完成,分数: {review.get('total_score', 'N/A')}"),
        "ai_review_result": review,
        "quality_score": review.get("total_score", 75),
        "issues": review.get("issues", [])
    }

# ===== 人工审核节点 =====

def human_review_node(state: ContentWorkflowState) -> dict:
    """人工审核(会在此处中断)"""
    human_review = state.get("human_review")

    if human_review:
        approved = human_review.get("approved", False)
        feedback = human_review.get("feedback", "")

        return {
            **log(WorkflowStage.HUMAN_REVIEW, f"人工审核: {'通过' if approved else '需修改'}"),
            "human_approved": approved,
            "human_feedback": feedback
        }

    return {
        **log(WorkflowStage.HUMAN_REVIEW, "等待人工审核..."),
    }

# ===== 平台适配节点 =====

def adapt_content(state: ContentWorkflowState) -> dict:
    """适配到各平台"""
    content = state["draft_content"]
    title = state["selected_title"]
    platforms = state["target_platforms"]

    adapted = {}

    for platform in platforms:
        response = llm.invoke(f"""
将以下内容适配到「{platform}」平台。

原标题:{title}
原内容:{content[:1500]}

根据平台特点调整:
- 标题风格
- 内容长度
- 语言风格
- 格式要求

输出JSON:
{{"title": "适配后标题", "content": "适配后内容", "tags": ["标签1", "标签2"]}}
""")

        try:
            adapted[platform] = json.loads(response.content)
        except:
            adapted[platform] = {"title": title, "content": content, "tags": []}

    return {
        **log(WorkflowStage.ADAPTING, f"已适配 {len(adapted)} 个平台"),
        "adapted_contents": adapted
    }

# ===== 发布节点 =====

def publish_content(state: ContentWorkflowState) -> dict:
    """发布到各平台"""
    adapted = state["adapted_contents"]
    results = []

    for platform, content in adapted.items():
        # 模拟发布
        result = {
            "platform": platform,
            "success": True,
            "post_id": str(uuid.uuid4())[:8],
            "url": f"https://{platform.lower().replace(' ', '')}.com/p/xxx",
            "published_at": datetime.now().isoformat()
        }
        results.append(result)

    success_count = sum(1 for r in results if r["success"])

    return {
        **log(WorkflowStage.PUBLISHING, f"发布完成: {success_count}/{len(results)} 成功"),
        "publish_results": results
    }

# ===== 数据追踪节点 =====

def track_performance(state: ContentWorkflowState) -> dict:
    """追踪发布效果"""
    results = state["publish_results"]

    # 模拟获取数据
    for result in results:
        result["stats"] = {
            "views": 0,
            "likes": 0,
            "comments": 0,
            "shares": 0
        }

    return {
        **log(WorkflowStage.TRACKING, "数据追踪已启动"),
        "publish_results": results,
        "stage": WorkflowStage.COMPLETED
    }

路由函数

def ai_review_router(state: ContentWorkflowState) -> Literal["human_review", "rewrite"]:
    """AI审核后的路由"""
    score = state.get("quality_score", 0)
    if score >= 70:
        return "human_review"
    return "rewrite"

def human_review_router(state: ContentWorkflowState) -> Literal["adapt", "rewrite", "wait"]:
    """人工审核后的路由"""
    human_review = state.get("human_review")

    if not human_review:
        return "wait"

    if human_review.get("approved"):
        return "adapt"
    return "rewrite"

构建完整工作流

def create_content_workflow():
    graph = StateGraph(ContentWorkflowState)

    # 添加所有节点
    graph.add_node("discover", discover_topics)
    graph.add_node("plan", plan_content)
    graph.add_node("write", write_content)
    graph.add_node("ai_review", ai_review)
    graph.add_node("human_review", human_review_node)
    graph.add_node("adapt", adapt_content)
    graph.add_node("publish", publish_content)
    graph.add_node("track", track_performance)

    # 主流程
    graph.add_edge(START, "discover")
    graph.add_edge("discover", "plan")
    graph.add_edge("plan", "write")
    graph.add_edge("write", "ai_review")

    # AI 审核后路由
    graph.add_conditional_edges(
        "ai_review",
        ai_review_router,
        {
            "human_review": "human_review",
            "rewrite": "write"
        }
    )

    # 人工审核后路由
    graph.add_conditional_edges(
        "human_review",
        human_review_router,
        {
            "adapt": "adapt",
            "rewrite": "write",
            "wait": END  # 等待人工输入
        }
    )

    # 发布流程
    graph.add_edge("adapt", "publish")
    graph.add_edge("publish", "track")
    graph.add_edge("track", END)

    # 编译(带检查点和人工审核中断点)
    memory = MemorySaver()
    return graph.compile(
        checkpointer=memory,
        interrupt_before=["human_review"]
    )

使用示例

def run_workflow_demo():
    workflow = create_content_workflow()

    # 初始状态
    initial_state = {
        "task_id": str(uuid.uuid4())[:8],
        "topic": "AI编程",
        "target_platforms": ["微信公众号", "知乎"],
        "style": "专业但易懂",
        "requirements": None,
        "stage": WorkflowStage.INIT,
        "iteration": 0,
        "max_iterations": 3,
        "created_at": datetime.now().isoformat(),
        "updated_at": datetime.now().isoformat(),
        "logs": [],
        "errors": []
    }

    config = {"configurable": {"thread_id": initial_state["task_id"]}}

    # 第一阶段:运行到人工审核
    print("=" * 60)
    print("阶段一:自动化流程")
    print("=" * 60)

    result = workflow.invoke(initial_state, config)

    # 打印日志
    for log_entry in result.get("logs", []):
        print(log_entry)

    print(f"\n当前阶段: {result.get('stage')}")
    print(f"AI审核分数: {result.get('quality_score')}")
    print(f"内容预览: {result.get('draft_content', '')[:200]}...")

    # 第二阶段:人工审核
    print("\n" + "=" * 60)
    print("阶段二:人工审核")
    print("=" * 60)

    # 模拟人工审核
    print("请审核内容...")
    approval = input("批准? (y/n): ").strip().lower() == 'y'
    feedback = ""
    if not approval:
        feedback = input("修改建议: ").strip()

    # 注入人工审核结果
    workflow.update_state(
        config,
        {
            "human_review": {
                "approved": approval,
                "feedback": feedback,
                "reviewer": "human",
                "timestamp": datetime.now().isoformat()
            }
        }
    )

    # 继续执行
    final_result = workflow.invoke(None, config)

    # 打印最终结果
    print("\n" + "=" * 60)
    print("最终结果")
    print("=" * 60)

    for log_entry in final_result.get("logs", [])[-5:]:
        print(log_entry)

    print(f"\n最终阶段: {final_result.get('stage')}")

    if final_result.get("publish_results"):
        print("\n发布结果:")
        for pr in final_result["publish_results"]:
            print(f"  - {pr['platform']}: {pr['url']}")

    return final_result

if __name__ == "__main__":
    run_workflow_demo()

工作流可视化

graph TD
    START --> discover[话题发现]
    discover --> plan[内容策划]
    plan --> write[内容创作]
    write --> ai_review[AI审核]

    ai_review -->|分数>=70| human_review[人工审核]
    ai_review -->|分数<70| write

    human_review -->|批准| adapt[平台适配]
    human_review -->|拒绝| write
    human_review -->|等待| END_WAIT[等待输入]

    adapt --> publish[多平台发布]
    publish --> track[数据追踪]
    track --> END

下一步

在最后一个教程中,我们将学习如何将工作流部署到生产环境。

Tutorial 10: 生产环境部署