Skip to content

第43天:Agent反思与评估

学习目标

  • 理解反思机制的设计
  • 掌握自我评估方法
  • 学习错误检测与纠正
  • 了解性能指标
  • 掌握持续改进策略

反思机制

反思循环

┌─────────────────────────────────────────┐
│         Action Execution              │
└───────────────────┬─────────────────────┘


┌─────────────────────────────────────────┐
│         Observation                  │
└───────────────────┬─────────────────────┘


┌─────────────────────────────────────────┐
│         Reflection                   │
│  - What happened?                  │
│  - Why did it happen?             │
│  - What could be improved?         │
└───────────────────┬─────────────────────┘


┌─────────────────────────────────────────┐
│         Evaluation                   │
│  - Success/Failure                 │
│  - Performance metrics             │
│  - Goal achievement              │
└───────────────────┬─────────────────────┘


┌─────────────────────────────────────────┐
│         Learning                     │
│  - Update knowledge               │
│  - Adjust strategies            │
│  - Improve policies           │
└───────────────────┬─────────────────────┘


┌─────────────────────────────────────────┐
│         Improved Action              │
└─────────────────────────────────────────┘

反思触发器

python
from typing import Callable, List
from datetime import datetime

class ReflectionTrigger:
    def __init__(self):
        self.triggers = []
    
    def add_trigger(self, condition: Callable, action: Callable):
        self.triggers.append((condition, action))
    
    def check(self, context: Dict) -> List[Callable]:
        actions = []
        
        for condition, action in self.triggers:
            if condition(context):
                actions.append(action)
        
        return actions

class ReflectionManager:
    def __init__(self):
        self.trigger = ReflectionTrigger()
        self._setup_triggers()
    
    def _setup_triggers(self):
        self.trigger.add_trigger(
            condition=lambda ctx: ctx.get("error") is not None,
            action=lambda ctx: self._handle_error(ctx)
        )
        
        self.trigger.add_trigger(
            condition=lambda ctx: ctx.get("success_rate", 1.0) < 0.7,
            action=lambda ctx: self._handle_low_success(ctx)
        )
        
        self.trigger.add_trigger(
            condition=lambda ctx: ctx.get("iteration", 0) % 10 == 0,
            action=lambda ctx: self._periodic_reflection(ctx)
        )
    
    def reflect(self, context: Dict) -> List[Dict]:
        actions = self.trigger.check(context)
        results = []
        
        for action in actions:
            result = action(context)
            results.append(result)
        
        return results
    
    def _handle_error(self, context: Dict) -> Dict:
        return {
            "type": "error_reflection",
            "error": context.get("error"),
            "timestamp": datetime.now(),
            "suggestion": self._analyze_error(context)
        }
    
    def _handle_low_success(self, context: Dict) -> Dict:
        return {
            "type": "performance_reflection",
            "success_rate": context.get("success_rate"),
            "timestamp": datetime.now(),
            "suggestion": self._improve_performance(context)
        }
    
    def _periodic_reflection(self, context: Dict) -> Dict:
        return {
            "type": "periodic_reflection",
            "iteration": context.get("iteration"),
            "timestamp": datetime.now(),
            "suggestion": self._review_progress(context)
        }

自我评估

目标达成评估

python
class GoalEvaluator:
    def __init__(self):
        self.evaluation_criteria = {}
    
    def add_criteria(self, name: str, evaluator: Callable):
        self.evaluation_criteria[name] = evaluator
    
    def evaluate(self, result: Dict, goal: Dict) -> Dict:
        evaluations = {}
        
        for name, evaluator in self.evaluation_criteria.items():
            score = evaluator(result, goal)
            evaluations[name] = score
        
        overall_score = self._calculate_overall_score(evaluations)
        
        return {
            "criteria": evaluations,
            "overall_score": overall_score,
            "achieved": overall_score >= 0.8
        }
    
    def _calculate_overall_score(self, evaluations: Dict) -> float:
        return sum(evaluations.values()) / len(evaluations)
    
    def evaluate_completeness(self, result: Dict, goal: Dict) -> float:
        required_keys = set(goal.get("required_keys", []))
        result_keys = set(result.keys())
        
        if not required_keys:
            return 1.0
        
        intersection = required_keys & result_keys
        return len(intersection) / len(required_keys)
    
    def evaluate_accuracy(self, result: Dict, goal: Dict) -> float:
        expected = goal.get("expected_output")
        actual = result.get("output")
        
        if expected is None or actual is None:
            return 0.5
        
        return 1.0 if expected == actual else 0.0

