AutoGen is Microsoft's open-source framework for building next-generation LLM applications using multiple agents that can converse with each other to solve tasks. It simplifies the orchestration, automation, and optimization of complex LLM workflows.
Agents can engage in complex multi-turn conversations to solve problems collaboratively
Native support for executing and debugging code within conversations
AssistantAgent, UserProxyAgent, and custom agent types for different roles
Seamless integration of human feedback and intervention
Built-in tools for optimizing multi-agent systems performance
Production features including caching, logging, and error handling
import autogen # 1. AssistantAgent - AI-powered agent assistant = autogen.AssistantAgent( name="assistant", llm_config={ "config_list": [{ "model": "gpt-4", "api_key": "your-api-key", }], "temperature": 0.7, "cache_seed": 42, # Enable caching }, system_message="""You are a helpful AI assistant skilled in: - Problem solving - Code generation - Data analysis - Creative tasks Always think step-by-step and provide clear explanations.""" ) # 2. UserProxyAgent - Executes code and represents human user_proxy = autogen.UserProxyAgent( name="user_proxy", human_input_mode="TERMINATE", # or "ALWAYS", "NEVER" max_consecutive_auto_reply=10, is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"), code_execution_config={ "work_dir": "coding", "use_docker": False, # Set to True for safer execution }, ) # 3. Custom Agent class ResearchAgent(autogen.AssistantAgent): def __init__(self, name, **kwargs): super().__init__(name, **kwargs) self.research_tools = self._setup_tools() def _setup_tools(self): return { "web_search": self.search_web, "arxiv_search": self.search_arxiv, "analyze_paper": self.analyze_research_paper } async def search_web(self, query): # Implementation for web search pass # 4. GroupChatManager for multi-agent coordination groupchat = autogen.GroupChat( agents=[assistant, user_proxy, researcher], messages=[], max_round=20 ) manager = autogen.GroupChatManager( groupchat=groupchat, llm_config={"config_list": config_list} )
# Simple conversation between two agents import autogen # Configuration config_list = autogen.config_list_from_json( "OAI_CONFIG_LIST", filter_dict={"model": ["gpt-4", "gpt-3.5-turbo"]} ) # Create assistant agent assistant = autogen.AssistantAgent( name="assistant", llm_config={ "config_list": config_list, "temperature": 0, } ) # Create user proxy agent user_proxy = autogen.UserProxyAgent( name="user_proxy", human_input_mode="TERMINATE", max_consecutive_auto_reply=10, code_execution_config={"work_dir": "workspace"}, ) # Start conversation user_proxy.initiate_chat( assistant, message="""Create a Python function that calculates the Fibonacci sequence up to n terms and visualize it using matplotlib.""" ) # The assistant will generate code, and user_proxy will execute it
import autogen from typing import List, Dict class SoftwareDevTeam: def __init__(self, config_list): self.config_list = config_list self.agents = self._create_team() def _create_team(self) -> Dict: """Create a software development team with specialized roles""" # Product Manager pm = autogen.AssistantAgent( name="product_manager", system_message="""You are a product manager. You: - Define requirements and acceptance criteria - Prioritize features - Ensure the solution meets user needs - Coordinate between team members""", llm_config={"config_list": self.config_list} ) # Software Architect architect = autogen.AssistantAgent( name="architect", system_message="""You are a software architect. You: - Design system architecture - Choose appropriate design patterns - Define APIs and interfaces - Ensure scalability and maintainability""", llm_config={"config_list": self.config_list} ) # Senior Developer developer = autogen.AssistantAgent( name="developer", system_message="""You are a senior developer. You: - Write clean, efficient code - Implement features according to specifications - Follow best practices and design patterns - Write comprehensive docstrings""", llm_config={"config_list": self.config_list} ) # QA Engineer qa_engineer = autogen.AssistantAgent( name="qa_engineer", system_message="""You are a QA engineer. You: - Write comprehensive test cases - Identify bugs and edge cases - Ensure code quality and coverage - Perform integration testing""", llm_config={"config_list": self.config_list} ) # Code Reviewer reviewer = autogen.AssistantAgent( name="code_reviewer", system_message="""You are a code reviewer. You: - Review code for best practices - Check for security vulnerabilities - Suggest optimizations - Ensure code readability""", llm_config={"config_list": self.config_list} ) # Code Executor executor = autogen.UserProxyAgent( name="executor", system_message="Execute code and report results", human_input_mode="NEVER", code_execution_config={ "work_dir": "project", "use_docker": True, } ) return { "pm": pm, "architect": architect, "developer": developer, "qa": qa_engineer, "reviewer": reviewer, "executor": executor } def develop_feature(self, requirements: str): """Develop a feature using the team""" # Create group chat groupchat = autogen.GroupChat( agents=list(self.agents.values()), messages=[], max_round=50, speaker_selection_method="auto" # AutoGen automatically selects next speaker ) manager = autogen.GroupChatManager( groupchat=groupchat, llm_config={"config_list": self.config_list} ) # Start development process self.agents["pm"].initiate_chat( manager, message=f"""New feature request: {requirements} Let's work together to implement this feature: 1. PM: Define detailed requirements 2. Architect: Design the solution 3. Developer: Implement the code 4. QA: Write and run tests 5. Reviewer: Review the implementation 6. Executor: Run the final code """ ) # Usage team = SoftwareDevTeam(config_list) team.develop_feature("Create a REST API for user authentication with JWT tokens")
class ResearchAnalysisTeam: def __init__(self, config_list): self.config_list = config_list def create_research_team(self): # Literature Reviewer lit_reviewer = autogen.AssistantAgent( name="literature_reviewer", system_message="""You are an expert at reviewing academic literature. - Summarize key findings - Identify research gaps - Evaluate methodology - Assess contribution to field""", llm_config={"config_list": self.config_list} ) # Statistician statistician = autogen.AssistantAgent( name="statistician", system_message="""You are a statistician who: - Evaluates statistical methods - Checks validity of results - Identifies potential biases - Suggests improvements""", llm_config={"config_list": self.config_list} ) # Domain Expert domain_expert = autogen.AssistantAgent( name="domain_expert", system_message="""You are a domain expert who: - Provides context and background - Evaluates practical applications - Identifies industry relevance - Suggests future directions""", llm_config={"config_list": self.config_list} ) # Synthesis Agent synthesizer = autogen.AssistantAgent( name="synthesizer", system_message="""You synthesize insights from all agents to: - Create comprehensive summary - Identify key takeaways - Generate actionable insights - Produce final report""", llm_config={"config_list": self.config_list} ) return [lit_reviewer, statistician, domain_expert, synthesizer] def analyze_paper(self, paper_content: str): agents = self.create_research_team() # Create group chat for collaborative analysis groupchat = autogen.GroupChat( agents=agents, messages=[], max_round=20, speaker_selection_method="round_robin" # Each agent speaks in turn ) manager = autogen.GroupChatManager( groupchat=groupchat, llm_config={"config_list": self.config_list} ) # Start analysis agents[0].initiate_chat( manager, message=f"Let's analyze this research paper:\n\n{paper_content}" )
# Nested conversations for complex workflows import autogen def create_nested_chat_system(): # Main coordinator coordinator = autogen.AssistantAgent( name="coordinator", system_message="Coordinate between different teams", llm_config={"config_list": config_list} ) # Team 1: Data Processing def data_processing_chat(message): data_analyst = autogen.AssistantAgent( name="data_analyst", system_message="Analyze and process data", llm_config={"config_list": config_list} ) data_engineer = autogen.AssistantAgent( name="data_engineer", system_message="Handle data pipelines and ETL", llm_config={"config_list": config_list} ) # Create sub-group chat data_chat = autogen.GroupChat( agents=[data_analyst, data_engineer], messages=[], max_round=10 ) data_manager = autogen.GroupChatManager( groupchat=data_chat, llm_config={"config_list": config_list} ) # Process within team data_analyst.initiate_chat( data_manager, message=message ) return data_chat.messages[-1]["content"] # Team 2: ML Development def ml_development_chat(message): ml_engineer = autogen.AssistantAgent( name="ml_engineer", system_message="Develop and train ML models", llm_config={"config_list": config_list} ) ml_researcher = autogen.AssistantAgent( name="ml_researcher", system_message="Research and optimize algorithms", llm_config={"config_list": config_list} ) ml_chat = autogen.GroupChat( agents=[ml_engineer, ml_researcher], messages=[], max_round=10 ) ml_manager = autogen.GroupChatManager( groupchat=ml_chat, llm_config={"config_list": config_list} ) ml_engineer.initiate_chat( ml_manager, message=message ) return ml_chat.messages[-1]["content"] # Register nested chats coordinator.register_nested_chats( [ { "sender": coordinator, "recipient": data_processing_chat, "summary_method": "last_msg" }, { "sender": coordinator, "recipient": ml_development_chat, "summary_method": "last_msg" } ] ) return coordinator # Usage coordinator = create_nested_chat_system() coordinator.initiate_chat( message="Build an end-to-end ML pipeline for customer churn prediction" )
# Safe code execution with Docker import autogen # Configure Docker executor executor = autogen.UserProxyAgent( name="code_executor", code_execution_config={ "work_dir": "workspace", "use_docker": True, # Enable Docker "docker_image": "python:3.11-slim", # Specify image "timeout": 60, # Execution timeout }, human_input_mode="NEVER", max_consecutive_auto_reply=10, ) # Assistant that generates code coder = autogen.AssistantAgent( name="coder", system_message="""You are an expert Python developer. Generate complete, runnable code with proper error handling. Include all necessary imports and handle edge cases.""", llm_config={"config_list": config_list} ) # Request code generation and execution executor.initiate_chat( coder, message="""Write a Python script that: 1. Scrapes data from a website 2. Processes it with pandas 3. Creates visualizations 4. Saves results to CSV Make sure the code is production-ready with error handling.""" )
class SafeCodeExecutor(autogen.UserProxyAgent): def __init__(self, name, **kwargs): super().__init__(name, **kwargs) self.execution_history = [] def execute_code_blocks(self, code_blocks): """Override to add custom safety checks""" for code_block in code_blocks: code = code_block[1] # Safety checks if self._check_dangerous_code(code): return "Code contains potentially dangerous operations. Execution blocked." # Resource limits if self._estimate_resources(code) > self.resource_limit: return "Code exceeds resource limits. Please optimize." # Log execution self.execution_history.append({ "timestamp": datetime.now(), "code": code, "result": None }) try: # Execute with timeout result = self._execute_with_timeout(code, timeout=30) self.execution_history[-1]["result"] = result return result except Exception as e: return f"Execution failed: {str(e)}" def _check_dangerous_code(self, code): """Check for dangerous patterns""" dangerous_patterns = [ "os.system", "subprocess", "eval(", "exec(", "__import__", "open(", "file(" ] return any(pattern in code for pattern in dangerous_patterns) def _estimate_resources(self, code): """Estimate resource usage""" # Simple heuristic based on loops and data structures loop_count = code.count("for ") + code.count("while ") return loop_count * 10 # Simplified estimation # Use custom executor safe_executor = SafeCodeExecutor( name="safe_executor", code_execution_config={"work_dir": "safe_workspace"}, resource_limit=100 )
# Define tools for agents from typing import Annotated def search_web(query: Annotated[str, "The search query"]) -> str: """Search the web for information""" # Implementation here return f"Search results for: {query}" def analyze_data( data: Annotated[str, "Data to analyze"], method: Annotated[str, "Analysis method"] = "statistical" ) -> str: """Analyze data using specified method""" # Implementation here return f"Analysis complete: {method} analysis of data" def send_email( to: Annotated[str, "Recipient email"], subject: Annotated[str, "Email subject"], body: Annotated[str, "Email body"] ) -> str: """Send an email""" # Implementation here return f"Email sent to {to}" # Register functions with agent assistant = autogen.AssistantAgent( name="assistant", llm_config={ "config_list": config_list, "functions": [ { "name": "search_web", "description": "Search the web for information", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query" } }, "required": ["query"] } }, { "name": "analyze_data", "description": "Analyze data using specified method", "parameters": { "type": "object", "properties": { "data": {"type": "string"}, "method": {"type": "string", "enum": ["statistical", "ml", "visual"]} }, "required": ["data"] } } ] } ) # Register function implementations assistant.register_function( function_map={ "search_web": search_web, "analyze_data": analyze_data, "send_email": send_email } )
# Enable caching to reduce API calls import autogen from autogen.cache import Cache # Setup cache with Cache.redis(redis_url="redis://localhost:6379") as cache: # Agents will use cache automatically assistant = autogen.AssistantAgent( name="assistant", llm_config={ "config_list": config_list, "cache_seed": 42, # Enable deterministic caching "temperature": 0, # For reproducible results } ) # First call - hits API response1 = assistant.generate_reply(messages=[{"content": "Hello"}]) # Second identical call - uses cache response2 = assistant.generate_reply(messages=[{"content": "Hello"}]) # Alternative: Disk-based cache with Cache.disk(cache_path_root=".cache") as cache: # Agents use disk cache pass # Monitor token usage def track_usage(agent): """Track token usage and costs""" usage = agent.client.usage_summary return { "total_tokens": usage.get("total_tokens", 0), "prompt_tokens": usage.get("prompt_tokens", 0), "completion_tokens": usage.get("completion_tokens", 0), "estimated_cost": usage.get("total_tokens", 0) * 0.00002 # GPT-4 pricing }
import asyncio from concurrent.futures import ThreadPoolExecutor class ParallelAgentSystem: def __init__(self, config_list): self.config_list = config_list self.executor = ThreadPoolExecutor(max_workers=5) async def parallel_analysis(self, topics): """Analyze multiple topics in parallel""" tasks = [] for topic in topics: # Create agent for each topic agent = autogen.AssistantAgent( name=f"analyst_{topic}", llm_config={"config_list": self.config_list} ) # Create async task task = asyncio.create_task( self.analyze_topic(agent, topic) ) tasks.append(task) # Wait for all analyses to complete results = await asyncio.gather(*tasks) # Synthesize results return self.synthesize_results(results) async def analyze_topic(self, agent, topic): """Async topic analysis""" response = await agent.a_generate_reply( messages=[{"content": f"Analyze: {topic}"}] ) return {"topic": topic, "analysis": response} # Usage system = ParallelAgentSystem(config_list) topics = ["AI Safety", "Climate Change", "Quantum Computing"] results = asyncio.run(system.parallel_analysis(topics))
# Production-ready AutoGen application import autogen import logging from typing import Optional import redis from datetime import datetime class ProductionAgentSystem: def __init__(self, config_path: str): self.config = self._load_config(config_path) self.agents = {} self.cache = redis.Redis() self.setup_logging() def setup_logging(self): """Configure comprehensive logging""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('autogen.log'), logging.StreamHandler() ] ) autogen.ChatCompletion.start_logging() def create_agent(self, role: str, **kwargs) -> autogen.AssistantAgent: """Factory method for creating agents""" base_config = { "config_list": self.config["llm_configs"], "cache_seed": 42, "temperature": 0.7, "request_timeout": 120, "max_retries": 3, } # Merge with custom config base_config.update(kwargs.get("llm_config", {})) agent = autogen.AssistantAgent( name=role, system_message=self.config["prompts"].get(role, ""), llm_config=base_config, **kwargs ) # Store agent self.agents[role] = agent # Add monitoring self._setup_agent_monitoring(agent) return agent def _setup_agent_monitoring(self, agent): """Add monitoring callbacks""" original_reply = agent.generate_reply def monitored_reply(*args, **kwargs): start_time = datetime.now() try: reply = original_reply(*args, **kwargs) # Log successful execution self._log_execution( agent_name=agent.name, duration=(datetime.now() - start_time).total_seconds(), status="success" ) return reply except Exception as e: # Log error self._log_execution( agent_name=agent.name, duration=(datetime.now() - start_time).total_seconds(), status="error", error=str(e) ) raise agent.generate_reply = monitored_reply def _log_execution(self, **kwargs): """Log execution metrics""" metrics = { "timestamp": datetime.now().isoformat(), **kwargs } # Store in Redis for monitoring self.cache.lpush("agent_metrics", json.dumps(metrics)) # Keep only last 1000 entries self.cache.ltrim("agent_metrics", 0, 999) def health_check(self) -> dict: """System health check""" return { "status": "healthy", "agents": list(self.agents.keys()), "cache_connected": self.cache.ping(), "timestamp": datetime.now().isoformat() } # Deployment with FastAPI from fastapi import FastAPI, BackgroundTasks app = FastAPI() agent_system = ProductionAgentSystem("config.yaml") @app.post("/chat") async def chat(message: str, background_tasks: BackgroundTasks): """API endpoint for agent interaction""" # Process in background for long-running tasks background_tasks.add_task( agent_system.process_message, message=message ) return {"status": "processing", "message_id": generate_id()} @app.get("/health") def health(): return agent_system.health_check()