Skip to content

基础 RAG 系统搭建

6.1 基于 LangChain 构建基础 RAG pipeline

6.1.1 LangChain 简介

LangChain 是一个用于构建基于语言模型的应用程序的框架,它提供了一套工具和组件,使得构建 RAG 系统变得更加简单。

6.1.2 核心组件

  • Document Loaders:用于加载不同格式的文档
  • Text Splitters:用于将文档分割成小块
  • Embeddings:用于将文本转换为向量
  • Vector Stores:用于存储和检索向量
  • LLMs:用于生成回答
  • Chains:用于将各个组件连接起来
  • Agents:用于处理复杂任务

6.1.3 基础 RAG pipeline 架构

mermaid
flowchart TD
    A[用户查询] --> B[RAG Chain]
    B --> C[Retriever]
    C --> D[Vector Store]
    D --> E[Embeddings]
    E --> F[Documents]
    C --> G[Retrieved Documents]
    G --> H[Prompt Template]
    H --> I[LLM]
    I --> J[Generated Answer]
    J --> K[Return to User]

6.1.4 构建 RAG pipeline

python
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os

# 1. 加载文档
def load_documents(file_paths):
    documents = []
    for file_path in file_paths:
        if file_path.endswith('.pdf'):
            loader = PyPDFLoader(file_path)
            docs = loader.load()
            documents.extend(docs)
        # 可以添加其他格式的文档加载器
    return documents

# 2. 分割文档
def split_documents(documents, chunk_size=1000, chunk_overlap=100):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    chunks = text_splitter.split_documents(documents)
    return chunks

# 3. 向量化和存储
def create_vector_store(chunks, persist_directory="./vector_db"):
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5"
    )
    vector_store = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_directory
    )
    vector_store.persist()
    return vector_store

