Essential Agent Patterns

Master the fundamental design patterns that power production AI agent systems. These battle-tested patterns solve common challenges in building scalable, maintainable, and reliable AI agents.

🎯 Single Responsibility

Each agent focuses on one specific task or domain

🔄 Chain of Thought

Break complex reasoning into sequential steps

🌳 Hierarchical Planning

Decompose goals into manageable sub-tasks

🔗
Chain of Responsibility Pattern

Build agent pipelines where each agent handles specific aspects of a request:

1
Input Validation Agent

Validate and sanitize user inputs before processing

2
Intent Classification Agent

Determine user intent and route to appropriate handlers

3
Task Execution Agent

Process the request based on classified intent

4
Response Generation Agent

Format and personalize the response

5
Quality Assurance Agent

Review output for accuracy and compliance

# Chain of Responsibility Pattern Implementation
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict
import asyncio

class AgentHandler(ABC):
    """Abstract base class for agent handlers in the chain"""
    
    def __init__(self):
        self.next_handler: Optional[AgentHandler] = None
    
    def set_next(self, handler: 'AgentHandler') -> 'AgentHandler':
        self.next_handler = handler
        return handler
    
    @abstractmethod
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Process the request or pass to next handler"""
        pass
    
    async def handle_next(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Pass request to next handler if available"""
        if self.next_handler:
            return await self.next_handler.handle(request)
        return request

class ValidationAgent(AgentHandler):
    """Validates and sanitizes input data"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("🔍 ValidationAgent: Validating input...")
        
        # Validate required fields
        required_fields = ['user_id', 'message', 'context']
        for field in required_fields:
            if field not in request:
                raise ValueError(f"Missing required field: {field}")
        
        # Sanitize input
        request['message'] = self.sanitize_input(request['message'])
        request['validated'] = True
        
        return await self.handle_next(request)
    
    def sanitize_input(self, text: str) -> str:
        # Remove potentially harmful content
        return text.strip()[:1000]  # Limit length

class IntentClassificationAgent(AgentHandler):
    """Classifies user intent using NLP"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("🎯 IntentClassificationAgent: Classifying intent...")
        
        message = request['message'].lower()
        
        # Simple intent classification (replace with ML model)
        if 'help' in message or 'support' in message:
            request['intent'] = 'support'
        elif 'buy' in message or 'purchase' in message:
            request['intent'] = 'purchase'
        elif 'status' in message or 'track' in message:
            request['intent'] = 'tracking'
        else:
            request['intent'] = 'general'
        
        request['confidence'] = 0.95  # Confidence score
        
        return await self.handle_next(request)

class TaskExecutionAgent(AgentHandler):
    """Executes task based on classified intent"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print(f"⚡ TaskExecutionAgent: Executing {request.get('intent')} task...")
        
        intent = request.get('intent', 'general')
        
        # Route to appropriate task handler
        if intent == 'support':
            request['result'] = await self.handle_support(request)
        elif intent == 'purchase':
            request['result'] = await self.handle_purchase(request)
        elif intent == 'tracking':
            request['result'] = await self.handle_tracking(request)
        else:
            request['result'] = await self.handle_general(request)
        
        return await self.handle_next(request)
    
    async def handle_support(self, request: Dict) -> str:
        return "I'll connect you with our support team right away."
    
    async def handle_purchase(self, request: Dict) -> str:
        return "Let me help you with your purchase."
    
    async def handle_tracking(self, request: Dict) -> str:
        return "I'll check your order status for you."
    
    async def handle_general(self, request: Dict) -> str:
        return "How can I assist you today?"

class ResponseGenerationAgent(AgentHandler):
    """Generates formatted response"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("✨ ResponseGenerationAgent: Generating response...")
        
        # Format response with context
        response = {
            'user_id': request['user_id'],
            'message': request.get('result', 'Processing your request...'),
            'intent': request.get('intent'),
            'confidence': request.get('confidence'),
            'timestamp': asyncio.get_event_loop().time()
        }
        
        request['response'] = response
        
        return await self.handle_next(request)

class QualityAssuranceAgent(AgentHandler):
    """Reviews output for quality and compliance"""
    
    async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
        print("✅ QualityAssuranceAgent: Reviewing output...")
        
        response = request.get('response', {})
        
        # Check for prohibited content
        if self.contains_prohibited_content(response.get('message', '')):
            response['message'] = "I apologize, but I cannot process this request."
            response['flagged'] = True
        
        # Add quality metrics
        response['quality_score'] = self.calculate_quality_score(response)
        
        return request
    
    def contains_prohibited_content(self, text: str) -> bool:
        # Check for prohibited patterns
        prohibited = ['password', 'credit card', 'ssn']
        return any(word in text.lower() for word in prohibited)
    
    def calculate_quality_score(self, response: Dict) -> float:
        # Simple quality scoring
        score = 1.0
        if response.get('flagged'):
            score *= 0.5
        if response.get('confidence', 0) < 0.8:
            score *= 0.8
        return score

