Skip to content

第75天:智能客服系统-后端开发(下)

学习目标

  • 掌握知识库服务开发
  • 学习RAG实现
  • 理解工单服务开发
  • 掌握分析服务开发
  • 学习API集成

知识库服务开发

文档处理

python
from typing import List, Dict, Optional
import asyncio
from datetime import datetime
import aiofiles
from pathlib import Path
import hashlib

class DocumentProcessor:
    def __init__(self):
        self.supported_formats = [".txt", ".md", ".pdf", ".docx"]
    
    async def process_document(
        self,
        file_path: str
    ) -> Dict:
        path = Path(file_path)
        
        if not path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        if path.suffix not in self.supported_formats:
            raise ValueError(f"不支持的文件格式: {path.suffix}")
        
        file_hash = await self._calculate_file_hash(file_path)
        
        content = await self._extract_content(file_path)
        
        metadata = {
            "file_name": path.name,
            "file_path": file_path,
            "file_size": path.stat().st_size,
            "file_hash": file_hash,
            "file_type": path.suffix,
            "processed_at": datetime.now().isoformat()
        }
        
        return {
            "content": content,
            "metadata": metadata
        }
    
    async def _calculate_file_hash(self, file_path: str) -> str:
        sha256_hash = hashlib.sha256()
        
        async with aiofiles.open(file_path, 'rb') as f:
            while chunk := await f.read(8192):
                sha256_hash.update(chunk)
        
        return sha256_hash.hexdigest()
    
    async def _extract_content(self, file_path: str) -> str:
        path = Path(file_path)
        
        if path.suffix == ".txt":
            return await self._extract_text(file_path)
        elif path.suffix == ".md":
            return await self._extract_text(file_path)
        elif path.suffix == ".pdf":
            return await self._extract_pdf(file_path)
        elif path.suffix == ".docx":
            return await self._extract_docx(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {path.suffix}")
    
    async def _extract_text(self, file_path: str) -> str:
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
            return await f.read()
    
    async def _extract_pdf(self, file_path: str) -> str:
        try:
            import pypdf
            
            reader = pypdf.PdfReader(file_path)
            
            content = ""
            for page in reader.pages:
                content += page.extract_text() + "\n"
            
            return content
        
        except Exception as e:
            raise Exception(f"PDF提取失败: {str(e)}")
    
    async def _extract_docx(self, file_path: str) -> str:
        try:
            from docx import Document
            
            doc = Document(file_path)
            
            content = ""
            for paragraph in doc.paragraphs:
                content += paragraph.text + "\n"
            
            return content
        
        except Exception as e:
            raise Exception(f"DOCX提取失败: {str(e)}")

文档切片

python
class DocumentChunker:
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def chunk_document(
        self,
        content: str,
        metadata: Optional[Dict] = None
    ) -> List[Dict]:
        chunks = []
        
        paragraphs = self._split_into_paragraphs(content)
        
        current_chunk = ""
        chunk_index = 0
        
        for paragraph in paragraphs:
            if len(current_chunk) + len(paragraph) <= self.chunk_size:
                current_chunk += paragraph + "\n"
            else:
                if current_chunk:
                    chunks.append(self._create_chunk(
                        current_chunk,
                        chunk_index,
                        metadata
                    ))
                    chunk_index += 1
                
                current_chunk = paragraph + "\n"
        
        if current_chunk:
            chunks.append(self._create_chunk(
                current_chunk,
                chunk_index,
                metadata
            ))
        
        return chunks
    
    def _split_into_paragraphs(self, content: str) -> List[str]:
        paragraphs = content.split('\n\n')
        
        return [p.strip() for p in paragraphs if p.strip()]
    
    def _create_chunk(
        self,
        content: str,
        chunk_index: int,
        metadata: Optional[Dict] = None
    ) -> Dict:
        chunk_metadata = {
            "chunk_index": chunk_index,
            "chunk_size": len(content),
            "created_at": datetime.now().isoformat()
        }
        
        if metadata:
            chunk_metadata.update(metadata)
        
        return {
            "content": content,
            "metadata": chunk_metadata
        }
    
    def chunk_with_overlap(
        self,
        content: str,
        metadata: Optional[Dict] = None
    ) -> List[Dict]:
        chunks = []
        
        start = 0
        chunk_index = 0
        
        while start < len(content):
            end = start + self.chunk_size
            
            chunk_content = content[start:end]
            
            chunks.append(self._create_chunk(
                chunk_content,
                chunk_index,
                metadata
            ))
            
            chunk_index += 1
            start = end - self.chunk_overlap
        
        return chunks

向量化

python
import openai

class EmbeddingService:
    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
        self.model = "text-embedding-3-small"
    
    async def create_embedding(
        self,
        text: str
    ) -> List[float]:
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            
            return response.data[0].embedding
        
        except Exception as e:
            raise Exception(f"向量化失败: {str(e)}")
    
    async def create_embeddings_batch(
        self,
        texts: List[str],
        batch_size: int = 100
    ) -> List[List[float]]:
        embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            
            try:
                response = self.client.embeddings.create(
                    model=self.model,
                    input=batch
                )
                
                batch_embeddings = [
                    item.embedding
                    for item in response.data
                ]
                
                embeddings.extend(batch_embeddings)
            
            except Exception as e:
                raise Exception(f"批量向量化失败: {str(e)}")
        
        return embeddings
    
    async def create_embeddings_for_chunks(
        self,
        chunks: List[Dict]
    ) -> List[Dict]:
        texts = [chunk["content"] for chunk in chunks]
        
        embeddings = await self.create_embeddings_batch(texts)
        
        for i, chunk in enumerate(chunks):
            chunk["embedding"] = embeddings[i]
        
        return chunks

向量数据库集成

python
import chromadb
from chromadb.config import Settings

class VectorDatabase:
    def __init__(self, persist_directory: str = "./chroma_db"):
        self.client = chromadb.PersistentClient(
            path=persist_directory,
            settings=Settings(
                anonymized_telemetry=False
            )
        )
        self.collection = None
    
    def create_collection(
        self,
        name: str,
        embedding_dimension: int = 1536
    ):
        try:
            self.collection = self.client.create_collection(
                name=name,
                metadata={"hnsw:space": "cosine"}
            )
        except Exception as e:
            if "already exists" in str(e):
                self.collection = self.client.get_collection(name)
            else:
                raise e
    
    def add_documents(
        self,
        documents: List[Dict]
    ):
        if not self.collection:
            raise Exception("集合未创建")
        
        ids = []
        embeddings = []
        texts = []
        metadatas = []
        
        for i, doc in enumerate(documents):
            ids.append(doc.get("id", f"doc_{i}"))
            embeddings.append(doc["embedding"])
            texts.append(doc["content"])
            metadatas.append(doc.get("metadata", {}))
        
        self.collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas
        )
    
    def search(
        self,
        query_embedding: List[float],
        top_k: int = 5,
        filter_metadata: Optional[Dict] = None
    ) -> List[Dict]:
        if not self.collection:
            raise Exception("集合未创建")
        
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            where=filter_metadata
        )
        
        documents = []
        
        for i in range(len(results["ids"][0])):
            documents.append({
                "id": results["ids"][0][i],
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i]
            })
        
        return documents
    
    def delete_document(self, document_id: str):
        if not self.collection:
            raise Exception("集合未创建")
        
        self.collection.delete(ids=[document_id])
    
    def clear_collection(self):
        if not self.collection:
            raise Exception("集合未创建")
        
        self.collection.delete(where={})

