Skip to content

第56天:模型部署

学习目标

  • 掌握模型量化技术
  • 学习推理优化方法
  • 理解部署架构
  • 掌握监控和维护
  • 了解成本优化策略

模型量化

基础量化

python
import torch
import torch.nn as nn

class ModelQuantizer:
    def __init__(self, model: nn.Module):
        self.model = model
    
    def quantize_dynamic(self, qconfig_spec: dict = None):
        if qconfig_spec is None:
            qconfig_spec = {
                nn.Linear: torch.quantization.default_dynamic_qconfig,
                nn.LSTM: torch.quantization.default_dynamic_qconfig,
                nn.GRU: torch.quantization.default_dynamic_qconfig
            }
        
        model_prepared = torch.quantization.prepare_dynamic(
            self.model,
            qconfig_spec
        )
        
        model_quantized = torch.quantization.convert(model_prepared)
        
        return model_quantized
    
    def quantize_static(self, calibration_loader: list):
        model_prepared = torch.quantization.prepare(
            self.model,
            inplace=False
        )
        
        with torch.no_grad():
            for data in calibration_loader:
                model_prepared(data)
        
        model_quantized = torch.quantization.convert(model_prepared)
        
        return model_quantized
    
    def quantize_to_int8(self):
        model_int8 = torch.quantization.quantize_dynamic(
            self.model,
            {nn.Linear, nn.Conv2d},
            dtype=torch.qint8
        )
        
        return model_int8

4位量化

python
class FourBitQuantizer:
    def __init__(self, model: nn.Module):
        self.model = model
    
    def quantize_to_nf4(self):
        try:
            from transformers import BitsAndBytesConfig, AutoModelForCausalLM
        except ImportError:
            raise ImportError("Install transformers: pip install transformers")
        
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_use_double_quant=True,
            bnb_4bit_compute_dtype=torch.bfloat16
        )
        
        return self.model
    
    def quantize_to_fp4(self):
        try:
            from transformers import BitsAndBytesConfig
        except ImportError:
            raise ImportError("Install transformers: pip install transformers")
        
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_quant_type="fp4",
            bnb_4bit_use_double_quant=True,
            bnb_4bit_compute_dtype=torch.bfloat16
        )
        
        return self.model

GPTQ量化

python
class GPTQQuantizer:
    def __init__(self, model: nn.Module):
        self.model = model
    
    def quantize_gptq(self, bits: int = 4, 
                       groupsize: int = 128):
        try:
            from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
        except ImportError:
            raise ImportError("Install auto-gptq: pip install auto-gptq")
        
        quantize_config = BaseQuantizeConfig(
            bits=bits,
            groupsize=groupsize,
            damp_percent=0.01,
            desc_act=False
        )
        
        model_quantized = AutoGPTQForCausalLM.from_pretrained(
            self.model.config.name_or_path,
            quantize_config=quantize_config
        )
        
        return model_quantized

推理优化

批处理优化

python
class BatchInferenceOptimizer:
    def __init__(self, model, tokenizer, 
                 max_batch_size: int = 8):
        self.model = model
        self.tokenizer = tokenizer
        self.max_batch_size = max_batch_size
    
    def batch_infer(self, prompts: List[str]) -> List[str]:
        results = []
        
        for i in range(0, len(prompts), self.max_batch_size):
            batch = prompts[i:i + self.max_batch_size]
            batch_results = self._process_batch(batch)
            results.extend(batch_results)
        
        return results
    
    def _process_batch(self, batch: List[str]) -> List[str]:
        inputs = self.tokenizer(
            batch,
            padding=True,
            truncation=True,
            return_tensors="pt"
        )
        
        with torch.no_grad():
            outputs = self.model.generate(**inputs)
        
        results = self.tokenizer.batch_decode(
            outputs,
            skip_special_tokens=True
        )
        
        return results

KV Cache优化

python
class KVCacheOptimizer:
    def __init__(self, model):
        self.model = model
        self.kv_cache = None
    
    def generate_with_cache(self, input_ids: torch.Tensor, 
                           max_new_tokens: int = 100) -> torch.Tensor:
        if self.kv_cache is None:
            outputs = self.model(
                input_ids,
                use_cache=True,
                past_key_values=None
            )
        else:
            outputs = self.model(
                input_ids[:, -1:],
                use_cache=True,
                past_key_values=self.kv_cache
            )
        
        self.kv_cache = outputs.past_key_values
        
        new_token = outputs.logits[:, -1:].argmax(dim=-1)
        
        return torch.cat([input_ids, new_token], dim=-1)
    
    def clear_cache(self):
        self.kv_cache = None

Flash Attention

python
class FlashAttentionOptimizer:
    def __init__(self, model):
        self.model = model
        self._enable_flash_attention()
    
    def _enable_flash_attention(self):
        for name, module in self.model.named_modules():
            if "attn" in name.lower():
                if hasattr(module, "use_flash_attention"):
                    module.use_flash_attention = True
    
    def forward(self, *args, **kwargs):
        return self.model(*args, **kwargs)

部署架构

FastAPI部署

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import torch

app = FastAPI()

