Skip to content

第84天:企业知识库-问答系统

学习目标

  • 掌握检索引擎实现
  • 学习问答引擎实现
  • 理解答案生成实现
  • 掌握多轮对话实现
  • 学习答案溯源实现

检索引擎实现

查询处理器

python
from typing import Dict, List, Optional
import jieba

class QueryProcessor:
    def __init__(self):
        jieba.initialize()
    
    async def process_query(
        self,
        query: str,
        context: Optional[Dict] = None
    ) -> Dict:
        processed_query = {
            "original": query,
            "normalized": self._normalize_query(query),
            "tokens": self._tokenize(query),
            "keywords": self._extract_keywords(query),
            "intent": await self._detect_intent(query),
            "entities": self._extract_entities(query)
        }
        
        return processed_query
    
    def _normalize_query(self, query: str) -> str:
        normalized = query.strip().lower()
        normalized = normalized.replace("?", "?").replace("!", "!")
        return normalized
    
    def _tokenize(self, query: str) -> List[str]:
        tokens = jieba.lcut(query)
        return [token for token in tokens if token.strip()]
    
    def _extract_keywords(self, query: str) -> List[str]:
        import jieba.analyse
        
        keywords = jieba.analyse.extract_tags(
            query,
            topK=5,
            withWeight=False
        )
        
        return keywords
    
    async def _detect_intent(self, query: str) -> str:
        if any(word in query for word in ["什么是", "什么是", "定义"]):
            return "definition"
        elif any(word in query for word in ["如何", "怎么", "怎样"]):
            return "how_to"
        elif any(word in query for word in ["为什么", "原因"]):
            return "why"
        elif any(word in query for word in ["比较", "区别", "差异"]):
            return "comparison"
        else:
            return "general"
    
    def _extract_entities(self, query: str) -> List[Dict]:
        entities = []
        
        tokens = self._tokenize(query)
        
        for i, token in enumerate(tokens):
            if token.isdigit():
                entities.append({
                    "text": token,
                    "type": "number",
                    "position": i
                })
            elif token.isalpha() and len(token) > 2:
                entities.append({
                    "text": token,
                    "type": "potential_entity",
                    "position": i
                })
        
        return entities

混合检索器

python
from typing import List, Dict, Optional
import math

class HybridRetriever:
    def __init__(
        self,
        vector_store,
        keyword_index
    ):
        self.vector_store = vector_store
        self.keyword_index = keyword_index
    
    async def retrieve(
        self,
        query: str,
        query_embedding: List[float],
        top_k: int = 10,
        alpha: float = 0.5
    ) -> List[Dict]:
        vector_results = await self._vector_retrieve(
            query_embedding,
            top_k * 2
        )
        
        keyword_results = await self._keyword_retrieve(
            query,
            top_k * 2
        )
        
        hybrid_results = self._merge_results(
            vector_results,
            keyword_results,
            alpha
        )
        
        return hybrid_results[:top_k]
    
    async def _vector_retrieve(
        self,
        query_embedding: List[float],
        top_k: int
    ) -> List[Dict]:
        try:
            results = self.vector_store.search(
                query_embedding,
                top_k=top_k
            )
            
            return results
        
        except Exception as e:
            print(f"向量检索失败: {e}")
            return []
    
    async def _keyword_retrieve(
        self,
        query: str,
        top_k: int
    ) -> List[Dict]:
        try:
            results = self.keyword_index.search(query, top_k)
            
            return results
        
        except Exception as e:
            print(f"关键词检索失败: {e}")
            return []
    
    def _merge_results(
        self,
        vector_results: List[Dict],
        keyword_results: List[Dict],
        alpha: float
    ) -> List[Dict]:
        merged = {}
        
        for result in vector_results:
            doc_id = result["id"]
            merged[doc_id] = {
                **result,
                "vector_score": 1 - result.get("distance", 1),
                "keyword_score": 0
            }
        
        for result in keyword_results:
            doc_id = result["id"]
            
            if doc_id in merged:
                merged[doc_id]["keyword_score"] = result.get("score", 0)
            else:
                merged[doc_id] = {
                    **result,
                    "vector_score": 0,
                    "keyword_score": result.get("score", 0)
                }
        
        for doc_id, result in merged.items():
            vector_score = result["vector_score"]
            keyword_score = result["keyword_score"]
            
            result["hybrid_score"] = (
                alpha * vector_score +
                (1 - alpha) * keyword_score
            )
        
        sorted_results = sorted(
            merged.values(),
            key=lambda x: x["hybrid_score"],
            reverse=True
        )
        
        return sorted_results

