Appearance
第62天:监控与优化
学习目标
- 掌握性能监控
- 学习日志管理
- 理解告警系统
- 掌握自动缩放
- 了解成本优化
性能监控
指标收集
python
from typing import Dict, List
import time
class MetricsCollector:
def __init__(self):
self.metrics = {}
self.start_times = {}
def start_timer(self, metric_name: str):
self.start_times[metric_name] = time.time()
def end_timer(self, metric_name: str) -> float:
if metric_name not in self.start_times:
return 0.0
elapsed = time.time() - self.start_times[metric_name]
if metric_name not in self.metrics:
self.metrics[metric_name] = []
self.metrics[metric_name].append(elapsed)
del self.start_times[metric_name]
return elapsed
def record_metric(self, metric_name: str, value: float):
if metric_name not in self.metrics:
self.metrics[metric_name] = []
self.metrics[metric_name].append(value)
def get_metrics(self, metric_name: str = None) -> Dict:
if metric_name:
return self._calculate_stats(metric_name)
else:
return {
name: self._calculate_stats(name)
for name in self.metrics.keys()
}
def _calculate_stats(self, metric_name: str) -> Dict:
values = self.metrics.get(metric_name, [])
if not values:
return {}
return {
"count": len(values),
"sum": sum(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"p50": self._percentile(values, 50),
"p95": self._percentile(values, 95),
"p99": self._percentile(values, 99)
}
def _percentile(self, values: List[float],
percentile: int) -> float:
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[index]模型性能监控
python
class ModelPerformanceMonitor:
def __init__(self, model):
self.model = model
self.metrics_collector = MetricsCollector()
def monitor_inference(self, input_data,
metric_name: str = "inference_time"):
self.metrics_collector.start_timer(metric_name)
with torch.no_grad():
output = self.model(input_data)
self.metrics_collector.end_timer(metric_name)
return output
def monitor_batch_inference(self, batch_data,
metric_name: str = "batch_inference_time"):
self.metrics_collector.start_timer(metric_name)
with torch.no_grad():
outputs = self.model(batch_data)
self.metrics_collector.end_timer(metric_name)
return outputs
def monitor_memory_usage(self, metric_name: str = "memory_usage"):
import psutil
process = psutil.Process()
memory_info = process.memory_info()
self.metrics_collector.record_metric(
metric_name,
memory_info.rss / 1024 / 1024
)
return memory_info.rss / 1024 / 1024
def monitor_gpu_usage(self, metric_name: str = "gpu_usage"):
try:
import pynvml
except ImportError:
raise ImportError("Install pynvml: pip install pynvml")
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
info = pynvml.nvmlDeviceGetMemoryInfo(handle)
usage = info.used / 1024 / 1024 / 1024
self.metrics_collector.record_metric(metric_name, usage)
return usage
def get_performance_report(self) -> Dict:
return {
"inference_metrics": self.metrics_collector.get_metrics("inference_time"),
"batch_metrics": self.metrics_collector.get_metrics("batch_inference_time"),
"memory_metrics": self.metrics_collector.get_metrics("memory_usage"),
"gpu_metrics": self.metrics_collector.get_metrics("gpu_usage")
}日志管理
日志收集
python
import logging
from datetime import datetime
import json
class LogManager:
def __init__(self, log_file: str = "app.log"):
self.log_file = log_file
self.logger = self._setup_logger()
def _setup_logger(self):
logger = logging.getLogger("AIAppLogger")
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(self.log_file)
file_handler.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def log_info(self, message: str, **kwargs):
log_data = {
"message": message,
"timestamp": datetime.now().isoformat(),
"level": "INFO",
**kwargs
}
self.logger.info(json.dumps(log_data))
def log_warning(self, message: str, **kwargs):
log_data = {
"message": message,
"timestamp": datetime.now().isoformat(),
"level": "WARNING",
**kwargs
}
self.logger.warning(json.dumps(log_data))
def log_error(self, message: str, **kwargs):
log_data = {
"message": message,
"timestamp": datetime.now().isoformat(),
"level": "ERROR",
**kwargs
}
self.logger.error(json.dumps(log_data))
def log_request(self, request_id: str, request_data: Dict):
self.log_info(
"Request received",
request_id=request_id,
**request_data
)
def log_response(self, request_id: str, response_data: Dict):
self.log_info(
"Response sent",
request_id=request_id,
**response_data
)
def log_exception(self, exception: Exception,
context: Dict = None):
log_data = {
"exception_type": type(exception).__name__,
"exception_message": str(exception),
"context": context or {}
}
self.log_error("Exception occurred", **log_data)结构化日志
python
class StructuredLogManager:
def __init__(self, log_file: str = "structured.log"):
self.log_file = log_file
self.log_entries = []
def log(self, level: str, event: str, **kwargs):
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": level,
"event": event,
**kwargs
}
self.log_entries.append(log_entry)
self._write_log(log_entry)
def log_request(self, request_id: str, method: str,
path: str, headers: Dict):
self.log(
level="INFO",
event="request_received",
request_id=request_id,
method=method,
path=path,
headers=headers
)
def log_response(self, request_id: str, status_code: int,
response_time: float, response_size: int):
self.log(
level="INFO",
event="response_sent",
request_id=request_id,
status_code=status_code,
response_time=response_time,
response_size=response_size
)
def log_model_inference(self, model_name: str,
inference_time: float,
input_size: int,
output_size: int):
self.log(
level="INFO",
event="model_inference",
model_name=model_name,
inference_time=inference_time,
input_size=input_size,
output_size=output_size
)
def log_error(self, error_type: str, error_message: str,
stack_trace: str, context: Dict = None):
self.log(
level="ERROR",
event="error_occurred",
error_type=error_type,
error_message=error_message,
stack_trace=stack_trace,
context=context or {}
)
def _write_log(self, log_entry: Dict):
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def search_logs(self, query: Dict) -> List[Dict]:
results = []
for entry in self.log_entries:
match = True
for key, value in query.items():
if key not in entry or entry[key] != value:
match = False
break
if match:
results.append(entry)
return results告警系统
告警规则
python
class AlertRule:
def __init__(self, name: str, metric: str,
condition: str, threshold: float,
duration: int = 60):
self.name = name
self.metric = metric
self.condition = condition
self.threshold = threshold
self.duration = duration
self.violations = []
def evaluate(self, metrics: Dict) -> bool:
if self.metric not in metrics:
return False
metric_value = metrics[self.metric]
if self.condition == "greater_than":
is_violation = metric_value > self.threshold
elif self.condition == "less_than":
is_violation = metric_value < self.threshold
elif self.condition == "equals":
is_violation = metric_value == self.threshold
else:
return False
if is_violation:
self.violations.append(time.time())
return self._check_duration()
def _check_duration(self) -> bool:
if not self.violations:
return False
current_time = time.time()
recent_violations = [
v for v in self.violations
if current_time - v <= self.duration
]
self.violations = recent_violations
return len(recent_violations) > 0告警管理器
python
class AlertManager:
def __init__(self):
self.rules = []
self.alerts = []
self.notifiers = []
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
def add_notifier(self, notifier):
self.notifiers.append(notifier)
def evaluate_rules(self, metrics: Dict):
for rule in self.rules:
if rule.evaluate(metrics):
self._trigger_alert(rule, metrics)
def _trigger_alert(self, rule: AlertRule, metrics: Dict):
alert = {
"rule_name": rule.name,
"metric": rule.metric,
"condition": rule.condition,
"threshold": rule.threshold,
"current_value": metrics.get(rule.metric),
"timestamp": datetime.now().isoformat()
}
self.alerts.append(alert)
for notifier in self.notifiers:
notifier.send_alert(alert)
def get_alerts(self, limit: int = 100) -> List[Dict]:
return self.alerts[-limit:]
def clear_alerts(self):
self.alerts = []告警通知器
python
class EmailNotifier:
def __init__(self, smtp_server: str, smtp_port: int,
username: str, password: str,
recipients: List[str]):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.recipients = recipients
def send_alert(self, alert: Dict):
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = ', '.join(self.recipients)
msg['Subject'] = f"Alert: {alert['rule_name']}"
body = f"""
Alert Triggered:
Rule: {alert['rule_name']}
Metric: {alert['metric']}
Condition: {alert['condition']}
Threshold: {alert['threshold']}
Current Value: {alert['current_value']}
Timestamp: {alert['timestamp']}
"""
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
class SlackNotifier:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_alert(self, alert: Dict):
import requests
message = {
"text": f"Alert: {alert['rule_name']}",
"attachments": [
{
"color": "danger",
"fields": [
{"title": "Metric", "value": alert['metric']},
{"title": "Condition", "value": alert['condition']},
{"title": "Threshold", "value": str(alert['threshold'])},
{"title": "Current Value", "value": str(alert['current_value'])},
{"title": "Timestamp", "value": alert['timestamp']}
]
}
]
}
requests.post(self.webhook_url, json=message)
class WebhookNotifier:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_alert(self, alert: Dict):
import requests
requests.post(self.webhook_url, json=alert)自动缩放
基于指标的缩放
python
class AutoScaler:
def __init__(self, min_instances: int = 1,
max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
def scale(self, metrics: Dict,
scale_up_threshold: float = 0.8,
scale_down_threshold: float = 0.3) -> int:
cpu_usage = metrics.get("cpu_usage", 0)
memory_usage = metrics.get("memory_usage", 0)
avg_usage = (cpu_usage + memory_usage) / 2
if avg_usage > scale_up_threshold:
return self._scale_up()
elif avg_usage < scale_down_threshold:
return self._scale_down()
else:
return 0
def _scale_up(self) -> int:
if self.current_instances >= self.max_instances:
return 0
n_new = min(
self.max_instances - self.current_instances,
2
)
self.current_instances += n_new
print(f"Scaling up: Adding {n_new} instances")
return n_new
def _scale_down(self) -> int:
if self.current_instances <= self.min_instances:
return 0
n_remove = min(
self.current_instances - self.min_instances,
1
)
self.current_instances -= n_remove
print(f"Scaling down: Removing {n_remove} instances")
return -n_remove
def get_current_instances(self) -> int:
return self.current_instances预测性缩放
python
class PredictiveAutoScaler:
def __init__(self, min_instances: int = 1,
max_instances: int = 10,
prediction_window: int = 300):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
self.prediction_window = prediction_window
self.usage_history = []
def scale(self, current_metrics: Dict) -> int:
self.usage_history.append(current_metrics)
predicted_usage = self._predict_usage()
if predicted_usage > 0.8:
return self._scale_up(predicted_usage)
elif predicted_usage < 0.3:
return self._scale_down(predicted_usage)
else:
return 0
def _predict_usage(self) -> float:
if len(self.usage_history) < 10:
return 0.5
recent_usage = self.usage_history[-10:]
avg_cpu = sum(m.get("cpu_usage", 0) for m in recent_usage) / len(recent_usage)
avg_memory = sum(m.get("memory_usage", 0) for m in recent_usage) / len(recent_usage)
predicted_usage = (avg_cpu + avg_memory) / 2
return predicted_usage
def _scale_up(self, predicted_usage: float) -> int:
if self.current_instances >= self.max_instances:
return 0
n_new = min(
self.max_instances - self.current_instances,
int(predicted_usage * 10)
)
self.current_instances += n_new
print(f"Predictive scaling up: Adding {n_new} instances")
return n_new
def _scale_down(self, predicted_usage: float) -> int:
if self.current_instances <= self.min_instances:
return 0
n_remove = min(
self.current_instances - self.min_instances,
int((1 - predicted_usage) * 5)
)
self.current_instances -= n_remove
print(f"Predictive scaling down: Removing {n_remove} instances")
return -n_remove成本优化
资源优化
python
class ResourceOptimizer:
def __init__(self):
self.resource_usage = {}
def analyze_usage(self, usage_data: Dict) -> Dict:
recommendations = []
cpu_usage = usage_data.get("cpu_usage", 0)
memory_usage = usage_data.get("memory_usage", 0)
if cpu_usage < 0.3:
recommendations.append({
"type": "downscale",
"resource": "cpu",
"current_usage": cpu_usage,
"recommendation": "Reduce CPU allocation"
})
if memory_usage < 0.3:
recommendations.append({
"type": "downscale",
"resource": "memory",
"current_usage": memory_usage,
"recommendation": "Reduce memory allocation"
})
if cpu_usage > 0.9:
recommendations.append({
"type": "upscale",
"resource": "cpu",
"current_usage": cpu_usage,
"recommendation": "Increase CPU allocation"
})
if memory_usage > 0.9:
recommendations.append({
"type": "upscale",
"resource": "memory",
"current_usage": memory_usage,
"recommendation": "Increase memory allocation"
})
return {
"recommendations": recommendations,
"total_recommendations": len(recommendations)
}
def calculate_cost_savings(self, current_config: Dict,
optimized_config: Dict) -> Dict:
current_cost = self._calculate_cost(current_config)
optimized_cost = self._calculate_cost(optimized_config)
savings = current_cost - optimized_cost
savings_percentage = (savings / current_cost) * 100
return {
"current_cost": current_cost,
"optimized_cost": optimized_cost,
"savings": savings,
"savings_percentage": savings_percentage
}
def _calculate_cost(self, config: Dict) -> float:
cpu_cost = config.get("cpu", 1) * 0.05
memory_cost = config.get("memory", 4) * 0.01
gpu_cost = config.get("gpu", 0) * 0.5
return cpu_cost + memory_cost + gpu_cost缓存优化
python
class CacheOptimizer:
def __init__(self):
self.cache = {}
self.cache_stats = {
"hits": 0,
"misses": 0,
"evictions": 0
}
def get(self, key: str) -> any:
if key in self.cache:
self.cache_stats["hits"] += 1
return self.cache[key]
else:
self.cache_stats["misses"] += 1
return None
def set(self, key: str, value: any, ttl: int = 3600):
if len(self.cache) >= 1000:
self._evict()
self.cache[key] = {
"value": value,
"expires_at": time.time() + ttl
}
def _evict(self):
if not self.cache:
return
oldest_key = min(
self.cache.keys(),
key=lambda k: self.cache[k]["expires_at"]
)
del self.cache[oldest_key]
self.cache_stats["evictions"] += 1
def get_stats(self) -> Dict:
total_requests = self.cache_stats["hits"] + self.cache_stats["misses"]
hit_rate = self.cache_stats["hits"] / total_requests if total_requests > 0 else 0
return {
**self.cache_stats,
"hit_rate": hit_rate,
"cache_size": len(self.cache)
}
def optimize_cache_size(self) -> int:
stats = self.get_stats()
if stats["hit_rate"] > 0.9:
return len(self.cache) - 100
elif stats["hit_rate"] < 0.7:
return len(self.cache) + 100
else:
return len(self.cache)实践练习
练习1:实现性能监控
python
def monitor_model_performance(model, test_data):
monitor = ModelPerformanceMonitor(model)
for batch in test_data:
monitor.monitor_inference(batch)
monitor.monitor_memory_usage()
report = monitor.get_performance_report()
return report练习2:实现告警系统
python
def setup_alert_system():
alert_manager = AlertManager()
cpu_rule = AlertRule(
name="high_cpu_usage",
metric="cpu_usage",
condition="greater_than",
threshold=0.9
)
memory_rule = AlertRule(
name="high_memory_usage",
metric="memory_usage",
condition="greater_than",
threshold=0.9
)
alert_manager.add_rule(cpu_rule)
alert_manager.add_rule(memory_rule)
return alert_manager总结
本节我们学习了监控与优化:
- 性能监控(指标收集、模型性能监控)
- 日志管理(日志收集、结构化日志)
- 告警系统(告警规则、告警管理器、告警通知器)
- 自动缩放(基于指标的缩放、预测性缩放)
- 成本优化(资源优化、缓存优化)
监控与优化是确保AI应用稳定运行的关键。
