Master the fundamental design patterns that power production AI agent systems. These battle-tested patterns solve common challenges in building scalable, maintainable, and reliable AI agents.
Each agent focuses on one specific task or domain
Break complex reasoning into sequential steps
Decompose goals into manageable sub-tasks
Build agent pipelines where each agent handles specific aspects of a request:
Validate and sanitize user inputs before processing
Determine user intent and route to appropriate handlers
Process the request based on classified intent
Format and personalize the response
Review output for accuracy and compliance
# Chain of Responsibility Pattern Implementation from abc import ABC, abstractmethod from typing import Optional, Any, Dict import asyncio class AgentHandler(ABC): """Abstract base class for agent handlers in the chain""" def __init__(self): self.next_handler: Optional[AgentHandler] = None def set_next(self, handler: 'AgentHandler') -> 'AgentHandler': self.next_handler = handler return handler @abstractmethod async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: """Process the request or pass to next handler""" pass async def handle_next(self, request: Dict[str, Any]) -> Dict[str, Any]: """Pass request to next handler if available""" if self.next_handler: return await self.next_handler.handle(request) return request class ValidationAgent(AgentHandler): """Validates and sanitizes input data""" async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: print("🔍 ValidationAgent: Validating input...") # Validate required fields required_fields = ['user_id', 'message', 'context'] for field in required_fields: if field not in request: raise ValueError(f"Missing required field: {field}") # Sanitize input request['message'] = self.sanitize_input(request['message']) request['validated'] = True return await self.handle_next(request) def sanitize_input(self, text: str) -> str: # Remove potentially harmful content return text.strip()[:1000] # Limit length class IntentClassificationAgent(AgentHandler): """Classifies user intent using NLP""" async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: print("🎯 IntentClassificationAgent: Classifying intent...") message = request['message'].lower() # Simple intent classification (replace with ML model) if 'help' in message or 'support' in message: request['intent'] = 'support' elif 'buy' in message or 'purchase' in message: request['intent'] = 'purchase' elif 'status' in message or 'track' in message: request['intent'] = 'tracking' else: request['intent'] = 'general' request['confidence'] = 0.95 # Confidence score return await self.handle_next(request) class TaskExecutionAgent(AgentHandler): """Executes task based on classified intent""" async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: print(f"⚡ TaskExecutionAgent: Executing {request.get('intent')} task...") intent = request.get('intent', 'general') # Route to appropriate task handler if intent == 'support': request['result'] = await self.handle_support(request) elif intent == 'purchase': request['result'] = await self.handle_purchase(request) elif intent == 'tracking': request['result'] = await self.handle_tracking(request) else: request['result'] = await self.handle_general(request) return await self.handle_next(request) async def handle_support(self, request: Dict) -> str: return "I'll connect you with our support team right away." async def handle_purchase(self, request: Dict) -> str: return "Let me help you with your purchase." async def handle_tracking(self, request: Dict) -> str: return "I'll check your order status for you." async def handle_general(self, request: Dict) -> str: return "How can I assist you today?" class ResponseGenerationAgent(AgentHandler): """Generates formatted response""" async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: print("✨ ResponseGenerationAgent: Generating response...") # Format response with context response = { 'user_id': request['user_id'], 'message': request.get('result', 'Processing your request...'), 'intent': request.get('intent'), 'confidence': request.get('confidence'), 'timestamp': asyncio.get_event_loop().time() } request['response'] = response return await self.handle_next(request) class QualityAssuranceAgent(AgentHandler): """Reviews output for quality and compliance""" async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]: print("✅ QualityAssuranceAgent: Reviewing output...") response = request.get('response', {}) # Check for prohibited content if self.contains_prohibited_content(response.get('message', '')): response['message'] = "I apologize, but I cannot process this request." response['flagged'] = True # Add quality metrics response['quality_score'] = self.calculate_quality_score(response) return request def contains_prohibited_content(self, text: str) -> bool: # Check for prohibited patterns prohibited = ['password', 'credit card', 'ssn'] return any(word in text.lower() for word in prohibited) def calculate_quality_score(self, response: Dict) -> float: # Simple quality scoring score = 1.0 if response.get('flagged'): score *= 0.5 if response.get('confidence', 0) < 0.8: score *= 0.8 return score # Usage Example async def process_request(): # Create the chain validation = ValidationAgent() classification = IntentClassificationAgent() execution = TaskExecutionAgent() generation = ResponseGenerationAgent() quality = QualityAssuranceAgent() # Link the chain validation.set_next(classification).set_next(execution).set_next(generation).set_next(quality) # Process request request = { 'user_id': 'user123', 'message': 'I need help tracking my order', 'context': {'session_id': 'abc123'} } result = await validation.handle(request) print(f"\n📤 Final Response: {result['response']}") return result # Run the chain if __name__ == "__main__": asyncio.run(process_request())
Coordinate multiple specialized agents working together on complex tasks:
Assign specific capabilities to each agent
Build central coordinator to manage agents
Set up message passing between agents
Manage task dependencies and sequencing
Combine outputs from multiple agents
# Multi-Agent Orchestration Pattern from typing import List, Dict, Any, Optional from dataclasses import dataclass from enum import Enum import asyncio from concurrent.futures import ThreadPoolExecutor class AgentRole(Enum): RESEARCHER = "researcher" ANALYZER = "analyzer" WRITER = "writer" REVIEWER = "reviewer" COORDINATOR = "coordinator" @dataclass class AgentMessage: sender: str recipient: str content: Any message_type: str priority: int = 0 class Agent: """Base agent class with communication capabilities""" def __init__(self, name: str, role: AgentRole): self.name = name self.role = role self.inbox: asyncio.Queue = asyncio.Queue() self.knowledge_base: Dict[str, Any] = {} self.running = False async def send_message(self, recipient: 'Agent', content: Any, message_type: str = "task") -> None: """Send message to another agent""" message = AgentMessage( sender=self.name, recipient=recipient.name, content=content, message_type=message_type ) await recipient.inbox.put(message) async def receive_message(self) -> Optional[AgentMessage]: """Receive message from inbox""" try: return await asyncio.wait_for(self.inbox.get(), timeout=1.0) except asyncio.TimeoutError: return None async def process(self) -> Any: """Process agent tasks (override in subclasses)""" raise NotImplementedError async def run(self) -> None: """Main agent loop""" self.running = True while self.running: message = await self.receive_message() if message: await self.handle_message(message) await asyncio.sleep(0.1) async def handle_message(self, message: AgentMessage) -> None: """Handle incoming messages""" print(f"📨 {self.name} received: {message.message_type} from {message.sender}") if message.message_type == "task": result = await self.process_task(message.content) await self.send_result(message.sender, result) elif message.message_type == "stop": self.running = False async def process_task(self, task: Dict[str, Any]) -> Any: """Process specific task (override in subclasses)""" return await self.process() async def send_result(self, recipient_name: str, result: Any) -> None: """Send result back to sender""" # This would normally look up the agent by name print(f"📤 {self.name} sending result to {recipient_name}") class ResearchAgent(Agent): """Agent specialized in research tasks""" def __init__(self, name: str): super().__init__(name, AgentRole.RESEARCHER) async def process(self) -> Dict[str, Any]: """Perform research on given topic""" await asyncio.sleep(1) # Simulate research time return { 'findings': [ 'Key insight 1: Market trends indicate growth', 'Key insight 2: Customer satisfaction is high', 'Key insight 3: Competition is increasing' ], 'sources': ['report1.pdf', 'survey2.xlsx', 'analysis3.doc'] } class AnalyzerAgent(Agent): """Agent specialized in data analysis""" def __init__(self, name: str): super().__init__(name, AgentRole.ANALYZER) async def process(self) -> Dict[str, Any]: """Analyze provided data""" await asyncio.sleep(1.5) # Simulate analysis time return { 'metrics': { 'growth_rate': 0.15, 'market_share': 0.23, 'customer_retention': 0.87 }, 'trends': ['upward', 'stable', 'improving'], 'recommendations': [ 'Invest in product development', 'Expand marketing efforts', 'Improve customer support' ] } class WriterAgent(Agent): """Agent specialized in content generation""" def __init__(self, name: str): super().__init__(name, AgentRole.WRITER) async def process(self) -> str: """Generate written content""" await asyncio.sleep(2) # Simulate writing time return """ Executive Summary Report Based on comprehensive research and analysis, our findings indicate: 1. Market Position: Strong growth trajectory with 15% YoY increase 2. Customer Metrics: 87% retention rate exceeds industry average 3. Strategic Recommendations: Focus on product innovation and market expansion Detailed analysis supports immediate action on identified opportunities. """ class Orchestrator: """Coordinates multiple agents to complete complex tasks""" def __init__(self): self.agents: Dict[str, Agent] = {} self.task_queue: asyncio.Queue = asyncio.Queue() self.results: Dict[str, Any] = {} def register_agent(self, agent: Agent) -> None: """Register an agent with the orchestrator""" self.agents[agent.name] = agent print(f"✅ Registered agent: {agent.name} ({agent.role.value})") async def execute_workflow(self, workflow: List[Dict[str, Any]]) -> Dict[str, Any]: """Execute a workflow across multiple agents""" print("\n🚀 Starting workflow execution...") for step in workflow: agent_name = step['agent'] task = step['task'] dependencies = step.get('dependencies', []) # Wait for dependencies for dep in dependencies: while dep not in self.results: await asyncio.sleep(0.1) # Execute task agent = self.agents.get(agent_name) if agent: print(f"\n⚡ Executing: {task['name']} with {agent_name}") # Add dependency results to task task['inputs'] = {dep: self.results[dep] for dep in dependencies} # Process task result = await agent.process() self.results[task['name']] = result print(f"✅ Completed: {task['name']}") return self.results async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]: """Execute multiple tasks in parallel""" print("\n🔄 Starting parallel execution...") async def execute_task(task: Dict[str, Any]) -> tuple: agent = self.agents.get(task['agent']) if agent: result = await agent.process() return (task['name'], result) return (task['name'], None) # Execute all tasks in parallel results = await asyncio.gather(*[execute_task(task) for task in tasks]) # Convert to dictionary return dict(results) # Usage Example async def main(): # Create orchestrator orchestrator = Orchestrator() # Create and register agents research_agent = ResearchAgent("research_bot") analyzer_agent = AnalyzerAgent("analyzer_bot") writer_agent = WriterAgent("writer_bot") orchestrator.register_agent(research_agent) orchestrator.register_agent(analyzer_agent) orchestrator.register_agent(writer_agent) # Define workflow workflow = [ { 'agent': 'research_bot', 'task': {'name': 'market_research', 'type': 'research'}, 'dependencies': [] }, { 'agent': 'analyzer_bot', 'task': {'name': 'data_analysis', 'type': 'analysis'}, 'dependencies': ['market_research'] }, { 'agent': 'writer_bot', 'task': {'name': 'report_generation', 'type': 'writing'}, 'dependencies': ['market_research', 'data_analysis'] } ] # Execute workflow results = await orchestrator.execute_workflow(workflow) print("\n📊 Workflow Results:") for task_name, result in results.items(): print(f"\n{task_name}:") if isinstance(result, dict): for key, value in result.items(): print(f" {key}: {value}") else: print(f" {result[:100]}...") # First 100 chars # Example of parallel execution parallel_tasks = [ {'agent': 'research_bot', 'name': 'research_task_1'}, {'agent': 'analyzer_bot', 'name': 'analysis_task_1'}, {'agent': 'writer_bot', 'name': 'writing_task_1'} ] parallel_results = await orchestrator.parallel_execution(parallel_tasks) print("\n⚡ Parallel Execution Results:") for task_name in parallel_results: print(f" ✅ {task_name} completed") if __name__ == "__main__": asyncio.run(main())
Combine reasoning and action in an iterative loop for complex problem-solving:
Gather current state and available information
Analyze situation and plan next action
Execute chosen action in environment
Monitor outcome of action
Continue loop or finish if goal achieved
Essential patterns for production AI systems:
Pattern | Use Case | Complexity | Scalability |
---|---|---|---|
Chain of Responsibility | Sequential processing pipeline | Low | High |
Multi-Agent Orchestration | Complex collaborative tasks | High | Very High |
ReAct Pattern | Interactive problem solving | Medium | Medium |
Hierarchical Planning | Goal decomposition | Medium | High |
Blackboard Pattern | Shared knowledge systems | High | Medium |