Skip to content

第74天:智能客服系统-后端开发(上)

学习目标

  • 掌握对话服务开发
  • 学习意图识别实现
  • 理解情感分析实现
  • 掌握上下文管理
  • 学习RAG集成

对话服务开发

对话服务基础

python
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import uuid
from datetime import datetime
import openai

app = FastAPI(title="智能客服系统")

class Message(BaseModel):
    role: str = Field(..., description="消息角色:user/assistant/system")
    content: str = Field(..., description="消息内容")
    timestamp: Optional[datetime] = None

class ConversationRequest(BaseModel):
    session_id: Optional[str] = None
    message: str = Field(..., description="用户消息")
    user_id: Optional[str] = None

class ConversationResponse(BaseModel):
    session_id: str
    message_id: str
    response: str
    intent: Optional[str] = None
    sentiment: Optional[str] = None
    confidence: Optional[float] = None

class ConversationService:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
        self.conversations: Dict[str, List[Message]] = {}
    
    async def process_message(
        self,
        request: ConversationRequest
    ) -> ConversationResponse:
        session_id = request.session_id or str(uuid.uuid4())
        
        if session_id not in self.conversations:
            self.conversations[session_id] = []
        
        user_message = Message(
            role="user",
            content=request.message,
            timestamp=datetime.now()
        )
        
        self.conversations[session_id].append(user_message)
        
        messages = [
            {"role": msg.role, "content": msg.content}
            for msg in self.conversations[session_id]
        ]
        
        try:
            response = await self._generate_response(messages)
            
            assistant_message = Message(
                role="assistant",
                content=response,
                timestamp=datetime.now()
            )
            
            self.conversations[session_id].append(assistant_message)
            
            return ConversationResponse(
                session_id=session_id,
                message_id=str(uuid.uuid4()),
                response=response
            )
        
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    
    async def _generate_response(self, messages: List[Dict]) -> str:
        system_prompt = """你是一个专业的客服助手,请根据用户的问题提供准确、友好的回答。
        
回答要求:
1. 准确理解用户问题
2. 提供清晰、简洁的回答
3. 如果不确定,诚实地说明
4. 保持友好和专业的语气
5. 必要时引导用户提供更多信息"""
        
        full_messages = [
            {"role": "system", "content": system_prompt}
        ] + messages
        
        completion = self.llm_client.chat.completions.create(
            model="gpt-4o",
            messages=full_messages,
            temperature=0.7,
            max_tokens=1000
        )
        
        return completion.choices[0].message.content
    
    def get_conversation_history(
        self,
        session_id: str
    ) -> List[Message]:
        return self.conversations.get(session_id, [])
    
    def clear_conversation(self, session_id: str):
        if session_id in self.conversations:
            del self.conversations[session_id]

对话路由

python
class ConversationRouter:
    def __init__(self, intent_service, knowledge_service):
        self.intent_service = intent_service
        self.knowledge_service = knowledge_service
    
    async def route_message(
        self,
        message: str,
        context: Dict
    ) -> Dict:
        intent_result = await self.intent_service.classify(message)
        
        intent = intent_result["intent"]
        confidence = intent_result["confidence"]
        
        if confidence < 0.6:
            return {
                "route": "general_chat",
                "confidence": confidence
            }
        
        if intent == "product_inquiry":
            return await self._handle_product_inquiry(message, context)
        elif intent == "order_status":
            return await self._handle_order_status(message, context)
        elif intent == "technical_support":
            return await self._handle_technical_support(message, context)
        elif intent == "complaint":
            return await self._handle_complaint(message, context)
        else:
            return {
                "route": "general_chat",
                "confidence": confidence
            }
    
    async def _handle_product_inquiry(
        self,
        message: str,
        context: Dict
    ) -> Dict:
        search_results = await self.knowledge_service.search(
            message,
            top_k=3
        )
        
        return {
            "route": "product_inquiry",
            "confidence": 0.8,
            "context": search_results
        }
    
    async def _handle_order_status(
        self,
        message: str,
        context: Dict
    ) -> Dict:
        return {
            "route": "order_status",
            "confidence": 0.8,
            "action": "check_order"
        }
    
    async def _handle_technical_support(
        self,
        message: str,
        context: Dict
    ) -> Dict:
        search_results = await self.knowledge_service.search(
            message,
            top_k=5
        )
        
        return {
            "route": "technical_support",
            "confidence": 0.8,
            "context": search_results
        }
    
    async def _handle_complaint(
        self,
        message: str,
        context: Dict
    ) -> Dict:
        return {
            "route": "complaint",
            "confidence": 0.8,
            "action": "create_ticket"
        }