# 4. 创建 RAG 链
def create_rag_chain(vector_store, llm_model="gpt-3.5-turbo"):
    # 可以使用开源模型或 API 模型
    # 这里使用 OpenAI API 作为示例
    llm = ChatOpenAI(
        openai_api_key=os.environ.get("OPENAI_API_KEY"),
        model=llm_model,
        temperature=0.3
    )
    
    # 自定义 Prompt
    template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

    上下文:
    {context}

    问题:
    {question}

    回答:
    """
    
    prompt = PromptTemplate(
        template=template,
        input_variables=["context", "question"]
    )
    
    chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vector_store.as_retriever(),
        chain_type_kwargs={"prompt": prompt},
        return_source_documents=True
    )
    
    return chain

# 5. 运行 RAG 系统
def run_rag_system(file_paths, query):
    # 加载文档
    documents = load_documents(file_paths)
    print(f"加载了 {len(documents)} 个文档")
    
    # 分割文档
    chunks = split_documents(documents)
    print(f"分割成 {len(chunks)} 个分块")
    
    # 创建向量存储
    vector_store = create_vector_store(chunks)
    print("创建了向量存储")
    
    # 创建 RAG 链
    rag_chain = create_rag_chain(vector_store)
    print("创建了 RAG 链")
    
    # 执行查询
    result = rag_chain({"query": query})
    print("\n回答:")
    print(result["result"])
    
    print("\n来源文档:")
    for i, doc in enumerate(result["source_documents"]):
        print(f"\n文档 {i+1}:")
        print(f"内容: {doc.page_content[:100]}...")
        print(f"来源: {doc.metadata}")

# 示例
if __name__ == "__main__":
    file_paths = ["document1.pdf", "document2.pdf"]
    query = "什么是 RAG 技术?"
    run_rag_system(file_paths, query)

6.2 本地文档读取解析

6.2.1 PDF 文档解析

python
from langchain.document_loaders import PyPDFLoader, PyPDFDirectoryLoader

# 加载单个PDF文件
def load_single_pdf(file_path):
    loader = PyPDFLoader(file_path)
    documents = loader.load()
    return documents

# 加载目录中的所有PDF文件
def load_pdf_directory(directory_path):
    loader = PyPDFDirectoryLoader(directory_path)
    documents = loader.load()
    return documents

# 示例
pdf_docs = load_single_pdf("example.pdf")
print(f"PDF文件包含 {len(pdf_docs)} 页")
print(f"第一页内容: {pdf_docs[0].page_content[:200]}...")

6.2.2 Word 文档解析

python
from langchain.document_loaders import Docx2txtLoader, UnstructuredWordDocumentLoader

# 加载Word文件
def load_word_document(file_path):
    # 方法1: 使用Docx2txtLoader
    loader = Docx2txtLoader(file_path)
    documents = loader.load()
    return documents

# 示例
word_docs = load_word_document("example.docx")
print(f"Word文档内容: {word_docs[0].page_content[:200]}...")

6.2.3 Excel 文档解析

python
import pandas as pd
from langchain.document_loaders import DataFrameLoader

# 加载Excel文件
def load_excel_document(file_path, sheet_name=None):
    # 读取Excel文件
    if sheet_name:
        df = pd.read_excel(file_path, sheet_name=sheet_name)
    else:
        df = pd.read_excel(file_path)
    
    # 转换为文档
    loader = DataFrameLoader(df, page_content_column=df.columns[0])
    documents = loader.load()
    return documents

# 示例
excel_docs = load_excel_document("example.xlsx")
print(f"Excel文档包含 {len(excel_docs)} 条记录")
print(f"第一条记录: {excel_docs[0].page_content[:200]}...")

6.2.4 Markdown 文档解析

python
from langchain.document_loaders import UnstructuredMarkdownLoader, TextLoader

# 加载Markdown文件
def load_markdown_document(file_path):
    # 方法1: 使用TextLoader
    loader = TextLoader(file_path)
    documents = loader.load()
    return documents

# 示例
md_docs = load_markdown_document("example.md")
print(f"Markdown文档内容: {md_docs[0].page_content[:200]}...")

6.2.5 多格式文档批量解析

python
import os
from langchain.document_loaders import (
    PyPDFLoader, Docx2txtLoader, TextLoader, UnstructuredFileLoader
)

# 批量加载不同格式的文档
def batch_load_documents(directory_path):
    documents = []
    
    for root, _, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            
            try:
                if file.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                elif file.endswith('.docx'):
                    loader = Docx2txtLoader(file_path)
                elif file.endswith('.txt') or file.endswith('.md'):
                    loader = TextLoader(file_path)
                else:
                    # 尝试使用通用加载器
                    loader = UnstructuredFileLoader(file_path)
                
                docs = loader.load()
                documents.extend(docs)
                print(f"成功加载: {file_path}")
            except Exception as e:
                print(f"加载失败: {file_path}, 错误: {str(e)}")
    
    return documents

# 示例
docs = batch_load_documents("./documents")
print(f"共加载了 {len(docs)} 个文档")

6.3 一键向量化与入库

6.3.1 自动化向量化流程

python
import os
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

# 一键向量化与入库函数
def vectorize_and_store(directory_path, persist_directory="./vector_db"):
    # 1. 加载所有文档
    documents = []
    
    for root, _, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            
            try:
                if file.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                elif file.endswith('.docx'):
                    loader = Docx2txtLoader(file_path)
                elif file.endswith('.txt') or file.endswith('.md'):
                    loader = TextLoader(file_path)
                else:
                    continue
                
                docs = loader.load()
                # 添加文件路径到元数据
                for doc in docs:
                    doc.metadata["source"] = file_path
                documents.extend(docs)
                print(f"成功加载: {file_path}")
            except Exception as e:
                print(f"加载失败: {file_path}, 错误: {str(e)}")
    
    if not documents:
        print("没有加载到文档")
        return None
    
    print(f"共加载了 {len(documents)} 个文档")
    
    # 2. 分割文档
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100
    )
    chunks = text_splitter.split_documents(documents)
    print(f"分割成 {len(chunks)} 个分块")
    
    # 3. 向量化和存储
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5"
    )
    
    # 检查是否已有向量存储
    if os.path.exists(persist_directory):
        # 加载现有向量存储
        vector_store = Chroma(
            persist_directory=persist_directory,
            embedding_function=embeddings
        )
        # 添加新文档
        vector_store.add_documents(chunks)
        print("向现有向量存储添加了新文档")
    else:
        # 创建新的向量存储
        vector_store = Chroma.from_documents(
            documents=chunks,
            embedding=embeddings,
            persist_directory=persist_directory
        )
        print("创建了新的向量存储")
    
    vector_store.persist()
    print(f"向量存储已保存到: {persist_directory}")
    
    return vector_store

# 示例
if __name__ == "__main__":
    vector_store = vectorize_and_store("./documents")
    if vector_store:
        print(f"向量存储中共有 {vector_store._collection.count()} 个向量")

6.3.2 增量更新

python
# 增量更新向量存储
def update_vector_store(new_file_paths, vector_store, persist_directory="./vector_db"):
    # 加载新文档
    documents = []
    for file_path in new_file_paths:
        try:
            if file_path.endswith('.pdf'):
                loader = PyPDFLoader(file_path)
            elif file_path.endswith('.docx'):
                loader = Docx2txtLoader(file_path)
            elif file_path.endswith('.txt') or file_path.endswith('.md'):
                loader = TextLoader(file_path)
            else:
                continue
            
            docs = loader.load()
            for doc in docs:
                doc.metadata["source"] = file_path
            documents.extend(docs)
            print(f"成功加载: {file_path}")
        except Exception as e:
            print(f"加载失败: {file_path}, 错误: {str(e)}")
    
    if not documents:
        print("没有加载到新文档")
        return vector_store
    
    # 分割文档
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=100
    )
    chunks = text_splitter.split_documents(documents)
    print(f"分割成 {len(chunks)} 个分块")
    
    # 添加到向量存储
    vector_store.add_documents(chunks)
    vector_store.persist()
    print("向量存储已更新")
    print(f"向量存储中共有 {vector_store._collection.count()} 个向量")
    
    return vector_store

# 示例
if __name__ == "__main__":
    # 加载现有向量存储
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5"
    )
    vector_store = Chroma(
        persist_directory="./vector_db",
        embedding_function=embeddings
    )
    
    # 更新新文档
    new_files = ["new_document.pdf", "new_document.docx"]
    vector_store = update_vector_store(new_files, vector_store)

6.4 简单问答接口实现

6.4.1 命令行接口

python
import os
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# 加载向量存储
def load_vector_store(persist_directory="./vector_db"):
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5"
    )
    vector_store = Chroma(
        persist_directory=persist_directory,
        embedding_function=embeddings
    )
    return vector_store

# 创建问答链
def create_qa_chain(vector_store):
    llm = ChatOpenAI(
        openai_api_key=os.environ.get("OPENAI_API_KEY"),
        model="gpt-3.5-turbo",
        temperature=0.3
    )
    
    template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

    上下文:
    {context}

    问题:
    {question}

    回答:
    """
    
    prompt = PromptTemplate(
        template=template,
        input_variables=["context", "question"]
    )
    
    chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vector_store.as_retriever(),
        chain_type_kwargs={"prompt": prompt},
        return_source_documents=True
    )
    
    return chain

