Skip to content

第81天:个人助理Agent-集成与部署

学习目标

  • 掌握Agent集成
  • 学习前端集成
  • 理解部署配置
  • 掌握监控告警
  • 学习性能优化

Agent集成

Agent管理器

python
from typing import Dict, List, Optional
from datetime import datetime
import asyncio

class AgentManager:
    def __init__(self):
        self.agents = {}
        self.active_sessions = {}
    
    def register_agent(
        self,
        agent_id: str,
        agent: BaseAgent,
        config: Optional[Dict] = None
    ):
        self.agents[agent_id] = {
            "agent": agent,
            "config": config or {},
            "status": "registered",
            "registered_at": datetime.now().isoformat()
        }
    
    async def create_session(
        self,
        agent_id: str,
        user_id: str,
        initial_context: Optional[Dict] = None
    ) -> Dict:
        if agent_id not in self.agents:
            raise ValueError(f"Agent不存在: {agent_id}")
        
        session_id = f"session_{datetime.now().timestamp()}"
        
        self.active_sessions[session_id] = {
            "agent_id": agent_id,
            "user_id": user_id,
            "context": initial_context or {},
            "created_at": datetime.now().isoformat(),
            "last_activity": datetime.now().isoformat(),
            "status": "active"
        }
        
        return {
            "session_id": session_id,
            "agent_id": agent_id,
            "user_id": user_id,
            "status": "active"
        }
    
    async def process_message(
        self,
        session_id: str,
        message: str,
        additional_context: Optional[Dict] = None
    ) -> Dict:
        if session_id not in self.active_sessions:
            raise ValueError(f"会话不存在: {session_id}")
        
        session = self.active_sessions[session_id]
        agent_id = session["agent_id"]
        
        agent_info = self.agents[agent_id]
        agent = agent_info["agent"]
        
        context = {
            **session["context"],
            **(additional_context or {})
        }
        
        try:
            response = await agent.process_input(message, context)
            
            session["last_activity"] = datetime.now().isoformat()
            
            return {
                "session_id": session_id,
                "response": response,
                "success": True
            }
        
        except Exception as e:
            return {
                "session_id": session_id,
                "error": str(e),
                "success": False
            }
    
    async def close_session(self, session_id: str) -> Dict:
        if session_id not in self.active_sessions:
            raise ValueError(f"会话不存在: {session_id}")
        
        session = self.active_sessions[session_id]
        session["status"] = "closed"
        session["closed_at"] = datetime.now().isoformat()
        
        return {
            "session_id": session_id,
            "status": "closed"
        }
    
    def get_session_info(self, session_id: str) -> Optional[Dict]:
        return self.active_sessions.get(session_id)
    
    def get_active_sessions(self, user_id: Optional[str] = None) -> List[Dict]:
        sessions = [
            session
            for session in self.active_sessions.values()
            if session["status"] == "active"
        ]
        
        if user_id:
            sessions = [
                session
                for session in sessions
                if session["user_id"] == user_id
            ]
        
        return sessions
    
    async def cleanup_inactive_sessions(self, timeout_minutes: int = 30):
        timeout_seconds = timeout_minutes * 60
        current_time = datetime.now().timestamp()
        
        inactive_sessions = []
        
        for session_id, session in self.active_sessions.items():
            if session["status"] != "active":
                continue
            
            last_activity = datetime.fromisoformat(session["last_activity"]).timestamp()
            
            if current_time - last_activity > timeout_seconds:
                inactive_sessions.append(session_id)
        
        for session_id in inactive_sessions:
            await self.close_session(session_id)
        
        return {
            "cleaned_sessions": len(inactive_sessions),
            "session_ids": inactive_sessions
        }

Agent服务

python
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional

app = FastAPI(title="个人助理Agent API")

class CreateSessionRequest(BaseModel):
    agent_id: str = Field(..., description="Agent ID")
    user_id: str = Field(..., description="用户ID")
    initial_context: Optional[Dict] = None

