Skip to content

第37天:Agent行动与记忆

学习目标

  • 理解Agent的行动选择机制
  • 掌握执行监控方法
  • 学习记忆系统设计
  • 理解短期记忆与长期记忆
  • 掌握记忆检索与更新策略

行动选择机制

行动选择流程

决策流程

感知 → 状态评估 → 选项生成 → 行动评估 → 行动选择 → 执行

实现示例

python
class ActionSelector:
    def __init__(self, evaluation_function):
        self.evaluation_function = evaluation_function
    
    def select_action(self, state, possible_actions):
        evaluations = self.evaluate_actions(state, possible_actions)
        return self.choose_best_action(evaluations)
    
    def evaluate_actions(self, state, actions):
        evaluations = {}
        for action in actions:
            evaluations[action] = self.evaluation_function(state, action)
        return evaluations
    
    def choose_best_action(self, evaluations):
        return max(evaluations, key=evaluations.get)

行动评估方法

基于效用的评估

python
class UtilityBasedEvaluator:
    def __init__(self, utility_function):
        self.utility_function = utility_function
    
    def evaluate(self, state, action):
        next_state = self.predict_next_state(state, action)
        utility = self.utility_function(next_state)
        return utility
    
    def predict_next_state(self, state, action):
        return state.apply(action)

基于价值的评估

python
class ValueBasedEvaluator:
    def __init__(self, value_function):
        self.value_function = value_function
    
    def evaluate(self, state, action):
        next_state = self.predict_next_state(state, action)
        value = self.value_function(next_state)
        return value
    
    def predict_next_state(self, state, action):
        return state.apply(action)

基于策略的评估

python
class PolicyBasedEvaluator:
    def __init__(self, policy):
        self.policy = policy
    
    def evaluate(self, state, action):
        return self.policy.get_probability(state, action)

行动执行

执行器接口

python
class Actuator:
    def __init__(self, name):
        self.name = name
    
    def can_perform(self, action):
        raise NotImplementedError
    
    def perform(self, action):
        raise NotImplementedError

具体执行器实现

python
class WebBrowserActuator(Actuator):
    def __init__(self):
        super().__init__("web_browser")
        self.browser = None
    
    def can_perform(self, action):
        return action.type == "web_action"
    
    def perform(self, action):
        if action.name == "navigate":
            return self.navigate(action.url)
        elif action.name == "click":
            return self.click(action.selector)
        elif action.name == "type":
            return self.type_text(action.selector, action.text)
    
    def navigate(self, url):
        if self.browser is None:
            from selenium import webdriver
            self.browser = webdriver.Chrome()
        self.browser.get(url)
        return {"success": True, "url": url}
    
    def click(self, selector):
        element = self.browser.find_element_by_css_selector(selector)
        element.click()
        return {"success": True, "selector": selector}
    
    def type_text(self, selector, text):
        element = self.browser.find_element_by_css_selector(selector)
        element.send_keys(text)
        return {"success": True, "selector": selector, "text": text}

执行监控

监控机制

python
class ExecutionMonitor:
    def __init__(self):
        self.monitoring = True
    
    def monitor_execution(self, action, execution_func):
        try:
            result = execution_func()
            return self.success(result)
        except Exception as e:
            return self.failure(e)
    
    def success(self, result):
        return {
            "status": "success",
            "result": result
        }
    
    def failure(self, error):
        return {
            "status": "failure",
            "error": str(error)
        }

异常处理

python
class ExecutionHandler:
    def __init__(self, retry_policy):
        self.retry_policy = retry_policy
    
    def execute_with_retry(self, action, executor):
        attempts = 0
        max_attempts = self.retry_policy.max_attempts
        
        while attempts < max_attempts:
            try:
                result = executor.execute(action)
                return result
            except Exception as e:
                attempts += 1
                if attempts >= max_attempts:
                    raise e
                self.handle_retry(e, attempts)
    
    def handle_retry(self, error, attempt):
        print(f"Attempt {attempt} failed: {error}")
        time.sleep(self.retry_policy.get_delay(attempt))

记忆系统设计

记忆架构

三层记忆架构

