Master the fundamental design patterns that power production AI agent systems. These battle-tested patterns solve common challenges in building scalable, maintainable, and reliable AI agents.
Each agent focuses on one specific task or domain
Break complex reasoning into sequential steps
Decompose goals into manageable sub-tasks
Build agent pipelines where each agent handles specific aspects of a request:
Validate and sanitize user inputs before processing
Determine user intent and route to appropriate handlers
Process the request based on classified intent
Format and personalize the response
Review output for accuracy and compliance
# Chain of Responsibility Pattern Implementation
from abc import ABC, abstractmethod
from typing import Optional, Any, Dict
import asyncio
class AgentHandler(ABC):
"""Abstract base class for agent handlers in the chain"""
def __init__(self):
self.next_handler: Optional[AgentHandler] = None
def set_next(self, handler: 'AgentHandler') -> 'AgentHandler':
self.next_handler = handler
return handler
@abstractmethod
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Process the request or pass to next handler"""
pass
async def handle_next(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Pass request to next handler if available"""
if self.next_handler:
return await self.next_handler.handle(request)
return request
class ValidationAgent(AgentHandler):
"""Validates and sanitizes input data"""
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("🔍 ValidationAgent: Validating input...")
# Validate required fields
required_fields = ['user_id', 'message', 'context']
for field in required_fields:
if field not in request:
raise ValueError(f"Missing required field: {field}")
# Sanitize input
request['message'] = self.sanitize_input(request['message'])
request['validated'] = True
return await self.handle_next(request)
def sanitize_input(self, text: str) -> str:
# Remove potentially harmful content
return text.strip()[:1000] # Limit length
class IntentClassificationAgent(AgentHandler):
"""Classifies user intent using NLP"""
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("🎯 IntentClassificationAgent: Classifying intent...")
message = request['message'].lower()
# Simple intent classification (replace with ML model)
if 'help' in message or 'support' in message:
request['intent'] = 'support'
elif 'buy' in message or 'purchase' in message:
request['intent'] = 'purchase'
elif 'status' in message or 'track' in message:
request['intent'] = 'tracking'
else:
request['intent'] = 'general'
request['confidence'] = 0.95 # Confidence score
return await self.handle_next(request)
class TaskExecutionAgent(AgentHandler):
"""Executes task based on classified intent"""
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
print(f"⚡ TaskExecutionAgent: Executing {request.get('intent')} task...")
intent = request.get('intent', 'general')
# Route to appropriate task handler
if intent == 'support':
request['result'] = await self.handle_support(request)
elif intent == 'purchase':
request['result'] = await self.handle_purchase(request)
elif intent == 'tracking':
request['result'] = await self.handle_tracking(request)
else:
request['result'] = await self.handle_general(request)
return await self.handle_next(request)
async def handle_support(self, request: Dict) -> str:
return "I'll connect you with our support team right away."
async def handle_purchase(self, request: Dict) -> str:
return "Let me help you with your purchase."
async def handle_tracking(self, request: Dict) -> str:
return "I'll check your order status for you."
async def handle_general(self, request: Dict) -> str:
return "How can I assist you today?"
class ResponseGenerationAgent(AgentHandler):
"""Generates formatted response"""
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("✨ ResponseGenerationAgent: Generating response...")
# Format response with context
response = {
'user_id': request['user_id'],
'message': request.get('result', 'Processing your request...'),
'intent': request.get('intent'),
'confidence': request.get('confidence'),
'timestamp': asyncio.get_event_loop().time()
}
request['response'] = response
return await self.handle_next(request)
class QualityAssuranceAgent(AgentHandler):
"""Reviews output for quality and compliance"""
async def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
print("✅ QualityAssuranceAgent: Reviewing output...")
response = request.get('response', {})
# Check for prohibited content
if self.contains_prohibited_content(response.get('message', '')):
response['message'] = "I apologize, but I cannot process this request."
response['flagged'] = True
# Add quality metrics
response['quality_score'] = self.calculate_quality_score(response)
return request
def contains_prohibited_content(self, text: str) -> bool:
# Check for prohibited patterns
prohibited = ['password', 'credit card', 'ssn']
return any(word in text.lower() for word in prohibited)
def calculate_quality_score(self, response: Dict) -> float:
# Simple quality scoring
score = 1.0
if response.get('flagged'):
score *= 0.5
if response.get('confidence', 0) < 0.8:
score *= 0.8
return score
# Usage Example
async def process_request():
# Create the chain
validation = ValidationAgent()
classification = IntentClassificationAgent()
execution = TaskExecutionAgent()
generation = ResponseGenerationAgent()
quality = QualityAssuranceAgent()
# Link the chain
validation.set_next(classification).set_next(execution).set_next(generation).set_next(quality)
# Process request
request = {
'user_id': 'user123',
'message': 'I need help tracking my order',
'context': {'session_id': 'abc123'}
}
result = await validation.handle(request)
print(f"\n📤 Final Response: {result['response']}")
return result
# Run the chain
if __name__ == "__main__":
asyncio.run(process_request())
Coordinate multiple specialized agents working together on complex tasks:
Assign specific capabilities to each agent
Build central coordinator to manage agents
Set up message passing between agents
Manage task dependencies and sequencing
Combine outputs from multiple agents
# Multi-Agent Orchestration Pattern
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AgentRole(Enum):
RESEARCHER = "researcher"
ANALYZER = "analyzer"
WRITER = "writer"
REVIEWER = "reviewer"
COORDINATOR = "coordinator"
@dataclass
class AgentMessage:
sender: str
recipient: str
content: Any
message_type: str
priority: int = 0
class Agent:
"""Base agent class with communication capabilities"""
def __init__(self, name: str, role: AgentRole):
self.name = name
self.role = role
self.inbox: asyncio.Queue = asyncio.Queue()
self.knowledge_base: Dict[str, Any] = {}
self.running = False
async def send_message(self, recipient: 'Agent', content: Any,
message_type: str = "task") -> None:
"""Send message to another agent"""
message = AgentMessage(
sender=self.name,
recipient=recipient.name,
content=content,
message_type=message_type
)
await recipient.inbox.put(message)
async def receive_message(self) -> Optional[AgentMessage]:
"""Receive message from inbox"""
try:
return await asyncio.wait_for(self.inbox.get(), timeout=1.0)
except asyncio.TimeoutError:
return None
async def process(self) -> Any:
"""Process agent tasks (override in subclasses)"""
raise NotImplementedError
async def run(self) -> None:
"""Main agent loop"""
self.running = True
while self.running:
message = await self.receive_message()
if message:
await self.handle_message(message)
await asyncio.sleep(0.1)
async def handle_message(self, message: AgentMessage) -> None:
"""Handle incoming messages"""
print(f"📨 {self.name} received: {message.message_type} from {message.sender}")
if message.message_type == "task":
result = await self.process_task(message.content)
await self.send_result(message.sender, result)
elif message.message_type == "stop":
self.running = False
async def process_task(self, task: Dict[str, Any]) -> Any:
"""Process specific task (override in subclasses)"""
return await self.process()
async def send_result(self, recipient_name: str, result: Any) -> None:
"""Send result back to sender"""
# This would normally look up the agent by name
print(f"📤 {self.name} sending result to {recipient_name}")
class ResearchAgent(Agent):
"""Agent specialized in research tasks"""
def __init__(self, name: str):
super().__init__(name, AgentRole.RESEARCHER)
async def process(self) -> Dict[str, Any]:
"""Perform research on given topic"""
await asyncio.sleep(1) # Simulate research time
return {
'findings': [
'Key insight 1: Market trends indicate growth',
'Key insight 2: Customer satisfaction is high',
'Key insight 3: Competition is increasing'
],
'sources': ['report1.pdf', 'survey2.xlsx', 'analysis3.doc']
}
class AnalyzerAgent(Agent):
"""Agent specialized in data analysis"""
def __init__(self, name: str):
super().__init__(name, AgentRole.ANALYZER)
async def process(self) -> Dict[str, Any]:
"""Analyze provided data"""
await asyncio.sleep(1.5) # Simulate analysis time
return {
'metrics': {
'growth_rate': 0.15,
'market_share': 0.23,
'customer_retention': 0.87
},
'trends': ['upward', 'stable', 'improving'],
'recommendations': [
'Invest in product development',
'Expand marketing efforts',
'Improve customer support'
]
}
class WriterAgent(Agent):
"""Agent specialized in content generation"""
def __init__(self, name: str):
super().__init__(name, AgentRole.WRITER)
async def process(self) -> str:
"""Generate written content"""
await asyncio.sleep(2) # Simulate writing time
return """
Executive Summary Report
Based on comprehensive research and analysis, our findings indicate:
1. Market Position: Strong growth trajectory with 15% YoY increase
2. Customer Metrics: 87% retention rate exceeds industry average
3. Strategic Recommendations: Focus on product innovation and market expansion
Detailed analysis supports immediate action on identified opportunities.
"""
class Orchestrator:
"""Coordinates multiple agents to complete complex tasks"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, Any] = {}
def register_agent(self, agent: Agent) -> None:
"""Register an agent with the orchestrator"""
self.agents[agent.name] = agent
print(f"✅ Registered agent: {agent.name} ({agent.role.value})")
async def execute_workflow(self, workflow: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a workflow across multiple agents"""
print("\n🚀 Starting workflow execution...")
for step in workflow:
agent_name = step['agent']
task = step['task']
dependencies = step.get('dependencies', [])
# Wait for dependencies
for dep in dependencies:
while dep not in self.results:
await asyncio.sleep(0.1)
# Execute task
agent = self.agents.get(agent_name)
if agent:
print(f"\n⚡ Executing: {task['name']} with {agent_name}")
# Add dependency results to task
task['inputs'] = {dep: self.results[dep] for dep in dependencies}
# Process task
result = await agent.process()
self.results[task['name']] = result
print(f"✅ Completed: {task['name']}")
return self.results
async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute multiple tasks in parallel"""
print("\n🔄 Starting parallel execution...")
async def execute_task(task: Dict[str, Any]) -> tuple:
agent = self.agents.get(task['agent'])
if agent:
result = await agent.process()
return (task['name'], result)
return (task['name'], None)
# Execute all tasks in parallel
results = await asyncio.gather(*[execute_task(task) for task in tasks])
# Convert to dictionary
return dict(results)
# Usage Example
async def main():
# Create orchestrator
orchestrator = Orchestrator()
# Create and register agents
research_agent = ResearchAgent("research_bot")
analyzer_agent = AnalyzerAgent("analyzer_bot")
writer_agent = WriterAgent("writer_bot")
orchestrator.register_agent(research_agent)
orchestrator.register_agent(analyzer_agent)
orchestrator.register_agent(writer_agent)
# Define workflow
workflow = [
{
'agent': 'research_bot',
'task': {'name': 'market_research', 'type': 'research'},
'dependencies': []
},
{
'agent': 'analyzer_bot',
'task': {'name': 'data_analysis', 'type': 'analysis'},
'dependencies': ['market_research']
},
{
'agent': 'writer_bot',
'task': {'name': 'report_generation', 'type': 'writing'},
'dependencies': ['market_research', 'data_analysis']
}
]
# Execute workflow
results = await orchestrator.execute_workflow(workflow)
print("\n📊 Workflow Results:")
for task_name, result in results.items():
print(f"\n{task_name}:")
if isinstance(result, dict):
for key, value in result.items():
print(f" {key}: {value}")
else:
print(f" {result[:100]}...") # First 100 chars
# Example of parallel execution
parallel_tasks = [
{'agent': 'research_bot', 'name': 'research_task_1'},
{'agent': 'analyzer_bot', 'name': 'analysis_task_1'},
{'agent': 'writer_bot', 'name': 'writing_task_1'}
]
parallel_results = await orchestrator.parallel_execution(parallel_tasks)
print("\n⚡ Parallel Execution Results:")
for task_name in parallel_results:
print(f" ✅ {task_name} completed")
if __name__ == "__main__":
asyncio.run(main())
Combine reasoning and action in an iterative loop for complex problem-solving:
Gather current state and available information
Analyze situation and plan next action
Execute chosen action in environment
Monitor outcome of action
Continue loop or finish if goal achieved
Essential patterns for production AI systems:
| Pattern | Use Case | Complexity | Scalability |
|---|---|---|---|
| Chain of Responsibility | Sequential processing pipeline | Low | High |
| Multi-Agent Orchestration | Complex collaborative tasks | High | Very High |
| ReAct Pattern | Interactive problem solving | Medium | Medium |
| Hierarchical Planning | Goal decomposition | Medium | High |
| Blackboard Pattern | Shared knowledge systems | High | Medium |