Understanding MCP Client Architecture

MCP clients enable AI applications to communicate with MCP servers, accessing tools, resources, and prompts. Build powerful integrations that extend your AI's capabilities with external data and services.

🎯 Direct Integration

Embed MCP client directly into your AI application for seamless communication

🔗 Protocol Support

Support for stdio, HTTP, and WebSocket transports

📊 Real-time Updates

Subscribe to server notifications and resource changes

🚀
Quick Start Integration

Follow these steps to integrate an MCP client into your application:

1
Install MCP SDK

Add the MCP client SDK to your project dependencies

2
Initialize Client

Create and configure your MCP client instance

3
Connect to Server

Establish connection using appropriate transport

4
Discover Capabilities

Query available tools, resources, and prompts

5
Execute Operations

Call tools and access resources as needed

// TypeScript MCP Client Integration
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js';

class MCPClientIntegration {
  private client: Client;
  private transport: any;

  constructor() {
    this.client = new Client({
      name: 'my-ai-application',
      version: '1.0.0',
    });
  }

  async connectStdio(command: string, args: string[] = []): Promise {
    // Connect via stdio transport (subprocess)
    this.transport = new StdioClientTransport({
      command,
      args,
    });
    
    await this.client.connect(this.transport);
    console.log('Connected to MCP server via stdio');
  }

  async connectWebSocket(url: string): Promise {
    // Connect via WebSocket transport
    this.transport = new WebSocketClientTransport(new URL(url));
    
    await this.client.connect(this.transport);
    console.log('Connected to MCP server via WebSocket');
  }

  async discoverCapabilities(): Promise<{
    tools: any[],
    resources: any[],
    prompts: any[]
  }> {
    // List available tools
    const toolsResponse = await this.client.listTools();
    
    // List available resources
    const resourcesResponse = await this.client.listResources();
    
    // List available prompts
    const promptsResponse = await this.client.listPrompts();
    
    return {
      tools: toolsResponse.tools || [],
      resources: resourcesResponse.resources || [],
      prompts: promptsResponse.prompts || [],
    };
  }

  async callTool(name: string, args: any): Promise {
    try {
      const response = await this.client.callTool({
        name,
        arguments: args,
      });
      
      return response.content;
    } catch (error) {
      console.error(`Error calling tool ${name}:`, error);
      throw error;
    }
  }

  async readResource(uri: string): Promise {
    try {
      const response = await this.client.readResource({ uri });
      return response.contents;
    } catch (error) {
      console.error(`Error reading resource ${uri}:`, error);
      throw error;
    }
  }

  async getPrompt(name: string, args?: any): Promise {
    try {
      const response = await this.client.getPrompt({
        name,
        arguments: args,
      });
      
      return response.messages;
    } catch (error) {
      console.error(`Error getting prompt ${name}:`, error);
      throw error;
    }
  }

  async subscribeToUpdates(): Promise {
    // Subscribe to resource updates
    this.client.on('notification', (notification) => {
      if (notification.method === 'resources/updated') {
        console.log('Resources updated:', notification.params);
        // Handle resource updates
      }
    });

    // Subscribe to tool updates
    this.client.on('notification', (notification) => {
      if (notification.method === 'tools/updated') {
        console.log('Tools updated:', notification.params);
        // Handle tool updates
      }
    });
  }

  async disconnect(): Promise {
    await this.client.close();
    console.log('Disconnected from MCP server');
  }
}

// Usage example
async function main() {
  const mcpClient = new MCPClientIntegration();
  
  // Connect to server
  await mcpClient.connectStdio('npx', ['-y', '@your-org/mcp-server']);
  
  // Discover capabilities
  const capabilities = await mcpClient.discoverCapabilities();
  console.log('Available tools:', capabilities.tools);
  
  // Call a tool
  const result = await mcpClient.callTool('get_data', {
    query: 'SELECT * FROM users LIMIT 10',
  });
  console.log('Tool result:', result);
  
  // Read a resource
  const resource = await mcpClient.readResource('db://users/profile');
  console.log('Resource data:', resource);
  
  // Clean up
  await mcpClient.disconnect();
}

main().catch(console.error);
                
Security Note: Always validate server responses and implement proper error handling to prevent vulnerabilities in your AI application.
🐍
Python Client Implementation

Build MCP clients in Python for seamless integration with AI frameworks:

1
Setup Environment

Install mcp package and configure Python environment

2
Create Client Class

Implement client wrapper with async support

3
Handle Connections

Manage server connections and reconnection logic

4
Process Responses

Parse and handle server responses appropriately

# Python MCP Client Implementation
import asyncio
import json
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import Tool, Resource, TextContent, ImageContent

