Skip to content

第42天:Agent记忆系统

学习目标

  • 理解记忆系统架构
  • 掌握向量存储技术
  • 学习记忆检索方法
  • 理解记忆压缩技术
  • 掌握记忆更新策略

记忆系统架构

三层记忆模型

┌─────────────────────────────────────────┐
│      工作记忆(Working Memory)        │
│   - 容量:7±2个项目                  │
│   - 持续时间:秒到分钟                │
│   - 访问速度:极快                    │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│      情景记忆(Episodic Memory)      │
│   - 容量:无限                       │
│   - 持续时间:长期                     │
│   - 访问速度:快                       │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│      语义记忆(Semantic Memory)        │
│   - 容量:无限                       │
│   - 持续时间:永久                     │
│   - 访问速度:中                       │
└─────────────────────────────────────────┘

记忆管理器

python
from typing import List, Dict, Optional
from datetime import datetime
import uuid

class MemoryManager:
    def __init__(self):
        self.working_memory = WorkingMemory(capacity=7)
        self.episodic_memory = EpisodicMemory()
        self.semantic_memory = SemanticMemory()
        self.memory_router = MemoryRouter()
    
    def store(self, memory: MemoryItem, memory_type: str = "auto"):
        memory_type = self.memory_router.route(memory, memory_type)
        
        if memory_type == "working":
            self.working_memory.add(memory)
        elif memory_type == "episodic":
            self.episodic_memory.add(memory)
        elif memory_type == "semantic":
            self.semantic_memory.add(memory)
        
        return memory_type
    
    def retrieve(self, query: str, memory_type: str = "all") -> List[MemoryItem]:
        results = []
        
        if memory_type in ["all", "working"]:
            results.extend(self.working_memory.search(query))
        
        if memory_type in ["all", "episodic"]:
            results.extend(self.episodic_memory.search(query))
        
        if memory_type in ["all", "semantic"]:
            results.extend(self.semantic_memory.search(query))
        
        return self._rank_results(results, query)
    
    def _rank_results(self, results: List[MemoryItem], 
                     query: str) -> List[MemoryItem]:
        scored_results = []
        
        for result in results:
            score = self._calculate_relevance(result, query)
            scored_results.append((result, score))
        
        scored_results.sort(key=lambda x: x[1], reverse=True)
        
        return [result for result, score in scored_results]
    
    def _calculate_relevance(self, memory: MemoryItem, 
                           query: str) -> float:
        query_words = set(query.lower().split())
        content_words = set(memory.content.lower().split())
        
        intersection = query_words & content_words
        union = query_words | content_words
        
        return len(intersection) / len(union) if union else 0

class MemoryRouter:
    def route(self, memory: MemoryItem, 
              suggested_type: str) -> str:
        if suggested_type != "auto":
            return suggested_type
        
        if memory.importance > 0.8:
            return "semantic"
        elif memory.importance > 0.5:
            return "episodic"
        else:
            return "working"

向量存储

Embedding生成

python
from typing import List
import numpy as np

class EmbeddingGenerator:
    def __init__(self, model_name: str = "text-embedding-ada-002"):
        self.model_name = model_name
    
    def generate(self, text: str) -> np.ndarray:
        words = text.lower().split()
        
        word_vectors = []
        for word in words:
            vector = self._word_to_vector(word)
            word_vectors.append(vector)
        
        if word_vectors:
            return np.mean(word_vectors, axis=0)
        else:
            return np.zeros(1536)
    
    def _word_to_vector(self, word: str) -> np.ndarray:
        vector = np.zeros(1536)
        
        for i, char in enumerate(word[:1536]):
            vector[i] = ord(char) / 256.0
        
        return vector
    
    def generate_batch(self, texts: List[str]) -> np.ndarray:
        return np.array([self.generate(text) for text in texts])

向量数据库

python
from typing import List, Tuple
import numpy as np
from collections import defaultdict

