Appearance
第81天:个人助理Agent-集成与部署
学习目标
- 掌握Agent集成
- 学习前端集成
- 理解部署配置
- 掌握监控告警
- 学习性能优化
Agent集成
Agent管理器
python
from typing import Dict, List, Optional
from datetime import datetime
import asyncio
class AgentManager:
def __init__(self):
self.agents = {}
self.active_sessions = {}
def register_agent(
self,
agent_id: str,
agent: BaseAgent,
config: Optional[Dict] = None
):
self.agents[agent_id] = {
"agent": agent,
"config": config or {},
"status": "registered",
"registered_at": datetime.now().isoformat()
}
async def create_session(
self,
agent_id: str,
user_id: str,
initial_context: Optional[Dict] = None
) -> Dict:
if agent_id not in self.agents:
raise ValueError(f"Agent不存在: {agent_id}")
session_id = f"session_{datetime.now().timestamp()}"
self.active_sessions[session_id] = {
"agent_id": agent_id,
"user_id": user_id,
"context": initial_context or {},
"created_at": datetime.now().isoformat(),
"last_activity": datetime.now().isoformat(),
"status": "active"
}
return {
"session_id": session_id,
"agent_id": agent_id,
"user_id": user_id,
"status": "active"
}
async def process_message(
self,
session_id: str,
message: str,
additional_context: Optional[Dict] = None
) -> Dict:
if session_id not in self.active_sessions:
raise ValueError(f"会话不存在: {session_id}")
session = self.active_sessions[session_id]
agent_id = session["agent_id"]
agent_info = self.agents[agent_id]
agent = agent_info["agent"]
context = {
**session["context"],
**(additional_context or {})
}
try:
response = await agent.process_input(message, context)
session["last_activity"] = datetime.now().isoformat()
return {
"session_id": session_id,
"response": response,
"success": True
}
except Exception as e:
return {
"session_id": session_id,
"error": str(e),
"success": False
}
async def close_session(self, session_id: str) -> Dict:
if session_id not in self.active_sessions:
raise ValueError(f"会话不存在: {session_id}")
session = self.active_sessions[session_id]
session["status"] = "closed"
session["closed_at"] = datetime.now().isoformat()
return {
"session_id": session_id,
"status": "closed"
}
def get_session_info(self, session_id: str) -> Optional[Dict]:
return self.active_sessions.get(session_id)
def get_active_sessions(self, user_id: Optional[str] = None) -> List[Dict]:
sessions = [
session
for session in self.active_sessions.values()
if session["status"] == "active"
]
if user_id:
sessions = [
session
for session in sessions
if session["user_id"] == user_id
]
return sessions
async def cleanup_inactive_sessions(self, timeout_minutes: int = 30):
timeout_seconds = timeout_minutes * 60
current_time = datetime.now().timestamp()
inactive_sessions = []
for session_id, session in self.active_sessions.items():
if session["status"] != "active":
continue
last_activity = datetime.fromisoformat(session["last_activity"]).timestamp()
if current_time - last_activity > timeout_seconds:
inactive_sessions.append(session_id)
for session_id in inactive_sessions:
await self.close_session(session_id)
return {
"cleaned_sessions": len(inactive_sessions),
"session_ids": inactive_sessions
}Agent服务
python
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional
app = FastAPI(title="个人助理Agent API")
class CreateSessionRequest(BaseModel):
agent_id: str = Field(..., description="Agent ID")
user_id: str = Field(..., description="用户ID")
initial_context: Optional[Dict] = None
class ProcessMessageRequest(BaseModel):
session_id: str = Field(..., description="会话ID")
message: str = Field(..., description="用户消息")
additional_context: Optional[Dict] = None
agent_manager = AgentManager()
@app.post("/api/sessions")
async def create_session(request: CreateSessionRequest):
try:
session = await agent_manager.create_session(
request.agent_id,
request.user_id,
request.initial_context
)
return session
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/messages")
async def process_message(request: ProcessMessageRequest):
try:
response = await agent_manager.process_message(
request.session_id,
request.message,
request.additional_context
)
return response
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/sessions/{session_id}")
async def close_session(session_id: str):
try:
response = await agent_manager.close_session(session_id)
return response
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/sessions/{session_id}")
async def get_session(session_id: str):
session = agent_manager.get_session_info(session_id)
if not session:
raise HTTPException(status_code=404, detail="会话不存在")
return session
@app.get("/api/sessions")
async def list_sessions(user_id: Optional[str] = None):
sessions = agent_manager.get_active_sessions(user_id)
return {
"sessions": sessions,
"count": len(sessions)
}
@app.post("/api/admin/cleanup")
async def cleanup_sessions(timeout_minutes: int = 30):
result = await agent_manager.cleanup_inactive_sessions(timeout_minutes)
return result前端集成
React组件
typescript
import React, { useState, useEffect, useRef } from 'react';
import { Input, Button, Card, Space, Typography, Spin } from 'antd';
import { SendOutlined, RobotOutlined, UserOutlined } from '@ant-design/icons';
const { TextArea } = Input;
const { Text, Title } = Typography;
interface Message {
role: 'user' | 'assistant';
content: string;
timestamp: string;
}
interface AgentChatProps {
agentId: string;
userId: string;
apiBaseUrl?: string;
}
const AgentChat: React.FC<AgentChatProps> = ({
agentId,
userId,
apiBaseUrl = 'http://localhost:8000/api'
}) => {
const [sessionId, setSessionId] = useState<string | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [loading, setLoading] = useState(false);
const [isTyping, setIsTyping] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
useEffect(() => {
initializeSession();
}, []);
useEffect(() => {
scrollToBottom();
}, [messages, isTyping]);
const initializeSession = async () => {
try {
const response = await fetch(`${apiBaseUrl}/sessions`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
agent_id: agentId,
user_id: userId
})
});
const data = await response.json();
setSessionId(data.session_id);
} catch (error) {
console.error('初始化会话失败:', error);
}
};
const sendMessage = async () => {
if (!input.trim() || !sessionId || loading) return;
const userMessage: Message = {
role: 'user',
content: input,
timestamp: new Date().toISOString()
};
setMessages(prev => [...prev, userMessage]);
setInput('');
setLoading(true);
setIsTyping(true);
try {
const response = await fetch(`${apiBaseUrl}/messages`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
session_id: sessionId,
message: input
})
});
const data = await response.json();
const assistantMessage: Message = {
role: 'assistant',
content: data.response.response || data.response,
timestamp: new Date().toISOString()
};
setMessages(prev => [...prev, assistantMessage]);
} catch (error) {
console.error('发送消息失败:', error);
const errorMessage: Message = {
role: 'assistant',
content: '抱歉,我遇到了一些问题,请稍后再试。',
timestamp: new Date().toISOString()
};
setMessages(prev => [...prev, errorMessage]);
} finally {
setLoading(false);
setIsTyping(false);
}
};
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
const handleKeyPress = (e: React.KeyboardEvent) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
};
return (
<Card
title={
<Space>
<RobotOutlined />
<Title level={4} style={{ margin: 0 }}>
个人助理
</Title>
</Space>
}
style={{ height: '100vh', display: 'flex', flexDirection: 'column' }}
bodyStyle={{
flex: 1,
display: 'flex',
flexDirection: 'column',
overflow: 'hidden'
}}
>
<div
style={{
flex: 1,
overflow: 'auto',
padding: '16px',
backgroundColor: '#f5f5f5',
borderRadius: '8px',
marginBottom: '16px'
}}
>
{messages.length === 0 ? (
<div style={{ textAlign: 'center', marginTop: '100px' }}>
<Text type="secondary">开始对话吧!</Text>
</div>
) : (
messages.map((message, index) => (
<div
key={index}
style={{
display: 'flex',
justifyContent: message.role === 'user' ? 'flex-end' : 'flex-start',
marginBottom: '16px'
}}
>
<Space direction="vertical" size={4}>
<div
style={{
display: 'flex',
alignItems: 'center',
gap: '8px'
}}
>
{message.role === 'user' ? (
<>
<UserOutlined />
<Text type="secondary" style={{ fontSize: '12px' }}>
{new Date(message.timestamp).toLocaleTimeString()}
</Text>
</>
) : (
<>
<RobotOutlined />
<Text type="secondary" style={{ fontSize: '12px' }}>
{new Date(message.timestamp).toLocaleTimeString()}
</Text>
</>
)}
</div>
<div
style={{
maxWidth: '70%',
padding: '12px 16px',
borderRadius: '8px',
backgroundColor: message.role === 'user' ? '#1890ff' : '#fff',
color: message.role === 'user' ? '#fff' : '#333',
wordBreak: 'break-word',
boxShadow: '0 1px 2px rgba(0,0,0,0.1)'
}}
>
{message.content}
</div>
</Space>
</div>
))
)}
{isTyping && (
<div style={{ display: 'flex', justifyContent: 'flex-start', marginBottom: '16px' }}>
<Space size={4}>
<span
style={{
width: '8px',
height: '8px',
borderRadius: '50%',
backgroundColor: '#1890ff',
animation: 'bounce 1.4s infinite ease-in-out both'
}}
/>
<span
style={{
width: '8px',
height: '8px',
borderRadius: '50%',
backgroundColor: '#1890ff',
animation: 'bounce 1.4s infinite ease-in-out both 0.16s'
}}
/>
<span
style={{
width: '8px',
height: '8px',
borderRadius: '50%',
backgroundColor: '#1890ff',
animation: 'bounce 1.4s infinite ease-in-out both 0.32s'
}}
/>
</Space>
<style>{`
@keyframes bounce {
0%, 80%, 100% {
transform: scale(0);
}
40% {
transform: scale(1);
}
}
`}</style>
</div>
)}
<div ref={messagesEndRef} />
</div>
<div style={{ display: 'flex', gap: '8px' }}>
<TextArea
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={handleKeyPress}
placeholder="输入消息,按Enter发送,Shift+Enter换行"
autoSize={{ minRows: 1, maxRows: 4 }}
disabled={loading}
style={{ flex: 1 }}
/>
<Button
type="primary"
icon={<SendOutlined />}
onClick={sendMessage}
disabled={loading || !input.trim()}
loading={loading}
>
发送
</Button>
</div>
</Card>
);
};
export default AgentChat;部署配置
Docker配置
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]yaml
version: '3.8'
services:
agent-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
volumes:
- ./data:/app/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- agent-api
restart: unless-stopped
volumes:
redis_data:Kubernetes配置
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: personal-assistant
namespace: ai-agents
spec:
replicas: 3
selector:
matchLabels:
app: personal-assistant
template:
metadata:
labels:
app: personal-assistant
spec:
containers:
- name: agent
image: your-registry/personal-assistant:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: REDIS_URL
value: "redis://redis:6379"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5监控告警
性能监控
python
import time
from typing import Dict, List
from collections import deque
class PerformanceMonitor:
def __init__(self, max_history: int = 1000):
self.max_history = max_history
self.metrics = {
"response_times": deque(maxlen=max_history),
"error_rates": deque(maxlen=max_history),
"active_sessions": deque(maxlen=max_history),
"tool_calls": deque(maxlen=max_history)
}
def record_response_time(self, response_time: float):
self.metrics["response_times"].append({
"value": response_time,
"timestamp": time.time()
})
def record_error(self, error_type: str):
self.metrics["error_rates"].append({
"error_type": error_type,
"timestamp": time.time()
})
def record_active_sessions(self, count: int):
self.metrics["active_sessions"].append({
"value": count,
"timestamp": time.time()
})
def record_tool_call(self, tool_name: str, success: bool):
self.metrics["tool_calls"].append({
"tool_name": tool_name,
"success": success,
"timestamp": time.time()
})
def get_statistics(self) -> Dict:
stats = {}
for metric_name, metric_data in self.metrics.items():
if not metric_data:
stats[metric_name] = {
"count": 0,
"avg": 0,
"min": 0,
"max": 0
}
continue
values = [
item["value"]
for item in metric_data
if "value" in item
]
if values:
stats[metric_name] = {
"count": len(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values)
}
else:
stats[metric_name] = {
"count": len(metric_data),
"total": len(metric_data)
}
return stats
def get_health_status(self) -> Dict:
stats = self.get_statistics()
response_times = stats.get("response_times", {})
error_rates = self.metrics["error_rates"]
recent_errors = [
err
for err in error_rates
if time.time() - err["timestamp"] < 300
]
health_status = {
"status": "healthy",
"checks": {
"response_time": {
"status": "ok" if response_times.get("avg", 0) < 2 else "slow",
"value": response_times.get("avg", 0)
},
"error_rate": {
"status": "ok" if len(recent_errors) < 10 else "high",
"value": len(recent_errors)
}
}
}
if response_times.get("avg", 0) > 5 or len(recent_errors) > 20:
health_status["status"] = "unhealthy"
elif response_times.get("avg", 0) > 2 or len(recent_errors) > 10:
health_status["status"] = "degraded"
return health_status告警系统
python
import asyncio
from typing import Dict, List, Callable
class AlertSystem:
def __init__(self, performance_monitor: PerformanceMonitor):
self.monitor = performance_monitor
self.alert_rules = []
self.alert_handlers = []
self.active_alerts = {}
def add_alert_rule(
self,
name: str,
condition: Callable,
severity: str = "warning"
):
self.alert_rules.append({
"name": name,
"condition": condition,
"severity": severity
})
def add_alert_handler(self, handler: Callable):
self.alert_handlers.append(handler)
async def check_alerts(self):
stats = self.monitor.get_statistics()
health = self.monitor.get_health_status()
new_alerts = []
for rule in self.alert_rules:
if rule["condition"](stats, health):
alert_id = rule["name"]
if alert_id not in self.active_alerts:
alert = {
"id": alert_id,
"name": rule["name"],
"severity": rule["severity"],
"timestamp": time.time(),
"details": {
"stats": stats,
"health": health
}
}
self.active_alerts[alert_id] = alert
new_alerts.append(alert)
for alert in new_alerts:
await self._notify_handlers(alert)
return new_alerts
async def _notify_handlers(self, alert: Dict):
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
print(f"告警处理失败: {e}")
def clear_alert(self, alert_id: str):
if alert_id in self.active_alerts:
del self.active_alerts[alert_id]
def get_active_alerts(self) -> List[Dict]:
return list(self.active_alerts.values())性能优化
缓存优化
python
import hashlib
import json
from typing import Optional, Any
import redis
class CacheManager:
def __init__(self, redis_url: str, default_ttl: int = 3600):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = default_ttl
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
key_data = f"{prefix}:{args}:{kwargs}"
return hashlib.md5(key_data.encode()).hexdigest()
async def get(self, prefix: str, *args, **kwargs) -> Optional[Any]:
key = self._generate_key(prefix, *args, **kwargs)
try:
value = self.redis_client.get(key)
if value:
return json.loads(value)
except Exception as e:
print(f"缓存获取失败: {e}")
return None
async def set(
self,
prefix: str,
value: Any,
ttl: Optional[int] = None,
*args,
**kwargs
):
key = self._generate_key(prefix, *args, **kwargs)
ttl = ttl or self.default_ttl
try:
self.redis_client.setex(
key,
ttl,
json.dumps(value)
)
except Exception as e:
print(f"缓存设置失败: {e}")
async def delete(self, prefix: str, *args, **kwargs):
key = self._generate_key(prefix, *args, **kwargs)
try:
self.redis_client.delete(key)
except Exception as e:
print(f"缓存删除失败: {e}")
async def clear_pattern(self, pattern: str):
try:
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)
except Exception as e:
print(f"批量删除缓存失败: {e}")异步优化
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncOptimizer:
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def run_in_thread(self, func, *args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
func,
*args,
**kwargs
)
async def batch_execute(
self,
tasks: List[callable],
max_concurrency: int = 10
) -> List[Any]:
semaphore = asyncio.Semaphore(max_concurrency)
async def execute_with_semaphore(task):
async with semaphore:
return await task()
results = await asyncio.gather(
*[
execute_with_semaphore(task)
for task in tasks
]
)
return results
async def parallel_process(
self,
items: list,
process_func: callable,
batch_size: int = 100
) -> List[Any]:
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await self.batch_execute([
lambda item=item: process_func(item)
for item in batch
])
results.extend(batch_results)
return results实践练习
练习1:集成Agent
python
def integrate_agent():
agent_manager = AgentManager()
return agent_manager练习2:部署Agent
bash
docker-compose up -d练习3:监控Agent
python
def monitor_agent():
monitor = PerformanceMonitor()
alert_system = AlertSystem(monitor)
return monitor, alert_system总结
本节我们学习了个人助理Agent的集成与部署:
- Agent集成
- 前端集成
- 部署配置
- 监控告警
- 性能优化
集成和部署是Agent应用上线的关键步骤。