class GenerationRequest(BaseModel):
    prompt: str
    max_length: int = 100
    temperature: float = 0.7
    top_p: float = 0.9
    num_return_sequences: int = 1

class GenerationResponse(BaseModel):
    generated_text: str
    generation_time: float

class ModelServer:
    def __init__(self, model_name: str):
        self.model, self.tokenizer = self._load_model(model_name)
        self.model.eval()
    
    def _load_model(self, model_name: str):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        model = AutoModelForCausalLM.from_pretrained(model_name)
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        return model, tokenizer
    
    def generate(self, request: GenerationRequest) -> GenerationResponse:
        import time
        
        start_time = time.time()
        
        inputs = self.tokenizer(
            request.prompt,
            return_tensors="pt",
            padding=True,
            truncation=True
        )
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=request.max_length,
                temperature=request.temperature,
                top_p=request.top_p,
                num_return_sequences=request.num_return_sequences,
                do_sample=True
            )
        
        generated_text = self.tokenizer.decode(
            outputs[0],
            skip_special_tokens=True
        )
        
        generation_time = time.time() - start_time
        
        return GenerationResponse(
            generated_text=generated_text,
            generation_time=generation_time
        )

model_server = ModelServer("gpt2")

@app.post("/generate", response_model=GenerationResponse)
async def generate(request: GenerationRequest):
    try:
        return model_server.generate(request)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

Docker部署

dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
version: '3.8'

services:
  model-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL_NAME=gpt2
      - CUDA_VISIBLE_DEVICES=0
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

Kubernetes部署

yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-server
spec:
  replicas: 2
  selector:
    matchLabels:
      app: model-server
  template:
    metadata:
      labels:
        app: model-server
    spec:
      containers:
      - name: model-server
        image: model-server:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: 1
            memory: "8Gi"
          limits:
            nvidia.com/gpu: 1
            memory: "16Gi"
        env:
        - name: MODEL_NAME
          value: "gpt2"
---
apiVersion: v1
kind: Service
metadata:
  name: model-server
spec:
  selector:
    app: model-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

监控和维护

性能监控

python
import time
from typing import Dict, List
from collections import deque

class PerformanceMonitor:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.metrics = {
            "latency": deque(maxlen=window_size),
            "throughput": deque(maxlen=window_size),
            "error_rate": deque(maxlen=window_size),
            "memory_usage": deque(maxlen=window_size),
            "gpu_usage": deque(maxlen=window_size)
        }
    
    def record_request(self, latency: float, success: bool):
        self.metrics["latency"].append(latency)
        
        throughput = 1.0 / latency if latency > 0 else 0
        self.metrics["throughput"].append(throughput)
        
        error = 0 if success else 1
        self.metrics["error_rate"].append(error)
    
    def record_resources(self, memory: float, gpu: float):
        self.metrics["memory_usage"].append(memory)
        self.metrics["gpu_usage"].append(gpu)
    
    def get_metrics(self) -> Dict:
        return {
            "avg_latency": self._avg(self.metrics["latency"]),
            "p95_latency": self._p95(self.metrics["latency"]),
            "p99_latency": self._p99(self.metrics["latency"]),
            "avg_throughput": self._avg(self.metrics["throughput"]),
            "error_rate": self._avg(self.metrics["error_rate"]),
            "avg_memory": self._avg(self.metrics["memory_usage"]),
            "avg_gpu": self._avg(self.metrics["gpu_usage"])
        }
    
    def _avg(self, values: deque) -> float:
        if not values:
            return 0.0
        return sum(values) / len(values)
    
    def _p95(self, values: deque) -> float:
        if not values:
            return 0.0
        sorted_values = sorted(values)
        return sorted_values[int(len(sorted_values) * 0.95)]
    
    def _p99(self, values: deque) -> float:
        if not values:
            return 0.0
        sorted_values = sorted(values)
        return sorted_values[int(len(sorted_values) * 0.99)]

健康检查