class VectorDatabase:
    def __init__(self, dimension: int = 1536):
        self.dimension = dimension
        self.vectors = []
        self.metadata = []
        self.index = defaultdict(list)
    
    def add(self, vector: np.ndarray, metadata: Dict):
        if len(vector) != self.dimension:
            raise ValueError(f"Vector dimension mismatch: expected {self.dimension}, got {len(vector)}")
        
        vector_id = len(self.vectors)
        self.vectors.append(vector)
        self.metadata.append(metadata)
        
        for key, value in metadata.items():
            self.index[key].append((vector_id, value))
        
        return vector_id
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 5) -> List[Tuple[int, float, Dict]]:
        if not self.vectors:
            return []
        
        similarities = []
        
        for i, vector in enumerate(self.vectors):
            similarity = self._cosine_similarity(query_vector, vector)
            similarities.append((i, similarity, self.metadata[i]))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return similarities[:top_k]
    
    def _cosine_similarity(self, vec1: np.ndarray, 
                         vec2: np.ndarray) -> float:
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0
        
        return dot_product / (norm1 * norm2)
    
    def delete(self, vector_id: int):
        if 0 <= vector_id < len(self.vectors):
            del self.vectors[vector_id]
            del self.metadata[vector_id]
            
            for key in self.index:
                self.index[key] = [
                    (vid, value) for vid, value in self.index[key]
                    if vid != vector_id
                ]
    
    def get_by_metadata(self, key: str, value: str) -> List[Tuple[int, Dict]]:
        return [
            (vector_id, self.metadata[vector_id])
            for vector_id, val in self.index.get(key, [])
            if val == value
        ]

向量索引

python
from typing import List, Dict
import numpy as np

class VectorIndex:
    def __init__(self, dimension: int, n_trees: int = 10):
        self.dimension = dimension
        self.n_trees = n_trees
        self.trees = []
        self._build_trees()
    
    def _build_trees(self):
        for _ in range(self.n_trees):
            tree = self._build_random_projection_tree()
            self.trees.append(tree)
    
    def _build_random_projection_tree(self) -> Dict:
        projection_matrix = np.random.randn(self.dimension, self.dimension)
        return {
            "projection": projection_matrix,
            "buckets": defaultdict(list)
        }
    
    def add(self, vector_id: int, vector: np.ndarray):
        for tree in self.trees:
            projected = np.dot(vector, tree["projection"])
            bucket_id = self._get_bucket_id(projected)
            tree["buckets"][bucket_id].append((vector_id, vector))
    
    def _get_bucket_id(self, vector: np.ndarray) -> str:
        return hash(tuple(vector.astype(int))) % 1000
    
    def search(self, query_vector: np.ndarray, 
              top_k: int = 5) -> List[Tuple[int, float]]:
        candidates = set()
        
        for tree in self.trees:
            projected = np.dot(query_vector, tree["projection"])
            bucket_id = self._get_bucket_id(projected)
            
            for vector_id, vector in tree["buckets"].get(bucket_id, []):
                candidates.add(vector_id)
        
        results = []
        for vector_id in candidates:
            similarity = self._cosine_similarity(query_vector, 
                                             self.vectors[vector_id])
            results.append((vector_id, similarity))
        
        results.sort(key=lambda x: x[1], reverse=True)
        
        return results[:top_k]
    
    def _cosine_similarity(self, vec1: np.ndarray, 
                         vec2: np.ndarray) -> float:
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0
        
        return dot_product / (norm1 * norm2)

记忆检索

语义检索

python
class SemanticRetriever:
    def __init__(self, vector_db: VectorDatabase, 
                 embedding_generator: EmbeddingGenerator):
        self.vector_db = vector_db
        self.embedding_generator = embedding_generator
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        query_vector = self.embedding_generator.generate(query)
        results = self.vector_db.search(query_vector, top_k)
        
        return [
            {
                "id": vector_id,
                "similarity": similarity,
                "metadata": metadata
            }
            for vector_id, similarity, metadata in results
        ]

混合检索

