Memory systems are crucial for creating agents that can maintain context, learn from interactions, and build knowledge over time. This guide covers all aspects of implementing robust memory systems for AI agents.
Current conversation context (last 5-10 messages)
Recent sessions and interactions (hours to days)
Persistent knowledge and experiences (permanent)
Vector databases, knowledge bases, documents
Working memory holds the current conversation context and immediate task information. It's the agent's "RAM" - fast access but limited capacity.
# Working Memory Implementation class WorkingMemory: def __init__(self, max_tokens=4000, max_messages=10): self.max_tokens = max_tokens self.max_messages = max_messages self.messages = [] self.token_count = 0 def add_message(self, role, content): """Add a message to working memory with token management""" message = {"role": role, "content": content} tokens = self.count_tokens(content) # Add message self.messages.append(message) self.token_count += tokens # Manage capacity while self.token_count > self.max_tokens or len(self.messages) > self.max_messages: if self.messages: removed = self.messages.pop(0) self.token_count -= self.count_tokens(removed["content"]) def get_context(self): """Get current context for LLM""" return self.messages def summarize_if_needed(self): """Create summary when approaching limits""" if self.token_count > self.max_tokens * 0.8: summary = self.create_summary(self.messages[:-5]) # Replace old messages with summary self.messages = [ {"role": "system", "content": f"Previous context: {summary}"} ] + self.messages[-5:] self.recalculate_tokens() def count_tokens(self, text): """Estimate token count (use tiktoken in production)""" return len(text) // 4 # Rough approximation # Usage Example memory = WorkingMemory() memory.add_message("user", "Tell me about quantum computing") memory.add_message("assistant", "Quantum computing uses quantum mechanics...") context = memory.get_context()
Short-term memory bridges working memory and long-term storage, maintaining recent interactions and temporary task state.
# Short-term Memory with Redis import redis import json from datetime import datetime, timedelta class ShortTermMemory: def __init__(self, redis_client=None, ttl_hours=24): self.redis = redis_client or redis.Redis( host='localhost', port=6379, decode_responses=True ) self.ttl = ttl_hours * 3600 # Convert to seconds def store_interaction(self, session_id, interaction): """Store an interaction with TTL""" key = f"session:{session_id}:interactions" interaction_data = { "timestamp": datetime.now().isoformat(), "user_input": interaction["user"], "agent_response": interaction["agent"], "metadata": interaction.get("metadata", {}) } # Store in Redis list with expiration self.redis.lpush(key, json.dumps(interaction_data)) self.redis.expire(key, self.ttl) # Update session metadata self.update_session_metadata(session_id) def get_recent_interactions(self, session_id, limit=10): """Retrieve recent interactions for a session""" key = f"session:{session_id}:interactions" interactions = self.redis.lrange(key, 0, limit - 1) return [json.loads(i) for i in interactions] def search_interactions(self, query, session_id=None): """Search through recent interactions""" pattern = f"session:{session_id or '*'}:interactions" results = [] for key in self.redis.scan_iter(match=pattern): interactions = self.redis.lrange(key, 0, -1) for interaction in interactions: data = json.loads(interaction) if query.lower() in data["user_input"].lower() or \ query.lower() in data["agent_response"].lower(): results.append(data) return results def get_session_summary(self, session_id): """Generate summary of session interactions""" interactions = self.get_recent_interactions(session_id, limit=50) if not interactions: return None summary = { "session_id": session_id, "interaction_count": len(interactions), "start_time": interactions[-1]["timestamp"], "last_interaction": interactions[0]["timestamp"], "topics": self.extract_topics(interactions), "key_points": self.extract_key_points(interactions) } return summary def extract_topics(self, interactions): """Extract main topics from interactions""" # In production, use NLP/LLM for topic extraction # This is simplified all_text = " ".join([ i["user_input"] + " " + i["agent_response"] for i in interactions ]) # Implement topic extraction logic return ["topic1", "topic2"] # Placeholder # Buffer Memory Pattern class BufferMemory: """Fixed-size buffer for recent memories""" def __init__(self, buffer_size=100): self.buffer = [] self.buffer_size = buffer_size def add(self, memory): self.buffer.append(memory) if len(self.buffer) > self.buffer_size: self.buffer.pop(0) def get_all(self): return list(self.buffer) def clear(self): self.buffer.clear()
Long-term memory provides persistent storage for important information, learned patterns, and accumulated knowledge that should survive across sessions.
# Long-term Memory with SQLite import sqlite3 import json from datetime import datetime class LongTermMemory: def __init__(self, db_path="agent_memory.db"): self.conn = sqlite3.connect(db_path) self.setup_database() def setup_database(self): """Create memory tables""" cursor = self.conn.cursor() # Episodic memory - specific events cursor.execute(""" CREATE TABLE IF NOT EXISTS episodic_memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, event_type TEXT, content TEXT, importance REAL, embeddings BLOB, metadata TEXT ) """) # Semantic memory - facts and concepts cursor.execute(""" CREATE TABLE IF NOT EXISTS semantic_memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, concept TEXT UNIQUE, definition TEXT, relationships TEXT, confidence REAL, last_accessed TEXT, access_count INTEGER DEFAULT 0 ) """) # Procedural memory - how to do things cursor.execute(""" CREATE TABLE IF NOT EXISTS procedural_memory ( id INTEGER PRIMARY KEY AUTOINCREMENT, task TEXT, steps TEXT, success_rate REAL, last_used TEXT, use_count INTEGER DEFAULT 0 ) """) self.conn.commit() def store_episode(self, event_type, content, importance=0.5, metadata=None): """Store an episodic memory""" cursor = self.conn.cursor() cursor.execute(""" INSERT INTO episodic_memory (timestamp, event_type, content, importance, metadata) VALUES (?, ?, ?, ?, ?) """, ( datetime.now().isoformat(), event_type, content, importance, json.dumps(metadata or {}) )) self.conn.commit() return cursor.lastrowid def learn_concept(self, concept, definition, relationships=None): """Store or update semantic knowledge""" cursor = self.conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO semantic_memory (concept, definition, relationships, confidence, last_accessed) VALUES (?, ?, ?, ?, ?) """, ( concept, definition, json.dumps(relationships or {}), 0.5, # Initial confidence datetime.now().isoformat() )) self.conn.commit() def recall_episodes(self, query=None, event_type=None, limit=10): """Retrieve episodic memories""" cursor = self.conn.cursor() if event_type: cursor.execute(""" SELECT * FROM episodic_memory WHERE event_type = ? ORDER BY importance DESC, timestamp DESC LIMIT ? """, (event_type, limit)) elif query: cursor.execute(""" SELECT * FROM episodic_memory WHERE content LIKE ? ORDER BY importance DESC, timestamp DESC LIMIT ? """, (f"%{query}%", limit)) else: cursor.execute(""" SELECT * FROM episodic_memory ORDER BY timestamp DESC LIMIT ? """, (limit,)) return cursor.fetchall() def consolidate_memories(self, threshold_days=7): """Consolidate and compress old memories""" cursor = self.conn.cursor() # Find old, low-importance memories cutoff_date = (datetime.now() - timedelta(days=threshold_days)).isoformat() cursor.execute(""" SELECT event_type, COUNT(*) as count, AVG(importance) as avg_importance FROM episodic_memory WHERE timestamp < ? AND importance < 0.5 GROUP BY event_type """, (cutoff_date,)) consolidation_targets = cursor.fetchall() for event_type, count, avg_importance in consolidation_targets: if count > 10: # Consolidate if many similar memories # Create summary memory summary = f"Consolidated {count} {event_type} events" self.store_episode( f"{event_type}_consolidated", summary, importance=avg_importance ) # Delete individual memories cursor.execute(""" DELETE FROM episodic_memory WHERE event_type = ? AND timestamp < ? AND importance < 0.5 """, (event_type, cutoff_date)) self.conn.commit() # Memory with Importance Scoring class ImportanceWeightedMemory: def __init__(self): self.memories = [] def add(self, content, importance=None): """Add memory with importance score""" if importance is None: importance = self.calculate_importance(content) self.memories.append({ "content": content, "importance": importance, "timestamp": datetime.now(), "access_count": 0 }) # Keep only most important memories if exceeding limit if len(self.memories) > 1000: self.memories.sort(key=lambda x: x["importance"], reverse=True) self.memories = self.memories[:800] def calculate_importance(self, content): """Calculate importance based on various factors""" importance = 0.5 # Base importance # Adjust based on content characteristics if any(word in content.lower() for word in ["important", "remember", "critical"]): importance += 0.2 if len(content) > 500: # Longer content might be more important importance += 0.1 return min(importance, 1.0)
Vector databases enable semantic search through memories using embeddings, allowing agents to find relevant information based on meaning rather than exact matches.
# Vector Memory with ChromaDB import chromadb from chromadb.utils import embedding_functions import uuid class VectorMemory: def __init__(self, collection_name="agent_memory"): self.client = chromadb.PersistentClient(path="./chroma_db") # Use OpenAI embeddings self.embedding_function = embedding_functions.OpenAIEmbeddingFunction( api_key="your-api-key", model_name="text-embedding-ada-002" ) self.collection = self.client.get_or_create_collection( name=collection_name, embedding_function=self.embedding_function ) def store(self, content, metadata=None): """Store content with embeddings""" doc_id = str(uuid.uuid4()) self.collection.add( documents=[content], metadatas=[metadata or {}], ids=[doc_id] ) return doc_id def search(self, query, n_results=5, filter_metadata=None): """Semantic search through memories""" results = self.collection.query( query_texts=[query], n_results=n_results, where=filter_metadata # Optional metadata filtering ) return { "documents": results["documents"][0], "metadatas": results["metadatas"][0], "distances": results["distances"][0], "ids": results["ids"][0] } def update(self, doc_id, new_content=None, new_metadata=None): """Update existing memory""" if new_content: self.collection.update( ids=[doc_id], documents=[new_content] ) if new_metadata: self.collection.update( ids=[doc_id], metadatas=[new_metadata] ) def delete(self, doc_ids): """Remove memories""" self.collection.delete(ids=doc_ids) def get_similar_memories(self, doc_id, n_results=5): """Find memories similar to a given memory""" # Get the document result = self.collection.get(ids=[doc_id], include=["documents"]) if result["documents"]: document = result["documents"][0] return self.search(document, n_results=n_results + 1)[1:] # Exclude self return None # Pinecone Implementation import pinecone class PineconeMemory: def __init__(self, index_name="agent-memory"): pinecone.init( api_key="your-api-key", environment="your-environment" ) # Create index if doesn't exist if index_name not in pinecone.list_indexes(): pinecone.create_index( index_name, dimension=1536, # OpenAI embedding dimension metric="cosine" ) self.index = pinecone.Index(index_name) def upsert_memory(self, content, embeddings, metadata=None): """Store memory with vector embeddings""" vector_id = str(uuid.uuid4()) self.index.upsert([ (vector_id, embeddings, { "content": content, **(metadata or {}) }) ]) return vector_id def search_memories(self, query_embedding, top_k=5, filter_dict=None): """Search for similar memories""" results = self.index.query( query_embedding, top_k=top_k, include_metadata=True, filter=filter_dict ) return results.matches
Vector Database | Best For | Key Features | Pricing |
---|---|---|---|
Pinecone | Production, Scale | Managed, Fast, Reliable | Free tier + Usage-based |
Weaviate | Hybrid search | GraphQL, Multiple vectors | Open source + Cloud |
ChromaDB | Development | Simple, Local-first | Open source |
Qdrant | On-premise | Rich filtering, Rust-based | Open source + Cloud |
Milvus | Large scale | Distributed, GPU support | Open source |
class HierarchicalMemory: """Multi-level memory system with different retention policies""" def __init__(self): self.working = WorkingMemory(max_messages=10) self.short_term = ShortTermMemory(ttl_hours=24) self.long_term = LongTermMemory() self.vector_store = VectorMemory() def process_interaction(self, user_input, agent_response): # Add to working memory self.working.add_message("user", user_input) self.working.add_message("assistant", agent_response) # Store in short-term with session context self.short_term.store_interaction( session_id=self.current_session, interaction={"user": user_input, "agent": agent_response} ) # Evaluate for long-term storage importance = self.evaluate_importance(user_input, agent_response) if importance > 0.7: self.long_term.store_episode( "interaction", f"User: {user_input}\nAgent: {agent_response}", importance=importance ) # Also store in vector memory for semantic search self.vector_store.store( f"{user_input} {agent_response}", metadata={"type": "interaction", "importance": importance} ) def retrieve_context(self, query): """Multi-level context retrieval""" context = { "immediate": self.working.get_context(), "recent": self.short_term.get_recent_interactions(self.current_session), "relevant": self.vector_store.search(query, n_results=3), "historical": self.long_term.recall_episodes(query, limit=2) } return context
class MemoryConsolidator: """Consolidate and compress memories over time""" def consolidate_daily(self): """Run daily consolidation""" # Summarize conversations sessions = self.get_yesterday_sessions() for session in sessions: summary = self.summarize_session(session) self.store_summary(summary) # Extract and store key learnings learnings = self.extract_learnings(sessions) for learning in learnings: self.long_term.learn_concept( learning["concept"], learning["definition"] ) # Clean up redundant memories self.remove_duplicates() self.compress_similar_memories() def summarize_session(self, session): """Create concise summary of session""" prompt = f""" Summarize this conversation: {session['interactions']} Include: 1. Main topics discussed 2. Key decisions or outcomes 3. Important facts learned """ summary = self.llm.generate(prompt) return summary
Stores specific events and experiences with temporal context.
Stores facts, concepts, and general knowledge.
Stores how to perform tasks and procedures.
class SmartRetrieval: """Intelligent memory retrieval with multiple strategies""" def retrieve(self, query, strategy="hybrid"): if strategy == "recency": return self.get_recent_memories(limit=10) elif strategy == "relevance": return self.vector_search(query, top_k=10) elif strategy == "importance": return self.get_important_memories(threshold=0.7) elif strategy == "hybrid": # Combine multiple strategies recent = self.get_recent_memories(limit=5) relevant = self.vector_search(query, top_k=5) important = self.get_important_memories(threshold=0.8, limit=3) # Merge and deduplicate combined = self.merge_results(recent, relevant, important) # Re-rank based on combined score return self.rerank(combined, query) def rerank(self, memories, query): """Re-rank memories based on multiple factors""" for memory in memories: score = 0 score += memory.get("relevance_score", 0) * 0.4 score += memory.get("recency_score", 0) * 0.2 score += memory.get("importance", 0) * 0.3 score += memory.get("access_frequency", 0) * 0.1 memory["combined_score"] = score return sorted(memories, key=lambda x: x["combined_score"], reverse=True)