# Usage Example
async def process_request():
    # Create the chain
    validation = ValidationAgent()
    classification = IntentClassificationAgent()
    execution = TaskExecutionAgent()
    generation = ResponseGenerationAgent()
    quality = QualityAssuranceAgent()
    
    # Link the chain
    validation.set_next(classification).set_next(execution).set_next(generation).set_next(quality)
    
    # Process request
    request = {
        'user_id': 'user123',
        'message': 'I need help tracking my order',
        'context': {'session_id': 'abc123'}
    }
    
    result = await validation.handle(request)
    print(f"\n📤 Final Response: {result['response']}")
    
    return result

# Run the chain
if __name__ == "__main__":
    asyncio.run(process_request())
                
Important: Each agent in the chain should have a single, well-defined responsibility. This makes the system easier to test, maintain, and scale.
🌐
Multi-Agent Orchestration Pattern

Coordinate multiple specialized agents working together on complex tasks:

1
Define Agent Roles

Assign specific capabilities to each agent

2
Create Orchestrator

Build central coordinator to manage agents

3
Implement Communication

Set up message passing between agents

4
Handle Dependencies

Manage task dependencies and sequencing

5
Aggregate Results

Combine outputs from multiple agents

# Multi-Agent Orchestration Pattern
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AgentRole(Enum):
    RESEARCHER = "researcher"
    ANALYZER = "analyzer"
    WRITER = "writer"
    REVIEWER = "reviewer"
    COORDINATOR = "coordinator"

@dataclass
class AgentMessage:
    sender: str
    recipient: str
    content: Any
    message_type: str
    priority: int = 0

class Agent:
    """Base agent class with communication capabilities"""
    
    def __init__(self, name: str, role: AgentRole):
        self.name = name
        self.role = role
        self.inbox: asyncio.Queue = asyncio.Queue()
        self.knowledge_base: Dict[str, Any] = {}
        self.running = False
    
    async def send_message(self, recipient: 'Agent', content: Any, 
                          message_type: str = "task") -> None:
        """Send message to another agent"""
        message = AgentMessage(
            sender=self.name,
            recipient=recipient.name,
            content=content,
            message_type=message_type
        )
        await recipient.inbox.put(message)
    
    async def receive_message(self) -> Optional[AgentMessage]:
        """Receive message from inbox"""
        try:
            return await asyncio.wait_for(self.inbox.get(), timeout=1.0)
        except asyncio.TimeoutError:
            return None
    
    async def process(self) -> Any:
        """Process agent tasks (override in subclasses)"""
        raise NotImplementedError
    
    async def run(self) -> None:
        """Main agent loop"""
        self.running = True
        while self.running:
            message = await self.receive_message()
            if message:
                await self.handle_message(message)
            await asyncio.sleep(0.1)
    
    async def handle_message(self, message: AgentMessage) -> None:
        """Handle incoming messages"""
        print(f"📨 {self.name} received: {message.message_type} from {message.sender}")
        if message.message_type == "task":
            result = await self.process_task(message.content)
            await self.send_result(message.sender, result)
        elif message.message_type == "stop":
            self.running = False
    
    async def process_task(self, task: Dict[str, Any]) -> Any:
        """Process specific task (override in subclasses)"""
        return await self.process()
    
    async def send_result(self, recipient_name: str, result: Any) -> None:
        """Send result back to sender"""
        # This would normally look up the agent by name
        print(f"📤 {self.name} sending result to {recipient_name}")

class ResearchAgent(Agent):
    """Agent specialized in research tasks"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.RESEARCHER)
    
    async def process(self) -> Dict[str, Any]:
        """Perform research on given topic"""
        await asyncio.sleep(1)  # Simulate research time
        return {
            'findings': [
                'Key insight 1: Market trends indicate growth',
                'Key insight 2: Customer satisfaction is high',
                'Key insight 3: Competition is increasing'
            ],
            'sources': ['report1.pdf', 'survey2.xlsx', 'analysis3.doc']
        }

class AnalyzerAgent(Agent):
    """Agent specialized in data analysis"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.ANALYZER)
    
    async def process(self) -> Dict[str, Any]:
        """Analyze provided data"""
        await asyncio.sleep(1.5)  # Simulate analysis time
        return {
            'metrics': {
                'growth_rate': 0.15,
                'market_share': 0.23,
                'customer_retention': 0.87
            },
            'trends': ['upward', 'stable', 'improving'],
            'recommendations': [
                'Invest in product development',
                'Expand marketing efforts',
                'Improve customer support'
            ]
        }

class WriterAgent(Agent):
    """Agent specialized in content generation"""
    
    def __init__(self, name: str):
        super().__init__(name, AgentRole.WRITER)
    
    async def process(self) -> str:
        """Generate written content"""
        await asyncio.sleep(2)  # Simulate writing time
        return """
        Executive Summary Report
        
        Based on comprehensive research and analysis, our findings indicate:
        
        1. Market Position: Strong growth trajectory with 15% YoY increase
        2. Customer Metrics: 87% retention rate exceeds industry average
        3. Strategic Recommendations: Focus on product innovation and market expansion
        
        Detailed analysis supports immediate action on identified opportunities.
        """