python
class HealthChecker:
    def __init__(self, model, threshold: float = 0.95):
        self.model = model
        self.threshold = threshold
        self.test_prompts = [
            "Hello, how are you?",
            "What is the capital of France?",
            "Explain machine learning."
        ]
    
    def check_health(self) -> Dict:
        results = []
        
        for prompt in self.test_prompts:
            try:
                response = self._generate_response(prompt)
                quality = self._assess_quality(response)
                results.append(quality)
            except Exception as e:
                results.append(0.0)
        
        avg_quality = sum(results) / len(results)
        
        return {
            "healthy": avg_quality >= self.threshold,
            "quality_score": avg_quality,
            "threshold": self.threshold
        }
    
    def _generate_response(self, prompt: str) -> str:
        inputs = self.model.tokenizer(prompt, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.model.model.generate(**inputs, max_length=50)
        
        return self.model.tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    def _assess_quality(self, response: str) -> float:
        if not response:
            return 0.0
        
        score = 0.5
        
        if len(response.split()) >= 5:
            score += 0.3
        
        if any(char in response for char in '.!?'):
            score += 0.2
        
        return score

日志记录

python
import logging
from datetime import datetime

class ModelLogger:
    def __init__(self, log_file: str = "model.log"):
        self.logger = logging.getLogger("ModelServer")
        self.logger.setLevel(logging.INFO)
        
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
    
    def log_request(self, request: Dict):
        self.logger.info(f"Request: {request}")
    
    def log_response(self, response: Dict):
        self.logger.info(f"Response: {response}")
    
    def log_error(self, error: Exception):
        self.logger.error(f"Error: {str(error)}")
    
    def log_metrics(self, metrics: Dict):
        self.logger.info(f"Metrics: {metrics}")

成本优化

资源优化

python
class ResourceOptimizer:
    def __init__(self, model):
        self.model = model
    
    def optimize_memory(self):
        self.model.eval()
        
        for param in self.model.parameters():
            param.requires_grad = False
        
        if hasattr(self.model, "gradient_checkpointing_enable"):
            self.model.gradient_checkpointing_enable()
    
    def optimize_inference(self):
        self.model.eval()
        
        with torch.no_grad():
            dummy_input = torch.randint(0, 1000, (1, 10))
            _ = self.model(dummy_input)
    
    def get_memory_usage(self) -> Dict:
        if torch.cuda.is_available():
            return {
                "allocated": torch.cuda.memory_allocated() / 1024**3,
                "reserved": torch.cuda.memory_reserved() / 1024**3,
                "max_allocated": torch.cuda.max_memory_allocated() / 1024**3
            }
        else:
            return {"message": "CUDA not available"}

缓存策略

python
from functools import lru_cache
import hashlib

class ResponseCache:
    def __init__(self, max_size: int = 1000):
        self.cache = {}
        self.max_size = max_size
        self.access_count = {}
    
    def get(self, prompt: str) -> str:
        cache_key = self._generate_key(prompt)
        
        if cache_key in self.cache:
            self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
            return self.cache[cache_key]
        
        return None
    
    def set(self, prompt: str, response: str):
        cache_key = self._generate_key(prompt)
        
        if len(self.cache) >= self.max_size:
            self._evict()
        
        self.cache[cache_key] = response
        self.access_count[cache_key] = 1
    
    def _generate_key(self, prompt: str) -> str:
        return hashlib.md5(prompt.encode()).hexdigest()
    
    def _evict(self):
        if not self.cache:
            return
        
        min_access_key = min(
            self.access_count.keys(),
            key=lambda k: self.access_count[k]
        )
        
        del self.cache[min_access_key]
        del self.access_count[min_access_key]

自动缩放

python
class AutoScaler:
    def __init__(self, min_instances: int = 1, 
                   max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
    
    def should_scale_up(self, metrics: Dict) -> bool:
        avg_latency = metrics.get("avg_latency", 0)
        cpu_usage = metrics.get("cpu_usage", 0)
        
        if avg_latency > 1.0 or cpu_usage > 0.8:
            return self.current_instances < self.max_instances
        
        return False
    
    def should_scale_down(self, metrics: Dict) -> bool:
        avg_latency = metrics.get("avg_latency", 0)
        cpu_usage = metrics.get("cpu_usage", 0)
        
        if avg_latency < 0.2 and cpu_usage < 0.3:
            return self.current_instances > self.min_instances
        
        return False
    
    def scale(self, metrics: Dict) -> int:
        if self.should_scale_up(metrics):
            self.current_instances += 1
        elif self.should_scale_down(metrics):
            self.current_instances -= 1
        
        return self.current_instances

实践练习

练习1:实现完整的部署流程

python
class ModelDeployment:
    def __init__(self, model_name: str):
        self.model_server = ModelServer(model_name)
        self.monitor = PerformanceMonitor()
        self.health_checker = HealthChecker(self.model_server)
        self.cache = ResponseCache()
        self.logger = ModelLogger()
    
    def generate(self, request: GenerationRequest) -> GenerationResponse:
        cached_response = self.cache.get(request.prompt)
        
        if cached_response:
            return GenerationResponse(
                generated_text=cached_response,
                generation_time=0.0
            )
        
        start_time = time.time()
        
        try:
            response = self.model_server.generate(request)
            success = True
            
            self.cache.set(request.prompt, response.generated_text)
        except Exception as e:
            self.logger.log_error(e)
            success = False
            raise
        
        latency = time.time() - start_time
        
        self.monitor.record_request(latency, success)
        self.logger.log_response({
            "prompt": request.prompt,
            "response": response.generated_text,
            "latency": latency
        })
        
        return response
    
    def health_check(self) -> Dict:
        return self.health_checker.check_health()
    
    def get_metrics(self) -> Dict:
        return self.monitor.get_metrics()

总结

本节我们学习了模型部署:

  1. 模型量化技术(基础量化、4位量化、GPTQ)
  2. 推理优化方法(批处理、KV Cache、Flash Attention)
  3. 部署架构(FastAPI、Docker、Kubernetes)
  4. 监控和维护(性能监控、健康检查、日志记录)
  5. 成本优化策略(资源优化、缓存、自动缩放)

掌握这些技术可以高效地部署和运行微调后的模型。

参考资源