AutoGen is Microsoft's open-source framework for building next-generation LLM applications using multiple agents that can converse with each other to solve tasks. It simplifies the orchestration, automation, and optimization of complex LLM workflows.
Agents can engage in complex multi-turn conversations to solve problems collaboratively
Native support for executing and debugging code within conversations
AssistantAgent, UserProxyAgent, and custom agent types for different roles
Seamless integration of human feedback and intervention
Built-in tools for optimizing multi-agent systems performance
Production features including caching, logging, and error handling
import autogen
# 1. AssistantAgent - AI-powered agent
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"config_list": [{
"model": "gpt-4",
"api_key": "your-api-key",
}],
"temperature": 0.7,
"cache_seed": 42, # Enable caching
},
system_message="""You are a helpful AI assistant skilled in:
- Problem solving
- Code generation
- Data analysis
- Creative tasks
Always think step-by-step and provide clear explanations."""
)
# 2. UserProxyAgent - Executes code and represents human
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
human_input_mode="TERMINATE", # or "ALWAYS", "NEVER"
max_consecutive_auto_reply=10,
is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
code_execution_config={
"work_dir": "coding",
"use_docker": False, # Set to True for safer execution
},
)
# 3. Custom Agent
class ResearchAgent(autogen.AssistantAgent):
def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
self.research_tools = self._setup_tools()
def _setup_tools(self):
return {
"web_search": self.search_web,
"arxiv_search": self.search_arxiv,
"analyze_paper": self.analyze_research_paper
}
async def search_web(self, query):
# Implementation for web search
pass
# 4. GroupChatManager for multi-agent coordination
groupchat = autogen.GroupChat(
agents=[assistant, user_proxy, researcher],
messages=[],
max_round=20
)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": config_list}
)
# Simple conversation between two agents
import autogen
# Configuration
config_list = autogen.config_list_from_json(
"OAI_CONFIG_LIST",
filter_dict={"model": ["gpt-4", "gpt-3.5-turbo"]}
)
# Create assistant agent
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"config_list": config_list,
"temperature": 0,
}
)
# Create user proxy agent
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=10,
code_execution_config={"work_dir": "workspace"},
)
# Start conversation
user_proxy.initiate_chat(
assistant,
message="""Create a Python function that calculates the Fibonacci sequence
up to n terms and visualize it using matplotlib."""
)
# The assistant will generate code, and user_proxy will execute it
import autogen
from typing import List, Dict
class SoftwareDevTeam:
def __init__(self, config_list):
self.config_list = config_list
self.agents = self._create_team()
def _create_team(self) -> Dict:
"""Create a software development team with specialized roles"""
# Product Manager
pm = autogen.AssistantAgent(
name="product_manager",
system_message="""You are a product manager. You:
- Define requirements and acceptance criteria
- Prioritize features
- Ensure the solution meets user needs
- Coordinate between team members""",
llm_config={"config_list": self.config_list}
)
# Software Architect
architect = autogen.AssistantAgent(
name="architect",
system_message="""You are a software architect. You:
- Design system architecture
- Choose appropriate design patterns
- Define APIs and interfaces
- Ensure scalability and maintainability""",
llm_config={"config_list": self.config_list}
)
# Senior Developer
developer = autogen.AssistantAgent(
name="developer",
system_message="""You are a senior developer. You:
- Write clean, efficient code
- Implement features according to specifications
- Follow best practices and design patterns
- Write comprehensive docstrings""",
llm_config={"config_list": self.config_list}
)
# QA Engineer
qa_engineer = autogen.AssistantAgent(
name="qa_engineer",
system_message="""You are a QA engineer. You:
- Write comprehensive test cases
- Identify bugs and edge cases
- Ensure code quality and coverage
- Perform integration testing""",
llm_config={"config_list": self.config_list}
)
# Code Reviewer
reviewer = autogen.AssistantAgent(
name="code_reviewer",
system_message="""You are a code reviewer. You:
- Review code for best practices
- Check for security vulnerabilities
- Suggest optimizations
- Ensure code readability""",
llm_config={"config_list": self.config_list}
)
# Code Executor
executor = autogen.UserProxyAgent(
name="executor",
system_message="Execute code and report results",
human_input_mode="NEVER",
code_execution_config={
"work_dir": "project",
"use_docker": True,
}
)
return {
"pm": pm,
"architect": architect,
"developer": developer,
"qa": qa_engineer,
"reviewer": reviewer,
"executor": executor
}
def develop_feature(self, requirements: str):
"""Develop a feature using the team"""
# Create group chat
groupchat = autogen.GroupChat(
agents=list(self.agents.values()),
messages=[],
max_round=50,
speaker_selection_method="auto" # AutoGen automatically selects next speaker
)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": self.config_list}
)
# Start development process
self.agents["pm"].initiate_chat(
manager,
message=f"""New feature request: {requirements}
Let's work together to implement this feature:
1. PM: Define detailed requirements
2. Architect: Design the solution
3. Developer: Implement the code
4. QA: Write and run tests
5. Reviewer: Review the implementation
6. Executor: Run the final code
"""
)
# Usage
team = SoftwareDevTeam(config_list)
team.develop_feature("Create a REST API for user authentication with JWT tokens")
class ResearchAnalysisTeam:
def __init__(self, config_list):
self.config_list = config_list
def create_research_team(self):
# Literature Reviewer
lit_reviewer = autogen.AssistantAgent(
name="literature_reviewer",
system_message="""You are an expert at reviewing academic literature.
- Summarize key findings
- Identify research gaps
- Evaluate methodology
- Assess contribution to field""",
llm_config={"config_list": self.config_list}
)
# Statistician
statistician = autogen.AssistantAgent(
name="statistician",
system_message="""You are a statistician who:
- Evaluates statistical methods
- Checks validity of results
- Identifies potential biases
- Suggests improvements""",
llm_config={"config_list": self.config_list}
)
# Domain Expert
domain_expert = autogen.AssistantAgent(
name="domain_expert",
system_message="""You are a domain expert who:
- Provides context and background
- Evaluates practical applications
- Identifies industry relevance
- Suggests future directions""",
llm_config={"config_list": self.config_list}
)
# Synthesis Agent
synthesizer = autogen.AssistantAgent(
name="synthesizer",
system_message="""You synthesize insights from all agents to:
- Create comprehensive summary
- Identify key takeaways
- Generate actionable insights
- Produce final report""",
llm_config={"config_list": self.config_list}
)
return [lit_reviewer, statistician, domain_expert, synthesizer]
def analyze_paper(self, paper_content: str):
agents = self.create_research_team()
# Create group chat for collaborative analysis
groupchat = autogen.GroupChat(
agents=agents,
messages=[],
max_round=20,
speaker_selection_method="round_robin" # Each agent speaks in turn
)
manager = autogen.GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": self.config_list}
)
# Start analysis
agents[0].initiate_chat(
manager,
message=f"Let's analyze this research paper:\n\n{paper_content}"
)
# Nested conversations for complex workflows
import autogen
def create_nested_chat_system():
# Main coordinator
coordinator = autogen.AssistantAgent(
name="coordinator",
system_message="Coordinate between different teams",
llm_config={"config_list": config_list}
)
# Team 1: Data Processing
def data_processing_chat(message):
data_analyst = autogen.AssistantAgent(
name="data_analyst",
system_message="Analyze and process data",
llm_config={"config_list": config_list}
)
data_engineer = autogen.AssistantAgent(
name="data_engineer",
system_message="Handle data pipelines and ETL",
llm_config={"config_list": config_list}
)
# Create sub-group chat
data_chat = autogen.GroupChat(
agents=[data_analyst, data_engineer],
messages=[],
max_round=10
)
data_manager = autogen.GroupChatManager(
groupchat=data_chat,
llm_config={"config_list": config_list}
)
# Process within team
data_analyst.initiate_chat(
data_manager,
message=message
)
return data_chat.messages[-1]["content"]
# Team 2: ML Development
def ml_development_chat(message):
ml_engineer = autogen.AssistantAgent(
name="ml_engineer",
system_message="Develop and train ML models",
llm_config={"config_list": config_list}
)
ml_researcher = autogen.AssistantAgent(
name="ml_researcher",
system_message="Research and optimize algorithms",
llm_config={"config_list": config_list}
)
ml_chat = autogen.GroupChat(
agents=[ml_engineer, ml_researcher],
messages=[],
max_round=10
)
ml_manager = autogen.GroupChatManager(
groupchat=ml_chat,
llm_config={"config_list": config_list}
)
ml_engineer.initiate_chat(
ml_manager,
message=message
)
return ml_chat.messages[-1]["content"]
# Register nested chats
coordinator.register_nested_chats(
[
{
"sender": coordinator,
"recipient": data_processing_chat,
"summary_method": "last_msg"
},
{
"sender": coordinator,
"recipient": ml_development_chat,
"summary_method": "last_msg"
}
]
)
return coordinator
# Usage
coordinator = create_nested_chat_system()
coordinator.initiate_chat(
message="Build an end-to-end ML pipeline for customer churn prediction"
)
# Safe code execution with Docker
import autogen
# Configure Docker executor
executor = autogen.UserProxyAgent(
name="code_executor",
code_execution_config={
"work_dir": "workspace",
"use_docker": True, # Enable Docker
"docker_image": "python:3.11-slim", # Specify image
"timeout": 60, # Execution timeout
},
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
)
# Assistant that generates code
coder = autogen.AssistantAgent(
name="coder",
system_message="""You are an expert Python developer.
Generate complete, runnable code with proper error handling.
Include all necessary imports and handle edge cases.""",
llm_config={"config_list": config_list}
)
# Request code generation and execution
executor.initiate_chat(
coder,
message="""Write a Python script that:
1. Scrapes data from a website
2. Processes it with pandas
3. Creates visualizations
4. Saves results to CSV
Make sure the code is production-ready with error handling."""
)
class SafeCodeExecutor(autogen.UserProxyAgent):
def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
self.execution_history = []
def execute_code_blocks(self, code_blocks):
"""Override to add custom safety checks"""
for code_block in code_blocks:
code = code_block[1]
# Safety checks
if self._check_dangerous_code(code):
return "Code contains potentially dangerous operations. Execution blocked."
# Resource limits
if self._estimate_resources(code) > self.resource_limit:
return "Code exceeds resource limits. Please optimize."
# Log execution
self.execution_history.append({
"timestamp": datetime.now(),
"code": code,
"result": None
})
try:
# Execute with timeout
result = self._execute_with_timeout(code, timeout=30)
self.execution_history[-1]["result"] = result
return result
except Exception as e:
return f"Execution failed: {str(e)}"
def _check_dangerous_code(self, code):
"""Check for dangerous patterns"""
dangerous_patterns = [
"os.system",
"subprocess",
"eval(",
"exec(",
"__import__",
"open(",
"file("
]
return any(pattern in code for pattern in dangerous_patterns)
def _estimate_resources(self, code):
"""Estimate resource usage"""
# Simple heuristic based on loops and data structures
loop_count = code.count("for ") + code.count("while ")
return loop_count * 10 # Simplified estimation
# Use custom executor
safe_executor = SafeCodeExecutor(
name="safe_executor",
code_execution_config={"work_dir": "safe_workspace"},
resource_limit=100
)
# Define tools for agents
from typing import Annotated
def search_web(query: Annotated[str, "The search query"]) -> str:
"""Search the web for information"""
# Implementation here
return f"Search results for: {query}"
def analyze_data(
data: Annotated[str, "Data to analyze"],
method: Annotated[str, "Analysis method"] = "statistical"
) -> str:
"""Analyze data using specified method"""
# Implementation here
return f"Analysis complete: {method} analysis of data"
def send_email(
to: Annotated[str, "Recipient email"],
subject: Annotated[str, "Email subject"],
body: Annotated[str, "Email body"]
) -> str:
"""Send an email"""
# Implementation here
return f"Email sent to {to}"
# Register functions with agent
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"config_list": config_list,
"functions": [
{
"name": "search_web",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"]
}
},
{
"name": "analyze_data",
"description": "Analyze data using specified method",
"parameters": {
"type": "object",
"properties": {
"data": {"type": "string"},
"method": {"type": "string", "enum": ["statistical", "ml", "visual"]}
},
"required": ["data"]
}
}
]
}
)
# Register function implementations
assistant.register_function(
function_map={
"search_web": search_web,
"analyze_data": analyze_data,
"send_email": send_email
}
)
# Enable caching to reduce API calls
import autogen
from autogen.cache import Cache
# Setup cache
with Cache.redis(redis_url="redis://localhost:6379") as cache:
# Agents will use cache automatically
assistant = autogen.AssistantAgent(
name="assistant",
llm_config={
"config_list": config_list,
"cache_seed": 42, # Enable deterministic caching
"temperature": 0, # For reproducible results
}
)
# First call - hits API
response1 = assistant.generate_reply(messages=[{"content": "Hello"}])
# Second identical call - uses cache
response2 = assistant.generate_reply(messages=[{"content": "Hello"}])
# Alternative: Disk-based cache
with Cache.disk(cache_path_root=".cache") as cache:
# Agents use disk cache
pass
# Monitor token usage
def track_usage(agent):
"""Track token usage and costs"""
usage = agent.client.usage_summary
return {
"total_tokens": usage.get("total_tokens", 0),
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"estimated_cost": usage.get("total_tokens", 0) * 0.00002 # GPT-4 pricing
}
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelAgentSystem:
def __init__(self, config_list):
self.config_list = config_list
self.executor = ThreadPoolExecutor(max_workers=5)
async def parallel_analysis(self, topics):
"""Analyze multiple topics in parallel"""
tasks = []
for topic in topics:
# Create agent for each topic
agent = autogen.AssistantAgent(
name=f"analyst_{topic}",
llm_config={"config_list": self.config_list}
)
# Create async task
task = asyncio.create_task(
self.analyze_topic(agent, topic)
)
tasks.append(task)
# Wait for all analyses to complete
results = await asyncio.gather(*tasks)
# Synthesize results
return self.synthesize_results(results)
async def analyze_topic(self, agent, topic):
"""Async topic analysis"""
response = await agent.a_generate_reply(
messages=[{"content": f"Analyze: {topic}"}]
)
return {"topic": topic, "analysis": response}
# Usage
system = ParallelAgentSystem(config_list)
topics = ["AI Safety", "Climate Change", "Quantum Computing"]
results = asyncio.run(system.parallel_analysis(topics))
# Production-ready AutoGen application
import autogen
import logging
from typing import Optional
import redis
from datetime import datetime
class ProductionAgentSystem:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.agents = {}
self.cache = redis.Redis()
self.setup_logging()
def setup_logging(self):
"""Configure comprehensive logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('autogen.log'),
logging.StreamHandler()
]
)
autogen.ChatCompletion.start_logging()
def create_agent(self, role: str, **kwargs) -> autogen.AssistantAgent:
"""Factory method for creating agents"""
base_config = {
"config_list": self.config["llm_configs"],
"cache_seed": 42,
"temperature": 0.7,
"request_timeout": 120,
"max_retries": 3,
}
# Merge with custom config
base_config.update(kwargs.get("llm_config", {}))
agent = autogen.AssistantAgent(
name=role,
system_message=self.config["prompts"].get(role, ""),
llm_config=base_config,
**kwargs
)
# Store agent
self.agents[role] = agent
# Add monitoring
self._setup_agent_monitoring(agent)
return agent
def _setup_agent_monitoring(self, agent):
"""Add monitoring callbacks"""
original_reply = agent.generate_reply
def monitored_reply(*args, **kwargs):
start_time = datetime.now()
try:
reply = original_reply(*args, **kwargs)
# Log successful execution
self._log_execution(
agent_name=agent.name,
duration=(datetime.now() - start_time).total_seconds(),
status="success"
)
return reply
except Exception as e:
# Log error
self._log_execution(
agent_name=agent.name,
duration=(datetime.now() - start_time).total_seconds(),
status="error",
error=str(e)
)
raise
agent.generate_reply = monitored_reply
def _log_execution(self, **kwargs):
"""Log execution metrics"""
metrics = {
"timestamp": datetime.now().isoformat(),
**kwargs
}
# Store in Redis for monitoring
self.cache.lpush("agent_metrics", json.dumps(metrics))
# Keep only last 1000 entries
self.cache.ltrim("agent_metrics", 0, 999)
def health_check(self) -> dict:
"""System health check"""
return {
"status": "healthy",
"agents": list(self.agents.keys()),
"cache_connected": self.cache.ping(),
"timestamp": datetime.now().isoformat()
}
# Deployment with FastAPI
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
agent_system = ProductionAgentSystem("config.yaml")
@app.post("/chat")
async def chat(message: str, background_tasks: BackgroundTasks):
"""API endpoint for agent interaction"""
# Process in background for long-running tasks
background_tasks.add_task(
agent_system.process_message,
message=message
)
return {"status": "processing", "message_id": generate_id()}
@app.get("/health")
def health():
return agent_system.health_check()