Appearance
第37天:Agent行动与记忆
学习目标
- 理解Agent的行动选择机制
- 掌握执行监控方法
- 学习记忆系统设计
- 理解短期记忆与长期记忆
- 掌握记忆检索与更新策略
行动选择机制
行动选择流程
决策流程:
感知 → 状态评估 → 选项生成 → 行动评估 → 行动选择 → 执行实现示例:
python
class ActionSelector:
def __init__(self, evaluation_function):
self.evaluation_function = evaluation_function
def select_action(self, state, possible_actions):
evaluations = self.evaluate_actions(state, possible_actions)
return self.choose_best_action(evaluations)
def evaluate_actions(self, state, actions):
evaluations = {}
for action in actions:
evaluations[action] = self.evaluation_function(state, action)
return evaluations
def choose_best_action(self, evaluations):
return max(evaluations, key=evaluations.get)行动评估方法
基于效用的评估
python
class UtilityBasedEvaluator:
def __init__(self, utility_function):
self.utility_function = utility_function
def evaluate(self, state, action):
next_state = self.predict_next_state(state, action)
utility = self.utility_function(next_state)
return utility
def predict_next_state(self, state, action):
return state.apply(action)基于价值的评估
python
class ValueBasedEvaluator:
def __init__(self, value_function):
self.value_function = value_function
def evaluate(self, state, action):
next_state = self.predict_next_state(state, action)
value = self.value_function(next_state)
return value
def predict_next_state(self, state, action):
return state.apply(action)基于策略的评估
python
class PolicyBasedEvaluator:
def __init__(self, policy):
self.policy = policy
def evaluate(self, state, action):
return self.policy.get_probability(state, action)行动执行
执行器接口
python
class Actuator:
def __init__(self, name):
self.name = name
def can_perform(self, action):
raise NotImplementedError
def perform(self, action):
raise NotImplementedError具体执行器实现
python
class WebBrowserActuator(Actuator):
def __init__(self):
super().__init__("web_browser")
self.browser = None
def can_perform(self, action):
return action.type == "web_action"
def perform(self, action):
if action.name == "navigate":
return self.navigate(action.url)
elif action.name == "click":
return self.click(action.selector)
elif action.name == "type":
return self.type_text(action.selector, action.text)
def navigate(self, url):
if self.browser is None:
from selenium import webdriver
self.browser = webdriver.Chrome()
self.browser.get(url)
return {"success": True, "url": url}
def click(self, selector):
element = self.browser.find_element_by_css_selector(selector)
element.click()
return {"success": True, "selector": selector}
def type_text(self, selector, text):
element = self.browser.find_element_by_css_selector(selector)
element.send_keys(text)
return {"success": True, "selector": selector, "text": text}执行监控
监控机制
python
class ExecutionMonitor:
def __init__(self):
self.monitoring = True
def monitor_execution(self, action, execution_func):
try:
result = execution_func()
return self.success(result)
except Exception as e:
return self.failure(e)
def success(self, result):
return {
"status": "success",
"result": result
}
def failure(self, error):
return {
"status": "failure",
"error": str(error)
}异常处理
python
class ExecutionHandler:
def __init__(self, retry_policy):
self.retry_policy = retry_policy
def execute_with_retry(self, action, executor):
attempts = 0
max_attempts = self.retry_policy.max_attempts
while attempts < max_attempts:
try:
result = executor.execute(action)
return result
except Exception as e:
attempts += 1
if attempts >= max_attempts:
raise e
self.handle_retry(e, attempts)
def handle_retry(self, error, attempt):
print(f"Attempt {attempt} failed: {error}")
time.sleep(self.retry_policy.get_delay(attempt))记忆系统设计
记忆架构
三层记忆架构:
┌─────────────────────────────────────┐
│ 工作记忆(Working Memory) │
│ - 当前任务信息 │
│ - 临时状态 │
│ - 快速访问 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 情景记忆(Episodic Memory) │
│ - 过去经历 │
│ - 事件序列 │
│ - 时空信息 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 语义记忆(Semantic Memory) │
│ - 知识和概念 │
│ - 规则和模式 │
│ - 长期存储 │
└─────────────────────────────────────┘工作记忆
实现:
python
class WorkingMemory:
def __init__(self, capacity=7):
self.capacity = capacity
self.items = []
def add(self, item):
self.items.append(item)
if len(self.items) > self.capacity:
self.items.pop(0)
def get(self, index):
if 0 <= index < len(self.items):
return self.items[index]
return None
def clear(self):
self.items = []
def contains(self, item):
return item in self.items情景记忆
实现:
python
class EpisodicMemory:
def __init__(self):
self.episodes = []
self.index = {}
def add_episode(self, episode):
episode_id = len(self.episodes)
self.episodes.append(episode)
for key, value in episode.items():
if key not in self.index:
self.index[key] = {}
if value not in self.index[key]:
self.index[key][value] = []
self.index[key][value].append(episode_id)
def retrieve(self, query):
results = []
for key, value in query.items():
if key in self.index and value in self.index[key]:
results.extend(self.index[key][value])
return [self.episodes[i] for i in set(results)]
def search_similar(self, episode, threshold=0.7):
similar_episodes = []
for ep in self.episodes:
similarity = self.calculate_similarity(episode, ep)
if similarity >= threshold:
similar_episodes.append((ep, similarity))
return sorted(similar_episodes, key=lambda x: x[1], reverse=True)
def calculate_similarity(self, episode1, episode2):
common_keys = set(episode1.keys()) & set(episode2.keys())
if not common_keys:
return 0
matches = sum(1 for key in common_keys if episode1[key] == episode2[key])
return matches / len(common_keys)语义记忆
实现:
python
class SemanticMemory:
def __init__(self):
self.concepts = {}
self.relations = {}
def add_concept(self, name, attributes):
self.concepts[name] = attributes
def add_relation(self, concept1, relation, concept2):
if concept1 not in self.relations:
self.relations[concept1] = {}
if relation not in self.relations[concept1]:
self.relations[concept1][relation] = []
self.relations[concept1][relation].append(concept2)
def get_concept(self, name):
return self.concepts.get(name)
def get_relations(self, concept, relation=None):
if concept not in self.relations:
return []
if relation is None:
return self.relations[concept]
return self.relations[concept].get(relation, [])
def infer(self, concept, relation):
relations = self.get_relations(concept, relation)
if relations:
return relations[0]
return None记忆检索
基于关键词的检索
python
class KeywordRetriever:
def __init__(self, memory):
self.memory = memory
self.index = self.build_index()
def build_index(self):
index = {}
for episode_id, episode in enumerate(self.memory.episodes):
for key, value in episode.items():
if isinstance(value, str):
words = value.lower().split()
for word in words:
if word not in index:
index[word] = []
index[word].append(episode_id)
return index
def retrieve(self, query):
query_words = query.lower().split()
episode_ids = set()
for word in query_words:
if word in self.index:
episode_ids.update(self.index[word])
return [self.memory.episodes[i] for i in episode_ids]基于向量的检索
python
class VectorRetriever:
def __init__(self, memory, embedding_model):
self.memory = memory
self.embedding_model = embedding_model
self.embeddings = self.build_embeddings()
def build_embeddings(self):
embeddings = []
for episode in self.memory.episodes:
text = self.episode_to_text(episode)
embedding = self.embedding_model.encode(text)
embeddings.append(embedding)
return np.array(embeddings)
def episode_to_text(self, episode):
return " ".join(str(v) for v in episode.values())
def retrieve(self, query, top_k=5):
query_embedding = self.embedding_model.encode(query)
similarities = np.dot(self.embeddings, query_embedding)
top_indices = np.argsort(similarities)[-top_k:][::-1]
return [self.memory.episodes[i] for i in top_indices]混合检索
python
class HybridRetriever:
def __init__(self, keyword_retriever, vector_retriever, alpha=0.5):
self.keyword_retriever = keyword_retriever
self.vector_retriever = vector_retriever
self.alpha = alpha
def retrieve(self, query, top_k=5):
keyword_results = self.keyword_retriever.retrieve(query)
vector_results = self.vector_retriever.retrieve(query, top_k * 2)
combined_scores = self.combine_results(
keyword_results, vector_results
)
top_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [result[0] for result in top_results]
def combine_results(self, keyword_results, vector_results):
combined = {}
for i, result in enumerate(keyword_results):
score = (1 - self.alpha) * (1 - i / len(keyword_results))
combined[str(result)] = combined.get(str(result), 0) + score
for i, result in enumerate(vector_results):
score = self.alpha * (1 - i / len(vector_results))
combined[str(result)] = combined.get(str(result), 0) + score
return combined记忆更新
记忆压缩
python
class MemoryCompressor:
def __init__(self, compression_ratio=0.5):
self.compression_ratio = compression_ratio
def compress(self, memory):
if len(memory.episodes) <= self.compression_ratio * 100:
return memory
importance_scores = self.calculate_importance(memory)
threshold = np.percentile(importance_scores,
(1 - self.compression_ratio) * 100)
compressed_episodes = [
episode for episode, score in zip(memory.episodes, importance_scores)
if score >= threshold
]
compressed_memory = EpisodicMemory()
for episode in compressed_episodes:
compressed_memory.add_episode(episode)
return compressed_memory
def calculate_importance(self, memory):
scores = []
for episode in memory.episodes:
score = self.evaluate_importance(episode, memory)
scores.append(score)
return scores
def evaluate_importance(self, episode, memory):
recency_score = self.recency_score(episode, memory)
frequency_score = self.frequency_score(episode, memory)
uniqueness_score = self.uniqueness_score(episode, memory)
return (recency_score + frequency_score + uniqueness_score) / 3
def recency_score(self, episode, memory):
episode_index = memory.episodes.index(episode)
return 1 - (episode_index / len(memory.episodes))
def frequency_score(self, episode, memory):
similar = memory.search_similar(episode, threshold=0.8)
return len(similar) / len(memory.episodes)
def uniqueness_score(self, episode, memory):
similar = memory.search_similar(episode, threshold=0.8)
return 1 / (len(similar) + 1)记忆遗忘
python
class MemoryForgetting:
def __init__(self, decay_rate=0.1):
self.decay_rate = decay_rate
def forget(self, memory):
for episode in memory.episodes:
if "importance" not in episode:
episode["importance"] = 1.0
episode["importance"] *= (1 - self.decay_rate)
memory.episodes = [
episode for episode in memory.episodes
if episode["importance"] > 0.1
]
return memory实践练习
练习1:实现简单的行动选择器
python
class SimpleActionSelector:
def __init__(self):
self.evaluator = UtilityBasedEvaluator(lambda s: s.utility)
def select_action(self, state, actions):
selector = ActionSelector(self.evaluator.evaluate)
return selector.select_action(state, actions)练习2:实现情景记忆系统
python
class SimpleEpisodicMemory:
def __init__(self):
self.memory = EpisodicMemory()
def remember(self, experience):
self.memory.add_episode(experience)
def recall(self, query):
return self.memory.retrieve(query)总结
本节我们学习了Agent的行动与记忆:
- 行动选择机制和评估方法
- 行动执行和监控
- 记忆系统设计(工作记忆、情景记忆、语义记忆)
- 记忆检索方法(关键词、向量、混合)
- 记忆更新策略(压缩、遗忘)
这些技术使Agent能够做出明智的行动选择,并通过记忆系统积累和利用经验。
