Skip to content

RAG 核心流程实现

5.1 数据加载→切片→向量化→入库→检索→生成

5.1.1 数据加载

支持的文件格式

  • 文本文件:.txt, .md, .docx, .pdf
  • 表格文件:.csv, .xlsx
  • 其他格式:.json, .html

数据加载实现

python
from langchain.document_loaders import PyPDFLoader, TextLoader, Docx2txtLoader, UnstructuredFileLoader

# 加载PDF文件
def load_pdf(file_path):
    loader = PyPDFLoader(file_path)
    documents = loader.load()
    return documents

# 加载文本文件
def load_text(file_path):
    loader = TextLoader(file_path)
    documents = loader.load()
    return documents

# 加载Word文件
def load_docx(file_path):
    loader = Docx2txtLoader(file_path)
    documents = loader.load()
    return documents

# 通用文件加载
def load_file(file_path):
    loader = UnstructuredFileLoader(file_path)
    documents = loader.load()
    return documents

# 示例
pdf_docs = load_pdf("document.pdf")
print(f"PDF文件加载完成,共{len(pdf_docs)}个文档")
print(pdf_docs[0].page_content[:100])  # 显示前100个字符

5.1.2 文本切片(分块)

分块策略

  • 固定长度分块:按字符数或词数分块
  • 语义分块:基于句子或段落分块
  • 重叠分块:相邻块之间保留重叠内容

分块实现

python
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter, SentenceTransformersTokenTextSplitter

# 基于字符的递归分块
def split_by_recursive_characters(documents, chunk_size=1000, chunk_overlap=100):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    chunks = text_splitter.split_documents(documents)
    return chunks

# 基于字符的分块
def split_by_characters(documents, chunk_size=1000, chunk_overlap=100):
    text_splitter = CharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    chunks = text_splitter.split_documents(documents)
    return chunks

# 基于token的分块
def split_by_tokens(documents, chunk_size=512, chunk_overlap=64):
    text_splitter = SentenceTransformersTokenTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    chunks = text_splitter.split_documents(documents)
    return chunks

# 示例
documents = load_pdf("document.pdf")
chunks = split_by_recursive_characters(documents, chunk_size=800, chunk_overlap=100)
print(f"分块完成,共{len(chunks)}个分块")
print(chunks[0].page_content[:100])  # 显示第一个分块的前100个字符

5.1.3 向量化

Embedding 模型选择

  • 开源模型:BGE, M3E, text2vec
  • API 模型:OpenAI Embeddings, Google Embeddings

向量化实现

python
from langchain.embeddings import HuggingFaceEmbeddings, OpenAIEmbeddings

# 使用开源Embedding模型
def get_open_source_embeddings():
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-base-zh-v1.5",
        model_kwargs={"device": "cpu"},
        encode_kwargs={"normalize_embeddings": True}
    )
    return embeddings

# 使用OpenAI Embeddings
def get_openai_embeddings(api_key):
    embeddings = OpenAIEmbeddings(
        openai_api_key=api_key,
        model="text-embedding-ada-002"
    )
    return embeddings

# 示例
embeddings = get_open_source_embeddings()
test_text = "企业私有知识库"
vector = embeddings.embed_query(test_text)
print(f"向量化完成,向量维度: {len(vector)}")
print(f"向量前5个值: {vector[:5]}")

5.1.4 入库

向量数据库选择

  • 本地数据库:Chroma, FAISS
  • 分布式数据库:Milvus, Qdrant
  • 云服务:Pinecone, Weaviate

入库实现

python
from langchain.vectorstores import Chroma, FAISS

# 使用Chroma向量数据库
def store_in_chroma(chunks, embeddings, persist_directory="./chroma_db"):
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_directory
    )
    vectorstore.persist()
    return vectorstore

# 使用FAISS向量数据库
def store_in_faiss(chunks, embeddings, index_path="./faiss_index"):
    vectorstore = FAISS.from_documents(
        documents=chunks,
        embedding=embeddings
    )
    vectorstore.save_local(index_path)
    return vectorstore

# 示例
vectorstore = store_in_chroma(chunks, embeddings)
print(f"入库完成,向量库中共有{vectorstore._collection.count()}个向量")

5.1.5 检索

检索方式

  • 相似度检索:基于向量相似度
  • 混合检索:结合关键词和向量相似度
  • 过滤检索:基于元数据过滤

检索实现

python
# 相似度检索
def similarity_search(vectorstore, query, k=3):
    results = vectorstore.similarity_search(
        query=query,
        k=k
    )
    return results

# 带分数的相似度检索
def similarity_search_with_score(vectorstore, query, k=3):
    results = vectorstore.similarity_search_with_score(
        query=query,
        k=k
    )
    return results

