Appearance
基础 RAG 系统搭建
6.1 基于 LangChain 构建基础 RAG pipeline
6.1.1 LangChain 简介
LangChain 是一个用于构建基于语言模型的应用程序的框架,它提供了一套工具和组件,使得构建 RAG 系统变得更加简单。
6.1.2 核心组件
- Document Loaders:用于加载不同格式的文档
- Text Splitters:用于将文档分割成小块
- Embeddings:用于将文本转换为向量
- Vector Stores:用于存储和检索向量
- LLMs:用于生成回答
- Chains:用于将各个组件连接起来
- Agents:用于处理复杂任务
6.1.3 基础 RAG pipeline 架构
mermaid
flowchart TD
A[用户查询] --> B[RAG Chain]
B --> C[Retriever]
C --> D[Vector Store]
D --> E[Embeddings]
E --> F[Documents]
C --> G[Retrieved Documents]
G --> H[Prompt Template]
H --> I[LLM]
I --> J[Generated Answer]
J --> K[Return to User]6.1.4 构建 RAG pipeline
python
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os
# 1. 加载文档
def load_documents(file_paths):
documents = []
for file_path in file_paths:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
docs = loader.load()
documents.extend(docs)
# 可以添加其他格式的文档加载器
return documents
# 2. 分割文档
def split_documents(documents, chunk_size=1000, chunk_overlap=100):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
chunks = text_splitter.split_documents(documents)
return chunks
# 3. 向量化和存储
def create_vector_store(chunks, persist_directory="./vector_db"):
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_directory
)
vector_store.persist()
return vector_store
# 4. 创建 RAG 链
def create_rag_chain(vector_store, llm_model="gpt-3.5-turbo"):
# 可以使用开源模型或 API 模型
# 这里使用 OpenAI API 作为示例
llm = ChatOpenAI(
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model=llm_model,
temperature=0.3
)
# 自定义 Prompt
template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。
上下文:
{context}
问题:
{question}
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(),
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
return chain
# 5. 运行 RAG 系统
def run_rag_system(file_paths, query):
# 加载文档
documents = load_documents(file_paths)
print(f"加载了 {len(documents)} 个文档")
# 分割文档
chunks = split_documents(documents)
print(f"分割成 {len(chunks)} 个分块")
# 创建向量存储
vector_store = create_vector_store(chunks)
print("创建了向量存储")
# 创建 RAG 链
rag_chain = create_rag_chain(vector_store)
print("创建了 RAG 链")
# 执行查询
result = rag_chain({"query": query})
print("\n回答:")
print(result["result"])
print("\n来源文档:")
for i, doc in enumerate(result["source_documents"]):
print(f"\n文档 {i+1}:")
print(f"内容: {doc.page_content[:100]}...")
print(f"来源: {doc.metadata}")
# 示例
if __name__ == "__main__":
file_paths = ["document1.pdf", "document2.pdf"]
query = "什么是 RAG 技术?"
run_rag_system(file_paths, query)6.2 本地文档读取解析
6.2.1 PDF 文档解析
python
from langchain.document_loaders import PyPDFLoader, PyPDFDirectoryLoader
# 加载单个PDF文件
def load_single_pdf(file_path):
loader = PyPDFLoader(file_path)
documents = loader.load()
return documents
# 加载目录中的所有PDF文件
def load_pdf_directory(directory_path):
loader = PyPDFDirectoryLoader(directory_path)
documents = loader.load()
return documents
# 示例
pdf_docs = load_single_pdf("example.pdf")
print(f"PDF文件包含 {len(pdf_docs)} 页")
print(f"第一页内容: {pdf_docs[0].page_content[:200]}...")6.2.2 Word 文档解析
python
from langchain.document_loaders import Docx2txtLoader, UnstructuredWordDocumentLoader
# 加载Word文件
def load_word_document(file_path):
# 方法1: 使用Docx2txtLoader
loader = Docx2txtLoader(file_path)
documents = loader.load()
return documents
# 示例
word_docs = load_word_document("example.docx")
print(f"Word文档内容: {word_docs[0].page_content[:200]}...")6.2.3 Excel 文档解析
python
import pandas as pd
from langchain.document_loaders import DataFrameLoader
# 加载Excel文件
def load_excel_document(file_path, sheet_name=None):
# 读取Excel文件
if sheet_name:
df = pd.read_excel(file_path, sheet_name=sheet_name)
else:
df = pd.read_excel(file_path)
# 转换为文档
loader = DataFrameLoader(df, page_content_column=df.columns[0])
documents = loader.load()
return documents
# 示例
excel_docs = load_excel_document("example.xlsx")
print(f"Excel文档包含 {len(excel_docs)} 条记录")
print(f"第一条记录: {excel_docs[0].page_content[:200]}...")6.2.4 Markdown 文档解析
python
from langchain.document_loaders import UnstructuredMarkdownLoader, TextLoader
# 加载Markdown文件
def load_markdown_document(file_path):
# 方法1: 使用TextLoader
loader = TextLoader(file_path)
documents = loader.load()
return documents
# 示例
md_docs = load_markdown_document("example.md")
print(f"Markdown文档内容: {md_docs[0].page_content[:200]}...")6.2.5 多格式文档批量解析
python
import os
from langchain.document_loaders import (
PyPDFLoader, Docx2txtLoader, TextLoader, UnstructuredFileLoader
)
# 批量加载不同格式的文档
def batch_load_documents(directory_path):
documents = []
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
try:
if file.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file.endswith('.txt') or file.endswith('.md'):
loader = TextLoader(file_path)
else:
# 尝试使用通用加载器
loader = UnstructuredFileLoader(file_path)
docs = loader.load()
documents.extend(docs)
print(f"成功加载: {file_path}")
except Exception as e:
print(f"加载失败: {file_path}, 错误: {str(e)}")
return documents
# 示例
docs = batch_load_documents("./documents")
print(f"共加载了 {len(docs)} 个文档")6.3 一键向量化与入库
6.3.1 自动化向量化流程
python
import os
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
# 一键向量化与入库函数
def vectorize_and_store(directory_path, persist_directory="./vector_db"):
# 1. 加载所有文档
documents = []
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
try:
if file.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file.endswith('.txt') or file.endswith('.md'):
loader = TextLoader(file_path)
else:
continue
docs = loader.load()
# 添加文件路径到元数据
for doc in docs:
doc.metadata["source"] = file_path
documents.extend(docs)
print(f"成功加载: {file_path}")
except Exception as e:
print(f"加载失败: {file_path}, 错误: {str(e)}")
if not documents:
print("没有加载到文档")
return None
print(f"共加载了 {len(documents)} 个文档")
# 2. 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100
)
chunks = text_splitter.split_documents(documents)
print(f"分割成 {len(chunks)} 个分块")
# 3. 向量化和存储
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
# 检查是否已有向量存储
if os.path.exists(persist_directory):
# 加载现有向量存储
vector_store = Chroma(
persist_directory=persist_directory,
embedding_function=embeddings
)
# 添加新文档
vector_store.add_documents(chunks)
print("向现有向量存储添加了新文档")
else:
# 创建新的向量存储
vector_store = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=persist_directory
)
print("创建了新的向量存储")
vector_store.persist()
print(f"向量存储已保存到: {persist_directory}")
return vector_store
# 示例
if __name__ == "__main__":
vector_store = vectorize_and_store("./documents")
if vector_store:
print(f"向量存储中共有 {vector_store._collection.count()} 个向量")6.3.2 增量更新
python
# 增量更新向量存储
def update_vector_store(new_file_paths, vector_store, persist_directory="./vector_db"):
# 加载新文档
documents = []
for file_path in new_file_paths:
try:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.docx'):
loader = Docx2txtLoader(file_path)
elif file_path.endswith('.txt') or file_path.endswith('.md'):
loader = TextLoader(file_path)
else:
continue
docs = loader.load()
for doc in docs:
doc.metadata["source"] = file_path
documents.extend(docs)
print(f"成功加载: {file_path}")
except Exception as e:
print(f"加载失败: {file_path}, 错误: {str(e)}")
if not documents:
print("没有加载到新文档")
return vector_store
# 分割文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100
)
chunks = text_splitter.split_documents(documents)
print(f"分割成 {len(chunks)} 个分块")
# 添加到向量存储
vector_store.add_documents(chunks)
vector_store.persist()
print("向量存储已更新")
print(f"向量存储中共有 {vector_store._collection.count()} 个向量")
return vector_store
# 示例
if __name__ == "__main__":
# 加载现有向量存储
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
persist_directory="./vector_db",
embedding_function=embeddings
)
# 更新新文档
new_files = ["new_document.pdf", "new_document.docx"]
vector_store = update_vector_store(new_files, vector_store)6.4 简单问答接口实现
6.4.1 命令行接口
python
import os
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
# 加载向量存储
def load_vector_store(persist_directory="./vector_db"):
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
persist_directory=persist_directory,
embedding_function=embeddings
)
return vector_store
# 创建问答链
def create_qa_chain(vector_store):
llm = ChatOpenAI(
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
temperature=0.3
)
template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。
上下文:
{context}
问题:
{question}
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(),
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
return chain
# 命令行交互
def cli_interface():
print("加载向量存储...")
vector_store = load_vector_store()
print(f"向量存储加载完成,共有 {vector_store._collection.count()} 个向量")
print("创建问答链...")
qa_chain = create_qa_chain(vector_store)
print("问答链创建完成")
print("\n欢迎使用知识库问答系统!输入 'exit' 退出。")
while True:
query = input("\n请输入你的问题: ")
if query.lower() == 'exit':
print("再见!")
break
try:
print("\n正在思考...")
result = qa_chain({"query": query})
print("\n回答:")
print(result["result"])
print("\n来源文档:")
for i, doc in enumerate(result["source_documents"]):
print(f"\n文档 {i+1}:")
print(f"内容: {doc.page_content[:100]}...")
print(f"来源: {doc.metadata.get('source', '未知')}")
except Exception as e:
print(f"发生错误: {str(e)}")
# 示例
if __name__ == "__main__":
cli_interface()6.4.2 Flask API 接口
python
from flask import Flask, request, jsonify
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os
app = Flask(__name__)
# 加载向量存储
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
persist_directory="./vector_db",
embedding_function=embeddings
)
# 创建问答链
llm = ChatOpenAI(
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
temperature=0.3
)
template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。
上下文:
{context}
问题:
{question}
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(),
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
@app.route('/api/qa', methods=['POST'])
def qa():
try:
data = request.json
query = data.get('query')
if not query:
return jsonify({"error": "缺少查询参数"}), 400
result = qa_chain({"query": query})
# 构建响应
response = {
"answer": result["result"],
"sources": []
}
for doc in result["source_documents"]:
source = {
"content": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
"metadata": doc.metadata
}
response["sources"].append(source)
return jsonify(response), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/status', methods=['GET'])
def status():
try:
count = vector_store._collection.count()
return jsonify({"status": "ok", "vector_count": count}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)6.4.3 FastAPI 接口
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os
app = FastAPI(title="知识库问答 API")
# 加载向量存储
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
persist_directory="./vector_db",
embedding_function=embeddings
)
# 创建问答链
llm = ChatOpenAI(
openai_api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
temperature=0.3
)
template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。
上下文:
{context}
问题:
{question}
回答:
"""
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(),
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
# 请求模型
class QueryRequest(BaseModel):
query: str
# 响应模型
class Source(BaseModel):
content: str
metadata: dict
class QAResponse(BaseModel):
answer: str
sources: list[Source]
class StatusResponse(BaseModel):
status: str
vector_count: int
@app.post("/api/qa", response_model=QAResponse)
def qa(request: QueryRequest):
try:
result = qa_chain({"query": request.query})
# 构建响应
sources = []
for doc in result["source_documents"]:
source = Source(
content=doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
metadata=doc.metadata
)
sources.append(source)
return QAResponse(
answer=result["result"],
sources=sources
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/status", response_model=StatusResponse)
def status():
try:
count = vector_store._collection.count()
return StatusResponse(status="ok", vector_count=count)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)6.5 本章小结
- 学习了基于 LangChain 构建基础 RAG pipeline
- 掌握了本地文档(PDF/Word/Excel/Markdown)的读取解析
- 实现了一键向量化与入库功能
- 开发了简单的问答接口,包括命令行、Flask API 和 FastAPI 接口
这些知识为我们后续构建更复杂的 RAG 系统打下了基础。在接下来的章节中,我们将学习如何使用 Streamlit/Gradio 快速构建可视化知识库,实现上传、问答、历史记录等功能。