重排序器

python
import openai
from typing import List, Dict

class Reranker:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int = 5
    ) -> List[Dict]:
        if len(documents) <= top_k:
            return documents
        
        reranked_docs = await self._llm_rerank(query, documents)
        
        return reranked_docs[:top_k]
    
    async def _llm_rerank(
        self,
        query: str,
        documents: List[Dict]
    ) -> List[Dict]:
        prompt = self._build_rerank_prompt(query, documents)
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的重排序器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            reranking = json.loads(result)
            
            doc_map = {
                doc["id"]: doc
                for doc in documents
            }
            
            reranked_docs = []
            
            for item in reranking.get("rankings", []):
                doc_id = item["document_id"]
                if doc_id in doc_map:
                    doc = doc_map[doc_id]
                    doc["rerank_score"] = item["score"]
                    reranked_docs.append(doc)
            
            return reranked_docs
        
        except Exception as e:
            print(f"重排序失败: {e}")
            return documents
    
    def _build_rerank_prompt(
        self,
        query: str,
        documents: List[Dict]
    ) -> str:
        docs_text = "\n\n".join([
            f"文档{i+1} (ID: {doc['id']}):\n{doc['text'][:500]}"
            for i, doc in enumerate(documents)
        ])
        
        prompt = f"""请根据以下查询对文档进行重排序:

查询:{query}

文档:
{docs_text}

请返回JSON格式的重排序结果:
{{
  "rankings": [
    {{
      "document_id": "文档ID",
      "score": "相关性评分(0-1)",
      "reason": "排序理由"
    }}
  ]
}}"""
        
        return prompt

问答引擎实现

问题理解

python
import openai
from typing import Dict, Optional

