Skip to content

第62天:监控与优化

学习目标

  • 掌握性能监控
  • 学习日志管理
  • 理解告警系统
  • 掌握自动缩放
  • 了解成本优化

性能监控

指标收集

python
from typing import Dict, List
import time

class MetricsCollector:
    def __init__(self):
        self.metrics = {}
        self.start_times = {}
    
    def start_timer(self, metric_name: str):
        self.start_times[metric_name] = time.time()
    
    def end_timer(self, metric_name: str) -> float:
        if metric_name not in self.start_times:
            return 0.0
        
        elapsed = time.time() - self.start_times[metric_name]
        
        if metric_name not in self.metrics:
            self.metrics[metric_name] = []
        
        self.metrics[metric_name].append(elapsed)
        
        del self.start_times[metric_name]
        
        return elapsed
    
    def record_metric(self, metric_name: str, value: float):
        if metric_name not in self.metrics:
            self.metrics[metric_name] = []
        
        self.metrics[metric_name].append(value)
    
    def get_metrics(self, metric_name: str = None) -> Dict:
        if metric_name:
            return self._calculate_stats(metric_name)
        else:
            return {
                name: self._calculate_stats(name)
                for name in self.metrics.keys()
            }
    
    def _calculate_stats(self, metric_name: str) -> Dict:
        values = self.metrics.get(metric_name, [])
        
        if not values:
            return {}
        
        return {
            "count": len(values),
            "sum": sum(values),
            "avg": sum(values) / len(values),
            "min": min(values),
            "max": max(values),
            "p50": self._percentile(values, 50),
            "p95": self._percentile(values, 95),
            "p99": self._percentile(values, 99)
        }
    
    def _percentile(self, values: List[float], 
                     percentile: int) -> float:
        sorted_values = sorted(values)
        index = int(len(sorted_values) * percentile / 100)
        return sorted_values[index]

模型性能监控

python
class ModelPerformanceMonitor:
    def __init__(self, model):
        self.model = model
        self.metrics_collector = MetricsCollector()
    
    def monitor_inference(self, input_data, 
                          metric_name: str = "inference_time"):
        self.metrics_collector.start_timer(metric_name)
        
        with torch.no_grad():
            output = self.model(input_data)
        
        self.metrics_collector.end_timer(metric_name)
        
        return output
    
    def monitor_batch_inference(self, batch_data, 
                                  metric_name: str = "batch_inference_time"):
        self.metrics_collector.start_timer(metric_name)
        
        with torch.no_grad():
            outputs = self.model(batch_data)
        
        self.metrics_collector.end_timer(metric_name)
        
        return outputs
    
    def monitor_memory_usage(self, metric_name: str = "memory_usage"):
        import psutil
        
        process = psutil.Process()
        memory_info = process.memory_info()
        
        self.metrics_collector.record_metric(
            metric_name,
            memory_info.rss / 1024 / 1024
        )
        
        return memory_info.rss / 1024 / 1024
    
    def monitor_gpu_usage(self, metric_name: str = "gpu_usage"):
        try:
            import pynvml
        except ImportError:
            raise ImportError("Install pynvml: pip install pynvml")
        
        pynvml.nvmlInit()
        
        handle = pynvml.nvmlDeviceGetHandleByIndex(0)
        info = pynvml.nvmlDeviceGetMemoryInfo(handle)
        
        usage = info.used / 1024 / 1024 / 1024
        
        self.metrics_collector.record_metric(metric_name, usage)
        
        return usage
    
    def get_performance_report(self) -> Dict:
        return {
            "inference_metrics": self.metrics_collector.get_metrics("inference_time"),
            "batch_metrics": self.metrics_collector.get_metrics("batch_inference_time"),
            "memory_metrics": self.metrics_collector.get_metrics("memory_usage"),
            "gpu_metrics": self.metrics_collector.get_metrics("gpu_usage")
        }

日志管理

日志收集

python
import logging
from datetime import datetime
import json

class LogManager:
    def __init__(self, log_file: str = "app.log"):
        self.log_file = log_file
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        logger = logging.getLogger("AIAppLogger")
        logger.setLevel(logging.INFO)
        
        file_handler = logging.FileHandler(self.log_file)
        file_handler.setLevel(logging.INFO)
        
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger
    
    def log_info(self, message: str, **kwargs):
        log_data = {
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "level": "INFO",
            **kwargs
        }
        
        self.logger.info(json.dumps(log_data))
    
    def log_warning(self, message: str, **kwargs):
        log_data = {
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "level": "WARNING",
            **kwargs
        }
        
        self.logger.warning(json.dumps(log_data))
    
    def log_error(self, message: str, **kwargs):
        log_data = {
            "message": message,
            "timestamp": datetime.now().isoformat(),
            "level": "ERROR",
            **kwargs
        }
        
        self.logger.error(json.dumps(log_data))
    
    def log_request(self, request_id: str, request_data: Dict):
        self.log_info(
            "Request received",
            request_id=request_id,
            **request_data
        )
    
    def log_response(self, request_id: str, response_data: Dict):
        self.log_info(
            "Response sent",
            request_id=request_id,
            **response_data
        )
    
    def log_exception(self, exception: Exception, 
                       context: Dict = None):
        log_data = {
            "exception_type": type(exception).__name__,
            "exception_message": str(exception),
            "context": context or {}
        }
        
        self.log_error("Exception occurred", **log_data)