# 混合检索
def hybrid_search(vectorstore, query, k=3):
    # 这里使用Chroma的混合检索功能
    results = vectorstore.search(
        query=query,
        search_type="hybrid",
        k=k
    )
    return results

# 示例
query = "什么是 RAG 技术?"
results = similarity_search(vectorstore, query, k=3)
print(f"检索完成,找到{len(results)}个相关文档")
for i, result in enumerate(results):
    print(f"\n结果 {i+1}:")
    print(f"内容: {result.page_content[:100]}...")
    print(f"来源: {result.metadata}")

5.1.6 生成

大模型选择

  • 开源模型:Llama 3, Mistral, Qwen
  • API 模型:GPT-4, Claude, 文心一言

生成实现

python
from langchain.llms import OpenAI, HuggingFacePipeline
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# 使用OpenAI模型
def get_openai_llm(api_key):
    llm = ChatOpenAI(
        openai_api_key=api_key,
        model="gpt-3.5-turbo",
        temperature=0.3
    )
    return llm

# 使用开源模型
def get_open_source_llm(model_name="mistralai/Mistral-7B-Instruct-v0.2"):
    from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=1024,
        temperature=0.3
    )
    
    llm = HuggingFacePipeline(pipeline=pipe)
    return llm

# 创建RAG链
def create_rag_chain(vectorstore, llm):
    # 自定义Prompt
    template = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

    上下文:
    {context}

    问题:
    {question}

    回答:
    """
    
    prompt = PromptTemplate(
        template=template,
        input_variables=["context", "question"]
    )
    
    chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(),
        chain_type_kwargs={"prompt": prompt},
        return_source_documents=True
    )
    
    return chain

# 示例
# llm = get_openai_llm("YOUR_API_KEY")
llm = get_open_source_llm()  # 使用开源模型
rag_chain = create_rag_chain(vectorstore, llm)

query = "什么是 RAG 技术?"
result = rag_chain({"query": query})
print("\n生成回答:")
print(result["result"])
print("\n来源文档:")
for i, doc in enumerate(result["source_documents"]):
    print(f"\n文档 {i+1}:")
    print(f"内容: {doc.page_content[:100]}...")
    print(f"来源: {doc.metadata}")

5.2 召回、排序、重排(Rerank)机制

5.2.1 召回机制

召回策略

  • 向量召回:基于向量相似度
  • 关键词召回:基于关键词匹配
  • 混合召回:结合向量和关键词

召回实现

python
# 向量召回
def vector_recall(vectorstore, query, k=10):
    results = vectorstore.similarity_search(
        query=query,
        k=k
    )
    return results

# 关键词召回
def keyword_recall(vectorstore, query, k=10):
    # 使用Chroma的关键词搜索
    results = vectorstore.search(
        query=query,
        search_type="mmr",
        k=k
    )
    return results

# 混合召回
def hybrid_recall(vectorstore, query, k=10):
    # 使用Chroma的混合搜索
    results = vectorstore.search(
        query=query,
        search_type="hybrid",
        k=k
    )
    return results

5.2.2 排序机制

排序策略

  • 相似度排序:按向量相似度排序
  • 相关性排序:按综合相关性排序
  • 多样性排序:使用MMR(Maximum Marginal Relevance)

排序实现

python
from langchain.vectorstores import Chroma

# 使用MMR排序
def mmr_ranking(vectorstore, query, k=5, fetch_k=20):
    results = vectorstore.max_marginal_relevance_search(
        query=query,
        k=k,
        fetch_k=fetch_k
    )
    return results

# 示例
query = "什么是 RAG 技术?"
results = mmr_ranking(vectorstore, query, k=3, fetch_k=10)
print(f"排序完成,找到{len(results)}个相关文档")
for i, result in enumerate(results):
    print(f"\n结果 {i+1}:")
    print(f"内容: {result.page_content[:100]}...")

5.2.3 重排(Rerank)机制

重排模型

  • Cross-Encoder:交叉编码器,如 BAAI/bge-reranker-base
  • ColBERT:上下文感知的重排模型
  • RankBERT:基于BERT的重排模型

重排实现

python
from sentence_transformers import CrossEncoder

# 初始化重排模型
def get_reranker_model(model_name="BAAI/bge-reranker-base"):
    model = CrossEncoder(model_name)
    return model

# 重排函数
def rerank_documents(query, documents, reranker_model, top_k=3):
    # 准备输入对
    pairs = [(query, doc.page_content) for doc in documents]
    
    # 计算相关性分数
    scores = reranker_model.predict(pairs)
    
    # 按分数排序
    sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    sorted_documents = [documents[i] for i in sorted_indices[:top_k]]
    sorted_scores = [scores[i] for i in sorted_indices[:top_k]]
    
    return sorted_documents, sorted_scores

# 示例
reranker = get_reranker_model()
query = "什么是 RAG 技术?"
initial_results = similarity_search(vectorstore, query, k=10)
reranked_docs, scores = rerank_documents(query, initial_results, reranker, top_k=3)

print("重排结果:")
for i, (doc, score) in enumerate(zip(reranked_docs, scores)):
    print(f"\n结果 {i+1} (分数: {score:.4f}):")
    print(f"内容: {doc.page_content[:100]}...")

5.3 Prompt 工程在 RAG 中的应用

5.3.1 基础 Prompt 设计

Prompt 结构

  • 指令:告诉模型要做什么
  • 上下文:提供检索到的相关信息
  • 问题:用户的具体问题
  • 格式要求:指定输出格式

基础 Prompt 示例

python
basic_prompt = """使用以下上下文来回答用户的问题。如果你不知道答案,就说你不知道,不要编造答案。