class MCPClient:
    """MCP Client for AI Application Integration"""
    
    def __init__(self, server_command: str, server_args: List[str] = None):
        self.server_params = StdioServerParameters(
            command=server_command,
            args=server_args or [],
            env=None
        )
        self.session: Optional[ClientSession] = None
        self.tools: Dict[str, Tool] = {}
        self.resources: Dict[str, Resource] = {}
        self.prompts: Dict[str, Any] = {}
    
    @asynccontextmanager
    async def connect(self):
        """Establish connection to MCP server"""
        async with stdio_client(self.server_params) as (read, write):
            async with ClientSession(read, write) as session:
                self.session = session
                await self._initialize()
                
                try:
                    yield self
                finally:
                    self.session = None
    
    async def _initialize(self):
        """Initialize client and discover server capabilities"""
        if not self.session:
            raise RuntimeError("Client not connected")
        
        # Initialize connection
        await self.session.initialize()
        
        # Discover available tools
        tools_response = await self.session.list_tools()
        self.tools = {tool.name: tool for tool in tools_response.tools}
        
        # Discover available resources
        resources_response = await self.session.list_resources()
        self.resources = {res.uri: res for res in resources_response.resources}
        
        # Discover available prompts
        prompts_response = await self.session.list_prompts()
        self.prompts = {prompt.name: prompt for prompt in prompts_response.prompts}
        
        print(f"Discovered {len(self.tools)} tools, "
              f"{len(self.resources)} resources, "
              f"{len(self.prompts)} prompts")
    
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> List[Any]:
        """Call a tool on the MCP server"""
        if not self.session:
            raise RuntimeError("Client not connected")
        
        if tool_name not in self.tools:
            raise ValueError(f"Tool '{tool_name}' not found")
        
        result = await self.session.call_tool(tool_name, arguments)
        
        # Process content based on type
        processed_content = []
        for content in result.content:
            if isinstance(content, TextContent):
                processed_content.append(content.text)
            elif isinstance(content, ImageContent):
                processed_content.append({
                    'type': 'image',
                    'data': content.data,
                    'mimeType': content.mimeType
                })
            else:
                processed_content.append(content)
        
        return processed_content
    
    async def read_resource(self, uri: str) -> Any:
        """Read a resource from the MCP server"""
        if not self.session:
            raise RuntimeError("Client not connected")
        
        if uri not in self.resources:
            raise ValueError(f"Resource '{uri}' not found")
        
        result = await self.session.read_resource(uri)
        return result.contents
    
    async def get_prompt(self, prompt_name: str, arguments: Dict[str, Any] = None) -> List[Dict]:
        """Get a prompt template from the server"""
        if not self.session:
            raise RuntimeError("Client not connected")
        
        if prompt_name not in self.prompts:
            raise ValueError(f"Prompt '{prompt_name}' not found")
        
        result = await self.session.get_prompt(
            prompt_name,
            arguments=arguments or {}
        )
        
        return result.messages
    
    async def subscribe_to_resource(self, uri: str, callback):
        """Subscribe to resource updates"""
        if not self.session:
            raise RuntimeError("Client not connected")
        
        await self.session.subscribe_resource(uri)
        
        # Register callback for notifications
        self.session.on_notification(
            'resources/updated',
            lambda params: callback(params) if params.get('uri') == uri else None
        )
    
    def list_capabilities(self) -> Dict[str, List[str]]:
        """List all available capabilities"""
        return {
            'tools': list(self.tools.keys()),
            'resources': list(self.resources.keys()),
            'prompts': list(self.prompts.keys())
        }