结构化日志

python
class StructuredLogManager:
    def __init__(self, log_file: str = "structured.log"):
        self.log_file = log_file
        self.log_entries = []
    
    def log(self, level: str, event: str, **kwargs):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "level": level,
            "event": event,
            **kwargs
        }
        
        self.log_entries.append(log_entry)
        
        self._write_log(log_entry)
    
    def log_request(self, request_id: str, method: str, 
                     path: str, headers: Dict):
        self.log(
            level="INFO",
            event="request_received",
            request_id=request_id,
            method=method,
            path=path,
            headers=headers
        )
    
    def log_response(self, request_id: str, status_code: int, 
                      response_time: float, response_size: int):
        self.log(
            level="INFO",
            event="response_sent",
            request_id=request_id,
            status_code=status_code,
            response_time=response_time,
            response_size=response_size
        )
    
    def log_model_inference(self, model_name: str, 
                             inference_time: float, 
                             input_size: int, 
                             output_size: int):
        self.log(
            level="INFO",
            event="model_inference",
            model_name=model_name,
            inference_time=inference_time,
            input_size=input_size,
            output_size=output_size
        )
    
    def log_error(self, error_type: str, error_message: str, 
                   stack_trace: str, context: Dict = None):
        self.log(
            level="ERROR",
            event="error_occurred",
            error_type=error_type,
            error_message=error_message,
            stack_trace=stack_trace,
            context=context or {}
        )
    
    def _write_log(self, log_entry: Dict):
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(log_entry) + '\n')
    
    def search_logs(self, query: Dict) -> List[Dict]:
        results = []
        
        for entry in self.log_entries:
            match = True
            
            for key, value in query.items():
                if key not in entry or entry[key] != value:
                    match = False
                    break
            
            if match:
                results.append(entry)
        
        return results

告警系统

告警规则

python
class AlertRule:
    def __init__(self, name: str, metric: str, 
                 condition: str, threshold: float, 
                 duration: int = 60):
        self.name = name
        self.metric = metric
        self.condition = condition
        self.threshold = threshold
        self.duration = duration
        self.violations = []
    
    def evaluate(self, metrics: Dict) -> bool:
        if self.metric not in metrics:
            return False
        
        metric_value = metrics[self.metric]
        
        if self.condition == "greater_than":
            is_violation = metric_value > self.threshold
        elif self.condition == "less_than":
            is_violation = metric_value < self.threshold
        elif self.condition == "equals":
            is_violation = metric_value == self.threshold
        else:
            return False
        
        if is_violation:
            self.violations.append(time.time())
        
        return self._check_duration()
    
    def _check_duration(self) -> bool:
        if not self.violations:
            return False
        
        current_time = time.time()
        recent_violations = [
            v for v in self.violations
            if current_time - v <= self.duration
        ]
        
        self.violations = recent_violations
        
        return len(recent_violations) > 0

告警管理器

python
class AlertManager:
    def __init__(self):
        self.rules = []
        self.alerts = []
        self.notifiers = []
    
    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)
    
    def add_notifier(self, notifier):
        self.notifiers.append(notifier)
    
    def evaluate_rules(self, metrics: Dict):
        for rule in self.rules:
            if rule.evaluate(metrics):
                self._trigger_alert(rule, metrics)
    
    def _trigger_alert(self, rule: AlertRule, metrics: Dict):
        alert = {
            "rule_name": rule.name,
            "metric": rule.metric,
            "condition": rule.condition,
            "threshold": rule.threshold,
            "current_value": metrics.get(rule.metric),
            "timestamp": datetime.now().isoformat()
        }
        
        self.alerts.append(alert)
        
        for notifier in self.notifiers:
            notifier.send_alert(alert)
    
    def get_alerts(self, limit: int = 100) -> List[Dict]:
        return self.alerts[-limit:]
    
    def clear_alerts(self):
        self.alerts = []

告警通知器