意图识别实现

意图分类器

python
class IntentClassifier:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
        self.intents = {
            "product_inquiry": "产品咨询",
            "order_status": "订单查询",
            "technical_support": "技术支持",
            "complaint": "投诉建议",
            "pricing": "价格咨询",
            "shipping": "物流咨询",
            "return_refund": "退换货",
            "account": "账户问题",
            "general_chat": "一般对话"
        }
    
    async def classify(
        self,
        message: str,
        context: Optional[Dict] = None
    ) -> Dict:
        intent_prompt = f"""请分析以下用户消息的意图,从以下意图类别中选择最合适的一个:

意图类别:
{self._format_intents()}

用户消息:{message}

请以JSON格式返回结果,包含以下字段:
- intent: 意图类别
- confidence: 置信度(0-1之间的浮点数)
- reasoning: 判断理由"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的意图分类器"},
                    {"role": "user", "content": intent_prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            return json.loads(result)
        
        except Exception as e:
            return {
                "intent": "general_chat",
                "confidence": 0.5,
                "reasoning": f"分类失败: {str(e)}"
            }
    
    def _format_intents(self) -> str:
        return "\n".join([
            f"- {key}: {value}"
            for key, value in self.intents.items()
        ])
    
    async def classify_batch(
        self,
        messages: List[str]
    ) -> List[Dict]:
        results = []
        
        for message in messages:
            result = await self.classify(message)
            results.append(result)
        
        return results

意图训练

python
class IntentTrainer:
    def __init__(self):
        self.training_data = []
    
    def add_training_example(
        self,
        message: str,
        intent: str,
        confidence: float = 1.0
    ):
        self.training_data.append({
            "message": message,
            "intent": intent,
            "confidence": confidence
        })
    
    def generate_training_prompt(self) -> str:
        examples = "\n\n".join([
            f"消息:{item['message']}\n意图:{item['intent']}\n置信度:{item['confidence']}"
            for item in self.training_data
        ])
        
        prompt = f"""以下是一些意图分类的训练示例:

{examples}

请根据这些示例,学习如何准确分类用户意图。"""
        
        return prompt
    
    def export_training_data(self, filepath: str):
        import json
        
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(self.training_data, f, ensure_ascii=False, indent=2)
    
    def import_training_data(self, filepath: str):
        import json
        
        with open(filepath, 'r', encoding='utf-8') as f:
            self.training_data = json.load(f)

情感分析实现

情感分析器

python
class SentimentAnalyzer:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
        self.sentiments = {
            "positive": "积极",
            "neutral": "中性",
            "negative": "消极",
            "angry": "愤怒",
            "frustrated": "沮丧",
            "satisfied": "满意"
        }
    
    async def analyze(
        self,
        message: str,
        context: Optional[Dict] = None
    ) -> Dict:
        sentiment_prompt = f"""请分析以下用户消息的情感,从以下情感类别中选择最合适的一个:

情感类别:
{self._format_sentiments()}

用户消息:{message}

