Skip to content

第77天:智能客服系统-部署与优化

学习目标

  • 掌握Docker部署
  • 学习Kubernetes部署
  • 理解性能优化
  • 掌握监控告警
  • 学习安全加固

Docker部署

Dockerfile

dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

yaml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@postgres:5432/chatbot
      - REDIS_URL=redis://redis:6379
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - postgres
      - redis
    volumes:
      - ./data:/app/data
    restart: unless-stopped

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=chatbot
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - api
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

Nginx配置

nginx
upstream api_backend {
    server api:8000;
}

server {
    listen 80;
    server_name yourdomain.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name yourdomain.com;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;

    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;

    client_max_body_size 10M;

    location / {
        proxy_pass http://api_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    location /ws {
        proxy_pass http://api_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

Kubernetes部署

Deployment配置

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chatbot-api
  namespace: chatbot
spec:
  replicas: 3
  selector:
    matchLabels:
      app: chatbot-api
  template:
    metadata:
      labels:
        app: chatbot-api
    spec:
      containers:
      - name: api
        image: your-registry/chatbot-api:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: chatbot-secrets
              key: database-url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: chatbot-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

Service配置

yaml
apiVersion: v1
kind: Service
metadata:
  name: chatbot-api
  namespace: chatbot
spec:
  selector:
    app: chatbot-api
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

Ingress配置

yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: chatbot-ingress
  namespace: chatbot
  annotations:
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
spec:
  ingressClassName: nginx
  tls:
  - hosts:
    - chatbot.yourdomain.com
    secretName: chatbot-tls
  rules:
  - host: chatbot.yourdomain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: chatbot-api
            port:
              number: 80

ConfigMap

yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: chatbot-config
  namespace: chatbot
data:
  config.yaml: |
    database:
      host: postgres
      port: 5432
      name: chatbot
      pool_size: 20
      max_overflow: 10
    
    redis:
      host: redis
      port: 6379
      db: 0
    
    llm:
      model: gpt-4o
      temperature: 0.7
      max_tokens: 1000
    
    rag:
      chunk_size: 1000
      chunk_overlap: 200
      top_k: 5

Secret

yaml
apiVersion: v1
kind: Secret
metadata:
  name: chatbot-secrets
  namespace: chatbot
type: Opaque
stringData:
  database-url: postgresql://user:password@postgres:5432/chatbot
  openai-api-key: your-openai-api-key
  redis-password: your-redis-password

性能优化

缓存策略

python
from functools import lru_cache
from typing import Optional
import redis
import json
import hashlib

class CacheManager:
    def __init__(self, redis_url: str):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 3600
    
    async def get(self, key: str) -> Optional[any]:
        try:
            value = self.redis_client.get(key)
            if value:
                return json.loads(value)
        except Exception as e:
            print(f"缓存获取失败: {e}")
        return None
    
    async def set(
        self,
        key: str,
        value: any,
        ttl: Optional[int] = None
    ):
        try:
            ttl = ttl or self.default_ttl
            self.redis_client.setex(
                key,
                ttl,
                json.dumps(value)
            )
        except Exception as e:
            print(f"缓存设置失败: {e}")
    
    async def delete(self, key: str):
        try:
            self.redis_client.delete(key)
        except Exception as e:
            print(f"缓存删除失败: {e}")
    
    def generate_cache_key(self, *args, **kwargs) -> str:
        key_data = f"{args}:{kwargs}"
        return hashlib.md5(key_data.encode()).hexdigest()

def cached(ttl: int = 3600):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            cache_key = f"{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"
            
            cached_value = await cache_manager.get(cache_key)
            if cached_value is not None:
                return cached_value
            
            result = await func(*args, **kwargs)
            await cache_manager.set(cache_key, result, ttl)
            
            return result
        return wrapper
    return decorator

cache_manager = CacheManager("redis://localhost:6379")

连接池优化

python
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import asyncio
from aiohttp import ClientSession, TCPConnector

class ConnectionPoolManager:
    def __init__(self):
        self.db_engine = None
        self.http_session = None
    
    def init_db_pool(
        self,
        database_url: str,
        pool_size: int = 20,
        max_overflow: int = 10
    ):
        self.db_engine = create_engine(
            database_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=max_overflow,
            pool_pre_ping=True,
            pool_recycle=3600
        )
    
    async def init_http_pool(
        self,
        max_connections: int = 100,
        limit_per_host: int = 10
    ):
        connector = TCPConnector(
            limit=max_connections,
            limit_per_host=limit_per_host,
            force_close=True,
            enable_cleanup_closed=True
        )
        
        self.http_session = ClientSession(
            connector=connector,
            timeout=asyncio.ClientTimeout(total=30)
        )
    
    async def close_all(self):
        if self.db_engine:
            self.db_engine.dispose()
        
        if self.http_session:
            await self.http_session.close()

pool_manager = ConnectionPoolManager()

异步优化

python
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncOptimizer:
    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def run_in_thread(self, func, *args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            func,
            *args,
            **kwargs
        )
    
    async def gather_with_concurrency(
        self,
        coroutines,
        max_concurrency: int = 10
    ):
        semaphore = asyncio.Semaphore(max_concurrency)
        
        async def run_with_semaphore(coro):
            async with semaphore:
                return await coro
        
        return await asyncio.gather(
            *[
                run_with_semaphore(coro)
                for coro in coroutines
            ]
        )
    
    async def batch_process(
        self,
        items: list,
        process_func,
        batch_size: int = 100
    ):
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            batch_results = await asyncio.gather(
                *[
                    process_func(item)
                    for item in batch
                ]
            )
            
            results.extend(batch_results)
        
        return results

async_optimizer = AsyncOptimizer()

监控告警

Prometheus配置

yaml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'chatbot-api'
    static_configs:
      - targets: ['chatbot-api:8000']
    metrics_path: '/metrics'

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

Grafana Dashboard

json
{
  "dashboard": {
    "title": "智能客服系统监控",
    "panels": [
      {
        "title": "请求速率",
        "targets": [
          {
            "expr": "rate(http_requests_total[5m])"
          }
        ],
        "type": "graph"
      },
      {
        "title": "响应时间",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
          }
        ],
        "type": "graph"
      },
      {
        "title": "错误率",
        "targets": [
          {
            "expr": "rate(http_requests_total{status=~\"5..\"}[5m]) / rate(http_requests_total[5m])"
          }
        ],
        "type": "graph"
      },
      {
        "title": "活跃连接数",
        "targets": [
          {
            "expr": "active_connections"
          }
        ],
        "type": "graph"
      },
      {
        "title": "数据库连接池",
        "targets": [
          {
            "expr": "db_pool_size"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Redis命中率",
        "targets": [
          {
            "expr": "redis_cache_hits / (redis_cache_hits + redis_cache_misses)"
          }
        ],
        "type": "graph"
      }
    ]
  }
}

告警规则

yaml
groups:
  - name: chatbot_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "高错误率告警"
          description: "错误率超过5%,当前值: {{ $value }}"
      
      - alert: HighResponseTime
        expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "高响应时间告警"
          description: "P95响应时间超过2秒,当前值: {{ $value }}秒"
      
      - alert: LowCacheHitRate
        expr: redis_cache_hits / (redis_cache_hits + redis_cache_misses) < 0.7
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "低缓存命中率告警"
          description: "缓存命中率低于70%,当前值: {{ $value }}"
      
      - alert: DatabaseConnectionPoolFull
        expr: db_pool_size / db_pool_max > 0.9
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "数据库连接池告警"
          description: "数据库连接池使用率超过90%,当前值: {{ $value }}"

安全加固

API安全

python
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

security = HTTPBearer()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["yourdomain.com", "*.yourdomain.com"]
)

@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    
    return response

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    
    if not validate_token(token):
        raise HTTPException(status_code=401, detail="无效的令牌")
    
    return token

@app.post("/api/chat")
@limiter.limit("100/minute")
async def chat(
    request: Request,
    message: str,
    token: str = Depends(verify_token)
):
    pass

数据加密

python
from cryptography.fernet import Fernet
import os

class EncryptionManager:
    def __init__(self):
        self.key = self._get_or_create_key()
        self.cipher = Fernet(self.key)
    
    def _get_or_create_key(self) -> bytes:
        key_file = "encryption_key.key"
        
        if os.path.exists(key_file):
            with open(key_file, 'rb') as f:
                return f.read()
        else:
            key = Fernet.generate_key()
            with open(key_file, 'wb') as f:
                f.write(key)
            return key
    
    def encrypt(self, data: str) -> str:
        encrypted = self.cipher.encrypt(data.encode())
        return encrypted.decode()
    
    def decrypt(self, encrypted_data: str) -> str:
        decrypted = self.cipher.decrypt(encrypted_data.encode())
        return decrypted.decode()

encryption_manager = EncryptionManager()

日志审计

python
import logging
from datetime import datetime
from typing import Dict, Any
import json

class AuditLogger:
    def __init__(self, log_file: str = "audit.log"):
        self.logger = logging.getLogger("audit")
        self.logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler(log_file)
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    
    def log_event(
        self,
        event_type: str,
        user_id: str,
        action: str,
        resource: str,
        details: Dict[str, Any] = None
    ):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "action": action,
            "resource": resource,
            "details": details or {}
        }
        
        self.logger.info(json.dumps(log_entry))
    
    def log_api_call(
        self,
        user_id: str,
        endpoint: str,
        method: str,
        status_code: int,
        response_time: float
    ):
        self.log_event(
            event_type="api_call",
            user_id=user_id,
            action=f"{method} {endpoint}",
            resource=endpoint,
            details={
                "status_code": status_code,
                "response_time": response_time
            }
        )
    
    def log_data_access(
        self,
        user_id: str,
        data_type: str,
        record_id: str,
        action: str
    ):
        self.log_event(
            event_type="data_access",
            user_id=user_id,
            action=action,
            resource=f"{data_type}:{record_id}"
        )

audit_logger = AuditLogger()

实践练习

练习1:Docker部署

bash
docker-compose up -d

练习2:Kubernetes部署

bash
kubectl apply -f k8s/

练习3:性能优化

python
async def optimize_performance():
    cache_manager = CacheManager("redis://localhost:6379")
    pool_manager = ConnectionPoolManager()
    
    return cache_manager, pool_manager

总结

本节我们学习了智能客服系统的部署与优化:

  1. Docker部署
  2. Kubernetes部署
  3. 性能优化
  4. 监控告警
  5. 安全加固

部署和优化是系统上线的关键环节。

参考资源