# 命令行交互
def cli_interface():
    print("加载向量存储...")
    vector_store = load_vector_store()
    print(f"向量存储加载完成,共有 {vector_store._collection.count()} 个向量")
    
    print("创建问答链...")
    qa_chain = create_qa_chain(vector_store)
    print("问答链创建完成")
    
    print("\n欢迎使用知识库问答系统!输入 'exit' 退出。")
    
    while True:
        query = input("\n请输入你的问题: ")
        
        if query.lower() == 'exit':
            print("再见!")
            break
        
        try:
            print("\n正在思考...")
            result = qa_chain({"query": query})
            
            print("\n回答:")
            print(result["result"])
            
            print("\n来源文档:")
            for i, doc in enumerate(result["source_documents"]):
                print(f"\n文档 {i+1}:")
                print(f"内容: {doc.page_content[:100]}...")
                print(f"来源: {doc.metadata.get('source', '未知')}")
        except Exception as e:
            print(f"发生错误: {str(e)}")

# 示例
if __name__ == "__main__":
    cli_interface()

6.4.2 Flask API 接口

python
from flask import Flask, request, jsonify
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os

app = Flask(__name__)

# 加载向量存储
embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
    persist_directory="./vector_db",
    embedding_function=embeddings
)

# 创建问答链
llm = ChatOpenAI(
    openai_api_key=os.environ.get("OPENAI_API_KEY"),
    model="gpt-3.5-turbo",
    temperature=0.3
)