性能评估

python
class PerformanceEvaluator:
    def __init__(self):
        self.metrics = {}
    
    def record_metric(self, name: str, value: float):
        if name not in self.metrics:
            self.metrics[name] = []
        self.metrics[name].append(value)
    
    def evaluate(self) -> Dict:
        evaluations = {}
        
        for name, values in self.metrics.items():
            if values:
                evaluations[name] = {
                    "mean": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values),
                    "std": self._calculate_std(values),
                    "count": len(values)
                }
        
        return evaluations
    
    def _calculate_std(self, values: List[float]) -> float:
        if len(values) < 2:
            return 0.0
        
        mean = sum(values) / len(values)
        variance = sum((x - mean) ** 2 for x in values) / len(values)
        
        return variance ** 0.5
    
    def get_trend(self, name: str, window: int = 10) -> str:
        if name not in self.metrics:
            return "unknown"
        
        values = self.metrics[name][-window:]
        
        if len(values) < 2:
            return "insufficient_data"
        
        recent = sum(values[-5:]) / min(5, len(values))
        older = sum(values[:-5]) / max(1, len(values) - 5)
        
        if recent > older * 1.1:
            return "improving"
        elif recent < older * 0.9:
            return "declining"
        else:
            return "stable"

错误检测与纠正

错误检测

python
class ErrorDetector:
    def __init__(self):
        self.error_patterns = []
        self._setup_error_patterns()
    
    def _setup_error_patterns(self):
        self.error_patterns.append(
            {
                "name": "timeout_error",
                "pattern": lambda result: result.get("error", "").lower() in ["timeout", "timed out"],
                "severity": "high"
            }
        )
        
        self.error_patterns.append(
            {
                "name": "invalid_input",
                "pattern": lambda result: "invalid" in result.get("error", "").lower(),
                "severity": "medium"
            }
        )
        
        self.error_patterns.append(
            {
                "name": "resource_exhaustion",
                "pattern": lambda result: "memory" in result.get("error", "").lower() or 
                                          "disk" in result.get("error", "").lower(),
                "severity": "high"
            }
        )
    
    def detect(self, result: Dict) -> List[Dict]:
        detected_errors = []
        
        for pattern in self.error_patterns:
            if pattern["pattern"](result):
                detected_errors.append({
                    "name": pattern["name"],
                    "severity": pattern["severity"],
                    "error": result.get("error")
                })
        
        return detected_errors
    
    def classify_error(self, error: str) -> str:
        error_lower = error.lower()
        
        if "timeout" in error_lower or "timed out" in error_lower:
            return "timeout"
        elif "memory" in error_lower:
            return "memory"
        elif "network" in error_lower or "connection" in error_lower:
            return "network"
        elif "permission" in error_lower or "access" in error_lower:
            return "permission"
        else:
            return "unknown"

错误纠正

python
class ErrorCorrector:
    def __init__(self):
        self.correction_strategies = {}
        self._setup_strategies()
    
    def _setup_strategies(self):
        self.correction_strategies["timeout"] = self._handle_timeout
        self.correction_strategies["memory"] = self._handle_memory
        self.correction_strategies["network"] = self._handle_network
        self.correction_strategies["permission"] = self._handle_permission
    
    def correct(self, error: Dict, context: Dict) -> Dict:
        error_type = error.get("type", "unknown")
        
        if error_type in self.correction_strategies:
            return self.correction_strategies[error_type](error, context)
        
        return {
            "status": "uncorrectable",
            "error": error
        }
    
    def _handle_timeout(self, error: Dict, context: Dict) -> Dict:
        return {
            "status": "corrected",
            "action": "increase_timeout",
            "suggestion": "Increase timeout value or optimize the operation"
        }
    
    def _handle_memory(self, error: Dict, context: Dict) -> Dict:
        return {
            "status": "corrected",
            "action": "reduce_memory_usage",
            "suggestion": "Reduce batch size or implement streaming"
        }
    
    def _handle_network(self, error: Dict, context: Dict) -> Dict:
        return {
            "status": "corrected",
            "action": "retry_with_backoff",
            "suggestion": "Implement exponential backoff for retries"
        }
    
    def _handle_permission(self, error: Dict, context: Dict) -> Dict:
        return {
            "status": "uncorrectable",
            "action": "check_permissions",
            "suggestion": "Verify and update access permissions"
        }

