Tutorial 9: 构建知识库系统
知识库系统概述
本教程将整合前面学到的所有技术,构建一个完整的企业级知识库系统。
知识库系统架构:
┌─────────────────────────────────────────────────────────────────┐
│ Enterprise Knowledge Base │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 数据层 索引层 查询层 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ 文档加载 │────────────►│ 向量索引 │───────►│ 检索引擎 │ │
│ │ PDF/Doc │ │ Chroma │ │ │ │
│ └───────────┘ └───────────┘ │ │ │
│ ┌───────────┐ ┌───────────┐ │ │ │
│ │ 数据库 │────────────►│ 关键词 │───────►│ 查询引擎 │ │
│ │ MySQL │ │ 索引 │ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ ┌───────────┐ │ │
│ │ API/Web │ ▼ │
│ └───────────┘ ┌───────────┐ │
│ │ Agent │ │
│ │ 智能问答 │ │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
系统设计
核心组件
组件 |
职责 |
技术选型 |
|---|---|---|
文档管理 |
加载、解析、存储文档 |
SimpleDirectoryReader, LlamaParse |
索引管理 |
创建和维护索引 |
VectorStoreIndex, Chroma |
检索引擎 |
混合检索、重排序 |
HybridRetriever, Reranker |
查询引擎 |
生成回答 |
QueryEngine, Agent |
API 服务 |
对外提供接口 |
FastAPI |
完整实现
项目结构
knowledge_base/
├── __init__.py
├── config.py # 配置管理
├── document_manager.py # 文档管理
├── index_manager.py # 索引管理
├── retriever.py # 检索器
├── query_engine.py # 查询引擎
├── agent.py # 智能代理
├── api.py # API 服务
└── main.py # 主程序
配置管理
# config.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""系统配置"""
# OpenAI
openai_api_key: str
openai_model: str = "gpt-4o-mini"
embedding_model: str = "text-embedding-3-small"
# 存储
chroma_persist_dir: str = "./chroma_db"
document_dir: str = "./documents"
# 检索
similarity_top_k: int = 5
rerank_top_n: int = 3
# 服务
api_host: str = "0.0.0.0"
api_port: int = 8000
class Config:
env_file = ".env"
settings = Settings()
文档管理器
# document_manager.py
from llama_index.core import SimpleDirectoryReader, Document
from llama_index.core.node_parser import SentenceSplitter
from typing import List, Optional
import os
import hashlib
class DocumentManager:
"""文档管理器"""
def __init__(self, document_dir: str):
self.document_dir = document_dir
self.node_parser = SentenceSplitter(
chunk_size=512,
chunk_overlap=50
)
self._document_hashes = {}
def load_documents(self, subdirectory: Optional[str] = None) -> List[Document]:
"""加载文档"""
target_dir = self.document_dir
if subdirectory:
target_dir = os.path.join(self.document_dir, subdirectory)
if not os.path.exists(target_dir):
raise ValueError(f"目录不存在: {target_dir}")
reader = SimpleDirectoryReader(
input_dir=target_dir,
recursive=True,
required_exts=[".pdf", ".md", ".txt", ".docx"]
)
documents = reader.load_data()
# 添加元数据
for doc in documents:
doc.metadata["source_dir"] = subdirectory or "root"
doc.metadata["doc_hash"] = self._compute_hash(doc.text)
return documents
def _compute_hash(self, text: str) -> str:
"""计算文档哈希"""
return hashlib.md5(text.encode()).hexdigest()
def get_nodes(self, documents: List[Document]) -> List:
"""将文档解析为节点"""
return self.node_parser.get_nodes_from_documents(documents)
def add_document(self, text: str, metadata: dict = None) -> Document:
"""添加单个文档"""
doc = Document(
text=text,
metadata=metadata or {}
)
doc.metadata["doc_hash"] = self._compute_hash(text)
return doc
索引管理器
# index_manager.py
import chromadb
from llama_index.core import VectorStoreIndex, StorageContext, Settings
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from typing import List, Optional
class IndexManager:
"""索引管理器"""
def __init__(self, persist_dir: str, embedding_model: str):
self.persist_dir = persist_dir
Settings.embed_model = OpenAIEmbedding(model=embedding_model)
# 初始化 Chroma
self.chroma_client = chromadb.PersistentClient(path=persist_dir)
self.indexes = {}
def create_index(
self,
collection_name: str,
nodes: List,
overwrite: bool = False
) -> VectorStoreIndex:
"""创建索引"""
# 获取或创建集合
if overwrite:
try:
self.chroma_client.delete_collection(collection_name)
except:
pass
collection = self.chroma_client.get_or_create_collection(
name=collection_name
)
# 创建向量存储
vector_store = ChromaVectorStore(chroma_collection=collection)
storage_context = StorageContext.from_defaults(
vector_store=vector_store
)
# 创建索引
index = VectorStoreIndex(
nodes,
storage_context=storage_context,
show_progress=True
)
self.indexes[collection_name] = index
return index
def load_index(self, collection_name: str) -> Optional[VectorStoreIndex]:
"""加载已有索引"""
try:
collection = self.chroma_client.get_collection(collection_name)
vector_store = ChromaVectorStore(chroma_collection=collection)
index = VectorStoreIndex.from_vector_store(vector_store)
self.indexes[collection_name] = index
return index
except Exception as e:
print(f"加载索引失败: {e}")
return None
def get_index(self, collection_name: str) -> Optional[VectorStoreIndex]:
"""获取索引"""
if collection_name in self.indexes:
return self.indexes[collection_name]
return self.load_index(collection_name)
def list_collections(self) -> List[str]:
"""列出所有集合"""
collections = self.chroma_client.list_collections()
return [c.name for c in collections]
def delete_collection(self, collection_name: str):
"""删除集合"""
self.chroma_client.delete_collection(collection_name)
if collection_name in self.indexes:
del self.indexes[collection_name]
混合检索器
# retriever.py
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore, QueryBundle
from llama_index.retrievers.bm25 import BM25Retriever
from typing import List, Optional
class HybridRetriever(BaseRetriever):
"""混合检索器"""
def __init__(
self,
vector_retriever,
bm25_retriever: Optional[BM25Retriever] = None,
vector_weight: float = 0.6,
top_k: int = 10
):
self.vector_retriever = vector_retriever
self.bm25_retriever = bm25_retriever
self.vector_weight = vector_weight
self.bm25_weight = 1 - vector_weight
self.top_k = top_k
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
# 向量检索
vector_nodes = self.vector_retriever.retrieve(query_bundle)
if not self.bm25_retriever:
return vector_nodes[:self.top_k]
# BM25 检索
bm25_nodes = self.bm25_retriever.retrieve(query_bundle)
# 合并结果
return self._merge_results(vector_nodes, bm25_nodes)
def _merge_results(
self,
vector_nodes: List[NodeWithScore],
bm25_nodes: List[NodeWithScore]
) -> List[NodeWithScore]:
"""合并并重新评分"""
all_nodes = {}
# 归一化向量分数
v_scores = self._normalize([n.score for n in vector_nodes])
for node, score in zip(vector_nodes, v_scores):
all_nodes[node.node.id_] = {
"node": node,
"v_score": score,
"b_score": 0
}
# 归一化 BM25 分数
b_scores = self._normalize([n.score for n in bm25_nodes])
for node, score in zip(bm25_nodes, b_scores):
if node.node.id_ in all_nodes:
all_nodes[node.node.id_]["b_score"] = score
else:
all_nodes[node.node.id_] = {
"node": node,
"v_score": 0,
"b_score": score
}
# 计算最终分数
results = []
for data in all_nodes.values():
final_score = (
data["v_score"] * self.vector_weight +
data["b_score"] * self.bm25_weight
)
node = data["node"]
node.score = final_score
results.append(node)
results.sort(key=lambda x: x.score, reverse=True)
return results[:self.top_k]
def _normalize(self, scores: List[float]) -> List[float]:
"""归一化分数"""
if not scores:
return []
min_s, max_s = min(scores), max(scores)
if max_s == min_s:
return [1.0] * len(scores)
return [(s - min_s) / (max_s - min_s) for s in scores]
查询引擎
# query_engine.py
from llama_index.core import Settings
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.prompts import PromptTemplate
from llama_index.llms.openai import OpenAI
from typing import Optional
class KnowledgeQueryEngine:
"""知识库查询引擎"""
def __init__(self, retriever, llm_model: str = "gpt-4o-mini"):
self.retriever = retriever
Settings.llm = OpenAI(model=llm_model)
# 自定义提示模板
self.qa_template = PromptTemplate(
"""你是一个知识库助手。请基于以下参考内容回答用户的问题。
参考内容: {context_str}
用户问题:{query_str}
回答要求: 1. 如果参考内容中有答案,请准确回答并标注来源 2. 如果参考内容不足以回答,请明确说明 3. 不要编造信息 4. 回答要简洁明了
- 回答:”””
)
# 创建响应合成器 self.synthesizer = get_response_synthesizer(
response_mode=”compact”, text_qa_template=self.qa_template
)
# 创建查询引擎 self.query_engine = RetrieverQueryEngine(
retriever=self.retriever, response_synthesizer=self.synthesizer
)
- def query(self, question: str) -> dict:
“””执行查询””” response = self.query_engine.query(question)
# 提取来源信息 sources = [] for node in response.source_nodes:
- sources.append({
“text”: node.text[:200] + “…” if len(node.text) > 200 else node.text, “score”: float(node.score) if node.score else 0, “metadata”: node.metadata
})
- return {
“answer”: str(response), “sources”: sources
}
- def query_stream(self, question: str):
“””流式查询””” streaming_response = self.query_engine.query(question) for text in streaming_response.response_gen:
yield text
智能 Agent
# agent.py
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import QueryEngineTool, FunctionTool, ToolMetadata
from llama_index.llms.openai import OpenAI
from datetime import datetime
from typing import Dict, List
class KnowledgeAgent:
"""知识库智能代理"""
def __init__(self, query_engines: Dict[str, any], llm_model: str = "gpt-4o-mini"):
self.llm = OpenAI(model=llm_model)
self.tools = []
self._setup_tools(query_engines)
self._build_agent()
def _setup_tools(self, query_engines: Dict):
"""设置工具"""
# 添加知识库查询工具
for name, engine in query_engines.items():
tool = QueryEngineTool(
query_engine=engine.query_engine,
metadata=ToolMetadata(
name=f"query_{name}",
description=f"查询{name}知识库的内容"
)
)
self.tools.append(tool)
# 添加辅助工具
def get_current_time() -> str:
"""获取当前时间"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.tools.append(FunctionTool.from_defaults(
fn=get_current_time,
name="get_time",
description="获取当前日期和时间"
))
def _build_agent(self):
"""构建 Agent"""
self.agent = ReActAgent.from_tools(
self.tools,
llm=self.llm,
verbose=True,
system_prompt="""你是一个智能知识库助手。
你可以查询多个知识库来回答用户的问题。
请根据问题选择合适的知识库进行查询。
回答时要准确、简洁,并说明信息来源。"""
)
def chat(self, message: str) -> str:
"""对话"""
response = self.agent.chat(message)
return str(response)
def reset(self):
"""重置对话"""
self.agent.reset()
API 服务
# api.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI(title="Knowledge Base API")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# 全局变量(实际应用中应使用依赖注入)
knowledge_system = None
class QueryRequest(BaseModel):
question: str
collection: Optional[str] = "default"
class QueryResponse(BaseModel):
answer: str
sources: List[dict]
class ChatRequest(BaseModel):
message: str
class DocumentRequest(BaseModel):
text: str
metadata: Optional[dict] = None
collection: str = "default"
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""查询知识库"""
if not knowledge_system:
raise HTTPException(status_code=500, detail="系统未初始化")
try:
result = knowledge_system.query(
request.question,
request.collection
)
return QueryResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/chat")
async def chat(request: ChatRequest):
"""智能对话"""
if not knowledge_system:
raise HTTPException(status_code=500, detail="系统未初始化")
try:
response = knowledge_system.chat(request.message)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/documents")
async def add_document(request: DocumentRequest):
"""添加文档"""
if not knowledge_system:
raise HTTPException(status_code=500, detail="系统未初始化")
try:
knowledge_system.add_document(
request.text,
request.metadata,
request.collection
)
return {"message": "文档添加成功"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/collections")
async def list_collections():
"""列出所有集合"""
if not knowledge_system:
raise HTTPException(status_code=500, detail="系统未初始化")
collections = knowledge_system.list_collections()
return {"collections": collections}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
主程序
# main.py
from config import settings
from document_manager import DocumentManager
from index_manager import IndexManager
from retriever import HybridRetriever
from query_engine import KnowledgeQueryEngine
from agent import KnowledgeAgent
from llama_index.retrievers.bm25 import BM25Retriever
import os
class KnowledgeBaseSystem:
"""知识库系统"""
def __init__(self):
self.doc_manager = DocumentManager(settings.document_dir)
self.index_manager = IndexManager(
settings.chroma_persist_dir,
settings.embedding_model
)
self.query_engines = {}
self.agent = None
def initialize(self, collections: list = None):
"""初始化系统"""
if collections is None:
collections = ["default"]
for collection in collections:
self._initialize_collection(collection)
# 初始化 Agent
if self.query_engines:
self.agent = KnowledgeAgent(
self.query_engines,
settings.openai_model
)
def _initialize_collection(self, collection: str):
"""初始化单个集合"""
# 尝试加载已有索引
index = self.index_manager.load_index(collection)
if index is None:
# 创建新索引
doc_path = os.path.join(settings.document_dir, collection)
if os.path.exists(doc_path):
documents = self.doc_manager.load_documents(collection)
nodes = self.doc_manager.get_nodes(documents)
index = self.index_manager.create_index(collection, nodes)
if index:
# 创建检索器
vector_retriever = index.as_retriever(
similarity_top_k=settings.similarity_top_k
)
retriever = HybridRetriever(
vector_retriever=vector_retriever,
top_k=settings.similarity_top_k
)
# 创建查询引擎
self.query_engines[collection] = KnowledgeQueryEngine(
retriever,
settings.openai_model
)
def query(self, question: str, collection: str = "default") -> dict:
"""查询"""
if collection not in self.query_engines:
raise ValueError(f"集合不存在: {collection}")
return self.query_engines[collection].query(question)
def chat(self, message: str) -> str:
"""智能对话"""
if not self.agent:
raise ValueError("Agent 未初始化")
return self.agent.chat(message)
def add_document(self, text: str, metadata: dict, collection: str):
"""添加文档"""
doc = self.doc_manager.add_document(text, metadata)
nodes = self.doc_manager.get_nodes([doc])
index = self.index_manager.get_index(collection)
if index:
for node in nodes:
index.insert_nodes([node])
def list_collections(self) -> list:
"""列出集合"""
return self.index_manager.list_collections()
def main():
# 初始化系统
system = KnowledgeBaseSystem()
system.initialize(["default", "products", "faq"])
# 启动 API 服务
import api
api.knowledge_system = system
import uvicorn
uvicorn.run(api.app, host=settings.api_host, port=settings.api_port)
if __name__ == "__main__":
main()
使用示例
# 使用知识库系统
# 1. 初始化
system = KnowledgeBaseSystem()
system.initialize()
# 2. 查询
result = system.query("什么是机器学习?")
print(f"回答: {result['answer']}")
print(f"来源: {result['sources']}")
# 3. 智能对话
response = system.chat("告诉我关于深度学习的知识")
print(response)
# 4. 添加新文档
system.add_document(
text="这是新添加的知识内容...",
metadata={"source": "manual", "topic": "ai"},
collection="default"
)
小结
本教程展示了:
完整的知识库系统架构设计
模块化的组件实现
混合检索策略的应用
智能 Agent 的集成
RESTful API 服务的构建
生产级代码组织方式
下一步
在下一个教程中,我们将学习如何将知识库系统部署到生产环境, 包括性能优化、监控、安全等方面。
练习
为系统添加用户认证功能
实现文档的更新和删除
添加查询缓存机制
实现多租户支持