python
class HybridRetriever:
    def __init__(self, semantic_retriever: SemanticRetriever,
                 keyword_retriever: KeywordRetriever,
                 alpha: float = 0.5):
        self.semantic_retriever = semantic_retriever
        self.keyword_retriever = keyword_retriever
        self.alpha = alpha
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        semantic_results = self.semantic_retriever.retrieve(query, top_k * 2)
        keyword_results = self.keyword_retriever.retrieve(query, top_k * 2)
        
        combined_scores = self._combine_results(
            semantic_results, 
            keyword_results
        )
        
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [
            {"id": item_id, "score": score}
            for item_id, score in sorted_results[:top_k]
        ]
    
    def _combine_results(self, semantic_results: List[Dict],
                       keyword_results: List[Dict]) -> Dict:
        combined = {}
        
        for i, result in enumerate(semantic_results):
            item_id = result["id"]
            semantic_score = (1 - self.alpha) * (1 - i / len(semantic_results))
            combined[item_id] = combined.get(item_id, 0) + semantic_score
        
        for i, result in enumerate(keyword_results):
            item_id = result["id"]
            keyword_score = self.alpha * (1 - i / len(keyword_results))
            combined[item_id] = combined.get(item_id, 0) + keyword_score
        
        return combined

上下文检索

python
class ContextualRetriever:
    def __init__(self, vector_db: VectorDatabase,
                 embedding_generator: EmbeddingGenerator):
        self.vector_db = vector_db
        self.embedding_generator = embedding_generator
        self.conversation_history = []
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        context_query = self._build_contextual_query(query)
        query_vector = self.embedding_generator.generate(context_query)
        
        results = self.vector_db.search(query_vector, top_k)
        
        return [
            {
                "id": vector_id,
                "similarity": similarity,
                "metadata": metadata
            }
            for vector_id, similarity, metadata in results
        ]
    
    def _build_contextual_query(self, query: str) -> str:
        if not self.conversation_history:
            return query
        
        recent_context = self.conversation_history[-3:]
        context_text = " ".join([
            turn["user"] + " " + turn["assistant"]
            for turn in recent_context
        ])
        
        return f"Context: {context_text}\nQuery: {query}"
    
    def add_to_history(self, user_input: str, assistant_response: str):
        self.conversation_history.append({
            "user": user_input,
            "assistant": assistant_response
        })
        
        if len(self.conversation_history) > 10:
            self.conversation_history = self.conversation_history[-10:]

记忆压缩

重要性评估

python
class ImportanceEvaluator:
    def __init__(self):
        self.recency_weight = 0.3
        self.frequency_weight = 0.3
        self.uniqueness_weight = 0.4
    
    def evaluate(self, memory: MemoryItem, 
               all_memories: List[MemoryItem]) -> float:
        recency_score = self._recency_score(memory)
        frequency_score = self._frequency_score(memory, all_memories)
        uniqueness_score = self._uniqueness_score(memory, all_memories)
        
        importance = (
            self.recency_weight * recency_score +
            self.frequency_weight * frequency_score +
            self.uniqueness_weight * uniqueness_score
        )
        
        return importance
    
    def _recency_score(self, memory: MemoryItem) -> float:
        if not memory.timestamp:
            return 0.5
        
        age = (datetime.now() - memory.timestamp).days
        return max(0, 1 - age / 365)
    
    def _frequency_score(self, memory: MemoryItem,
                       all_memories: List[MemoryItem]) -> float:
        similar_memories = [
            m for m in all_memories
            if self._are_similar(memory, m)
        ]
        
        return min(1.0, len(similar_memories) / 10)
    
    def _uniqueness_score(self, memory: MemoryItem,
                         all_memories: List[MemoryItem]) -> float:
        similar_memories = [
            m for m in all_memories
            if self._are_similar(memory, m)
        ]
        
        return 1.0 / (len(similar_memories) + 1)
    
    def _are_similar(self, memory1: MemoryItem, 
                    memory2: MemoryItem) -> bool:
        return memory1.content.lower() in memory2.content.lower() or \
               memory2.content.lower() in memory1.content.lower()

记忆摘要