python
class EmailNotifier:
    def __init__(self, smtp_server: str, smtp_port: int, 
                 username: str, password: str, 
                 recipients: List[str]):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.recipients = recipients
    
    def send_alert(self, alert: Dict):
        import smtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        
        msg = MIMEMultipart()
        msg['From'] = self.username
        msg['To'] = ', '.join(self.recipients)
        msg['Subject'] = f"Alert: {alert['rule_name']}"
        
        body = f"""
        Alert Triggered:
        
        Rule: {alert['rule_name']}
        Metric: {alert['metric']}
        Condition: {alert['condition']}
        Threshold: {alert['threshold']}
        Current Value: {alert['current_value']}
        Timestamp: {alert['timestamp']}
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)

class SlackNotifier:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send_alert(self, alert: Dict):
        import requests
        
        message = {
            "text": f"Alert: {alert['rule_name']}",
            "attachments": [
                {
                    "color": "danger",
                    "fields": [
                        {"title": "Metric", "value": alert['metric']},
                        {"title": "Condition", "value": alert['condition']},
                        {"title": "Threshold", "value": str(alert['threshold'])},
                        {"title": "Current Value", "value": str(alert['current_value'])},
                        {"title": "Timestamp", "value": alert['timestamp']}
                    ]
                }
            ]
        }
        
        requests.post(self.webhook_url, json=message)

class WebhookNotifier:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send_alert(self, alert: Dict):
        import requests
        
        requests.post(self.webhook_url, json=alert)

自动缩放

基于指标的缩放

python
class AutoScaler:
    def __init__(self, min_instances: int = 1, 
                 max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
    
    def scale(self, metrics: Dict, 
              scale_up_threshold: float = 0.8,
              scale_down_threshold: float = 0.3) -> int:
        cpu_usage = metrics.get("cpu_usage", 0)
        memory_usage = metrics.get("memory_usage", 0)
        
        avg_usage = (cpu_usage + memory_usage) / 2
        
        if avg_usage > scale_up_threshold:
            return self._scale_up()
        elif avg_usage < scale_down_threshold:
            return self._scale_down()
        else:
            return 0
    
    def _scale_up(self) -> int:
        if self.current_instances >= self.max_instances:
            return 0
        
        n_new = min(
            self.max_instances - self.current_instances,
            2
        )
        
        self.current_instances += n_new
        
        print(f"Scaling up: Adding {n_new} instances")
        
        return n_new
    
    def _scale_down(self) -> int:
        if self.current_instances <= self.min_instances:
            return 0
        
        n_remove = min(
            self.current_instances - self.min_instances,
            1
        )
        
        self.current_instances -= n_remove
        
        print(f"Scaling down: Removing {n_remove} instances")
        
        return -n_remove
    
    def get_current_instances(self) -> int:
        return self.current_instances

预测性缩放

python
class PredictiveAutoScaler:
    def __init__(self, min_instances: int = 1, 
                 max_instances: int = 10,
                 prediction_window: int = 300):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
        self.prediction_window = prediction_window
        self.usage_history = []
    
    def scale(self, current_metrics: Dict) -> int:
        self.usage_history.append(current_metrics)
        
        predicted_usage = self._predict_usage()
        
        if predicted_usage > 0.8:
            return self._scale_up(predicted_usage)
        elif predicted_usage < 0.3:
            return self._scale_down(predicted_usage)
        else:
            return 0
    
    def _predict_usage(self) -> float:
        if len(self.usage_history) < 10:
            return 0.5
        
        recent_usage = self.usage_history[-10:]
        
        avg_cpu = sum(m.get("cpu_usage", 0) for m in recent_usage) / len(recent_usage)
        avg_memory = sum(m.get("memory_usage", 0) for m in recent_usage) / len(recent_usage)
        
        predicted_usage = (avg_cpu + avg_memory) / 2
        
        return predicted_usage
    
    def _scale_up(self, predicted_usage: float) -> int:
        if self.current_instances >= self.max_instances:
            return 0
        
        n_new = min(
            self.max_instances - self.current_instances,
            int(predicted_usage * 10)
        )
        
        self.current_instances += n_new
        
        print(f"Predictive scaling up: Adding {n_new} instances")
        
        return n_new
    
    def _scale_down(self, predicted_usage: float) -> int:
        if self.current_instances <= self.min_instances:
            return 0
        
        n_remove = min(
            self.current_instances - self.min_instances,
            int((1 - predicted_usage) * 5)
        )
        
        self.current_instances -= n_remove
        
        print(f"Predictive scaling down: Removing {n_remove} instances")
        
        return -n_remove

成本优化

资源优化

python
class ResourceOptimizer:
    def __init__(self):
        self.resource_usage = {}
    
    def analyze_usage(self, usage_data: Dict) -> Dict:
        recommendations = []
        
        cpu_usage = usage_data.get("cpu_usage", 0)
        memory_usage = usage_data.get("memory_usage", 0)
        
        if cpu_usage < 0.3:
            recommendations.append({
                "type": "downscale",
                "resource": "cpu",
                "current_usage": cpu_usage,
                "recommendation": "Reduce CPU allocation"
            })
        
        if memory_usage < 0.3:
            recommendations.append({
                "type": "downscale",
                "resource": "memory",
                "current_usage": memory_usage,
                "recommendation": "Reduce memory allocation"
            })
        
        if cpu_usage > 0.9:
            recommendations.append({
                "type": "upscale",
                "resource": "cpu",
                "current_usage": cpu_usage,
                "recommendation": "Increase CPU allocation"
            })
        
        if memory_usage > 0.9:
            recommendations.append({
                "type": "upscale",
                "resource": "memory",
                "current_usage": memory_usage,
                "recommendation": "Increase memory allocation"
            })
        
        return {
            "recommendations": recommendations,
            "total_recommendations": len(recommendations)
        }
    
    def calculate_cost_savings(self, current_config: Dict, 
                                 optimized_config: Dict) -> Dict:
        current_cost = self._calculate_cost(current_config)
        optimized_cost = self._calculate_cost(optimized_config)
        
        savings = current_cost - optimized_cost
        savings_percentage = (savings / current_cost) * 100
        
        return {
            "current_cost": current_cost,
            "optimized_cost": optimized_cost,
            "savings": savings,
            "savings_percentage": savings_percentage
        }
    
    def _calculate_cost(self, config: Dict) -> float:
        cpu_cost = config.get("cpu", 1) * 0.05
        memory_cost = config.get("memory", 4) * 0.01
        gpu_cost = config.get("gpu", 0) * 0.5
        
        return cpu_cost + memory_cost + gpu_cost

缓存优化

python
class CacheOptimizer:
    def __init__(self):
        self.cache = {}
        self.cache_stats = {
            "hits": 0,
            "misses": 0,
            "evictions": 0
        }
    
    def get(self, key: str) -> any:
        if key in self.cache:
            self.cache_stats["hits"] += 1
            return self.cache[key]
        else:
            self.cache_stats["misses"] += 1
            return None
    
    def set(self, key: str, value: any, ttl: int = 3600):
        if len(self.cache) >= 1000:
            self._evict()
        
        self.cache[key] = {
            "value": value,
            "expires_at": time.time() + ttl
        }
    
    def _evict(self):
        if not self.cache:
            return
        
        oldest_key = min(
            self.cache.keys(),
            key=lambda k: self.cache[k]["expires_at"]
        )
        
        del self.cache[oldest_key]
        self.cache_stats["evictions"] += 1
    
    def get_stats(self) -> Dict:
        total_requests = self.cache_stats["hits"] + self.cache_stats["misses"]
        hit_rate = self.cache_stats["hits"] / total_requests if total_requests > 0 else 0
        
        return {
            **self.cache_stats,
            "hit_rate": hit_rate,
            "cache_size": len(self.cache)
        }
    
    def optimize_cache_size(self) -> int:
        stats = self.get_stats()
        
        if stats["hit_rate"] > 0.9:
            return len(self.cache) - 100
        elif stats["hit_rate"] < 0.7:
            return len(self.cache) + 100
        else:
            return len(self.cache)

实践练习

练习1:实现性能监控

python
def monitor_model_performance(model, test_data):
    monitor = ModelPerformanceMonitor(model)
    
    for batch in test_data:
        monitor.monitor_inference(batch)
        monitor.monitor_memory_usage()
    
    report = monitor.get_performance_report()
    
    return report

练习2:实现告警系统

python
def setup_alert_system():
    alert_manager = AlertManager()
    
    cpu_rule = AlertRule(
        name="high_cpu_usage",
        metric="cpu_usage",
        condition="greater_than",
        threshold=0.9
    )
    
    memory_rule = AlertRule(
        name="high_memory_usage",
        metric="memory_usage",
        condition="greater_than",
        threshold=0.9
    )
    
    alert_manager.add_rule(cpu_rule)
    alert_manager.add_rule(memory_rule)
    
    return alert_manager

总结

本节我们学习了监控与优化:

  1. 性能监控(指标收集、模型性能监控)
  2. 日志管理(日志收集、结构化日志)
  3. 告警系统(告警规则、告警管理器、告警通知器)
  4. 自动缩放(基于指标的缩放、预测性缩放)
  5. 成本优化(资源优化、缓存优化)

监控与优化是确保AI应用稳定运行的关键。

参考资源