性能指标

基础指标

python
class PerformanceMetrics:
    def __init__(self):
        self.metrics = {
            "success_rate": [],
            "execution_time": [],
            "resource_usage": [],
            "error_count": 0,
            "total_requests": 0
        }
    
    def record_success(self, execution_time: float, 
                     resource_usage: float):
        self.metrics["success_rate"].append(1.0)
        self.metrics["execution_time"].append(execution_time)
        self.metrics["resource_usage"].append(resource_usage)
        self.metrics["total_requests"] += 1
    
    def record_failure(self):
        self.metrics["success_rate"].append(0.0)
        self.metrics["error_count"] += 1
        self.metrics["total_requests"] += 1
    
    def get_summary(self) -> Dict:
        return {
            "success_rate": self._calculate_success_rate(),
            "avg_execution_time": self._calculate_avg_time(),
            "avg_resource_usage": self._calculate_avg_resource(),
            "error_rate": self._calculate_error_rate(),
            "total_requests": self.metrics["total_requests"]
        }
    
    def _calculate_success_rate(self) -> float:
        if not self.metrics["success_rate"]:
            return 0.0
        
        return sum(self.metrics["success_rate"]) / len(self.metrics["success_rate"])
    
    def _calculate_avg_time(self) -> float:
        if not self.metrics["execution_time"]:
            return 0.0
        
        return sum(self.metrics["execution_time"]) / len(self.metrics["execution_time"])
    
    def _calculate_avg_resource(self) -> float:
        if not self.metrics["resource_usage"]:
            return 0.0
        
        return sum(self.metrics["resource_usage"]) / len(self.metrics["resource_usage"])
    
    def _calculate_error_rate(self) -> float:
        if self.metrics["total_requests"] == 0:
            return 0.0
        
        return self.metrics["error_count"] / self.metrics["total_requests"]

高级指标

python
class AdvancedMetrics:
    def __init__(self):
        self.metrics = PerformanceMetrics()
        self.latency_distribution = []
        self.throughput = []
    
    def record_request(self, execution_time: float, 
                     success: bool, resource_usage: float):
        if success:
            self.metrics.record_success(execution_time, resource_usage)
        else:
            self.metrics.record_failure()
        
        self.latency_distribution.append(execution_time)
        self._update_throughput()
    
    def _update_throughput(self):
        if len(self.latency_distribution) > 0:
            throughput = 1.0 / self.latency_distribution[-1]
            self.throughput.append(throughput)
    
    def get_advanced_summary(self) -> Dict:
        basic_summary = self.metrics.get_summary()
        
        return {
            **basic_summary,
            "p50_latency": self._calculate_percentile(50),
            "p95_latency": self._calculate_percentile(95),
            "p99_latency": self._calculate_percentile(99),
            "avg_throughput": self._calculate_avg_throughput(),
            "max_throughput": max(self.throughput) if self.throughput else 0
        }
    
    def _calculate_percentile(self, percentile: int) -> float:
        if not self.latency_distribution:
            return 0.0
        
        sorted_latencies = sorted(self.latency_distribution)
        index = int(len(sorted_latencies) * percentile / 100)
        
        return sorted_latencies[min(index, len(sorted_latencies) - 1)]
    
    def _calculate_avg_throughput(self) -> float:
        if not self.throughput:
            return 0.0
        
        return sum(self.throughput) / len(self.throughput)

持续改进

学习机制

