Agent Platforms

Part of Module 6: Current AI Market Trends

Agent platforms enable the creation of autonomous AI systems that can reason, plan, and execute complex tasks. This module covers leading agent frameworks including LangGraph, Model Context Protocol (MCP), DSPy, and best practices for building reliable agent systems.

LangGraph Framework

Stateful Graph-Based Agents

Building complex agent workflows using cyclic graphs with state management and conditional routing.

LangGraph Agent Implementation

# LangGraph Agent Framework
from typing import TypedDict, Annotated, List, Dict, Optional
from langgraph.graph import Graph, END
from langgraph.prebuilt import ToolInvocation
import operator

# Define Agent State
class AgentState(TypedDict):
    messages: Annotated[List[str], operator.add]
    current_task: str
    completed_tasks: List[str]
    context: Dict
    next_action: Optional[str]
    tools_output: List[Dict]

# Define Agent Components
class ResearchAgent:
    def __init__(self):
        self.tools = self._initialize_tools()
        
    def _initialize_tools(self):
        """Initialize available tools"""
        return {
            "search": self.search_tool,
            "analyze": self.analyze_tool,
            "summarize": self.summarize_tool
        }
    
    async def search_tool(self, query: str) -> Dict:
        """Search for information"""
        # Simulate search results
        return {
            "query": query,
            "results": [
                {"title": "Result 1", "content": "Relevant information..."},
                {"title": "Result 2", "content": "More information..."}
            ]
        }
    
    async def analyze_tool(self, data: Dict) -> Dict:
        """Analyze collected data"""
        return {
            "analysis": "Key insights from data",
            "confidence": 0.85,
            "recommendations": ["Action 1", "Action 2"]
        }
    
    async def summarize_tool(self, content: List[str]) -> str:
        """Summarize findings"""
        return "Executive summary of findings..."

# Define Graph Nodes
async def plan_node(state: AgentState) -> AgentState:
    """Planning node - decides what to do next"""
    if not state.get("current_task"):
        state["next_action"] = "research"
    elif len(state["tools_output"]) < 2:
        state["next_action"] = "gather_more"
    else:
        state["next_action"] = "synthesize"
    
    state["messages"].append(f"Planning: Next action is {state['next_action']}")
    return state

async def research_node(state: AgentState) -> AgentState:
    """Research node - gathers information"""
    agent = ResearchAgent()
    
    # Execute search
    search_results = await agent.search_tool(state["current_task"])
    state["tools_output"].append(search_results)
    
    state["messages"].append("Research completed")
    return state

async def analyze_node(state: AgentState) -> AgentState:
    """Analysis node - processes gathered information"""
    agent = ResearchAgent()
    
    # Analyze all collected data
    analysis = await agent.analyze_tool({
        "data": state["tools_output"],
        "context": state["context"]
    })
    
    state["tools_output"].append(analysis)
    state["messages"].append("Analysis completed")
    return state

async def synthesize_node(state: AgentState) -> AgentState:
    """Synthesis node - creates final output"""
    agent = ResearchAgent()
    
    # Synthesize all findings
    summary = await agent.summarize_tool(state["messages"])
    
    state["completed_tasks"].append(state["current_task"])
    state["messages"].append(f"Final synthesis: {summary}")
    return state

# Define conditional edges
def should_continue(state: AgentState) -> str:
    """Decide whether to continue or end"""
    if state.get("next_action") == "research":
        return "research"
    elif state.get("next_action") == "gather_more":
        return "research"
    elif state.get("next_action") == "synthesize":
        return "synthesize"
    elif state["current_task"] in state["completed_tasks"]:
        return "end"
    else:
        return "plan"

# Build the Graph
def create_agent_graph():
    """Create the LangGraph workflow"""
    workflow = Graph()
    
    # Add nodes
    workflow.add_node("plan", plan_node)
    workflow.add_node("research", research_node)
    workflow.add_node("analyze", analyze_node)
    workflow.add_node("synthesize", synthesize_node)
    
    # Add edges
    workflow.add_edge("plan", should_continue)
    workflow.add_edge("research", "analyze")
    workflow.add_edge("analyze", "plan")
    workflow.add_edge("synthesize", END)
    
    # Set entry point
    workflow.set_entry_point("plan")
    
    return workflow.compile()

# Usage Example
async def run_langgraph_agent():
    # Initialize state
    initial_state = AgentState(
        messages=[],
        current_task="Research AI agent architectures",
        completed_tasks=[],
        context={"domain": "AI", "depth": "detailed"},
        next_action=None,
        tools_output=[]
    )
    
    # Create and run graph
    app = create_agent_graph()
    result = await app.ainvoke(initial_state)
    
    print("Agent execution completed:")
    for msg in result["messages"]:
        print(f"  - {msg}")
    
    return result