RAG实现

RAG引擎

python
class RAGEngine:
    def __init__(
        self,
        embedding_service: EmbeddingService,
        vector_db: VectorDatabase,
        llm_client: openai.OpenAI
    ):
        self.embedding_service = embedding_service
        self.vector_db = vector_db
        self.llm_client = llm_client
    
    async def query(
        self,
        question: str,
        top_k: int = 5,
        filter_metadata: Optional[Dict] = None
    ) -> Dict:
        query_embedding = await self.embedding_service.create_embedding(
            question
        )
        
        relevant_docs = self.vector_db.search(
            query_embedding,
            top_k=top_k,
            filter_metadata=filter_metadata
        )
        
        context = self._build_context(relevant_docs)
        
        answer = await self._generate_answer(
            question,
            context
        )
        
        return {
            "question": question,
            "answer": answer,
            "context": relevant_docs,
            "sources": [doc["metadata"] for doc in relevant_docs]
        }
    
    def _build_context(self, documents: List[Dict]) -> str:
        context_parts = []
        
        for i, doc in enumerate(documents, 1):
            context_parts.append(
                f"[来源{i}] {doc['content']}"
            )
        
        return "\n\n".join(context_parts)
    
    async def _generate_answer(
        self,
        question: str,
        context: str
    ) -> str:
        prompt = f"""基于以下信息回答问题,如果信息不足,请诚实地说明。

参考信息:
{context}

问题:{question}

请提供准确、清晰的回答。"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的客服助手"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.7,
                max_tokens=1000
            )
            
            return completion.choices[0].message.content
        
        except Exception as e:
            raise Exception(f"生成答案失败: {str(e)}")

RAG优化

python
class RAGOptimizer:
    def __init__(self, rag_engine: RAGEngine):
        self.rag_engine = rag_engine
    
    async def query_with_rerank(
        self,
        question: str,
        initial_k: int = 10,
        final_k: int = 5
    ) -> Dict:
        initial_results = await self.rag_engine.query(
            question,
            top_k=initial_k
        )
        
        reranked_docs = await self._rerank_documents(
            question,
            initial_results["context"]
        )
        
        final_docs = reranked_docs[:final_k]
        
        context = self.rag_engine._build_context(final_docs)
        
        answer = await self.rag_engine._generate_answer(
            question,
            context
        )
        
        return {
            "question": question,
            "answer": answer,
            "context": final_docs,
            "sources": [doc["metadata"] for doc in final_docs]
        }
    
    async def _rerank_documents(
        self,
        question: str,
        documents: List[Dict]
    ) -> List[Dict]:
        rerank_prompt = f"""请根据以下问题对文档进行重新排序,返回文档索引的排序结果。

