Appearance
第56天:模型部署
学习目标
- 掌握模型量化技术
- 学习推理优化方法
- 理解部署架构
- 掌握监控和维护
- 了解成本优化策略
模型量化
基础量化
python
import torch
import torch.nn as nn
class ModelQuantizer:
def __init__(self, model: nn.Module):
self.model = model
def quantize_dynamic(self, qconfig_spec: dict = None):
if qconfig_spec is None:
qconfig_spec = {
nn.Linear: torch.quantization.default_dynamic_qconfig,
nn.LSTM: torch.quantization.default_dynamic_qconfig,
nn.GRU: torch.quantization.default_dynamic_qconfig
}
model_prepared = torch.quantization.prepare_dynamic(
self.model,
qconfig_spec
)
model_quantized = torch.quantization.convert(model_prepared)
return model_quantized
def quantize_static(self, calibration_loader: list):
model_prepared = torch.quantization.prepare(
self.model,
inplace=False
)
with torch.no_grad():
for data in calibration_loader:
model_prepared(data)
model_quantized = torch.quantization.convert(model_prepared)
return model_quantized
def quantize_to_int8(self):
model_int8 = torch.quantization.quantize_dynamic(
self.model,
{nn.Linear, nn.Conv2d},
dtype=torch.qint8
)
return model_int84位量化
python
class FourBitQuantizer:
def __init__(self, model: nn.Module):
self.model = model
def quantize_to_nf4(self):
try:
from transformers import BitsAndBytesConfig, AutoModelForCausalLM
except ImportError:
raise ImportError("Install transformers: pip install transformers")
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.bfloat16
)
return self.model
def quantize_to_fp4(self):
try:
from transformers import BitsAndBytesConfig
except ImportError:
raise ImportError("Install transformers: pip install transformers")
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="fp4",
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.bfloat16
)
return self.modelGPTQ量化
python
class GPTQQuantizer:
def __init__(self, model: nn.Module):
self.model = model
def quantize_gptq(self, bits: int = 4,
groupsize: int = 128):
try:
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
except ImportError:
raise ImportError("Install auto-gptq: pip install auto-gptq")
quantize_config = BaseQuantizeConfig(
bits=bits,
groupsize=groupsize,
damp_percent=0.01,
desc_act=False
)
model_quantized = AutoGPTQForCausalLM.from_pretrained(
self.model.config.name_or_path,
quantize_config=quantize_config
)
return model_quantized推理优化
批处理优化
python
class BatchInferenceOptimizer:
def __init__(self, model, tokenizer,
max_batch_size: int = 8):
self.model = model
self.tokenizer = tokenizer
self.max_batch_size = max_batch_size
def batch_infer(self, prompts: List[str]) -> List[str]:
results = []
for i in range(0, len(prompts), self.max_batch_size):
batch = prompts[i:i + self.max_batch_size]
batch_results = self._process_batch(batch)
results.extend(batch_results)
return results
def _process_batch(self, batch: List[str]) -> List[str]:
inputs = self.tokenizer(
batch,
padding=True,
truncation=True,
return_tensors="pt"
)
with torch.no_grad():
outputs = self.model.generate(**inputs)
results = self.tokenizer.batch_decode(
outputs,
skip_special_tokens=True
)
return resultsKV Cache优化
python
class KVCacheOptimizer:
def __init__(self, model):
self.model = model
self.kv_cache = None
def generate_with_cache(self, input_ids: torch.Tensor,
max_new_tokens: int = 100) -> torch.Tensor:
if self.kv_cache is None:
outputs = self.model(
input_ids,
use_cache=True,
past_key_values=None
)
else:
outputs = self.model(
input_ids[:, -1:],
use_cache=True,
past_key_values=self.kv_cache
)
self.kv_cache = outputs.past_key_values
new_token = outputs.logits[:, -1:].argmax(dim=-1)
return torch.cat([input_ids, new_token], dim=-1)
def clear_cache(self):
self.kv_cache = NoneFlash Attention
python
class FlashAttentionOptimizer:
def __init__(self, model):
self.model = model
self._enable_flash_attention()
def _enable_flash_attention(self):
for name, module in self.model.named_modules():
if "attn" in name.lower():
if hasattr(module, "use_flash_attention"):
module.use_flash_attention = True
def forward(self, *args, **kwargs):
return self.model(*args, **kwargs)部署架构
FastAPI部署
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import torch
app = FastAPI()
class GenerationRequest(BaseModel):
prompt: str
max_length: int = 100
temperature: float = 0.7
top_p: float = 0.9
num_return_sequences: int = 1
class GenerationResponse(BaseModel):
generated_text: str
generation_time: float
class ModelServer:
def __init__(self, model_name: str):
self.model, self.tokenizer = self._load_model(model_name)
self.model.eval()
def _load_model(self, model_name: str):
from transformers import AutoModelForCausalLM, AutoTokenizer
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
return model, tokenizer
def generate(self, request: GenerationRequest) -> GenerationResponse:
import time
start_time = time.time()
inputs = self.tokenizer(
request.prompt,
return_tensors="pt",
padding=True,
truncation=True
)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=request.max_length,
temperature=request.temperature,
top_p=request.top_p,
num_return_sequences=request.num_return_sequences,
do_sample=True
)
generated_text = self.tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
generation_time = time.time() - start_time
return GenerationResponse(
generated_text=generated_text,
generation_time=generation_time
)
model_server = ModelServer("gpt2")
@app.post("/generate", response_model=GenerationResponse)
async def generate(request: GenerationRequest):
try:
return model_server.generate(request)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy"}Docker部署
dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]yaml
version: '3.8'
services:
model-server:
build: .
ports:
- "8000:8000"
environment:
- MODEL_NAME=gpt2
- CUDA_VISIBLE_DEVICES=0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]Kubernetes部署
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-server
spec:
replicas: 2
selector:
matchLabels:
app: model-server
template:
metadata:
labels:
app: model-server
spec:
containers:
- name: model-server
image: model-server:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: 1
memory: "8Gi"
limits:
nvidia.com/gpu: 1
memory: "16Gi"
env:
- name: MODEL_NAME
value: "gpt2"
---
apiVersion: v1
kind: Service
metadata:
name: model-server
spec:
selector:
app: model-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer监控和维护
性能监控
python
import time
from typing import Dict, List
from collections import deque
class PerformanceMonitor:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.metrics = {
"latency": deque(maxlen=window_size),
"throughput": deque(maxlen=window_size),
"error_rate": deque(maxlen=window_size),
"memory_usage": deque(maxlen=window_size),
"gpu_usage": deque(maxlen=window_size)
}
def record_request(self, latency: float, success: bool):
self.metrics["latency"].append(latency)
throughput = 1.0 / latency if latency > 0 else 0
self.metrics["throughput"].append(throughput)
error = 0 if success else 1
self.metrics["error_rate"].append(error)
def record_resources(self, memory: float, gpu: float):
self.metrics["memory_usage"].append(memory)
self.metrics["gpu_usage"].append(gpu)
def get_metrics(self) -> Dict:
return {
"avg_latency": self._avg(self.metrics["latency"]),
"p95_latency": self._p95(self.metrics["latency"]),
"p99_latency": self._p99(self.metrics["latency"]),
"avg_throughput": self._avg(self.metrics["throughput"]),
"error_rate": self._avg(self.metrics["error_rate"]),
"avg_memory": self._avg(self.metrics["memory_usage"]),
"avg_gpu": self._avg(self.metrics["gpu_usage"])
}
def _avg(self, values: deque) -> float:
if not values:
return 0.0
return sum(values) / len(values)
def _p95(self, values: deque) -> float:
if not values:
return 0.0
sorted_values = sorted(values)
return sorted_values[int(len(sorted_values) * 0.95)]
def _p99(self, values: deque) -> float:
if not values:
return 0.0
sorted_values = sorted(values)
return sorted_values[int(len(sorted_values) * 0.99)]健康检查
python
class HealthChecker:
def __init__(self, model, threshold: float = 0.95):
self.model = model
self.threshold = threshold
self.test_prompts = [
"Hello, how are you?",
"What is the capital of France?",
"Explain machine learning."
]
def check_health(self) -> Dict:
results = []
for prompt in self.test_prompts:
try:
response = self._generate_response(prompt)
quality = self._assess_quality(response)
results.append(quality)
except Exception as e:
results.append(0.0)
avg_quality = sum(results) / len(results)
return {
"healthy": avg_quality >= self.threshold,
"quality_score": avg_quality,
"threshold": self.threshold
}
def _generate_response(self, prompt: str) -> str:
inputs = self.model.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.model.generate(**inputs, max_length=50)
return self.model.tokenizer.decode(outputs[0], skip_special_tokens=True)
def _assess_quality(self, response: str) -> float:
if not response:
return 0.0
score = 0.5
if len(response.split()) >= 5:
score += 0.3
if any(char in response for char in '.!?'):
score += 0.2
return score日志记录
python
import logging
from datetime import datetime
class ModelLogger:
def __init__(self, log_file: str = "model.log"):
self.logger = logging.getLogger("ModelServer")
self.logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_request(self, request: Dict):
self.logger.info(f"Request: {request}")
def log_response(self, response: Dict):
self.logger.info(f"Response: {response}")
def log_error(self, error: Exception):
self.logger.error(f"Error: {str(error)}")
def log_metrics(self, metrics: Dict):
self.logger.info(f"Metrics: {metrics}")成本优化
资源优化
python
class ResourceOptimizer:
def __init__(self, model):
self.model = model
def optimize_memory(self):
self.model.eval()
for param in self.model.parameters():
param.requires_grad = False
if hasattr(self.model, "gradient_checkpointing_enable"):
self.model.gradient_checkpointing_enable()
def optimize_inference(self):
self.model.eval()
with torch.no_grad():
dummy_input = torch.randint(0, 1000, (1, 10))
_ = self.model(dummy_input)
def get_memory_usage(self) -> Dict:
if torch.cuda.is_available():
return {
"allocated": torch.cuda.memory_allocated() / 1024**3,
"reserved": torch.cuda.memory_reserved() / 1024**3,
"max_allocated": torch.cuda.max_memory_allocated() / 1024**3
}
else:
return {"message": "CUDA not available"}缓存策略
python
from functools import lru_cache
import hashlib
class ResponseCache:
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
self.access_count = {}
def get(self, prompt: str) -> str:
cache_key = self._generate_key(prompt)
if cache_key in self.cache:
self.access_count[cache_key] = self.access_count.get(cache_key, 0) + 1
return self.cache[cache_key]
return None
def set(self, prompt: str, response: str):
cache_key = self._generate_key(prompt)
if len(self.cache) >= self.max_size:
self._evict()
self.cache[cache_key] = response
self.access_count[cache_key] = 1
def _generate_key(self, prompt: str) -> str:
return hashlib.md5(prompt.encode()).hexdigest()
def _evict(self):
if not self.cache:
return
min_access_key = min(
self.access_count.keys(),
key=lambda k: self.access_count[k]
)
del self.cache[min_access_key]
del self.access_count[min_access_key]自动缩放
python
class AutoScaler:
def __init__(self, min_instances: int = 1,
max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
def should_scale_up(self, metrics: Dict) -> bool:
avg_latency = metrics.get("avg_latency", 0)
cpu_usage = metrics.get("cpu_usage", 0)
if avg_latency > 1.0 or cpu_usage > 0.8:
return self.current_instances < self.max_instances
return False
def should_scale_down(self, metrics: Dict) -> bool:
avg_latency = metrics.get("avg_latency", 0)
cpu_usage = metrics.get("cpu_usage", 0)
if avg_latency < 0.2 and cpu_usage < 0.3:
return self.current_instances > self.min_instances
return False
def scale(self, metrics: Dict) -> int:
if self.should_scale_up(metrics):
self.current_instances += 1
elif self.should_scale_down(metrics):
self.current_instances -= 1
return self.current_instances实践练习
练习1:实现完整的部署流程
python
class ModelDeployment:
def __init__(self, model_name: str):
self.model_server = ModelServer(model_name)
self.monitor = PerformanceMonitor()
self.health_checker = HealthChecker(self.model_server)
self.cache = ResponseCache()
self.logger = ModelLogger()
def generate(self, request: GenerationRequest) -> GenerationResponse:
cached_response = self.cache.get(request.prompt)
if cached_response:
return GenerationResponse(
generated_text=cached_response,
generation_time=0.0
)
start_time = time.time()
try:
response = self.model_server.generate(request)
success = True
self.cache.set(request.prompt, response.generated_text)
except Exception as e:
self.logger.log_error(e)
success = False
raise
latency = time.time() - start_time
self.monitor.record_request(latency, success)
self.logger.log_response({
"prompt": request.prompt,
"response": response.generated_text,
"latency": latency
})
return response
def health_check(self) -> Dict:
return self.health_checker.check_health()
def get_metrics(self) -> Dict:
return self.monitor.get_metrics()总结
本节我们学习了模型部署:
- 模型量化技术(基础量化、4位量化、GPTQ)
- 推理优化方法(批处理、KV Cache、Flash Attention)
- 部署架构(FastAPI、Docker、Kubernetes)
- 监控和维护(性能监控、健康检查、日志记录)
- 成本优化策略(资源优化、缓存、自动缩放)
掌握这些技术可以高效地部署和运行微调后的模型。