┌─────────────────────────────────────┐
│      工作记忆(Working Memory)      │
│   - 当前任务信息                     │
│   - 临时状态                         │
│   - 快速访问                         │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│      情景记忆(Episodic Memory)     │
│   - 过去经历                         │
│   - 事件序列                         │
│   - 时空信息                         │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│      语义记忆(Semantic Memory)     │
│   - 知识和概念                       │
│   - 规则和模式                       │
│   - 长期存储                         │
└─────────────────────────────────────┘

工作记忆

实现

python
class WorkingMemory:
    def __init__(self, capacity=7):
        self.capacity = capacity
        self.items = []
    
    def add(self, item):
        self.items.append(item)
        if len(self.items) > self.capacity:
            self.items.pop(0)
    
    def get(self, index):
        if 0 <= index < len(self.items):
            return self.items[index]
        return None
    
    def clear(self):
        self.items = []
    
    def contains(self, item):
        return item in self.items

情景记忆

实现

python
class EpisodicMemory:
    def __init__(self):
        self.episodes = []
        self.index = {}
    
    def add_episode(self, episode):
        episode_id = len(self.episodes)
        self.episodes.append(episode)
        
        for key, value in episode.items():
            if key not in self.index:
                self.index[key] = {}
            if value not in self.index[key]:
                self.index[key][value] = []
            self.index[key][value].append(episode_id)
    
    def retrieve(self, query):
        results = []
        for key, value in query.items():
            if key in self.index and value in self.index[key]:
                results.extend(self.index[key][value])
        return [self.episodes[i] for i in set(results)]
    
    def search_similar(self, episode, threshold=0.7):
        similar_episodes = []
        for ep in self.episodes:
            similarity = self.calculate_similarity(episode, ep)
            if similarity >= threshold:
                similar_episodes.append((ep, similarity))
        return sorted(similar_episodes, key=lambda x: x[1], reverse=True)
    
    def calculate_similarity(self, episode1, episode2):
        common_keys = set(episode1.keys()) & set(episode2.keys())
        if not common_keys:
            return 0
        
        matches = sum(1 for key in common_keys if episode1[key] == episode2[key])
        return matches / len(common_keys)

语义记忆

实现

python
class SemanticMemory:
    def __init__(self):
        self.concepts = {}
        self.relations = {}
    
    def add_concept(self, name, attributes):
        self.concepts[name] = attributes
    
    def add_relation(self, concept1, relation, concept2):
        if concept1 not in self.relations:
            self.relations[concept1] = {}
        if relation not in self.relations[concept1]:
            self.relations[concept1][relation] = []
        self.relations[concept1][relation].append(concept2)
    
    def get_concept(self, name):
        return self.concepts.get(name)
    
    def get_relations(self, concept, relation=None):
        if concept not in self.relations:
            return []
        if relation is None:
            return self.relations[concept]
        return self.relations[concept].get(relation, [])
    
    def infer(self, concept, relation):
        relations = self.get_relations(concept, relation)
        if relations:
            return relations[0]
        return None

记忆检索

基于关键词的检索

python
class KeywordRetriever:
    def __init__(self, memory):
        self.memory = memory
        self.index = self.build_index()
    
    def build_index(self):
        index = {}
        for episode_id, episode in enumerate(self.memory.episodes):
            for key, value in episode.items():
                if isinstance(value, str):
                    words = value.lower().split()
                    for word in words:
                        if word not in index:
                            index[word] = []
                        index[word].append(episode_id)
        return index
    
    def retrieve(self, query):
        query_words = query.lower().split()
        episode_ids = set()
        
        for word in query_words:
            if word in self.index:
                episode_ids.update(self.index[word])
        
        return [self.memory.episodes[i] for i in episode_ids]

基于向量的检索

python
class VectorRetriever:
    def __init__(self, memory, embedding_model):
        self.memory = memory
        self.embedding_model = embedding_model
        self.embeddings = self.build_embeddings()
    
    def build_embeddings(self):
        embeddings = []
        for episode in self.memory.episodes:
            text = self.episode_to_text(episode)
            embedding = self.embedding_model.encode(text)
            embeddings.append(embedding)
        return np.array(embeddings)
    
    def episode_to_text(self, episode):
        return " ".join(str(v) for v in episode.values())
    
    def retrieve(self, query, top_k=5):
        query_embedding = self.embedding_model.encode(query)
        similarities = np.dot(self.embeddings, query_embedding)
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        return [self.memory.episodes[i] for i in top_indices]