class ProcessMessageRequest(BaseModel):
    session_id: str = Field(..., description="会话ID")
    message: str = Field(..., description="用户消息")
    additional_context: Optional[Dict] = None

agent_manager = AgentManager()

@app.post("/api/sessions")
async def create_session(request: CreateSessionRequest):
    try:
        session = await agent_manager.create_session(
            request.agent_id,
            request.user_id,
            request.initial_context
        )
        
        return session
    
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/messages")
async def process_message(request: ProcessMessageRequest):
    try:
        response = await agent_manager.process_message(
            request.session_id,
            request.message,
            request.additional_context
        )
        
        return response
    
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.delete("/api/sessions/{session_id}")
async def close_session(session_id: str):
    try:
        response = await agent_manager.close_session(session_id)
        
        return response
    
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/sessions/{session_id}")
async def get_session(session_id: str):
    session = agent_manager.get_session_info(session_id)
    
    if not session:
        raise HTTPException(status_code=404, detail="会话不存在")
    
    return session

@app.get("/api/sessions")
async def list_sessions(user_id: Optional[str] = None):
    sessions = agent_manager.get_active_sessions(user_id)
    
    return {
        "sessions": sessions,
        "count": len(sessions)
    }

@app.post("/api/admin/cleanup")
async def cleanup_sessions(timeout_minutes: int = 30):
    result = await agent_manager.cleanup_inactive_sessions(timeout_minutes)
    
    return result

前端集成

React组件

typescript
import React, { useState, useEffect, useRef } from 'react';
import { Input, Button, Card, Space, Typography, Spin } from 'antd';
import { SendOutlined, RobotOutlined, UserOutlined } from '@ant-design/icons';

const { TextArea } = Input;
const { Text, Title } = Typography;

interface Message {
  role: 'user' | 'assistant';
  content: string;
  timestamp: string;
}

interface AgentChatProps {
  agentId: string;
  userId: string;
  apiBaseUrl?: string;
}