上下文:
{context}

问题:
{question}

回答:
"""

5.3.2 高级 Prompt 设计

结构化 Prompt

python
structured_prompt = """你是一个专业的知识库助手,负责基于提供的上下文回答用户问题。

请按照以下步骤处理:
1. 仔细阅读上下文,理解其中的信息
2. 分析用户的问题,确定需要回答的核心内容
3. 基于上下文提供准确、详细的回答
4. 如果上下文没有相关信息,请明确说明
5. 回答要简洁明了,使用专业术语

上下文:
{context}

问题:
{question}

回答:
"""

多轮对话 Prompt

python
conversation_prompt = """你是一个专业的知识库助手,负责基于提供的上下文回答用户问题。

对话历史:
{chat_history}

当前上下文:
{context}

用户当前问题:
{question}

请根据对话历史和当前上下文,提供自然、连贯的回答。如果不知道答案,请明确说明。

回答:
"""

5.3.3 Prompt 优化技巧

  • 明确任务:清楚说明模型的角色和任务
  • 提供示例:给出期望的输出示例
  • 限制范围:明确告知模型只能基于提供的上下文回答
  • 格式规范:指定输出的格式要求
  • 追问机制:当信息不足时,引导模型向用户提问

5.4 RAG 效果评估指标

5.4.1 评估指标

  • 精准率(Precision):检索结果中相关文档的比例
  • 召回率(Recall):检索到的相关文档占所有相关文档的比例
  • F1 分数:精准率和召回率的调和平均值
  • 幻觉率(Hallucination Rate):生成回答中包含错误信息的比例
  • 答案相关性:回答与问题的相关程度
  • 回答质量:回答的准确性、完整性、清晰度

5.4.2 评估方法

人工评估

  • 专家评估:由领域专家评估回答质量
  • 用户反馈:收集用户对回答的评价
  • 对比评估:与人工回答或其他系统的回答进行对比

自动评估

  • 基于规则的评估:使用规则判断回答是否正确
  • 基于模型的评估:使用大模型评估回答质量
  • 基于相似度的评估:计算回答与标准答案的相似度

5.4.3 评估实现

python
# 计算精准率和召回率
def calculate_precision_recall(retrieved_docs, relevant_docs):
    retrieved_ids = [doc.metadata.get('id') for doc in retrieved_docs]
    relevant_ids = [doc.metadata.get('id') for doc in relevant_docs]
    
    true_positives = len(set(retrieved_ids) & set(relevant_ids))
    precision = true_positives / len(retrieved_ids) if retrieved_ids else 0
    recall = true_positives / len(relevant_ids) if relevant_ids else 0
    
    return precision, recall

# 计算F1分数
def calculate_f1(precision, recall):
    if precision + recall == 0:
        return 0
    return 2 * (precision * recall) / (precision + recall)

# 示例
retrieved_docs = similarity_search(vectorstore, "什么是 RAG 技术?", k=5)
# 假设我们知道哪些文档是相关的
relevant_docs = [doc for doc in retrieved_docs if "RAG" in doc.page_content]

precision, recall = calculate_precision_recall(retrieved_docs, relevant_docs)
f1 = calculate_f1(precision, recall)

print(f"精准率: {precision:.4f}")
print(f"召回率: {recall:.4f}")
print(f"F1分数: {f1:.4f}")

5.5 本章小结

  • 学习了 RAG 核心流程的完整实现,包括数据加载、切片、向量化、入库、检索、生成
  • 掌握了召回、排序、重排机制,提高检索效果
  • 了解了 Prompt 工程在 RAG 中的应用,优化生成质量
  • 熟悉了 RAG 效果评估指标,确保系统性能

这些知识为我们后续构建完整的 RAG 系统打下了基础。在接下来的章节中,我们将学习如何基于 LangChain 构建基础 RAG pipeline,实现本地文档的读取解析和简单问答接口。