python
class MemorySummarizer:
    def __init__(self, llm):
        self.llm = llm
    
    def summarize(self, memories: List[MemoryItem]) -> MemoryItem:
        if not memories:
            return None
        
        if len(memories) == 1:
            return memories[0]
        
        prompt = f"""
        Summarize the following memories into a single memory:
        
        {self._format_memories(memories)}
        
        The summary should:
        1. Capture the key information
        2. Be concise
        3. Preserve important details
        """
        
        response = self.llm.invoke(prompt)
        
        return MemoryItem(
            content=response,
            timestamp=datetime.now(),
            importance=self._calculate_summary_importance(memories)
        )
    
    def _format_memories(self, memories: List[MemoryItem]) -> str:
        return "\n".join([
            f"- {memory.content}"
            for memory in memories
        ])
    
    def _calculate_summary_importance(self, 
                                    memories: List[MemoryItem]) -> float:
        return sum(m.importance for m in memories) / len(memories)

记忆更新策略

增量更新

python
class IncrementalUpdater:
    def __init__(self, vector_db: VectorDatabase,
                 embedding_generator: EmbeddingGenerator):
        self.vector_db = vector_db
        self.embedding_generator = embedding_generator
    
    def update(self, memory_id: int, new_content: str):
        old_vector = self.vector_db.vectors[memory_id]
        old_metadata = self.vector_db.metadata[memory_id]
        
        new_vector = self.embedding_generator.generate(new_content)
        
        updated_vector = self._merge_vectors(old_vector, new_vector)
        
        self.vector_db.vectors[memory_id] = updated_vector
        self.vector_db.metadata[memory_id]["content"] = new_content
        self.vector_db.metadata[memory_id]["updated_at"] = datetime.now()
    
    def _merge_vectors(self, old_vector: np.ndarray,
                     new_vector: np.ndarray) -> np.ndarray:
        alpha = 0.7
        return alpha * old_vector + (1 - alpha) * new_vector

周期性压缩

python
class PeriodicCompressor:
    def __init__(self, memory_manager: MemoryManager,
                 compression_interval: int = 100):
        self.memory_manager = memory_manager
        self.compression_interval = compression_interval
        self.update_count = 0
        self.evaluator = ImportanceEvaluator()
        self.summarizer = MemorySummarizer(llm)
    
    def update(self, memory: MemoryItem):
        self.memory_manager.store(memory)
        self.update_count += 1
        
        if self.update_count >= self.compression_interval:
            self._compress_memory()
            self.update_count = 0
    
    def _compress_memory(self):
        all_memories = self._get_all_memories()
        
        low_importance = [
            m for m in all_memories
            if m.importance < 0.3
        ]
        
        if len(low_importance) > 10:
            summary = self.summarizer.summarize(low_importance)
            
            for memory in low_importance:
                self.memory_manager.episodic_memory.delete(memory.id)
            
            self.memory_manager.store(summary, "episodic")

实践练习

练习1:实现简单的记忆系统

python
class SimpleMemorySystem:
    def __init__(self):
        self.vector_db = VectorDatabase()
        self.embedding_generator = EmbeddingGenerator()
        self.retriever = SemanticRetriever(
            self.vector_db,
            self.embedding_generator
        )
    
    def store(self, content: str, metadata: Dict):
        vector = self.embedding_generator.generate(content)
        self.vector_db.add(vector, metadata)
    
    def retrieve(self, query: str, top_k: int = 5):
        return self.retriever.retrieve(query, top_k)

练习2:实现带上下文的记忆检索

python
class ContextualMemorySystem:
    def __init__(self):
        self.vector_db = VectorDatabase()
        self.embedding_generator = EmbeddingGenerator()
        self.retriever = ContextualRetriever(
            self.vector_db,
            self.embedding_generator
        )
    
    def store(self, content: str, metadata: Dict):
        vector = self.embedding_generator.generate(content)
        self.vector_db.add(vector, metadata)
    
    def retrieve(self, query: str, top_k: int = 5):
        return self.retriever.retrieve(query, top_k)
    
    def add_conversation(self, user_input: str, assistant_response: str):
        self.retriever.add_to_history(user_input, assistant_response)

总结

本节我们学习了Agent记忆系统:

  1. 记忆系统架构(三层记忆模型)
  2. 向量存储技术(Embedding、向量数据库、向量索引)
  3. 记忆检索方法(语义检索、混合检索、上下文检索)
  4. 记忆压缩技术(重要性评估、记忆摘要)
  5. 记忆更新策略(增量更新、周期性压缩)

记忆系统是Agent能够积累经验、持续学习的关键组件。

参考资源