class QuestionUnderstanding:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def understand_question(
        self,
        question: str,
        context: Optional[Dict] = None
    ) -> Dict:
        prompt = self._build_understanding_prompt(question, context)
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的问题理解器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            understanding = json.loads(result)
            
            return {
                "success": True,
                "understanding": understanding
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _build_understanding_prompt(
        self,
        question: str,
        context: Optional[Dict]
    ) -> str:
        prompt = f"""请分析以下问题:

问题:{question}

上下文:
{context if context else '无'}

请返回JSON格式的分析结果:
{{
  "intent": "问题意图",
  "key_entities": ["关键实体"],
  "question_type": "问题类型",
  "required_info": ["需要的信息"],
  "complexity": "复杂度(simple/medium/complex)"
}}"""
        
        return prompt

答案生成

python
import openai
from typing import List, Dict, Optional

class AnswerGenerator:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def generate_answer(
        self,
        question: str,
        retrieved_docs: List[Dict],
        context: Optional[Dict] = None
    ) -> Dict:
        prompt = self._build_answer_prompt(
            question,
            retrieved_docs,
            context
        )
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的问答系统"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.5,
                max_tokens=1000
            )
            
            answer = completion.choices[0].message.content
            
            return {
                "success": True,
                "answer": answer,
                "sources": [doc["id"] for doc in retrieved_docs]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _build_answer_prompt(
        self,
        question: str,
        retrieved_docs: List[Dict],
        context: Optional[Dict]
    ) -> str:
        context_text = "\n\n".join([
            f"文档{i+1} (ID: {doc['id']}):\n{doc['text']}"
            for i, doc in enumerate(retrieved_docs)
        ])
        
        prompt = f"""请根据以下文档回答问题:

问题:{question}

相关文档:
{context_text}

请提供准确、简洁的答案,并在答案中引用文档ID。

答案:"""
        
        return prompt
    
    async def generate_answer_with_citations(
        self,
        question: str,
        retrieved_docs: List[Dict],
        context: Optional[Dict] = None
    ) -> Dict:
        prompt = self._build_citation_prompt(
            question,
            retrieved_docs,
            context
        )
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的问答系统"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.5,
                max_tokens=1000
            )
            
            answer = completion.choices[0].message.content
            
            citations = self._extract_citations(answer)
            
            return {
                "success": True,
                "answer": answer,
                "citations": citations,
                "sources": [doc["id"] for doc in retrieved_docs]
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _build_citation_prompt(
        self,
        question: str,
        retrieved_docs: List[Dict],
        context: Optional[Dict]
    ) -> str:
        context_text = "\n\n".join([
            f"[{doc['id']}] {doc['text']}"
            for doc in retrieved_docs
        ])
        
        prompt = f"""请根据以下文档回答问题,并在答案中使用[文档ID]格式引用来源:

问题:{question}

相关文档:
{context_text}

请提供准确、简洁的答案,并使用[文档ID]格式引用来源。

答案:"""
        
        return prompt
    
    def _extract_citations(self, answer: str) -> List[str]:
        import re
        
        citations = re.findall(r'\[([^\]]+)\]', answer)
        
        return citations

多轮对话实现

对话管理器

python
from typing import Dict, List, Optional
from datetime import datetime

class ConversationManager:
    def __init__(self):
        self.conversations = {}
    
    def create_conversation(
        self,
        user_id: str,
        initial_context: Optional[Dict] = None
    ) -> str:
        conversation_id = f"conv_{datetime.now().timestamp()}"
        
        self.conversations[conversation_id] = {
            "user_id": user_id,
            "messages": [],
            "context": initial_context or {},
            "created_at": datetime.now().isoformat(),
            "last_activity": datetime.now().isoformat()
        }
        
        return conversation_id
    
    def add_message(
        self,
        conversation_id: str,
        role: str,
        content: str,
        metadata: Optional[Dict] = None
    ):
        if conversation_id not in self.conversations:
            raise ValueError(f"对话不存在: {conversation_id}")
        
        conversation = self.conversations[conversation_id]
        
        message = {
            "role": role,
            "content": content,
            "metadata": metadata or {},
            "timestamp": datetime.now().isoformat()
        }
        
        conversation["messages"].append(message)
        conversation["last_activity"] = datetime.now().isoformat()
    
    def get_conversation(
        self,
        conversation_id: str
    ) -> Optional[Dict]:
        return self.conversations.get(conversation_id)
    
    def get_messages(
        self,
        conversation_id: str,
        limit: int = 10
    ) -> List[Dict]:
        if conversation_id not in self.conversations:
            return []
        
        conversation = self.conversations[conversation_id]
        messages = conversation["messages"]
        
        return messages[-limit:]
    
    def update_context(
        self,
        conversation_id: str,
        context: Dict
    ):
        if conversation_id not in self.conversations:
            raise ValueError(f"对话不存在: {conversation_id}")
        
        conversation = self.conversations[conversation_id]
        conversation["context"].update(context)
    
    def get_context(
        self,
        conversation_id: str
    ) -> Dict:
        if conversation_id not in self.conversations:
            return {}
        
        return self.conversations[conversation_id]["context"]

上下文增强

python
import openai
from typing import Dict, List, Optional

class ContextEnhancer:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def enhance_query(
        self,
        query: str,
        conversation_history: List[Dict],
        context: Optional[Dict] = None
    ) -> Dict:
        prompt = self._build_enhancement_prompt(
            query,
            conversation_history,
            context
        )
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的查询增强器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            enhancement = json.loads(result)
            
            return {
                "success": True,
                "enhanced_query": enhancement["enhanced_query"],
                "context_updates": enhancement.get("context_updates", {}),
                "reasoning": enhancement.get("reasoning", "")
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "enhanced_query": query
            }
    
    def _build_enhancement_prompt(
        self,
        query: str,
        conversation_history: List[Dict],
        context: Optional[Dict]
    ) -> str:
        history_text = "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in conversation_history[-5:]
        ])
        
        prompt = f"""请根据对话历史和上下文增强当前查询:

当前查询:{query}

对话历史:
{history_text}

上下文:
{context if context else '无'}

请返回JSON格式的增强结果:
{{
  "enhanced_query": "增强后的查询",
  "context_updates": "需要更新的上下文",
  "reasoning": "增强理由"
}}"""
        
        return prompt