# Integration with AI frameworks
class AIApplicationWithMCP:
    """Example AI application using MCP client"""
    
    def __init__(self, mcp_server_command: str):
        self.mcp_client = MCPClient(mcp_server_command)
        self.context = []
    
    async def process_request(self, user_input: str) -> str:
        """Process user request with MCP capabilities"""
        async with self.mcp_client.connect() as client:
            # Analyze user intent
            intent = self._analyze_intent(user_input)
            
            if intent['type'] == 'tool_call':
                # Execute tool
                result = await client.call_tool(
                    intent['tool_name'],
                    intent['arguments']
                )
                return self._format_tool_result(result)
            
            elif intent['type'] == 'resource_query':
                # Read resource
                data = await client.read_resource(intent['resource_uri'])
                return self._format_resource_data(data)
            
            elif intent['type'] == 'prompt_request':
                # Get and execute prompt
                messages = await client.get_prompt(
                    intent['prompt_name'],
                    intent.get('arguments', {})
                )
                return self._process_prompt_messages(messages)
            
            else:
                return "I couldn't understand your request."
    
    def _analyze_intent(self, user_input: str) -> Dict[str, Any]:
        """Analyze user input to determine intent"""
        # Implement intent analysis logic
        # This is a simplified example
        if "query" in user_input.lower():
            return {
                'type': 'tool_call',
                'tool_name': 'database_query',
                'arguments': {'query': user_input}
            }
        elif "file" in user_input.lower():
            return {
                'type': 'resource_query',
                'resource_uri': 'file://documents/data.json'
            }
        else:
            return {'type': 'unknown'}
    
    def _format_tool_result(self, result: List[Any]) -> str:
        """Format tool execution result"""
        formatted = []
        for item in result:
            if isinstance(item, str):
                formatted.append(item)
            elif isinstance(item, dict):
                formatted.append(json.dumps(item, indent=2))
        return '\n'.join(formatted)
    
    def _format_resource_data(self, data: Any) -> str:
        """Format resource data for display"""
        if isinstance(data, list):
            return '\n'.join(str(item) for item in data)
        elif isinstance(data, dict):
            return json.dumps(data, indent=2)
        return str(data)
    
    def _process_prompt_messages(self, messages: List[Dict]) -> str:
        """Process prompt messages"""
        response = []
        for msg in messages:
            role = msg.get('role', 'assistant')
            content = msg.get('content', '')
            response.append(f"{role}: {content}")
        return '\n'.join(response)


# Usage example
async def main():
    # Create AI application with MCP integration
    app = AIApplicationWithMCP('npx -y @your-org/mcp-server')
    
    # Process user requests
    requests = [
        "Query the database for recent transactions",
        "Read the configuration file",
        "Generate a report summary"
    ]
    
    for request in requests:
        print(f"\nUser: {request}")
        response = await app.process_request(request)
        print(f"Assistant: {response}")


if __name__ == "__main__":
    asyncio.run(main())
                
⚙️
Advanced Client Features

Implement advanced features for production-ready MCP clients:

1
Connection Pooling

Manage multiple server connections efficiently

2
Request Batching

Optimize performance with batched requests

3
Caching Layer

Cache responses for improved performance

4
Retry Logic

Implement automatic retry with exponential backoff

5
Health Monitoring

Monitor server health and connection status

// Advanced MCP Client with Production Features
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { LRUCache } from 'lru-cache';
import pRetry from 'p-retry';

class ProductionMCPClient {
  private clients: Map = new Map();
  private cache: LRUCache;
  private healthStatus: Map = new Map();
  private requestQueue: any[] = [];
  private batchTimer: NodeJS.Timeout | null = null;

  constructor(options: {
    maxConnections?: number;
    cacheSize?: number;
    batchDelay?: number;
  } = {}) {
    // Initialize cache
    this.cache = new LRUCache({
      max: options.cacheSize || 500,
      ttl: 1000 * 60 * 5, // 5 minute TTL
    });
  }

  async addServer(id: string, transport: any): Promise {
    const client = new Client({
      name: `production-client-${id}`,
      version: '1.0.0',
    });

    await client.connect(transport);
    this.clients.set(id, client);
    this.healthStatus.set(id, true);
    
    // Start health monitoring
    this.startHealthMonitoring(id);
  }

  private startHealthMonitoring(serverId: string): void {
    setInterval(async () => {
      try {
        const client = this.clients.get(serverId);
        if (client) {
          // Ping server to check health
          await client.listTools();
          this.healthStatus.set(serverId, true);
        }
      } catch (error) {
        console.error(`Server ${serverId} health check failed:`, error);
        this.healthStatus.set(serverId, false);
        
        // Attempt reconnection
        await this.reconnectServer(serverId);
      }
    }, 30000); // Check every 30 seconds
  }

  private async reconnectServer(serverId: string): Promise {
    await pRetry(async () => {
      const client = this.clients.get(serverId);
      if (client) {
        // Reconnect logic here
        console.log(`Reconnecting to server ${serverId}...`);
        // await client.reconnect();
        this.healthStatus.set(serverId, true);
      }
    }, {
      retries: 3,
      factor: 2,
      minTimeout: 1000,
      maxTimeout: 10000,
    });
  }

  async callToolWithRetry(
    serverId: string,
    toolName: string,
    args: any,
    options: { useCache?: boolean; retries?: number } = {}
  ): Promise {
    // Check cache first
    if (options.useCache) {
      const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`;
      const cached = this.cache.get(cacheKey);
      if (cached) {
        console.log('Cache hit for', cacheKey);
        return cached;
      }
    }

    // Execute with retry logic
    const result = await pRetry(async () => {
      const client = this.getHealthyClient(serverId);
      return await client.callTool({ name: toolName, arguments: args });
    }, {
      retries: options.retries || 3,
      onFailedAttempt: (error) => {
        console.log(`Attempt ${error.attemptNumber} failed. Retrying...`);
      },
    });

    // Cache result
    if (options.useCache) {
      const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`;
      this.cache.set(cacheKey, result);
    }

    return result;
  }