class Orchestrator:
    """Coordinates multiple agents to complete complex tasks"""
    
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.results: Dict[str, Any] = {}
    
    def register_agent(self, agent: Agent) -> None:
        """Register an agent with the orchestrator"""
        self.agents[agent.name] = agent
        print(f"✅ Registered agent: {agent.name} ({agent.role.value})")
    
    async def execute_workflow(self, workflow: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Execute a workflow across multiple agents"""
        print("\n🚀 Starting workflow execution...")
        
        for step in workflow:
            agent_name = step['agent']
            task = step['task']
            dependencies = step.get('dependencies', [])
            
            # Wait for dependencies
            for dep in dependencies:
                while dep not in self.results:
                    await asyncio.sleep(0.1)
            
            # Execute task
            agent = self.agents.get(agent_name)
            if agent:
                print(f"\n⚡ Executing: {task['name']} with {agent_name}")
                
                # Add dependency results to task
                task['inputs'] = {dep: self.results[dep] for dep in dependencies}
                
                # Process task
                result = await agent.process()
                self.results[task['name']] = result
                
                print(f"✅ Completed: {task['name']}")
        
        return self.results
    
    async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Execute multiple tasks in parallel"""
        print("\n🔄 Starting parallel execution...")
        
        async def execute_task(task: Dict[str, Any]) -> tuple:
            agent = self.agents.get(task['agent'])
            if agent:
                result = await agent.process()
                return (task['name'], result)
            return (task['name'], None)
        
        # Execute all tasks in parallel
        results = await asyncio.gather(*[execute_task(task) for task in tasks])
        
        # Convert to dictionary
        return dict(results)

# Usage Example
async def main():
    # Create orchestrator
    orchestrator = Orchestrator()
    
    # Create and register agents
    research_agent = ResearchAgent("research_bot")
    analyzer_agent = AnalyzerAgent("analyzer_bot")
    writer_agent = WriterAgent("writer_bot")
    
    orchestrator.register_agent(research_agent)
    orchestrator.register_agent(analyzer_agent)
    orchestrator.register_agent(writer_agent)
    
    # Define workflow
    workflow = [
        {
            'agent': 'research_bot',
            'task': {'name': 'market_research', 'type': 'research'},
            'dependencies': []
        },
        {
            'agent': 'analyzer_bot',
            'task': {'name': 'data_analysis', 'type': 'analysis'},
            'dependencies': ['market_research']
        },
        {
            'agent': 'writer_bot',
            'task': {'name': 'report_generation', 'type': 'writing'},
            'dependencies': ['market_research', 'data_analysis']
        }
    ]
    
    # Execute workflow
    results = await orchestrator.execute_workflow(workflow)
    
    print("\n📊 Workflow Results:")
    for task_name, result in results.items():
        print(f"\n{task_name}:")
        if isinstance(result, dict):
            for key, value in result.items():
                print(f"  {key}: {value}")
        else:
            print(f"  {result[:100]}...")  # First 100 chars
    
    # Example of parallel execution
    parallel_tasks = [
        {'agent': 'research_bot', 'name': 'research_task_1'},
        {'agent': 'analyzer_bot', 'name': 'analysis_task_1'},
        {'agent': 'writer_bot', 'name': 'writing_task_1'}
    ]
    
    parallel_results = await orchestrator.parallel_execution(parallel_tasks)
    print("\n⚡ Parallel Execution Results:")
    for task_name in parallel_results:
        print(f"  ✅ {task_name} completed")

if __name__ == "__main__":
    asyncio.run(main())
                
🧠
ReAct (Reasoning + Acting) Pattern

Combine reasoning and action in an iterative loop for complex problem-solving:

1
Observe Environment

Gather current state and available information

2
Think & Reason

Analyze situation and plan next action

3
Act on Decision

Execute chosen action in environment

4
Observe Results

Monitor outcome of action

5
Iterate or Complete

Continue loop or finish if goal achieved

📋 Common Agent Patterns

Essential patterns for production AI systems:

Pattern Comparison

Pattern Use Case Complexity Scalability
Chain of Responsibility Sequential processing pipeline Low High
Multi-Agent Orchestration Complex collaborative tasks High Very High
ReAct Pattern Interactive problem solving Medium Medium
Hierarchical Planning Goal decomposition Medium High
Blackboard Pattern Shared knowledge systems High Medium
Best Practices:
  • Keep agents focused on single responsibilities
  • Implement robust error handling and recovery
  • Use async/await for concurrent operations
  • Monitor agent performance and resource usage
  • Implement circuit breakers for failing agents
  • Version your agent interfaces for compatibility
  • Test agents individually and in combination
  • Document agent capabilities and dependencies