混合检索

python
class HybridRetriever:
    def __init__(self, keyword_retriever, vector_retriever, alpha=0.5):
        self.keyword_retriever = keyword_retriever
        self.vector_retriever = vector_retriever
        self.alpha = alpha
    
    def retrieve(self, query, top_k=5):
        keyword_results = self.keyword_retriever.retrieve(query)
        vector_results = self.vector_retriever.retrieve(query, top_k * 2)
        
        combined_scores = self.combine_results(
            keyword_results, vector_results
        )
        
        top_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]
        
        return [result[0] for result in top_results]
    
    def combine_results(self, keyword_results, vector_results):
        combined = {}
        
        for i, result in enumerate(keyword_results):
            score = (1 - self.alpha) * (1 - i / len(keyword_results))
            combined[str(result)] = combined.get(str(result), 0) + score
        
        for i, result in enumerate(vector_results):
            score = self.alpha * (1 - i / len(vector_results))
            combined[str(result)] = combined.get(str(result), 0) + score
        
        return combined

记忆更新

记忆压缩

python
class MemoryCompressor:
    def __init__(self, compression_ratio=0.5):
        self.compression_ratio = compression_ratio
    
    def compress(self, memory):
        if len(memory.episodes) <= self.compression_ratio * 100:
            return memory
        
        importance_scores = self.calculate_importance(memory)
        threshold = np.percentile(importance_scores, 
                                   (1 - self.compression_ratio) * 100)
        
        compressed_episodes = [
            episode for episode, score in zip(memory.episodes, importance_scores)
            if score >= threshold
        ]
        
        compressed_memory = EpisodicMemory()
        for episode in compressed_episodes:
            compressed_memory.add_episode(episode)
        
        return compressed_memory
    
    def calculate_importance(self, memory):
        scores = []
        for episode in memory.episodes:
            score = self.evaluate_importance(episode, memory)
            scores.append(score)
        return scores
    
    def evaluate_importance(self, episode, memory):
        recency_score = self.recency_score(episode, memory)
        frequency_score = self.frequency_score(episode, memory)
        uniqueness_score = self.uniqueness_score(episode, memory)
        
        return (recency_score + frequency_score + uniqueness_score) / 3
    
    def recency_score(self, episode, memory):
        episode_index = memory.episodes.index(episode)
        return 1 - (episode_index / len(memory.episodes))
    
    def frequency_score(self, episode, memory):
        similar = memory.search_similar(episode, threshold=0.8)
        return len(similar) / len(memory.episodes)
    
    def uniqueness_score(self, episode, memory):
        similar = memory.search_similar(episode, threshold=0.8)
        return 1 / (len(similar) + 1)

记忆遗忘

python
class MemoryForgetting:
    def __init__(self, decay_rate=0.1):
        self.decay_rate = decay_rate
    
    def forget(self, memory):
        for episode in memory.episodes:
            if "importance" not in episode:
                episode["importance"] = 1.0
            episode["importance"] *= (1 - self.decay_rate)
        
        memory.episodes = [
            episode for episode in memory.episodes
            if episode["importance"] > 0.1
        ]
        
        return memory

实践练习

练习1:实现简单的行动选择器

python
class SimpleActionSelector:
    def __init__(self):
        self.evaluator = UtilityBasedEvaluator(lambda s: s.utility)
    
    def select_action(self, state, actions):
        selector = ActionSelector(self.evaluator.evaluate)
        return selector.select_action(state, actions)

练习2:实现情景记忆系统

python
class SimpleEpisodicMemory:
    def __init__(self):
        self.memory = EpisodicMemory()
    
    def remember(self, experience):
        self.memory.add_episode(experience)
    
    def recall(self, query):
        return self.memory.retrieve(query)

总结

本节我们学习了Agent的行动与记忆:

  1. 行动选择机制和评估方法
  2. 行动执行和监控
  3. 记忆系统设计(工作记忆、情景记忆、语义记忆)
  4. 记忆检索方法(关键词、向量、混合)
  5. 记忆更新策略(压缩、遗忘)

这些技术使Agent能够做出明智的行动选择,并通过记忆系统积累和利用经验。

参考资源