Appearance
第43天:Agent反思与评估
学习目标
- 理解反思机制的设计
- 掌握自我评估方法
- 学习错误检测与纠正
- 了解性能指标
- 掌握持续改进策略
反思机制
反思循环
┌─────────────────────────────────────────┐
│ Action Execution │
└───────────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Observation │
└───────────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Reflection │
│ - What happened? │
│ - Why did it happen? │
│ - What could be improved? │
└───────────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Evaluation │
│ - Success/Failure │
│ - Performance metrics │
│ - Goal achievement │
└───────────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Learning │
│ - Update knowledge │
│ - Adjust strategies │
│ - Improve policies │
└───────────────────┬─────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Improved Action │
└─────────────────────────────────────────┘反思触发器
python
from typing import Callable, List
from datetime import datetime
class ReflectionTrigger:
def __init__(self):
self.triggers = []
def add_trigger(self, condition: Callable, action: Callable):
self.triggers.append((condition, action))
def check(self, context: Dict) -> List[Callable]:
actions = []
for condition, action in self.triggers:
if condition(context):
actions.append(action)
return actions
class ReflectionManager:
def __init__(self):
self.trigger = ReflectionTrigger()
self._setup_triggers()
def _setup_triggers(self):
self.trigger.add_trigger(
condition=lambda ctx: ctx.get("error") is not None,
action=lambda ctx: self._handle_error(ctx)
)
self.trigger.add_trigger(
condition=lambda ctx: ctx.get("success_rate", 1.0) < 0.7,
action=lambda ctx: self._handle_low_success(ctx)
)
self.trigger.add_trigger(
condition=lambda ctx: ctx.get("iteration", 0) % 10 == 0,
action=lambda ctx: self._periodic_reflection(ctx)
)
def reflect(self, context: Dict) -> List[Dict]:
actions = self.trigger.check(context)
results = []
for action in actions:
result = action(context)
results.append(result)
return results
def _handle_error(self, context: Dict) -> Dict:
return {
"type": "error_reflection",
"error": context.get("error"),
"timestamp": datetime.now(),
"suggestion": self._analyze_error(context)
}
def _handle_low_success(self, context: Dict) -> Dict:
return {
"type": "performance_reflection",
"success_rate": context.get("success_rate"),
"timestamp": datetime.now(),
"suggestion": self._improve_performance(context)
}
def _periodic_reflection(self, context: Dict) -> Dict:
return {
"type": "periodic_reflection",
"iteration": context.get("iteration"),
"timestamp": datetime.now(),
"suggestion": self._review_progress(context)
}自我评估
目标达成评估
python
class GoalEvaluator:
def __init__(self):
self.evaluation_criteria = {}
def add_criteria(self, name: str, evaluator: Callable):
self.evaluation_criteria[name] = evaluator
def evaluate(self, result: Dict, goal: Dict) -> Dict:
evaluations = {}
for name, evaluator in self.evaluation_criteria.items():
score = evaluator(result, goal)
evaluations[name] = score
overall_score = self._calculate_overall_score(evaluations)
return {
"criteria": evaluations,
"overall_score": overall_score,
"achieved": overall_score >= 0.8
}
def _calculate_overall_score(self, evaluations: Dict) -> float:
return sum(evaluations.values()) / len(evaluations)
def evaluate_completeness(self, result: Dict, goal: Dict) -> float:
required_keys = set(goal.get("required_keys", []))
result_keys = set(result.keys())
if not required_keys:
return 1.0
intersection = required_keys & result_keys
return len(intersection) / len(required_keys)
def evaluate_accuracy(self, result: Dict, goal: Dict) -> float:
expected = goal.get("expected_output")
actual = result.get("output")
if expected is None or actual is None:
return 0.5
return 1.0 if expected == actual else 0.0性能评估
python
class PerformanceEvaluator:
def __init__(self):
self.metrics = {}
def record_metric(self, name: str, value: float):
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(value)
def evaluate(self) -> Dict:
evaluations = {}
for name, values in self.metrics.items():
if values:
evaluations[name] = {
"mean": sum(values) / len(values),
"min": min(values),
"max": max(values),
"std": self._calculate_std(values),
"count": len(values)
}
return evaluations
def _calculate_std(self, values: List[float]) -> float:
if len(values) < 2:
return 0.0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
return variance ** 0.5
def get_trend(self, name: str, window: int = 10) -> str:
if name not in self.metrics:
return "unknown"
values = self.metrics[name][-window:]
if len(values) < 2:
return "insufficient_data"
recent = sum(values[-5:]) / min(5, len(values))
older = sum(values[:-5]) / max(1, len(values) - 5)
if recent > older * 1.1:
return "improving"
elif recent < older * 0.9:
return "declining"
else:
return "stable"错误检测与纠正
错误检测
python
class ErrorDetector:
def __init__(self):
self.error_patterns = []
self._setup_error_patterns()
def _setup_error_patterns(self):
self.error_patterns.append(
{
"name": "timeout_error",
"pattern": lambda result: result.get("error", "").lower() in ["timeout", "timed out"],
"severity": "high"
}
)
self.error_patterns.append(
{
"name": "invalid_input",
"pattern": lambda result: "invalid" in result.get("error", "").lower(),
"severity": "medium"
}
)
self.error_patterns.append(
{
"name": "resource_exhaustion",
"pattern": lambda result: "memory" in result.get("error", "").lower() or
"disk" in result.get("error", "").lower(),
"severity": "high"
}
)
def detect(self, result: Dict) -> List[Dict]:
detected_errors = []
for pattern in self.error_patterns:
if pattern["pattern"](result):
detected_errors.append({
"name": pattern["name"],
"severity": pattern["severity"],
"error": result.get("error")
})
return detected_errors
def classify_error(self, error: str) -> str:
error_lower = error.lower()
if "timeout" in error_lower or "timed out" in error_lower:
return "timeout"
elif "memory" in error_lower:
return "memory"
elif "network" in error_lower or "connection" in error_lower:
return "network"
elif "permission" in error_lower or "access" in error_lower:
return "permission"
else:
return "unknown"错误纠正
python
class ErrorCorrector:
def __init__(self):
self.correction_strategies = {}
self._setup_strategies()
def _setup_strategies(self):
self.correction_strategies["timeout"] = self._handle_timeout
self.correction_strategies["memory"] = self._handle_memory
self.correction_strategies["network"] = self._handle_network
self.correction_strategies["permission"] = self._handle_permission
def correct(self, error: Dict, context: Dict) -> Dict:
error_type = error.get("type", "unknown")
if error_type in self.correction_strategies:
return self.correction_strategies[error_type](error, context)
return {
"status": "uncorrectable",
"error": error
}
def _handle_timeout(self, error: Dict, context: Dict) -> Dict:
return {
"status": "corrected",
"action": "increase_timeout",
"suggestion": "Increase timeout value or optimize the operation"
}
def _handle_memory(self, error: Dict, context: Dict) -> Dict:
return {
"status": "corrected",
"action": "reduce_memory_usage",
"suggestion": "Reduce batch size or implement streaming"
}
def _handle_network(self, error: Dict, context: Dict) -> Dict:
return {
"status": "corrected",
"action": "retry_with_backoff",
"suggestion": "Implement exponential backoff for retries"
}
def _handle_permission(self, error: Dict, context: Dict) -> Dict:
return {
"status": "uncorrectable",
"action": "check_permissions",
"suggestion": "Verify and update access permissions"
}性能指标
基础指标
python
class PerformanceMetrics:
def __init__(self):
self.metrics = {
"success_rate": [],
"execution_time": [],
"resource_usage": [],
"error_count": 0,
"total_requests": 0
}
def record_success(self, execution_time: float,
resource_usage: float):
self.metrics["success_rate"].append(1.0)
self.metrics["execution_time"].append(execution_time)
self.metrics["resource_usage"].append(resource_usage)
self.metrics["total_requests"] += 1
def record_failure(self):
self.metrics["success_rate"].append(0.0)
self.metrics["error_count"] += 1
self.metrics["total_requests"] += 1
def get_summary(self) -> Dict:
return {
"success_rate": self._calculate_success_rate(),
"avg_execution_time": self._calculate_avg_time(),
"avg_resource_usage": self._calculate_avg_resource(),
"error_rate": self._calculate_error_rate(),
"total_requests": self.metrics["total_requests"]
}
def _calculate_success_rate(self) -> float:
if not self.metrics["success_rate"]:
return 0.0
return sum(self.metrics["success_rate"]) / len(self.metrics["success_rate"])
def _calculate_avg_time(self) -> float:
if not self.metrics["execution_time"]:
return 0.0
return sum(self.metrics["execution_time"]) / len(self.metrics["execution_time"])
def _calculate_avg_resource(self) -> float:
if not self.metrics["resource_usage"]:
return 0.0
return sum(self.metrics["resource_usage"]) / len(self.metrics["resource_usage"])
def _calculate_error_rate(self) -> float:
if self.metrics["total_requests"] == 0:
return 0.0
return self.metrics["error_count"] / self.metrics["total_requests"]高级指标
python
class AdvancedMetrics:
def __init__(self):
self.metrics = PerformanceMetrics()
self.latency_distribution = []
self.throughput = []
def record_request(self, execution_time: float,
success: bool, resource_usage: float):
if success:
self.metrics.record_success(execution_time, resource_usage)
else:
self.metrics.record_failure()
self.latency_distribution.append(execution_time)
self._update_throughput()
def _update_throughput(self):
if len(self.latency_distribution) > 0:
throughput = 1.0 / self.latency_distribution[-1]
self.throughput.append(throughput)
def get_advanced_summary(self) -> Dict:
basic_summary = self.metrics.get_summary()
return {
**basic_summary,
"p50_latency": self._calculate_percentile(50),
"p95_latency": self._calculate_percentile(95),
"p99_latency": self._calculate_percentile(99),
"avg_throughput": self._calculate_avg_throughput(),
"max_throughput": max(self.throughput) if self.throughput else 0
}
def _calculate_percentile(self, percentile: int) -> float:
if not self.latency_distribution:
return 0.0
sorted_latencies = sorted(self.latency_distribution)
index = int(len(sorted_latencies) * percentile / 100)
return sorted_latencies[min(index, len(sorted_latencies) - 1)]
def _calculate_avg_throughput(self) -> float:
if not self.throughput:
return 0.0
return sum(self.throughput) / len(self.throughput)持续改进
学习机制
python
class LearningMechanism:
def __init__(self, llm):
self.llm = llm
self.experience_buffer = []
self.knowledge_base = {}
def add_experience(self, experience: Dict):
self.experience_buffer.append(experience)
if len(self.experience_buffer) >= 10:
self._learn_from_experiences()
self.experience_buffer = []
def _learn_from_experiences(self):
prompt = f"""
Analyze the following experiences and extract learning insights:
{self._format_experiences(self.experience_buffer)}
For each insight, provide:
1. What was learned
2. When to apply it
3. Expected improvement
"""
response = self.llm.invoke(prompt)
self._update_knowledge_base(response)
def _format_experiences(self, experiences: List[Dict]) -> str:
return "\n".join([
f"Experience {i+1}: {exp.get('description', 'Unknown')}"
for i, exp in enumerate(experiences)
])
def _update_knowledge_base(self, insights: str):
self.knowledge_base["last_update"] = datetime.now()
self.knowledge_base["insights"] = insights
def get_applicable_insights(self, context: Dict) -> List[str]:
prompt = f"""
Context: {context}
Knowledge Base:
{self.knowledge_base.get('insights', 'No insights yet')}
Return insights applicable to this context.
"""
response = self.llm.invoke(prompt)
return self._parse_insights(response)
def _parse_insights(self, response: str) -> List[str]:
return [line.strip() for line in response.split('\n') if line.strip()]策略优化
python
class StrategyOptimizer:
def __init__(self, llm):
self.llm = llm
self.strategies = {}
self.performance_history = {}
def register_strategy(self, name: str, strategy: Callable):
self.strategies[name] = strategy
self.performance_history[name] = []
def evaluate_strategy(self, name: str, context: Dict) -> float:
if name not in self.strategies:
return 0.0
strategy = self.strategies[name]
result = strategy(context)
score = self._calculate_performance_score(result)
self.performance_history[name].append(score)
return score
def _calculate_performance_score(self, result: Dict) -> float:
success = result.get("success", False)
execution_time = result.get("execution_time", 1.0)
quality = result.get("quality", 0.5)
if not success:
return 0.0
return quality / execution_time
def get_best_strategy(self, context: Dict) -> str:
best_strategy = None
best_score = 0.0
for name in self.strategies:
score = self.evaluate_strategy(name, context)
if score > best_score:
best_score = score
best_strategy = name
return best_strategy
def optimize_strategies(self, context: Dict):
prompt = f"""
Context: {context}
Current strategies and their performance:
{self._format_strategy_performance()}
Suggest improvements to underperforming strategies.
"""
response = self.llm.invoke(prompt)
self._apply_improvements(response)
def _format_strategy_performance(self) -> str:
lines = []
for name, scores in self.performance_history.items():
if scores:
avg_score = sum(scores) / len(scores)
lines.append(f"{name}: {avg_score:.3f} (avg over {len(scores)} runs)")
return "\n".join(lines) if lines else "No performance data yet"
def _apply_improvements(self, improvements: str):
pass实践练习
练习1:实现简单的反思系统
python
class SimpleReflectionSystem:
def __init__(self):
self.evaluator = GoalEvaluator()
self.evaluator.add_criteria("completeness",
self.evaluator.evaluate_completeness)
def reflect(self, result: Dict, goal: Dict) -> Dict:
evaluation = self.evaluator.evaluate(result, goal)
if evaluation["achieved"]:
return {"status": "success", "evaluation": evaluation}
else:
return {
"status": "needs_improvement",
"evaluation": evaluation,
"suggestion": "Review and adjust approach"
}练习2:实现性能监控系统
python
class SimplePerformanceMonitor:
def __init__(self):
self.metrics = PerformanceMetrics()
def record(self, success: bool, execution_time: float):
if success:
self.metrics.record_success(execution_time, 0.5)
else:
self.metrics.record_failure()
def get_report(self) -> Dict:
return self.metrics.get_summary()总结
本节我们学习了Agent反思与评估:
- 反思机制的设计和触发器
- 自我评估方法(目标达成、性能评估)
- 错误检测与纠正
- 性能指标(基础指标、高级指标)
- 持续改进策略(学习机制、策略优化)
反思与评估使Agent能够自我监控、自我改进,不断提升性能。