# Run the agent
# asyncio.run(run_langgraph_agent())

Model Context Protocol (MCP)

Standardized Tool Integration

Building agents with standardized tool interfaces for seamless integration across different AI systems.

MCP Agent Implementation

# Model Context Protocol (MCP) Implementation
from typing import Dict, List, Any, Optional, Protocol
from dataclasses import dataclass
from enum import Enum
import json
from abc import ABC, abstractmethod

class ToolType(Enum):
    FUNCTION = "function"
    RETRIEVAL = "retrieval"
    CODE_EXECUTION = "code_execution"
    WEB_BROWSER = "web_browser"

@dataclass
class ToolSchema:
    name: str
    description: str
    parameters: Dict[str, Any]
    returns: Dict[str, Any]
    tool_type: ToolType

class MCPTool(Protocol):
    """Protocol for MCP-compliant tools"""
    
    def get_schema(self) -> ToolSchema:
        """Return tool schema"""
        ...
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Execute tool with parameters"""
        ...

class DatabaseTool(MCPTool):
    """MCP-compliant database tool"""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
    
    def get_schema(self) -> ToolSchema:
        return ToolSchema(
            name="database_query",
            description="Execute SQL queries on the database",
            parameters={
                "query": {"type": "string", "description": "SQL query to execute"},
                "params": {"type": "array", "description": "Query parameters"}
            },
            returns={
                "results": {"type": "array"},
                "row_count": {"type": "integer"}
            },
            tool_type=ToolType.FUNCTION
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Execute database query"""
        query = params.get("query")
        query_params = params.get("params", [])
        
        # Simulate database execution
        results = [
            {"id": 1, "name": "Item 1", "value": 100},
            {"id": 2, "name": "Item 2", "value": 200}
        ]
        
        return {
            "results": results,
            "row_count": len(results)
        }