python
class LearningMechanism:
    def __init__(self, llm):
        self.llm = llm
        self.experience_buffer = []
        self.knowledge_base = {}
    
    def add_experience(self, experience: Dict):
        self.experience_buffer.append(experience)
        
        if len(self.experience_buffer) >= 10:
            self._learn_from_experiences()
            self.experience_buffer = []
    
    def _learn_from_experiences(self):
        prompt = f"""
        Analyze the following experiences and extract learning insights:
        
        {self._format_experiences(self.experience_buffer)}
        
        For each insight, provide:
        1. What was learned
        2. When to apply it
        3. Expected improvement
        """
        
        response = self.llm.invoke(prompt)
        self._update_knowledge_base(response)
    
    def _format_experiences(self, experiences: List[Dict]) -> str:
        return "\n".join([
            f"Experience {i+1}: {exp.get('description', 'Unknown')}"
            for i, exp in enumerate(experiences)
        ])
    
    def _update_knowledge_base(self, insights: str):
        self.knowledge_base["last_update"] = datetime.now()
        self.knowledge_base["insights"] = insights
    
    def get_applicable_insights(self, context: Dict) -> List[str]:
        prompt = f"""
        Context: {context}
        
        Knowledge Base:
        {self.knowledge_base.get('insights', 'No insights yet')}
        
        Return insights applicable to this context.
        """
        
        response = self.llm.invoke(prompt)
        return self._parse_insights(response)
    
    def _parse_insights(self, response: str) -> List[str]:
        return [line.strip() for line in response.split('\n') if line.strip()]

策略优化

python
class StrategyOptimizer:
    def __init__(self, llm):
        self.llm = llm
        self.strategies = {}
        self.performance_history = {}
    
    def register_strategy(self, name: str, strategy: Callable):
        self.strategies[name] = strategy
        self.performance_history[name] = []
    
    def evaluate_strategy(self, name: str, context: Dict) -> float:
        if name not in self.strategies:
            return 0.0
        
        strategy = self.strategies[name]
        result = strategy(context)
        
        score = self._calculate_performance_score(result)
        self.performance_history[name].append(score)
        
        return score
    
    def _calculate_performance_score(self, result: Dict) -> float:
        success = result.get("success", False)
        execution_time = result.get("execution_time", 1.0)
        quality = result.get("quality", 0.5)
        
        if not success:
            return 0.0
        
        return quality / execution_time
    
    def get_best_strategy(self, context: Dict) -> str:
        best_strategy = None
        best_score = 0.0
        
        for name in self.strategies:
            score = self.evaluate_strategy(name, context)
            
            if score > best_score:
                best_score = score
                best_strategy = name
        
        return best_strategy
    
    def optimize_strategies(self, context: Dict):
        prompt = f"""
        Context: {context}
        
        Current strategies and their performance:
        {self._format_strategy_performance()}
        
        Suggest improvements to underperforming strategies.
        """
        
        response = self.llm.invoke(prompt)
        self._apply_improvements(response)
    
    def _format_strategy_performance(self) -> str:
        lines = []
        
        for name, scores in self.performance_history.items():
            if scores:
                avg_score = sum(scores) / len(scores)
                lines.append(f"{name}: {avg_score:.3f} (avg over {len(scores)} runs)")
        
        return "\n".join(lines) if lines else "No performance data yet"
    
    def _apply_improvements(self, improvements: str):
        pass

实践练习

练习1:实现简单的反思系统

python
class SimpleReflectionSystem:
    def __init__(self):
        self.evaluator = GoalEvaluator()
        self.evaluator.add_criteria("completeness", 
                                  self.evaluator.evaluate_completeness)
    
    def reflect(self, result: Dict, goal: Dict) -> Dict:
        evaluation = self.evaluator.evaluate(result, goal)
        
        if evaluation["achieved"]:
            return {"status": "success", "evaluation": evaluation}
        else:
            return {
                "status": "needs_improvement",
                "evaluation": evaluation,
                "suggestion": "Review and adjust approach"
            }

练习2:实现性能监控系统

python
class SimplePerformanceMonitor:
    def __init__(self):
        self.metrics = PerformanceMetrics()
    
    def record(self, success: bool, execution_time: float):
        if success:
            self.metrics.record_success(execution_time, 0.5)
        else:
            self.metrics.record_failure()
    
    def get_report(self) -> Dict:
        return self.metrics.get_summary()

总结

本节我们学习了Agent反思与评估:

  1. 反思机制的设计和触发器
  2. 自我评估方法(目标达成、性能评估)
  3. 错误检测与纠正
  4. 性能指标(基础指标、高级指标)
  5. 持续改进策略(学习机制、策略优化)

反思与评估使Agent能够自我监控、自我改进,不断提升性能。

参考资源