template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

上下文:
{context}

问题:
{question}

回答:
"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vector_store.as_retriever(),
    chain_type_kwargs={"prompt": prompt},
    return_source_documents=True
)

@app.route('/api/qa', methods=['POST'])
def qa():
    try:
        data = request.json
        query = data.get('query')
        
        if not query:
            return jsonify({"error": "缺少查询参数"}), 400
        
        result = qa_chain({"query": query})
        
        # 构建响应
        response = {
            "answer": result["result"],
            "sources": []
        }
        
        for doc in result["source_documents"]:
            source = {
                "content": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
                "metadata": doc.metadata
            }
            response["sources"].append(source)
        
        return jsonify(response), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/status', methods=['GET'])
def status():
    try:
        count = vector_store._collection.count()
        return jsonify({"status": "ok", "vector_count": count}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

6.4.3 FastAPI 接口

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os

app = FastAPI(title="知识库问答 API")

# 加载向量存储
embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-base-zh-v1.5"
)
vector_store = Chroma(
    persist_directory="./vector_db",
    embedding_function=embeddings
)

# 创建问答链
llm = ChatOpenAI(
    openai_api_key=os.environ.get("OPENAI_API_KEY"),
    model="gpt-3.5-turbo",
    temperature=0.3
)

template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

上下文:
{context}

问题:
{question}

回答:
"""

prompt = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vector_store.as_retriever(),
    chain_type_kwargs={"prompt": prompt},
    return_source_documents=True
)

# 请求模型
class QueryRequest(BaseModel):
    query: str

# 响应模型
class Source(BaseModel):
    content: str
    metadata: dict

class QAResponse(BaseModel):
    answer: str
    sources: list[Source]

class StatusResponse(BaseModel):
    status: str
    vector_count: int

@app.post("/api/qa", response_model=QAResponse)
def qa(request: QueryRequest):
    try:
        result = qa_chain({"query": request.query})
        
        # 构建响应
        sources = []
        for doc in result["source_documents"]:
            source = Source(
                content=doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
                metadata=doc.metadata
            )
            sources.append(source)
        
        return QAResponse(
            answer=result["result"],
            sources=sources
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/status", response_model=StatusResponse)
def status():
    try:
        count = vector_store._collection.count()
        return StatusResponse(status="ok", vector_count=count)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

6.5 本章小结

  • 学习了基于 LangChain 构建基础 RAG pipeline
  • 掌握了本地文档(PDF/Word/Excel/Markdown)的读取解析
  • 实现了一键向量化与入库功能
  • 开发了简单的问答接口,包括命令行、Flask API 和 FastAPI 接口

这些知识为我们后续构建更复杂的 RAG 系统打下了基础。在接下来的章节中,我们将学习如何使用 Streamlit/Gradio 快速构建可视化知识库,实现上传、问答、历史记录等功能。