  private getHealthyClient(serverId: string): Client {
    if (!this.healthStatus.get(serverId)) {
      // Try to find alternative healthy server
      for (const [id, healthy] of this.healthStatus.entries()) {
        if (healthy && id !== serverId) {
          console.log(`Using fallback server ${id}`);
          return this.clients.get(id)!;
        }
      }
      throw new Error('No healthy servers available');
    }

    const client = this.clients.get(serverId);
    if (!client) {
      throw new Error(`Server ${serverId} not found`);
    }
    return client;
  }

  async batchRequests(requests: Array<{
    serverId: string;
    type: 'tool' | 'resource' | 'prompt';
    name: string;
    args?: any;
  }>): Promise {
    return Promise.all(
      requests.map(async (req) => {
        const client = this.getHealthyClient(req.serverId);
        
        switch (req.type) {
          case 'tool':
            return client.callTool({ name: req.name, arguments: req.args });
          case 'resource':
            return client.readResource({ uri: req.name });
          case 'prompt':
            return client.getPrompt({ name: req.name, arguments: req.args });
          default:
            throw new Error(`Unknown request type: ${req.type}`);
        }
      })
    );
  }

  async queueRequest(request: any): Promise {
    this.requestQueue.push(request);
    
    // Start batch timer if not already running
    if (!this.batchTimer) {
      this.batchTimer = setTimeout(() => {
        this.processBatch();
      }, 100); // 100ms batch window
    }
  }

  private async processBatch(): Promise {
    const batch = [...this.requestQueue];
    this.requestQueue = [];
    this.batchTimer = null;

    if (batch.length === 0) return;

    console.log(`Processing batch of ${batch.length} requests`);
    const results = await this.batchRequests(batch);
    
    // Return results to callers
    batch.forEach((req, index) => {
      if (req.callback) {
        req.callback(results[index]);
      }
    });
  }

  getHealthStatus(): Map {
    return new Map(this.healthStatus);
  }

  getCacheStats(): any {
    return {
      size: this.cache.size,
      hits: this.cache.hits,
      misses: this.cache.misses,
    };
  }

  async cleanup(): Promise {
    // Clear cache
    this.cache.clear();
    
    // Disconnect all clients
    for (const [id, client] of this.clients.entries()) {
      await client.close();
      console.log(`Disconnected from server ${id}`);
    }
    
    this.clients.clear();
    this.healthStatus.clear();
  }
}

// Usage with load balancing
async function productionExample() {
  const client = new ProductionMCPClient({
    maxConnections: 5,
    cacheSize: 1000,
    batchDelay: 100,
  });

  // Add multiple servers for redundancy
  await client.addServer('primary', primaryTransport);
  await client.addServer('secondary', secondaryTransport);
  await client.addServer('tertiary', tertiaryTransport);

  // Make requests with automatic failover
  const result = await client.callToolWithRetry(
    'primary',
    'complex_operation',
    { data: 'important' },
    { useCache: true, retries: 5 }
  );

  // Batch multiple requests
  const batchResults = await client.batchRequests([
    { serverId: 'primary', type: 'tool', name: 'get_data', args: { id: 1 } },
    { serverId: 'secondary', type: 'resource', name: 'config://app' },
    { serverId: 'tertiary', type: 'prompt', name: 'summarize' },
  ]);

  // Check health status
  const health = client.getHealthStatus();
  console.log('Server health:', health);

  // Get cache statistics
  const cacheStats = client.getCacheStats();
  console.log('Cache stats:', cacheStats);

  // Cleanup
  await client.cleanup();
}
                
Production Checklist:
  • Implement connection pooling for multiple servers
  • Add request caching with TTL
  • Set up automatic retry with exponential backoff
  • Monitor server health and connection status
  • Implement request batching for performance
  • Add comprehensive error handling
  • Set up logging and monitoring
  • Configure timeouts for all operations
📋 Client Integration Templates

Ready-to-use templates for various AI frameworks:

Transport Options

Transport Type Use Case Performance Complexity
stdio Local subprocess communication High Low
HTTP/REST Remote server communication Medium Low
WebSocket Real-time bidirectional communication High Medium
gRPC High-performance RPC Very High High
IPC Inter-process communication Very High Medium
LangChain
LlamaIndex
AutoGPT
CrewAI
Semantic Kernel
Haystack
Rasa
Botpress