请以JSON格式返回结果,包含以下字段:
- sentiment: 情感类别
- confidence: 置信度(0-1之间的浮点数)
- reasoning: 判断理由
- intensity: 情感强度(1-5之间的整数)"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的情感分析器"},
                    {"role": "user", "content": sentiment_prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            return json.loads(result)
        
        except Exception as e:
            return {
                "sentiment": "neutral",
                "confidence": 0.5,
                "reasoning": f"分析失败: {str(e)}",
                "intensity": 3
            }
    
    def _format_sentiments(self) -> str:
        return "\n".join([
            f"- {key}: {value}"
            for key, value in self.sentiments.items()
        ])
    
    async def analyze_batch(
        self,
        messages: List[str]
    ) -> List[Dict]:
        results = []
        
        for message in messages:
            result = await self.analyze(message)
            results.append(result)
        
        return results

情感趋势分析

python
class SentimentTrendAnalyzer:
    def __init__(self):
        self.sentiment_history = []
    
    def add_sentiment(
        self,
        sentiment: str,
        confidence: float,
        intensity: int,
        timestamp: datetime
    ):
        self.sentiment_history.append({
            "sentiment": sentiment,
            "confidence": confidence,
            "intensity": intensity,
            "timestamp": timestamp
        })
    
    def analyze_trend(self, window_size: int = 10) -> Dict:
        if len(self.sentiment_history) < window_size:
            return {
                "trend": "insufficient_data",
                "recent_sentiments": self.sentiment_history
            }
        
        recent = self.sentiment_history[-window_size:]
        
        sentiment_counts = {}
        intensity_sum = 0
        
        for item in recent:
            sentiment = item["sentiment"]
            sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
            intensity_sum += item["intensity"]
        
        avg_intensity = intensity_sum / len(recent)
        dominant_sentiment = max(sentiment_counts, key=sentiment_counts.get)
        
        trend = self._determine_trend(recent)
        
        return {
            "trend": trend,
            "dominant_sentiment": dominant_sentiment,
            "sentiment_distribution": sentiment_counts,
            "average_intensity": avg_intensity,
            "window_size": window_size
        }
    
    def _determine_trend(self, recent: List[Dict]) -> str:
        positive_count = sum(
            1 for item in recent
            if item["sentiment"] in ["positive", "satisfied"]
        )
        negative_count = sum(
            1 for item in recent
            if item["sentiment"] in ["negative", "angry", "frustrated"]
        )
        
        if positive_count > negative_count * 1.5:
            return "improving"
        elif negative_count > positive_count * 1.5:
            return "deteriorating"
        else:
            return "stable"

上下文管理

上下文管理器

python
class ContextManager:
    def __init__(self, max_context_length: int = 10):
        self.max_context_length = max_context_length
        self.contexts: Dict[str, List[Dict]] = {}
    
    def add_message(
        self,
        session_id: str,
        role: str,
        content: str,
        metadata: Optional[Dict] = None
    ):
        if session_id not in self.contexts:
            self.contexts[session_id] = []
        
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        
        self.contexts[session_id].append(message)
        
        if len(self.contexts[session_id]) > self.max_context_length:
            self.contexts[session_id] = self.contexts[session_id][-self.max_context_length:]
    
    def get_context(
        self,
        session_id: str,
        include_system: bool = True
    ) -> List[Dict]:
        context = self.contexts.get(session_id, [])
        
        if include_system:
            system_message = {
                "role": "system",
                "content": "你是一个专业的客服助手",
                "timestamp": datetime.now().isoformat()
            }
            context = [system_message] + context
        
        return context
    
    def get_summary(
        self,
        session_id: str,
        llm_client: openai.OpenAI
    ) -> str:
        context = self.get_context(session_id, include_system=False)
        
        if not context:
            return "暂无对话历史"
        
        summary_prompt = f"""请总结以下对话的要点:

对话历史:
{self._format_context(context)}

请用简洁的语言总结对话的主要内容和关键信息。"""
        
        try:
            completion = llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的对话总结器"},
                    {"role": "user", "content": summary_prompt}
                ],
                temperature=0.3,
                max_tokens=500
            )
            
            return completion.choices[0].message.content
        
        except Exception as e:
            return f"总结失败: {str(e)}"
    
    def clear_context(self, session_id: str):
        if session_id in self.contexts:
            del self.contexts[session_id]
    
    def _format_context(self, context: List[Dict]) -> str:
        return "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in context
        ])