const AgentChat: React.FC<AgentChatProps> = ({
  agentId,
  userId,
  apiBaseUrl = 'http://localhost:8000/api'
}) => {
  const [sessionId, setSessionId] = useState<string | null>(null);
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [loading, setLoading] = useState(false);
  const [isTyping, setIsTyping] = useState(false);
  
  const messagesEndRef = useRef<HTMLDivElement>(null);

  useEffect(() => {
    initializeSession();
  }, []);

  useEffect(() => {
    scrollToBottom();
  }, [messages, isTyping]);

  const initializeSession = async () => {
    try {
      const response = await fetch(`${apiBaseUrl}/sessions`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          agent_id: agentId,
          user_id: userId
        })
      });

      const data = await response.json();
      setSessionId(data.session_id);
    } catch (error) {
      console.error('初始化会话失败:', error);
    }
  };

  const sendMessage = async () => {
    if (!input.trim() || !sessionId || loading) return;

    const userMessage: Message = {
      role: 'user',
      content: input,
      timestamp: new Date().toISOString()
    };

    setMessages(prev => [...prev, userMessage]);
    setInput('');
    setLoading(true);
    setIsTyping(true);

    try {
      const response = await fetch(`${apiBaseUrl}/messages`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({
          session_id: sessionId,
          message: input
        })
      });

      const data = await response.json();

      const assistantMessage: Message = {
        role: 'assistant',
        content: data.response.response || data.response,
        timestamp: new Date().toISOString()
      };

      setMessages(prev => [...prev, assistantMessage]);
    } catch (error) {
      console.error('发送消息失败:', error);
      const errorMessage: Message = {
        role: 'assistant',
        content: '抱歉,我遇到了一些问题,请稍后再试。',
        timestamp: new Date().toISOString()
      };
      setMessages(prev => [...prev, errorMessage]);
    } finally {
      setLoading(false);
      setIsTyping(false);
    }
  };

  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  };

  const handleKeyPress = (e: React.KeyboardEvent) => {
    if (e.key === 'Enter' && !e.shiftKey) {
      e.preventDefault();
      sendMessage();
    }
  };

  return (
    <Card
      title={
        <Space>
          <RobotOutlined />
          <Title level={4} style={{ margin: 0 }}>
            个人助理
          </Title>
        </Space>
      }
      style={{ height: '100vh', display: 'flex', flexDirection: 'column' }}
      bodyStyle={{
        flex: 1,
        display: 'flex',
        flexDirection: 'column',
        overflow: 'hidden'
      }}
    >
      <div
        style={{
          flex: 1,
          overflow: 'auto',
          padding: '16px',
          backgroundColor: '#f5f5f5',
          borderRadius: '8px',
          marginBottom: '16px'
        }}
      >
        {messages.length === 0 ? (
          <div style={{ textAlign: 'center', marginTop: '100px' }}>
            <Text type="secondary">开始对话吧!</Text>
          </div>
        ) : (
          messages.map((message, index) => (
            <div
              key={index}
              style={{
                display: 'flex',
                justifyContent: message.role === 'user' ? 'flex-end' : 'flex-start',
                marginBottom: '16px'
              }}
            >
              <Space direction="vertical" size={4}>
                <div
                  style={{
                    display: 'flex',
                    alignItems: 'center',
                    gap: '8px'
                  }}
                >
                  {message.role === 'user' ? (
                    <>
                      <UserOutlined />
                      <Text type="secondary" style={{ fontSize: '12px' }}>
                        {new Date(message.timestamp).toLocaleTimeString()}
                      </Text>
                    </>
                  ) : (
                    <>
                      <RobotOutlined />
                      <Text type="secondary" style={{ fontSize: '12px' }}>
                        {new Date(message.timestamp).toLocaleTimeString()}
                      </Text>
                    </>
                  )}
                </div>
                <div
                  style={{
                    maxWidth: '70%',
                    padding: '12px 16px',
                    borderRadius: '8px',
                    backgroundColor: message.role === 'user' ? '#1890ff' : '#fff',
                    color: message.role === 'user' ? '#fff' : '#333',
                    wordBreak: 'break-word',
                    boxShadow: '0 1px 2px rgba(0,0,0,0.1)'
                  }}
                >
                  {message.content}
                </div>
              </Space>
            </div>
          ))
        )}
        
        {isTyping && (
          <div style={{ display: 'flex', justifyContent: 'flex-start', marginBottom: '16px' }}>
            <Space size={4}>
              <span
                style={{
                  width: '8px',
                  height: '8px',
                  borderRadius: '50%',
                  backgroundColor: '#1890ff',
                  animation: 'bounce 1.4s infinite ease-in-out both'
                }}
              />
              <span
                style={{
                  width: '8px',
                  height: '8px',
                  borderRadius: '50%',
                  backgroundColor: '#1890ff',
                  animation: 'bounce 1.4s infinite ease-in-out both 0.16s'
                }}
              />
              <span
                style={{
                  width: '8px',
                  height: '8px',
                  borderRadius: '50%',
                  backgroundColor: '#1890ff',
                  animation: 'bounce 1.4s infinite ease-in-out both 0.32s'
                }}
              />
            </Space>
            <style>{`
              @keyframes bounce {
                0%, 80%, 100% {
                  transform: scale(0);
                }
                40% {
                  transform: scale(1);
                }
              }
            `}</style>
          </div>
        )}
        
        <div ref={messagesEndRef} />
      </div>

      <div style={{ display: 'flex', gap: '8px' }}>
        <TextArea
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyPress={handleKeyPress}
          placeholder="输入消息,按Enter发送,Shift+Enter换行"
          autoSize={{ minRows: 1, maxRows: 4 }}
          disabled={loading}
          style={{ flex: 1 }}
        />
        <Button
          type="primary"
          icon={<SendOutlined />}
          onClick={sendMessage}
          disabled={loading || !input.trim()}
          loading={loading}
        >
          发送
        </Button>
      </div>
    </Card>
  );
};

