Appearance
第75天:智能客服系统-后端开发(下)
学习目标
- 掌握知识库服务开发
- 学习RAG实现
- 理解工单服务开发
- 掌握分析服务开发
- 学习API集成
知识库服务开发
文档处理
python
from typing import List, Dict, Optional
import asyncio
from datetime import datetime
import aiofiles
from pathlib import Path
import hashlib
class DocumentProcessor:
def __init__(self):
self.supported_formats = [".txt", ".md", ".pdf", ".docx"]
async def process_document(
self,
file_path: str
) -> Dict:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
if path.suffix not in self.supported_formats:
raise ValueError(f"不支持的文件格式: {path.suffix}")
file_hash = await self._calculate_file_hash(file_path)
content = await self._extract_content(file_path)
metadata = {
"file_name": path.name,
"file_path": file_path,
"file_size": path.stat().st_size,
"file_hash": file_hash,
"file_type": path.suffix,
"processed_at": datetime.now().isoformat()
}
return {
"content": content,
"metadata": metadata
}
async def _calculate_file_hash(self, file_path: str) -> str:
sha256_hash = hashlib.sha256()
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
async def _extract_content(self, file_path: str) -> str:
path = Path(file_path)
if path.suffix == ".txt":
return await self._extract_text(file_path)
elif path.suffix == ".md":
return await self._extract_text(file_path)
elif path.suffix == ".pdf":
return await self._extract_pdf(file_path)
elif path.suffix == ".docx":
return await self._extract_docx(file_path)
else:
raise ValueError(f"不支持的文件格式: {path.suffix}")
async def _extract_text(self, file_path: str) -> str:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
return await f.read()
async def _extract_pdf(self, file_path: str) -> str:
try:
import pypdf
reader = pypdf.PdfReader(file_path)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
return content
except Exception as e:
raise Exception(f"PDF提取失败: {str(e)}")
async def _extract_docx(self, file_path: str) -> str:
try:
from docx import Document
doc = Document(file_path)
content = ""
for paragraph in doc.paragraphs:
content += paragraph.text + "\n"
return content
except Exception as e:
raise Exception(f"DOCX提取失败: {str(e)}")文档切片
python
class DocumentChunker:
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk_document(
self,
content: str,
metadata: Optional[Dict] = None
) -> List[Dict]:
chunks = []
paragraphs = self._split_into_paragraphs(content)
current_chunk = ""
chunk_index = 0
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) <= self.chunk_size:
current_chunk += paragraph + "\n"
else:
if current_chunk:
chunks.append(self._create_chunk(
current_chunk,
chunk_index,
metadata
))
chunk_index += 1
current_chunk = paragraph + "\n"
if current_chunk:
chunks.append(self._create_chunk(
current_chunk,
chunk_index,
metadata
))
return chunks
def _split_into_paragraphs(self, content: str) -> List[str]:
paragraphs = content.split('\n\n')
return [p.strip() for p in paragraphs if p.strip()]
def _create_chunk(
self,
content: str,
chunk_index: int,
metadata: Optional[Dict] = None
) -> Dict:
chunk_metadata = {
"chunk_index": chunk_index,
"chunk_size": len(content),
"created_at": datetime.now().isoformat()
}
if metadata:
chunk_metadata.update(metadata)
return {
"content": content,
"metadata": chunk_metadata
}
def chunk_with_overlap(
self,
content: str,
metadata: Optional[Dict] = None
) -> List[Dict]:
chunks = []
start = 0
chunk_index = 0
while start < len(content):
end = start + self.chunk_size
chunk_content = content[start:end]
chunks.append(self._create_chunk(
chunk_content,
chunk_index,
metadata
))
chunk_index += 1
start = end - self.chunk_overlap
return chunks向量化
python
import openai
class EmbeddingService:
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)
self.model = "text-embedding-3-small"
async def create_embedding(
self,
text: str
) -> List[float]:
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
except Exception as e:
raise Exception(f"向量化失败: {str(e)}")
async def create_embeddings_batch(
self,
texts: List[str],
batch_size: int = 100
) -> List[List[float]]:
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
try:
response = self.client.embeddings.create(
model=self.model,
input=batch
)
batch_embeddings = [
item.embedding
for item in response.data
]
embeddings.extend(batch_embeddings)
except Exception as e:
raise Exception(f"批量向量化失败: {str(e)}")
return embeddings
async def create_embeddings_for_chunks(
self,
chunks: List[Dict]
) -> List[Dict]:
texts = [chunk["content"] for chunk in chunks]
embeddings = await self.create_embeddings_batch(texts)
for i, chunk in enumerate(chunks):
chunk["embedding"] = embeddings[i]
return chunks向量数据库集成
python
import chromadb
from chromadb.config import Settings
class VectorDatabase:
def __init__(self, persist_directory: str = "./chroma_db"):
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(
anonymized_telemetry=False
)
)
self.collection = None
def create_collection(
self,
name: str,
embedding_dimension: int = 1536
):
try:
self.collection = self.client.create_collection(
name=name,
metadata={"hnsw:space": "cosine"}
)
except Exception as e:
if "already exists" in str(e):
self.collection = self.client.get_collection(name)
else:
raise e
def add_documents(
self,
documents: List[Dict]
):
if not self.collection:
raise Exception("集合未创建")
ids = []
embeddings = []
texts = []
metadatas = []
for i, doc in enumerate(documents):
ids.append(doc.get("id", f"doc_{i}"))
embeddings.append(doc["embedding"])
texts.append(doc["content"])
metadatas.append(doc.get("metadata", {}))
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas
)
def search(
self,
query_embedding: List[float],
top_k: int = 5,
filter_metadata: Optional[Dict] = None
) -> List[Dict]:
if not self.collection:
raise Exception("集合未创建")
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=filter_metadata
)
documents = []
for i in range(len(results["ids"][0])):
documents.append({
"id": results["ids"][0][i],
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i]
})
return documents
def delete_document(self, document_id: str):
if not self.collection:
raise Exception("集合未创建")
self.collection.delete(ids=[document_id])
def clear_collection(self):
if not self.collection:
raise Exception("集合未创建")
self.collection.delete(where={})RAG实现
RAG引擎
python
class RAGEngine:
def __init__(
self,
embedding_service: EmbeddingService,
vector_db: VectorDatabase,
llm_client: openai.OpenAI
):
self.embedding_service = embedding_service
self.vector_db = vector_db
self.llm_client = llm_client
async def query(
self,
question: str,
top_k: int = 5,
filter_metadata: Optional[Dict] = None
) -> Dict:
query_embedding = await self.embedding_service.create_embedding(
question
)
relevant_docs = self.vector_db.search(
query_embedding,
top_k=top_k,
filter_metadata=filter_metadata
)
context = self._build_context(relevant_docs)
answer = await self._generate_answer(
question,
context
)
return {
"question": question,
"answer": answer,
"context": relevant_docs,
"sources": [doc["metadata"] for doc in relevant_docs]
}
def _build_context(self, documents: List[Dict]) -> str:
context_parts = []
for i, doc in enumerate(documents, 1):
context_parts.append(
f"[来源{i}] {doc['content']}"
)
return "\n\n".join(context_parts)
async def _generate_answer(
self,
question: str,
context: str
) -> str:
prompt = f"""基于以下信息回答问题,如果信息不足,请诚实地说明。
参考信息:
{context}
问题:{question}
请提供准确、清晰的回答。"""
try:
completion = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的客服助手"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
return completion.choices[0].message.content
except Exception as e:
raise Exception(f"生成答案失败: {str(e)}")RAG优化
python
class RAGOptimizer:
def __init__(self, rag_engine: RAGEngine):
self.rag_engine = rag_engine
async def query_with_rerank(
self,
question: str,
initial_k: int = 10,
final_k: int = 5
) -> Dict:
initial_results = await self.rag_engine.query(
question,
top_k=initial_k
)
reranked_docs = await self._rerank_documents(
question,
initial_results["context"]
)
final_docs = reranked_docs[:final_k]
context = self.rag_engine._build_context(final_docs)
answer = await self.rag_engine._generate_answer(
question,
context
)
return {
"question": question,
"answer": answer,
"context": final_docs,
"sources": [doc["metadata"] for doc in final_docs]
}
async def _rerank_documents(
self,
question: str,
documents: List[Dict]
) -> List[Dict]:
rerank_prompt = f"""请根据以下问题对文档进行重新排序,返回文档索引的排序结果。
问题:{question}
文档:
{self._format_documents_for_rerank(documents)}
请返回排序后的文档索引列表,用逗号分隔。"""
try:
completion = self.rag_engine.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的文档排序器"},
{"role": "user", "content": rerank_prompt}
],
temperature=0.3,
max_tokens=100
)
result = completion.choices[0].message.content
indices = [int(idx.strip()) for idx in result.split(',')]
reranked = []
for idx in indices:
if 0 <= idx < len(documents):
reranked.append(documents[idx])
return reranked
except Exception as e:
return documents
def _format_documents_for_rerank(self, documents: List[Dict]) -> str:
return "\n\n".join([
f"[{i}] {doc['content']}"
for i, doc in enumerate(documents)
])工单服务开发
工单管理
python
from typing import Optional
from datetime import datetime
import uuid
class TicketService:
def __init__(self):
self.tickets: Dict[str, Dict] = {}
def create_ticket(
self,
user_id: str,
title: str,
description: str,
priority: str = "medium",
conversation_id: Optional[str] = None
) -> Dict:
ticket_id = str(uuid.uuid4())
ticket = {
"id": ticket_id,
"user_id": user_id,
"conversation_id": conversation_id,
"title": title,
"description": description,
"priority": priority,
"status": "open",
"agent_id": None,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"resolved_at": None,
"resolution": None
}
self.tickets[ticket_id] = ticket
return ticket
def assign_ticket(
self,
ticket_id: str,
agent_id: str
) -> Dict:
if ticket_id not in self.tickets:
raise ValueError(f"工单不存在: {ticket_id}")
ticket = self.tickets[ticket_id]
ticket["agent_id"] = agent_id
ticket["status"] = "in_progress"
ticket["updated_at"] = datetime.now().isoformat()
return ticket
def update_ticket(
self,
ticket_id: str,
**updates
) -> Dict:
if ticket_id not in self.tickets:
raise ValueError(f"工单不存在: {ticket_id}")
ticket = self.tickets[ticket_id]
allowed_updates = [
"title",
"description",
"priority",
"status",
"resolution"
]
for key, value in updates.items():
if key in allowed_updates:
ticket[key] = value
ticket["updated_at"] = datetime.now().isoformat()
if ticket["status"] == "resolved":
ticket["resolved_at"] = datetime.now().isoformat()
return ticket
def get_ticket(self, ticket_id: str) -> Optional[Dict]:
return self.tickets.get(ticket_id)
def get_user_tickets(
self,
user_id: str,
status: Optional[str] = None
) -> List[Dict]:
tickets = [
ticket
for ticket in self.tickets.values()
if ticket["user_id"] == user_id
]
if status:
tickets = [
ticket
for ticket in tickets
if ticket["status"] == status
]
return sorted(
tickets,
key=lambda x: x["created_at"],
reverse=True
)
def get_agent_tickets(
self,
agent_id: str,
status: Optional[str] = None
) -> List[Dict]:
tickets = [
ticket
for ticket in self.tickets.values()
if ticket["agent_id"] == agent_id
]
if status:
tickets = [
ticket
for ticket in tickets
if ticket["status"] == status
]
return sorted(
tickets,
key=lambda x: x["created_at"],
reverse=True
)工单自动创建
python
class AutoTicketCreator:
def __init__(self, ticket_service: TicketService):
self.ticket_service = ticket_service
async def should_create_ticket(
self,
message: str,
sentiment: str,
intent: str,
conversation_history: List[Dict]
) -> bool:
if sentiment in ["negative", "angry", "frustrated"]:
return True
if intent == "complaint":
return True
if await self._is_repeated_issue(conversation_history):
return True
return False
async def create_ticket_from_conversation(
self,
user_id: str,
conversation_id: str,
message: str,
sentiment: str,
intent: str,
conversation_history: List[Dict]
) -> Optional[Dict]:
if not await self.should_create_ticket(
message,
sentiment,
intent,
conversation_history
):
return None
title = await self._generate_ticket_title(
message,
intent
)
description = await self._generate_ticket_description(
conversation_history
)
priority = self._determine_priority(
sentiment,
intent
)
ticket = self.ticket_service.create_ticket(
user_id=user_id,
title=title,
description=description,
priority=priority,
conversation_id=conversation_id
)
return ticket
async def _generate_ticket_title(
self,
message: str,
intent: str
) -> str:
prompt = f"""请根据以下信息生成一个简洁的工单标题:
用户消息:{message}
意图:{intent}
请生成一个10-20字的工单标题。"""
try:
import openai
client = openai.OpenAI()
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的工单标题生成器"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=50
)
return completion.choices[0].message.content.strip()
except Exception as e:
return f"{intent} - {message[:20]}"
async def _generate_ticket_description(
self,
conversation_history: List[Dict]
) -> str:
prompt = f"""请总结以下对话,生成工单描述:
对话历史:
{self._format_conversation(conversation_history)}
请生成详细的工单描述,包括问题背景和具体需求。"""
try:
import openai
client = openai.OpenAI()
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的工单描述生成器"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=500
)
return completion.choices[0].message.content.strip()
except Exception as e:
return self._format_conversation(conversation_history)
def _determine_priority(
self,
sentiment: str,
intent: str
) -> str:
if sentiment in ["angry", "frustrated"]:
return "high"
if intent == "complaint":
return "high"
if sentiment == "negative":
return "medium"
return "low"
async def _is_repeated_issue(
self,
conversation_history: List[Dict]
) -> bool:
if len(conversation_history) < 3:
return False
recent_messages = [
msg["content"]
for msg in conversation_history[-3:]
]
unique_messages = set(recent_messages)
return len(unique_messages) < 3
def _format_conversation(self, conversation_history: List[Dict]) -> str:
return "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in conversation_history
])实践练习
练习1:实现知识库服务
python
def implement_knowledge_service():
embedding_service = EmbeddingService(api_key="your-api-key")
vector_db = VectorDatabase(persist_directory="./chroma_db")
return embedding_service, vector_db练习2:实现RAG引擎
python
def implement_rag_engine():
embedding_service = EmbeddingService(api_key="your-api-key")
vector_db = VectorDatabase(persist_directory="./chroma_db")
llm_client = openai.OpenAI(api_key="your-api-key")
rag_engine = RAGEngine(embedding_service, vector_db, llm_client)
return rag_engine练习3:实现工单服务
python
def implement_ticket_service():
ticket_service = TicketService()
auto_ticket_creator = AutoTicketCreator(ticket_service)
return ticket_service, auto_ticket_creator总结
本节我们学习了智能客服系统后端开发(下):
- 知识库服务开发
- RAG实现
- 工单服务开发
这些是智能客服系统的重要功能模块。