上下文压缩

python
class ContextCompressor:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def compress(
        self,
        context: List[Dict],
        target_length: int = 2000
    ) -> List[Dict]:
        current_length = sum(len(msg["content"]) for msg in context)
        
        if current_length <= target_length:
            return context
        
        compressed = []
        system_message = None
        
        for msg in context:
            if msg["role"] == "system":
                system_message = msg
            else:
                compressed.append(msg)
        
        if system_message:
            compressed.insert(0, system_message)
        
        while len(compressed) > 2:
            if self._calculate_length(compressed) <= target_length:
                break
            
            if len(compressed) > 3:
                compressed = self._compress_middle_messages(compressed)
            else:
                compressed = self._truncate_messages(compressed, target_length)
        
        return compressed
    
    def _calculate_length(self, context: List[Dict]) -> int:
        return sum(len(msg["content"]) for msg in context)
    
    async def _compress_middle_messages(
        self,
        context: List[Dict]
    ) -> List[Dict]:
        if len(context) <= 3:
            return context
        
        middle_messages = context[1:-1]
        
        summary_prompt = f"""请总结以下对话的要点:

对话:
{self._format_messages(middle_messages)}

请用简洁的语言总结对话的主要内容和关键信息。"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的对话总结器"},
                    {"role": "user", "content": summary_prompt}
                ],
                temperature=0.3,
                max_tokens=500
            )
            
            summary = completion.choices[0].message.content
            
            compressed_message = {
                "role": "assistant",
                "content": f"[对话摘要] {summary}",
                "timestamp": datetime.now().isoformat()
            }
            
            return [context[0], compressed_message, context[-1]]
        
        except Exception as e:
            return context
    
    def _truncate_messages(
        self,
        context: List[Dict],
        target_length: int
    ) -> List[Dict]:
        system_message = context[0] if context[0]["role"] == "system" else None
        other_messages = context[1:] if system_message else context
        
        available_length = target_length - (len(system_message["content"]) if system_message else 0)
        
        truncated = []
        current_length = 0
        
        for msg in reversed(other_messages):
            msg_length = len(msg["content"])
            
            if current_length + msg_length <= available_length:
                truncated.insert(0, msg)
                current_length += msg_length
            else:
                remaining_length = available_length - current_length
                if remaining_length > 0:
                    truncated_msg = msg.copy()
                    truncated_msg["content"] = msg["content"][:remaining_length] + "..."
                    truncated.insert(0, truncated_msg)
                break
        
        if system_message:
            truncated.insert(0, system_message)
        
        return truncated
    
    def _format_messages(self, messages: List[Dict]) -> str:
        return "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages
        ])

实践练习

练习1:实现对话服务

python
def implement_conversation_service():
    llm_client = openai.OpenAI(api_key="your-api-key")
    
    service = ConversationService(llm_client)
    
    return service

练习2:实现意图识别

python
def implement_intent_classifier():
    llm_client = openai.OpenAI(api_key="your-api-key")
    
    classifier = IntentClassifier(llm_client)
    
    return classifier

练习3:实现情感分析

python
def implement_sentiment_analyzer():
    llm_client = openai.OpenAI(api_key="your-api-key")
    
    analyzer = SentimentAnalyzer(llm_client)
    
    return analyzer

总结

本节我们学习了智能客服系统后端开发(上):

  1. 对话服务开发
  2. 意图识别实现
  3. 情感分析实现
  4. 上下文管理

这些是智能客服系统的核心功能。

参考资源