export default AgentChat;

部署配置

Docker配置

dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
version: '3.8'

services:
  agent-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    volumes:
      - ./data:/app/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - agent-api
    restart: unless-stopped

volumes:
  redis_data:

Kubernetes配置

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: personal-assistant
  namespace: ai-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: personal-assistant
  template:
    metadata:
      labels:
        app: personal-assistant
    spec:
      containers:
      - name: agent
        image: your-registry/personal-assistant:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: agent-secrets
              key: openai-api-key
        - name: REDIS_URL
          value: "redis://redis:6379"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

监控告警

性能监控

python
import time
from typing import Dict, List
from collections import deque

class PerformanceMonitor:
    def __init__(self, max_history: int = 1000):
        self.max_history = max_history
        self.metrics = {
            "response_times": deque(maxlen=max_history),
            "error_rates": deque(maxlen=max_history),
            "active_sessions": deque(maxlen=max_history),
            "tool_calls": deque(maxlen=max_history)
        }
    
    def record_response_time(self, response_time: float):
        self.metrics["response_times"].append({
            "value": response_time,
            "timestamp": time.time()
        })
    
    def record_error(self, error_type: str):
        self.metrics["error_rates"].append({
            "error_type": error_type,
            "timestamp": time.time()
        })
    
    def record_active_sessions(self, count: int):
        self.metrics["active_sessions"].append({
            "value": count,
            "timestamp": time.time()
        })
    
    def record_tool_call(self, tool_name: str, success: bool):
        self.metrics["tool_calls"].append({
            "tool_name": tool_name,
            "success": success,
            "timestamp": time.time()
        })
    
    def get_statistics(self) -> Dict:
        stats = {}
        
        for metric_name, metric_data in self.metrics.items():
            if not metric_data:
                stats[metric_name] = {
                    "count": 0,
                    "avg": 0,
                    "min": 0,
                    "max": 0
                }
                continue
            
            values = [
                item["value"]
                for item in metric_data
                if "value" in item
            ]
            
            if values:
                stats[metric_name] = {
                    "count": len(values),
                    "avg": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values)
                }
            else:
                stats[metric_name] = {
                    "count": len(metric_data),
                    "total": len(metric_data)
                }
        
        return stats
    
    def get_health_status(self) -> Dict:
        stats = self.get_statistics()
        
        response_times = stats.get("response_times", {})
        error_rates = self.metrics["error_rates"]
        
        recent_errors = [
            err
            for err in error_rates
            if time.time() - err["timestamp"] < 300
        ]
        
        health_status = {
            "status": "healthy",
            "checks": {
                "response_time": {
                    "status": "ok" if response_times.get("avg", 0) < 2 else "slow",
                    "value": response_times.get("avg", 0)
                },
                "error_rate": {
                    "status": "ok" if len(recent_errors) < 10 else "high",
                    "value": len(recent_errors)
                }
            }
        }
        
        if response_times.get("avg", 0) > 5 or len(recent_errors) > 20:
            health_status["status"] = "unhealthy"
        elif response_times.get("avg", 0) > 2 or len(recent_errors) > 10:
            health_status["status"] = "degraded"
        
        return health_status

告警系统

python
import asyncio
from typing import Dict, List, Callable