class CodeExecutionTool(MCPTool):
    """MCP-compliant code execution tool"""
    
    def get_schema(self) -> ToolSchema:
        return ToolSchema(
            name="execute_code",
            description="Execute Python code in sandboxed environment",
            parameters={
                "code": {"type": "string", "description": "Python code to execute"},
                "timeout": {"type": "integer", "description": "Execution timeout in seconds"}
            },
            returns={
                "output": {"type": "string"},
                "error": {"type": "string", "nullable": True},
                "execution_time": {"type": "number"}
            },
            tool_type=ToolType.CODE_EXECUTION
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """Execute code in sandbox"""
        code = params.get("code")
        timeout = params.get("timeout", 30)
        
        # Simulate code execution
        try:
            # In real implementation, use subprocess or docker
            output = "Code executed successfully"
            return {
                "output": output,
                "error": None,
                "execution_time": 0.125
            }
        except Exception as e:
            return {
                "output": "",
                "error": str(e),
                "execution_time": 0
            }

class MCPAgent:
    """Agent using Model Context Protocol"""
    
    def __init__(self):
        self.tools: Dict[str, MCPTool] = {}
        self.context: Dict[str, Any] = {}
        self.conversation_history: List[Dict] = []
    
    def register_tool(self, tool: MCPTool):
        """Register an MCP-compliant tool"""
        schema = tool.get_schema()
        self.tools[schema.name] = tool
        print(f"Registered tool: {schema.name}")
    
    def get_available_tools(self) -> List[ToolSchema]:
        """Get schemas of all available tools"""
        return [tool.get_schema() for tool in self.tools.values()]
    
    async def plan_execution(self, task: str) -> List[Dict]:
        """Plan tool executions for task"""
        # In real implementation, use LLM for planning
        plan = []
        
        # Example planning logic
        if "database" in task.lower():
            plan.append({
                "tool": "database_query",
                "params": {
                    "query": "SELECT * FROM relevant_table",
                    "params": []
                }
            })
        
        if "analyze" in task.lower():
            plan.append({
                "tool": "execute_code",
                "params": {
                    "code": "# Analysis code here\nresult = analyze_data()",
                    "timeout": 10
                }
            })
        
        return plan
    
    async def execute_plan(self, plan: List[Dict]) -> List[Dict]:
        """Execute planned tool calls"""
        results = []
        
        for step in plan:
            tool_name = step["tool"]
            params = step["params"]
            
            if tool_name in self.tools:
                tool = self.tools[tool_name]
                result = await tool.execute(params)
                results.append({
                    "tool": tool_name,
                    "params": params,
                    "result": result
                })
            else:
                results.append({
                    "tool": tool_name,
                    "error": f"Tool {tool_name} not found"
                })
        
        return results
    
    async def process_task(self, task: str) -> Dict:
        """Process a task using available tools"""
        # Add to conversation history
        self.conversation_history.append({
            "role": "user",
            "content": task
        })
        
        # Plan execution
        plan = await self.plan_execution(task)
        
        # Execute plan
        results = await self.execute_plan(plan)
        
        # Generate response
        response = {
            "task": task,
            "plan": plan,
            "results": results,
            "summary": self._generate_summary(results)
        }
        
        # Add to history
        self.conversation_history.append({
            "role": "assistant",
            "content": response
        })
        
        return response
    
    def _generate_summary(self, results: List[Dict]) -> str:
        """Generate summary of execution results"""
        summary_parts = []
        for result in results:
            if "error" in result:
                summary_parts.append(f"Error in {result['tool']}: {result['error']}")
            else:
                summary_parts.append(f"Successfully executed {result['tool']}")
        
        return " | ".join(summary_parts)

# Usage Example
async def mcp_agent_demo():
    # Create MCP agent
    agent = MCPAgent()
    
    # Register tools
    agent.register_tool(DatabaseTool("postgresql://localhost/db"))
    agent.register_tool(CodeExecutionTool())
    
    # Process tasks
    tasks = [
        "Query the database for recent transactions",
        "Analyze the transaction data and generate insights"
    ]
    
    for task in tasks:
        print(f"\nProcessing: {task}")
        result = await agent.process_task(task)
        print(f"Summary: {result['summary']}")
        
        for execution in result['results']:
            print(f"  Tool: {execution.get('tool')}")
            if 'result' in execution:
                print(f"  Result: {execution['result']}")

# Run the demo
# asyncio.run(mcp_agent_demo())

DSPy Framework

Declarative Self-Improving Programs

Building agents that automatically optimize their prompts and behaviors through compilation and bootstrapping.

DSPy Agent Implementation

# DSPy Framework Implementation
import dspy
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass

# Configure DSPy with LLM
turbo = dspy.OpenAI(model='gpt-3.5-turbo', max_tokens=1000)
dspy.settings.configure(lm=turbo)

# Define Signatures (Input/Output Specifications)
class QuestionAnswer(dspy.Signature):
    """Answer questions with reasoning."""
    question = dspy.InputField(desc="question to answer")
    answer = dspy.OutputField(desc="detailed answer with reasoning")

class FactCheck(dspy.Signature):
    """Verify facts in a statement."""
    statement = dspy.InputField(desc="statement to verify")
    facts = dspy.OutputField(desc="list of facts")
    accuracy = dspy.OutputField(desc="accuracy assessment")

class Summarize(dspy.Signature):
    """Summarize long text concisely."""
    document = dspy.InputField(desc="document to summarize")
    summary = dspy.OutputField(desc="concise summary")

# Define DSPy Modules
class ChainOfThought(dspy.Module):
    """Chain of Thought reasoning module"""
    
    def __init__(self):
        super().__init__()
        self.generate_reasoning = dspy.ChainOfThought(QuestionAnswer)
    
    def forward(self, question):
        return self.generate_reasoning(question=question)

class MultiHopQA(dspy.Module):
    """Multi-hop question answering module"""
    
    def __init__(self, num_hops=3):
        super().__init__()
        self.num_hops = num_hops
        self.retrieve = dspy.Retrieve(k=5)
        self.generate_query = dspy.ChainOfThought("context, question -> query")
        self.generate_answer = dspy.ChainOfThought(QuestionAnswer)
    
    def forward(self, question):
        context = []
        
        for hop in range(self.num_hops):
            # Generate search query
            query = self.generate_query(
                context=context, 
                question=question
            ).query
            
            # Retrieve relevant passages
            passages = self.retrieve(query).passages
            context.extend(passages)
        
        # Generate final answer
        return self.generate_answer(
            question=question,
            context=context
        )

class ReAct(dspy.Module):
    """ReAct (Reasoning + Acting) agent module"""
    
    def __init__(self, tools: Dict[str, callable]):
        super().__init__()
        self.tools = tools
        self.max_iterations = 5
        
        # Define signatures for ReAct
        self.think = dspy.ChainOfThought("question, observations -> thought")
        self.act = dspy.ChainOfThought("thought -> action, action_input")
        self.reflect = dspy.ChainOfThought("observations -> answer")
    
    def forward(self, question):
        observations = []
        
        for i in range(self.max_iterations):
            # Think
            thought = self.think(
                question=question,
                observations=observations
            ).thought
            
            # Act
            action_output = self.act(thought=thought)
            action = action_output.action
            action_input = action_output.action_input
            
            # Execute action
            if action in self.tools:
                result = self.tools[action](action_input)
                observations.append(f"{action}({action_input}) = {result}")
            elif action == "Finish":
                break
            
        # Reflect and generate final answer
        return self.reflect(observations=observations)

# Define Optimizers
class BootstrapFewShot(dspy.Module):
    """Bootstrap few-shot examples for better performance"""
    
    def __init__(self, base_module, num_examples=3):
        super().__init__()
        self.base_module = base_module
        self.num_examples = num_examples
        self.examples = []
    
    def compile(self, training_data):
        """Compile module with bootstrapped examples"""
        # Bootstrap examples from training data
        for item in training_data[:self.num_examples]:
            prediction = self.base_module(item.question)
            if self._is_correct(prediction, item.answer):
                self.examples.append({
                    "question": item.question,
                    "answer": prediction.answer
                })
        
        return self
    
    def _is_correct(self, prediction, ground_truth):
        """Check if prediction is correct"""
        # Implement evaluation logic
        return prediction.answer.lower() in ground_truth.lower()
    
    def forward(self, question):
        # Use examples as few-shot demonstrations
        with dspy.context(examples=self.examples):
            return self.base_module(question)

# DSPy Program Compilation
class CompiledProgram(dspy.Module):
    """Compiled DSPy program with optimized prompts"""
    
    def __init__(self):
        super().__init__()
        self.qa = ChainOfThought()
        self.fact_checker = dspy.Predict(FactCheck)
        self.summarizer = dspy.Predict(Summarize)
    
    def forward(self, task_type, input_data):
        if task_type == "qa":
            answer = self.qa(input_data)
            # Fact-check the answer
            facts = self.fact_checker(statement=answer.answer)
            return {
                "answer": answer.answer,
                "facts": facts.facts,
                "accuracy": facts.accuracy
            }
        elif task_type == "summarize":
            return self.summarizer(document=input_data)
        else:
            return {"error": "Unknown task type"}

# Teleprompter for Automatic Prompt Optimization
class PromptOptimizer:
    """Optimize prompts automatically using DSPy teleprompters"""
    
    def __init__(self, module, metric):
        self.module = module
        self.metric = metric
    
    def compile(self, training_data, validation_data):
        """Compile and optimize the module"""
        # Use BootstrapFewShotWithRandomSearch
        teleprompter = dspy.BootstrapFewShotWithRandomSearch(
            metric=self.metric,
            max_bootstrapped_demos=4,
            max_labeled_demos=16,
            num_candidate_programs=10,
            num_threads=4
        )
        
        optimized_module = teleprompter.compile(
            self.module,
            trainset=training_data,
            valset=validation_data
        )
        
        return optimized_module

# Usage Example
def dspy_agent_demo():
    # Create modules
    cot = ChainOfThought()
    react_agent = ReAct(tools={
        "search": lambda q: f"Search results for: {q}",
        "calculate": lambda expr: eval(expr),
        "lookup": lambda term: f"Definition of {term}"
    })
    
    # Example questions
    questions = [
        "What is the capital of France and what is its population?",
        "Calculate the compound interest on $1000 at 5% for 3 years",
        "Explain quantum computing in simple terms"
    ]
    
    print("DSPy Agent Demonstrations:\n")
    
    # Chain of Thought
    print("1. Chain of Thought:")
    for q in questions[:1]:
        result = cot(q)
        print(f"Q: {q}")
        print(f"A: {result.answer}\n")
    
    # ReAct Agent
    print("2. ReAct Agent:")
    result = react_agent("What is 15% of 200?")
    print(f"Result: {result.answer}\n")
    
    # Compiled Program
    print("3. Compiled Program:")
    program = CompiledProgram()
    result = program("qa", "What are the benefits of renewable energy?")
    print(f"Answer: {result['answer']}")
    print(f"Accuracy: {result['accuracy']}\n")

# Run the demo
# dspy_agent_demo()

Agent Platform Comparison

Platform Core Concept Best For Key Features
LangGraph Graph-based workflows Complex multi-step agents State management, cycles, conditional routing
MCP Standardized tools Tool interoperability Protocol-based, cross-platform, typed interfaces
DSPy Declarative optimization Self-improving agents Automatic prompt optimization, compilation
AutoGPT Autonomous execution Long-running tasks Memory, self-prompting, goal decomposition
CrewAI Multi-agent teams Collaborative workflows Role-based agents, delegation, coordination

Agent Development Best Practices

  • Start simple with single-purpose agents before building complex systems
  • Implement robust error handling and fallback mechanisms
  • Use structured outputs and type validation for reliability
  • Monitor agent behavior with comprehensive logging and metrics
  • Implement safety checks and output validation
  • Design with human-in-the-loop capabilities for critical decisions
  • Test agents thoroughly with edge cases and adversarial inputs
  • Version control prompts and agent configurations

Common Challenges

  • Hallucination: Agents generating false or misleading information
  • Infinite Loops: Agents getting stuck in repetitive behaviors
  • Context Limits: Managing state within token constraints
  • Tool Reliability: Handling tool failures and timeouts gracefully
  • Cost Management: Controlling API calls and resource usage