问题:{question}

文档:
{self._format_documents_for_rerank(documents)}

请返回排序后的文档索引列表,用逗号分隔。"""
        
        try:
            completion = self.rag_engine.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的文档排序器"},
                    {"role": "user", "content": rerank_prompt}
                ],
                temperature=0.3,
                max_tokens=100
            )
            
            result = completion.choices[0].message.content
            
            indices = [int(idx.strip()) for idx in result.split(',')]
            
            reranked = []
            for idx in indices:
                if 0 <= idx < len(documents):
                    reranked.append(documents[idx])
            
            return reranked
        
        except Exception as e:
            return documents
    
    def _format_documents_for_rerank(self, documents: List[Dict]) -> str:
        return "\n\n".join([
            f"[{i}] {doc['content']}"
            for i, doc in enumerate(documents)
        ])

工单服务开发

工单管理

python
from typing import Optional
from datetime import datetime
import uuid

class TicketService:
    def __init__(self):
        self.tickets: Dict[str, Dict] = {}
    
    def create_ticket(
        self,
        user_id: str,
        title: str,
        description: str,
        priority: str = "medium",
        conversation_id: Optional[str] = None
    ) -> Dict:
        ticket_id = str(uuid.uuid4())
        
        ticket = {
            "id": ticket_id,
            "user_id": user_id,
            "conversation_id": conversation_id,
            "title": title,
            "description": description,
            "priority": priority,
            "status": "open",
            "agent_id": None,
            "created_at": datetime.now().isoformat(),
            "updated_at": datetime.now().isoformat(),
            "resolved_at": None,
            "resolution": None
        }
        
        self.tickets[ticket_id] = ticket
        
        return ticket
    
    def assign_ticket(
        self,
        ticket_id: str,
        agent_id: str
    ) -> Dict:
        if ticket_id not in self.tickets:
            raise ValueError(f"工单不存在: {ticket_id}")
        
        ticket = self.tickets[ticket_id]
        
        ticket["agent_id"] = agent_id
        ticket["status"] = "in_progress"
        ticket["updated_at"] = datetime.now().isoformat()
        
        return ticket
    
    def update_ticket(
        self,
        ticket_id: str,
        **updates
    ) -> Dict:
        if ticket_id not in self.tickets:
            raise ValueError(f"工单不存在: {ticket_id}")
        
        ticket = self.tickets[ticket_id]
        
        allowed_updates = [
            "title",
            "description",
            "priority",
            "status",
            "resolution"
        ]
        
        for key, value in updates.items():
            if key in allowed_updates:
                ticket[key] = value
        
        ticket["updated_at"] = datetime.now().isoformat()
        
        if ticket["status"] == "resolved":
            ticket["resolved_at"] = datetime.now().isoformat()
        
        return ticket
    
    def get_ticket(self, ticket_id: str) -> Optional[Dict]:
        return self.tickets.get(ticket_id)
    
    def get_user_tickets(
        self,
        user_id: str,
        status: Optional[str] = None
    ) -> List[Dict]:
        tickets = [
            ticket
            for ticket in self.tickets.values()
            if ticket["user_id"] == user_id
        ]
        
        if status:
            tickets = [
                ticket
                for ticket in tickets
                if ticket["status"] == status
            ]
        
        return sorted(
            tickets,
            key=lambda x: x["created_at"],
            reverse=True
        )
    
    def get_agent_tickets(
        self,
        agent_id: str,
        status: Optional[str] = None
    ) -> List[Dict]:
        tickets = [
            ticket
            for ticket in self.tickets.values()
            if ticket["agent_id"] == agent_id
        ]
        
        if status:
            tickets = [
                ticket
                for ticket in tickets
                if ticket["status"] == status
            ]
        
        return sorted(
            tickets,
            key=lambda x: x["created_at"],
            reverse=True
        )

工单自动创建

python
class AutoTicketCreator:
    def __init__(self, ticket_service: TicketService):
        self.ticket_service = ticket_service
    
    async def should_create_ticket(
        self,
        message: str,
        sentiment: str,
        intent: str,
        conversation_history: List[Dict]
    ) -> bool:
        if sentiment in ["negative", "angry", "frustrated"]:
            return True
        
        if intent == "complaint":
            return True
        
        if await self._is_repeated_issue(conversation_history):
            return True
        
        return False
    
    async def create_ticket_from_conversation(
        self,
        user_id: str,
        conversation_id: str,
        message: str,
        sentiment: str,
        intent: str,
        conversation_history: List[Dict]
    ) -> Optional[Dict]:
        if not await self.should_create_ticket(
            message,
            sentiment,
            intent,
            conversation_history
        ):
            return None
        
        title = await self._generate_ticket_title(
            message,
            intent
        )
        
        description = await self._generate_ticket_description(
            conversation_history
        )
        
        priority = self._determine_priority(
            sentiment,
            intent
        )
        
        ticket = self.ticket_service.create_ticket(
            user_id=user_id,
            title=title,
            description=description,
            priority=priority,
            conversation_id=conversation_id
        )
        
        return ticket
    
    async def _generate_ticket_title(
        self,
        message: str,
        intent: str
    ) -> str:
        prompt = f"""请根据以下信息生成一个简洁的工单标题:

用户消息:{message}
意图:{intent}

请生成一个10-20字的工单标题。"""
        
        try:
            import openai
            client = openai.OpenAI()
            
            completion = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的工单标题生成器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=50
            )
            
            return completion.choices[0].message.content.strip()
        
        except Exception as e:
            return f"{intent} - {message[:20]}"
    
    async def _generate_ticket_description(
        self,
        conversation_history: List[Dict]
    ) -> str:
        prompt = f"""请总结以下对话,生成工单描述:

对话历史:
{self._format_conversation(conversation_history)}

请生成详细的工单描述,包括问题背景和具体需求。"""
        
        try:
            import openai
            client = openai.OpenAI()
            
            completion = client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的工单描述生成器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=500
            )
            
            return completion.choices[0].message.content.strip()
        
        except Exception as e:
            return self._format_conversation(conversation_history)
    
    def _determine_priority(
        self,
        sentiment: str,
        intent: str
    ) -> str:
        if sentiment in ["angry", "frustrated"]:
            return "high"
        
        if intent == "complaint":
            return "high"
        
        if sentiment == "negative":
            return "medium"
        
        return "low"
    
    async def _is_repeated_issue(
        self,
        conversation_history: List[Dict]
    ) -> bool:
        if len(conversation_history) < 3:
            return False
        
        recent_messages = [
            msg["content"]
            for msg in conversation_history[-3:]
        ]
        
        unique_messages = set(recent_messages)
        
        return len(unique_messages) < 3
    
    def _format_conversation(self, conversation_history: List[Dict]) -> str:
        return "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in conversation_history
        ])

实践练习

练习1:实现知识库服务

python
def implement_knowledge_service():
    embedding_service = EmbeddingService(api_key="your-api-key")
    vector_db = VectorDatabase(persist_directory="./chroma_db")
    
    return embedding_service, vector_db

练习2:实现RAG引擎

python
def implement_rag_engine():
    embedding_service = EmbeddingService(api_key="your-api-key")
    vector_db = VectorDatabase(persist_directory="./chroma_db")
    llm_client = openai.OpenAI(api_key="your-api-key")
    
    rag_engine = RAGEngine(embedding_service, vector_db, llm_client)
    
    return rag_engine

练习3:实现工单服务

python
def implement_ticket_service():
    ticket_service = TicketService()
    auto_ticket_creator = AutoTicketCreator(ticket_service)
    
    return ticket_service, auto_ticket_creator

总结

本节我们学习了智能客服系统后端开发(下):

  1. 知识库服务开发
  2. RAG实现
  3. 工单服务开发

这些是智能客服系统的重要功能模块。

参考资源