class AlertSystem:
    def __init__(self, performance_monitor: PerformanceMonitor):
        self.monitor = performance_monitor
        self.alert_rules = []
        self.alert_handlers = []
        self.active_alerts = {}
    
    def add_alert_rule(
        self,
        name: str,
        condition: Callable,
        severity: str = "warning"
    ):
        self.alert_rules.append({
            "name": name,
            "condition": condition,
            "severity": severity
        })
    
    def add_alert_handler(self, handler: Callable):
        self.alert_handlers.append(handler)
    
    async def check_alerts(self):
        stats = self.monitor.get_statistics()
        health = self.monitor.get_health_status()
        
        new_alerts = []
        
        for rule in self.alert_rules:
            if rule["condition"](stats, health):
                alert_id = rule["name"]
                
                if alert_id not in self.active_alerts:
                    alert = {
                        "id": alert_id,
                        "name": rule["name"],
                        "severity": rule["severity"],
                        "timestamp": time.time(),
                        "details": {
                            "stats": stats,
                            "health": health
                        }
                    }
                    
                    self.active_alerts[alert_id] = alert
                    new_alerts.append(alert)
        
        for alert in new_alerts:
            await self._notify_handlers(alert)
        
        return new_alerts
    
    async def _notify_handlers(self, alert: Dict):
        for handler in self.alert_handlers:
            try:
                await handler(alert)
            except Exception as e:
                print(f"告警处理失败: {e}")
    
    def clear_alert(self, alert_id: str):
        if alert_id in self.active_alerts:
            del self.active_alerts[alert_id]
    
    def get_active_alerts(self) -> List[Dict]:
        return list(self.active_alerts.values())

性能优化

缓存优化

python
import hashlib
import json
from typing import Optional, Any
import redis

class CacheManager:
    def __init__(self, redis_url: str, default_ttl: int = 3600):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = default_ttl
    
    def _generate_key(self, prefix: str, *args, **kwargs) -> str:
        key_data = f"{prefix}:{args}:{kwargs}"
        return hashlib.md5(key_data.encode()).hexdigest()
    
    async def get(self, prefix: str, *args, **kwargs) -> Optional[Any]:
        key = self._generate_key(prefix, *args, **kwargs)
        
        try:
            value = self.redis_client.get(key)
            if value:
                return json.loads(value)
        except Exception as e:
            print(f"缓存获取失败: {e}")
        
        return None
    
    async def set(
        self,
        prefix: str,
        value: Any,
        ttl: Optional[int] = None,
        *args,
        **kwargs
    ):
        key = self._generate_key(prefix, *args, **kwargs)
        ttl = ttl or self.default_ttl
        
        try:
            self.redis_client.setex(
                key,
                ttl,
                json.dumps(value)
            )
        except Exception as e:
            print(f"缓存设置失败: {e}")
    
    async def delete(self, prefix: str, *args, **kwargs):
        key = self._generate_key(prefix, *args, **kwargs)
        
        try:
            self.redis_client.delete(key)
        except Exception as e:
            print(f"缓存删除失败: {e}")
    
    async def clear_pattern(self, pattern: str):
        try:
            keys = self.redis_client.keys(pattern)
            if keys:
                self.redis_client.delete(*keys)
        except Exception as e:
            print(f"批量删除缓存失败: {e}")

异步优化

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncOptimizer:
    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def run_in_thread(self, func, *args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            func,
            *args,
            **kwargs
        )
    
    async def batch_execute(
        self,
        tasks: List[callable],
        max_concurrency: int = 10
    ) -> List[Any]:
        semaphore = asyncio.Semaphore(max_concurrency)
        
        async def execute_with_semaphore(task):
            async with semaphore:
                return await task()
        
        results = await asyncio.gather(
            *[
                execute_with_semaphore(task)
                for task in tasks
            ]
        )
        
        return results
    
    async def parallel_process(
        self,
        items: list,
        process_func: callable,
        batch_size: int = 100
    ) -> List[Any]:
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            batch_results = await self.batch_execute([
                lambda item=item: process_func(item)
                for item in batch
            ])
            
            results.extend(batch_results)
        
        return results

实践练习

练习1:集成Agent

python
def integrate_agent():
    agent_manager = AgentManager()
    
    return agent_manager

练习2:部署Agent

bash
docker-compose up -d

练习3:监控Agent

python
def monitor_agent():
    monitor = PerformanceMonitor()
    alert_system = AlertSystem(monitor)
    
    return monitor, alert_system

总结

本节我们学习了个人助理Agent的集成与部署:

  1. Agent集成
  2. 前端集成
  3. 部署配置
  4. 监控告警
  5. 性能优化

集成和部署是Agent应用上线的关键步骤。

参考资源