答案溯源实现

溯源管理器

python
from typing import Dict, List, Optional

class SourceTracker:
    def __init__(self):
        self.sources = {}
    
    def add_source(
        self,
        answer_id: str,
        document_id: str,
        chunk_id: Optional[str] = None,
        confidence: float = 1.0
    ):
        if answer_id not in self.sources:
            self.sources[answer_id] = []
        
        self.sources[answer_id].append({
            "document_id": document_id,
            "chunk_id": chunk_id,
            "confidence": confidence
        })
    
    def get_sources(
        self,
        answer_id: str
    ) -> List[Dict]:
        return self.sources.get(answer_id, [])
    
    def format_sources(
        self,
        answer_id: str,
        document_storage
    ) -> List[Dict]:
        sources = self.get_sources(answer_id)
        
        formatted_sources = []
        
        for source in sources:
            document = document_storage.get_document(
                source["document_id"]
            )
            
            if document:
                formatted_sources.append({
                    "document_id": source["document_id"],
                    "filename": document["filename"],
                    "chunk_id": source["chunk_id"],
                    "confidence": source["confidence"]
                })
        
        return formatted_sources

溯源验证

python
import openai
from typing import Dict, List

class SourceValidator:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def validate_answer(
        self,
        answer: str,
        sources: List[Dict]
    ) -> Dict:
        prompt = self._build_validation_prompt(answer, sources)
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的答案验证器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            validation = json.loads(result)
            
            return {
                "success": True,
                "validation": validation
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _build_validation_prompt(
        self,
        answer: str,
        sources: List[Dict]
    ) -> str:
        sources_text = "\n\n".join([
            f"来源{i+1}:\n{source['text']}"
            for i, source in enumerate(sources)
        ])
        
        prompt = f"""请验证以下答案是否准确:

答案:
{answer}

来源:
{sources_text}

请返回JSON格式的验证结果:
{{
  "is_accurate": true/false,
  "confidence": "置信度(0-1)",
  "issues": ["存在的问题"],
  "suggestions": ["改进建议"]
}}"""
        
        return prompt

实践练习

练习1:实现检索引擎

python
def implement_retrieval_engine():
    query_processor = QueryProcessor()
    hybrid_retriever = HybridRetriever(vector_store, keyword_index)
    reranker = Reranker(llm_client)
    
    return query_processor, hybrid_retriever, reranker

练习2:实现问答引擎

python
def implement_qa_engine():
    question_understanding = QuestionUnderstanding(llm_client)
    answer_generator = AnswerGenerator(llm_client)
    
    return question_understanding, answer_generator

练习3:实现多轮对话

python
def implement_conversation():
    conversation_manager = ConversationManager()
    context_enhancer = ContextEnhancer(llm_client)
    
    return conversation_manager, context_enhancer

总结

本节我们学习了企业知识库的问答系统:

  1. 检索引擎实现
  2. 问答引擎实现
  3. 答案生成实现
  4. 多轮对话实现
  5. 答案溯源实现

问答系统是知识库的核心功能,需要准确、高效地回答用户问题。

参考资源