Tutorial 2: 数据加载
数据加载概述
LlamaIndex 提供了强大的数据加载能力,支持 100+ 种数据源。 数据加载是 RAG 管道的第一步,也是最关键的一步。
数据加载流程:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 原始数据 │───►│ Reader │───►│ Document │
│ (各种格式) │ │ (加载器) │ │ (标准格式) │
└─────────────┘ └─────────────┘ └─────────────┘
数据连接器类型
类型 |
示例 |
说明 |
|---|---|---|
本地文件 |
PDF, Word, Markdown |
最常用的数据源 |
数据库 |
MySQL, PostgreSQL |
结构化数据 |
云存储 |
S3, Google Drive |
远程文件 |
Web |
网页, API |
在线数据 |
知识库 |
Notion, Confluence |
协作平台 |
SimpleDirectoryReader
最常用的文件加载器,支持多种文件格式。
基本用法
from llama_index.core import SimpleDirectoryReader
# 加载整个目录
reader = SimpleDirectoryReader(input_dir="./data")
documents = reader.load_data()
print(f"加载了 {len(documents)} 个文档")
for doc in documents:
print(f"- {doc.metadata.get('file_name', 'unknown')}")
指定文件类型
# 只加载特定类型的文件
reader = SimpleDirectoryReader(
input_dir="./data",
required_exts=[".pdf", ".md", ".txt"] # 只加载这些类型
)
documents = reader.load_data()
递归加载子目录
# 递归加载所有子目录
reader = SimpleDirectoryReader(
input_dir="./data",
recursive=True, # 包含子目录
exclude=["*.log", "temp/*"] # 排除模式
)
documents = reader.load_data()
自定义元数据
def custom_metadata_func(file_path: str) -> dict:
"""自定义元数据提取函数"""
import os
from datetime import datetime
stat = os.stat(file_path)
return {
"file_name": os.path.basename(file_path),
"file_path": file_path,
"file_size": stat.st_size,
"created_at": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
reader = SimpleDirectoryReader(
input_dir="./data",
file_metadata=custom_metadata_func
)
documents = reader.load_data()
PDF 文件加载
基础 PDF 加载
from llama_index.readers.file import PDFReader
# 使用专门的 PDF 读取器
pdf_reader = PDFReader()
documents = pdf_reader.load_data(file="./documents/report.pdf")
for doc in documents:
print(f"页码: {doc.metadata.get('page_label', 'N/A')}")
print(f"内容预览: {doc.text[:200]}...")
高级 PDF 处理
# 安装高级 PDF 处理器
# pip install llama-index-readers-llama-parse
from llama_parse import LlamaParse
# LlamaParse 提供更好的 PDF 解析
parser = LlamaParse(
api_key="your-llama-cloud-api-key",
result_type="markdown", # 输出 Markdown 格式
language="zh", # 支持中文
verbose=True
)
documents = parser.load_data("./documents/complex_report.pdf")
处理扫描 PDF
# 对于扫描的 PDF,需要 OCR 支持
# pip install pytesseract pdf2image
from llama_index.readers.file import ImageReader
# 先将 PDF 转换为图片,再进行 OCR
image_reader = ImageReader(
text_type="plain_text",
parser_config={"lang": "chi_sim+eng"} # 中英文混合
)
数据库加载
SQL 数据库
from llama_index.readers.database import DatabaseReader
# 连接数据库
db_reader = DatabaseReader(
uri="mysql://user:password@localhost:3306/mydb"
)
# 执行 SQL 查询并加载结果
documents = db_reader.load_data(
query="SELECT id, title, content FROM articles WHERE status = 'published'"
)
print(f"从数据库加载了 {len(documents)} 条记录")
自定义数据库加载
from sqlalchemy import create_engine, text
from llama_index.core import Document
def load_from_database(connection_string: str, query: str) -> list:
"""自定义数据库加载函数"""
engine = create_engine(connection_string)
documents = []
with engine.connect() as conn:
result = conn.execute(text(query))
for row in result:
doc = Document(
text=str(row.content),
metadata={
"id": row.id,
"title": row.title,
"source": "database",
"table": "articles"
}
)
documents.append(doc)
return documents
# 使用自定义加载器
docs = load_from_database(
"postgresql://user:pass@localhost/db",
"SELECT * FROM knowledge_base"
)
Web 数据加载
网页爬取
from llama_index.readers.web import SimpleWebPageReader
# 加载单个网页
web_reader = SimpleWebPageReader(html_to_text=True)
documents = web_reader.load_data(
urls=["https://docs.llamaindex.ai/en/stable/"]
)
print(f"网页内容长度: {len(documents[0].text)}")
递归网页爬取
from llama_index.readers.web import WholeSiteReader
# 爬取整个网站(谨慎使用)
site_reader = WholeSiteReader(
prefix="https://docs.llamaindex.ai",
max_depth=2 # 最大深度
)
documents = site_reader.load_data(
base_url="https://docs.llamaindex.ai/en/stable/"
)
API 数据加载
import requests
from llama_index.core import Document
def load_from_api(api_url: str, headers: dict = None) -> list:
"""从 API 加载数据"""
response = requests.get(api_url, headers=headers)
data = response.json()
documents = []
for item in data.get("items", []):
doc = Document(
text=item.get("content", ""),
metadata={
"id": item.get("id"),
"title": item.get("title"),
"source": "api",
"url": api_url
}
)
documents.append(doc)
return documents
# 示例:从 GitHub API 加载
docs = load_from_api(
"https://api.github.com/repos/run-llama/llama_index/readme",
headers={"Accept": "application/vnd.github.v3+json"}
)
云存储加载
AWS S3
# pip install llama-index-readers-s3
from llama_index.readers.s3 import S3Reader
s3_reader = S3Reader(
bucket="my-bucket",
prefix="documents/", # 可选前缀
aws_access_id="your-access-key",
aws_access_secret="your-secret-key"
)
documents = s3_reader.load_data()
Google Drive
# pip install llama-index-readers-google
from llama_index.readers.google import GoogleDriveReader
drive_reader = GoogleDriveReader(
credentials_path="./credentials.json"
)
# 加载特定文件夹
documents = drive_reader.load_data(folder_id="your-folder-id")
知识库平台
Notion
# pip install llama-index-readers-notion
from llama_index.readers.notion import NotionPageReader
notion_reader = NotionPageReader(
integration_token="your-notion-token"
)
# 加载特定页面
documents = notion_reader.load_data(
page_ids=["page-id-1", "page-id-2"]
)
# 或加载整个数据库
documents = notion_reader.load_data(
database_id="your-database-id"
)
Confluence
# pip install llama-index-readers-confluence
from llama_index.readers.confluence import ConfluenceReader
confluence_reader = ConfluenceReader(
base_url="https://your-domain.atlassian.net/wiki",
user="your-email@example.com",
api_token="your-api-token"
)
# 加载特定空间
documents = confluence_reader.load_data(
space_key="MYSPACE",
include_attachments=True
)
自定义 Reader
创建自定义数据加载器
from llama_index.core.readers.base import BaseReader
from llama_index.core import Document
from typing import List
import json
class CustomJSONReader(BaseReader):
"""自定义 JSON 文件读取器"""
def __init__(self, text_field: str = "content"):
self.text_field = text_field
def load_data(self, file_path: str) -> List[Document]:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
documents = []
items = data if isinstance(data, list) else [data]
for item in items:
text = item.get(self.text_field, "")
metadata = {k: v for k, v in item.items() if k != self.text_field}
doc = Document(text=text, metadata=metadata)
documents.append(doc)
return documents
# 使用自定义读取器
reader = CustomJSONReader(text_field="body")
documents = reader.load_data("./data/articles.json")
批量加载多种格式
from pathlib import Path
from llama_index.core import Document
class MultiFormatLoader:
"""支持多种格式的批量加载器"""
def __init__(self):
self.readers = {}
def register_reader(self, extension: str, reader):
"""注册文件格式对应的读取器"""
self.readers[extension] = reader
def load_directory(self, directory: str) -> List[Document]:
"""加载目录下所有支持的文件"""
documents = []
dir_path = Path(directory)
for file_path in dir_path.rglob("*"):
if file_path.is_file():
ext = file_path.suffix.lower()
if ext in self.readers:
docs = self.readers[ext].load_data(str(file_path))
documents.extend(docs)
return documents
# 使用示例
loader = MultiFormatLoader()
loader.register_reader(".pdf", PDFReader())
loader.register_reader(".json", CustomJSONReader())
all_docs = loader.load_directory("./data")
数据预处理
文档清洗
import re
from llama_index.core import Document
def clean_document(doc: Document) -> Document:
"""清洗文档内容"""
text = doc.text
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:\'\"()-]', '', text)
# 规范化标点
text = text.strip()
return Document(text=text, metadata=doc.metadata)
# 批量清洗
cleaned_docs = [clean_document(doc) for doc in documents]
添加全局元数据
def add_global_metadata(documents: List[Document], metadata: dict) -> List[Document]:
"""为所有文档添加全局元数据"""
for doc in documents:
doc.metadata.update(metadata)
return documents
# 添加项目信息
documents = add_global_metadata(documents, {
"project": "knowledge-base",
"version": "1.0",
"indexed_at": "2024-01-01"
})
实战示例:多源数据整合
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings
from llama_index.readers.database import DatabaseReader
from llama_index.readers.web import SimpleWebPageReader
def build_unified_knowledge_base():
"""构建统一的知识库"""
all_documents = []
# 1. 加载本地文档
print("加载本地文档...")
local_reader = SimpleDirectoryReader(
input_dir="./documents",
recursive=True,
required_exts=[".pdf", ".md", ".txt"]
)
local_docs = local_reader.load_data()
for doc in local_docs:
doc.metadata["source_type"] = "local_file"
all_documents.extend(local_docs)
print(f" - 加载了 {len(local_docs)} 个本地文档")
# 2. 加载数据库记录
print("加载数据库记录...")
db_reader = DatabaseReader(uri="sqlite:///knowledge.db")
db_docs = db_reader.load_data(
query="SELECT title, content FROM knowledge WHERE active = 1"
)
for doc in db_docs:
doc.metadata["source_type"] = "database"
all_documents.extend(db_docs)
print(f" - 加载了 {len(db_docs)} 条数据库记录")
# 3. 加载网页内容
print("加载网页内容...")
web_reader = SimpleWebPageReader(html_to_text=True)
web_docs = web_reader.load_data(urls=[
"https://example.com/doc1",
"https://example.com/doc2"
])
for doc in web_docs:
doc.metadata["source_type"] = "web"
all_documents.extend(web_docs)
print(f" - 加载了 {len(web_docs)} 个网页")
# 4. 构建统一索引
print(f"\n总共加载 {len(all_documents)} 个文档")
print("构建向量索引...")
index = VectorStoreIndex.from_documents(all_documents)
return index
# 构建知识库
knowledge_index = build_unified_knowledge_base()
# 查询
query_engine = knowledge_index.as_query_engine()
response = query_engine.query("总结我们的产品特点")
print(response)
小结
本教程介绍了:
SimpleDirectoryReader 的使用方法
各种文件格式的加载:PDF、数据库、Web 等
云存储和知识库平台的数据加载
自定义 Reader 的创建
数据预处理和清洗
多源数据整合的最佳实践
下一步
在下一个教程中,我们将学习节点解析(Node Parsing), 了解如何将文档智能地分割为更小的、可索引的单元。
练习
使用 SimpleDirectoryReader 加载你的本地文档
创建一个自定义 Reader 加载特定格式的数据
尝试从数据库或 API 加载数据
实现一个多源数据整合的知识库