Appearance
第77天:智能客服系统-部署与优化
学习目标
- 掌握Docker部署
- 学习Kubernetes部署
- 理解性能优化
- 掌握监控告警
- 学习安全加固
Docker部署
Dockerfile
dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]Docker Compose
yaml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@postgres:5432/chatbot
- REDIS_URL=redis://redis:6379
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- postgres
- redis
volumes:
- ./data:/app/data
restart: unless-stopped
postgres:
image: postgres:15
environment:
- POSTGRES_DB=chatbot
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- api
restart: unless-stopped
volumes:
postgres_data:
redis_data:Nginx配置
nginx
upstream api_backend {
server api:8000;
}
server {
listen 80;
server_name yourdomain.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
client_max_body_size 10M;
location / {
proxy_pass http://api_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
location /ws {
proxy_pass http://api_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}Kubernetes部署
Deployment配置
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: chatbot-api
namespace: chatbot
spec:
replicas: 3
selector:
matchLabels:
app: chatbot-api
template:
metadata:
labels:
app: chatbot-api
spec:
containers:
- name: api
image: your-registry/chatbot-api:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: chatbot-secrets
key: database-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: chatbot-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5Service配置
yaml
apiVersion: v1
kind: Service
metadata:
name: chatbot-api
namespace: chatbot
spec:
selector:
app: chatbot-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIPIngress配置
yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: chatbot-ingress
namespace: chatbot
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
ingressClassName: nginx
tls:
- hosts:
- chatbot.yourdomain.com
secretName: chatbot-tls
rules:
- host: chatbot.yourdomain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: chatbot-api
port:
number: 80ConfigMap
yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: chatbot-config
namespace: chatbot
data:
config.yaml: |
database:
host: postgres
port: 5432
name: chatbot
pool_size: 20
max_overflow: 10
redis:
host: redis
port: 6379
db: 0
llm:
model: gpt-4o
temperature: 0.7
max_tokens: 1000
rag:
chunk_size: 1000
chunk_overlap: 200
top_k: 5Secret
yaml
apiVersion: v1
kind: Secret
metadata:
name: chatbot-secrets
namespace: chatbot
type: Opaque
stringData:
database-url: postgresql://user:password@postgres:5432/chatbot
openai-api-key: your-openai-api-key
redis-password: your-redis-password性能优化
缓存策略
python
from functools import lru_cache
from typing import Optional
import redis
import json
import hashlib
class CacheManager:
def __init__(self, redis_url: str):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600
async def get(self, key: str) -> Optional[any]:
try:
value = self.redis_client.get(key)
if value:
return json.loads(value)
except Exception as e:
print(f"缓存获取失败: {e}")
return None
async def set(
self,
key: str,
value: any,
ttl: Optional[int] = None
):
try:
ttl = ttl or self.default_ttl
self.redis_client.setex(
key,
ttl,
json.dumps(value)
)
except Exception as e:
print(f"缓存设置失败: {e}")
async def delete(self, key: str):
try:
self.redis_client.delete(key)
except Exception as e:
print(f"缓存删除失败: {e}")
def generate_cache_key(self, *args, **kwargs) -> str:
key_data = f"{args}:{kwargs}"
return hashlib.md5(key_data.encode()).hexdigest()
def cached(ttl: int = 3600):
def decorator(func):
async def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"
cached_value = await cache_manager.get(cache_key)
if cached_value is not None:
return cached_value
result = await func(*args, **kwargs)
await cache_manager.set(cache_key, result, ttl)
return result
return wrapper
return decorator
cache_manager = CacheManager("redis://localhost:6379")连接池优化
python
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import asyncio
from aiohttp import ClientSession, TCPConnector
class ConnectionPoolManager:
def __init__(self):
self.db_engine = None
self.http_session = None
def init_db_pool(
self,
database_url: str,
pool_size: int = 20,
max_overflow: int = 10
):
self.db_engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=True,
pool_recycle=3600
)
async def init_http_pool(
self,
max_connections: int = 100,
limit_per_host: int = 10
):
connector = TCPConnector(
limit=max_connections,
limit_per_host=limit_per_host,
force_close=True,
enable_cleanup_closed=True
)
self.http_session = ClientSession(
connector=connector,
timeout=asyncio.ClientTimeout(total=30)
)
async def close_all(self):
if self.db_engine:
self.db_engine.dispose()
if self.http_session:
await self.http_session.close()
pool_manager = ConnectionPoolManager()异步优化
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncOptimizer:
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def run_in_thread(self, func, *args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
func,
*args,
**kwargs
)
async def gather_with_concurrency(
self,
coroutines,
max_concurrency: int = 10
):
semaphore = asyncio.Semaphore(max_concurrency)
async def run_with_semaphore(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*[
run_with_semaphore(coro)
for coro in coroutines
]
)
async def batch_process(
self,
items: list,
process_func,
batch_size: int = 100
):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[
process_func(item)
for item in batch
]
)
results.extend(batch_results)
return results
async_optimizer = AsyncOptimizer()监控告警
Prometheus配置
yaml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'chatbot-api'
static_configs:
- targets: ['chatbot-api:8000']
metrics_path: '/metrics'
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']Grafana Dashboard
json
{
"dashboard": {
"title": "智能客服系统监控",
"panels": [
{
"title": "请求速率",
"targets": [
{
"expr": "rate(http_requests_total[5m])"
}
],
"type": "graph"
},
{
"title": "响应时间",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
}
],
"type": "graph"
},
{
"title": "错误率",
"targets": [
{
"expr": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m])"
}
],
"type": "graph"
},
{
"title": "活跃连接数",
"targets": [
{
"expr": "active_connections"
}
],
"type": "graph"
},
{
"title": "数据库连接池",
"targets": [
{
"expr": "db_pool_size"
}
],
"type": "graph"
},
{
"title": "Redis命中率",
"targets": [
{
"expr": "redis_cache_hits / (redis_cache_hits + redis_cache_misses)"
}
],
"type": "graph"
}
]
}
}告警规则
yaml
groups:
- name: chatbot_alerts
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "高错误率告警"
description: "错误率超过5%,当前值: {{ $value }}"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "高响应时间告警"
description: "P95响应时间超过2秒,当前值: {{ $value }}秒"
- alert: LowCacheHitRate
expr: redis_cache_hits / (redis_cache_hits + redis_cache_misses) < 0.7
for: 10m
labels:
severity: warning
annotations:
summary: "低缓存命中率告警"
description: "缓存命中率低于70%,当前值: {{ $value }}"
- alert: DatabaseConnectionPoolFull
expr: db_pool_size / db_pool_max > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "数据库连接池告警"
description: "数据库连接池使用率超过90%,当前值: {{ $value }}"安全加固
API安全
python
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
security = HTTPBearer()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yourdomain.com", "*.yourdomain.com"]
)
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
if not validate_token(token):
raise HTTPException(status_code=401, detail="无效的令牌")
return token
@app.post("/api/chat")
@limiter.limit("100/minute")
async def chat(
request: Request,
message: str,
token: str = Depends(verify_token)
):
pass数据加密
python
from cryptography.fernet import Fernet
import os
class EncryptionManager:
def __init__(self):
self.key = self._get_or_create_key()
self.cipher = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
key_file = "encryption_key.key"
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key
def encrypt(self, data: str) -> str:
encrypted = self.cipher.encrypt(data.encode())
return encrypted.decode()
def decrypt(self, encrypted_data: str) -> str:
decrypted = self.cipher.decrypt(encrypted_data.encode())
return decrypted.decode()
encryption_manager = EncryptionManager()日志审计
python
import logging
from datetime import datetime
from typing import Dict, Any
import json
class AuditLogger:
def __init__(self, log_file: str = "audit.log"):
self.logger = logging.getLogger("audit")
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_event(
self,
event_type: str,
user_id: str,
action: str,
resource: str,
details: Dict[str, Any] = None
):
log_entry = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"user_id": user_id,
"action": action,
"resource": resource,
"details": details or {}
}
self.logger.info(json.dumps(log_entry))
def log_api_call(
self,
user_id: str,
endpoint: str,
method: str,
status_code: int,
response_time: float
):
self.log_event(
event_type="api_call",
user_id=user_id,
action=f"{method} {endpoint}",
resource=endpoint,
details={
"status_code": status_code,
"response_time": response_time
}
)
def log_data_access(
self,
user_id: str,
data_type: str,
record_id: str,
action: str
):
self.log_event(
event_type="data_access",
user_id=user_id,
action=action,
resource=f"{data_type}:{record_id}"
)
audit_logger = AuditLogger()实践练习
练习1:Docker部署
bash
docker-compose up -d练习2:Kubernetes部署
bash
kubectl apply -f k8s/练习3:性能优化
python
async def optimize_performance():
cache_manager = CacheManager("redis://localhost:6379")
pool_manager = ConnectionPoolManager()
return cache_manager, pool_manager总结
本节我们学习了智能客服系统的部署与优化:
- Docker部署
- Kubernetes部署
- 性能优化
- 监控告警
- 安全加固
部署和优化是系统上线的关键环节。
