MCP clients enable AI applications to communicate with MCP servers, accessing tools, resources, and prompts. Build powerful integrations that extend your AI's capabilities with external data and services.
Embed MCP client directly into your AI application for seamless communication
Support for stdio, HTTP, and WebSocket transports
Subscribe to server notifications and resource changes
Follow these steps to integrate an MCP client into your application:
Add the MCP client SDK to your project dependencies
Create and configure your MCP client instance
Establish connection using appropriate transport
Query available tools, resources, and prompts
Call tools and access resources as needed
// TypeScript MCP Client Integration import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js'; import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js'; class MCPClientIntegration { private client: Client; private transport: any; constructor() { this.client = new Client({ name: 'my-ai-application', version: '1.0.0', }); } async connectStdio(command: string, args: string[] = []): Promise{ // Connect via stdio transport (subprocess) this.transport = new StdioClientTransport({ command, args, }); await this.client.connect(this.transport); console.log('Connected to MCP server via stdio'); } async connectWebSocket(url: string): Promise { // Connect via WebSocket transport this.transport = new WebSocketClientTransport(new URL(url)); await this.client.connect(this.transport); console.log('Connected to MCP server via WebSocket'); } async discoverCapabilities(): Promise<{ tools: any[], resources: any[], prompts: any[] }> { // List available tools const toolsResponse = await this.client.listTools(); // List available resources const resourcesResponse = await this.client.listResources(); // List available prompts const promptsResponse = await this.client.listPrompts(); return { tools: toolsResponse.tools || [], resources: resourcesResponse.resources || [], prompts: promptsResponse.prompts || [], }; } async callTool(name: string, args: any): Promise { try { const response = await this.client.callTool({ name, arguments: args, }); return response.content; } catch (error) { console.error(`Error calling tool ${name}:`, error); throw error; } } async readResource(uri: string): Promise { try { const response = await this.client.readResource({ uri }); return response.contents; } catch (error) { console.error(`Error reading resource ${uri}:`, error); throw error; } } async getPrompt(name: string, args?: any): Promise { try { const response = await this.client.getPrompt({ name, arguments: args, }); return response.messages; } catch (error) { console.error(`Error getting prompt ${name}:`, error); throw error; } } async subscribeToUpdates(): Promise { // Subscribe to resource updates this.client.on('notification', (notification) => { if (notification.method === 'resources/updated') { console.log('Resources updated:', notification.params); // Handle resource updates } }); // Subscribe to tool updates this.client.on('notification', (notification) => { if (notification.method === 'tools/updated') { console.log('Tools updated:', notification.params); // Handle tool updates } }); } async disconnect(): Promise { await this.client.close(); console.log('Disconnected from MCP server'); } } // Usage example async function main() { const mcpClient = new MCPClientIntegration(); // Connect to server await mcpClient.connectStdio('npx', ['-y', '@your-org/mcp-server']); // Discover capabilities const capabilities = await mcpClient.discoverCapabilities(); console.log('Available tools:', capabilities.tools); // Call a tool const result = await mcpClient.callTool('get_data', { query: 'SELECT * FROM users LIMIT 10', }); console.log('Tool result:', result); // Read a resource const resource = await mcpClient.readResource('db://users/profile'); console.log('Resource data:', resource); // Clean up await mcpClient.disconnect(); } main().catch(console.error);
Build MCP clients in Python for seamless integration with AI frameworks:
Install mcp package and configure Python environment
Implement client wrapper with async support
Manage server connections and reconnection logic
Parse and handle server responses appropriately
# Python MCP Client Implementation import asyncio import json from typing import Any, Dict, List, Optional from contextlib import asynccontextmanager from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from mcp.types import Tool, Resource, TextContent, ImageContent class MCPClient: """MCP Client for AI Application Integration""" def __init__(self, server_command: str, server_args: List[str] = None): self.server_params = StdioServerParameters( command=server_command, args=server_args or [], env=None ) self.session: Optional[ClientSession] = None self.tools: Dict[str, Tool] = {} self.resources: Dict[str, Resource] = {} self.prompts: Dict[str, Any] = {} @asynccontextmanager async def connect(self): """Establish connection to MCP server""" async with stdio_client(self.server_params) as (read, write): async with ClientSession(read, write) as session: self.session = session await self._initialize() try: yield self finally: self.session = None async def _initialize(self): """Initialize client and discover server capabilities""" if not self.session: raise RuntimeError("Client not connected") # Initialize connection await self.session.initialize() # Discover available tools tools_response = await self.session.list_tools() self.tools = {tool.name: tool for tool in tools_response.tools} # Discover available resources resources_response = await self.session.list_resources() self.resources = {res.uri: res for res in resources_response.resources} # Discover available prompts prompts_response = await self.session.list_prompts() self.prompts = {prompt.name: prompt for prompt in prompts_response.prompts} print(f"Discovered {len(self.tools)} tools, " f"{len(self.resources)} resources, " f"{len(self.prompts)} prompts") async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> List[Any]: """Call a tool on the MCP server""" if not self.session: raise RuntimeError("Client not connected") if tool_name not in self.tools: raise ValueError(f"Tool '{tool_name}' not found") result = await self.session.call_tool(tool_name, arguments) # Process content based on type processed_content = [] for content in result.content: if isinstance(content, TextContent): processed_content.append(content.text) elif isinstance(content, ImageContent): processed_content.append({ 'type': 'image', 'data': content.data, 'mimeType': content.mimeType }) else: processed_content.append(content) return processed_content async def read_resource(self, uri: str) -> Any: """Read a resource from the MCP server""" if not self.session: raise RuntimeError("Client not connected") if uri not in self.resources: raise ValueError(f"Resource '{uri}' not found") result = await self.session.read_resource(uri) return result.contents async def get_prompt(self, prompt_name: str, arguments: Dict[str, Any] = None) -> List[Dict]: """Get a prompt template from the server""" if not self.session: raise RuntimeError("Client not connected") if prompt_name not in self.prompts: raise ValueError(f"Prompt '{prompt_name}' not found") result = await self.session.get_prompt( prompt_name, arguments=arguments or {} ) return result.messages async def subscribe_to_resource(self, uri: str, callback): """Subscribe to resource updates""" if not self.session: raise RuntimeError("Client not connected") await self.session.subscribe_resource(uri) # Register callback for notifications self.session.on_notification( 'resources/updated', lambda params: callback(params) if params.get('uri') == uri else None ) def list_capabilities(self) -> Dict[str, List[str]]: """List all available capabilities""" return { 'tools': list(self.tools.keys()), 'resources': list(self.resources.keys()), 'prompts': list(self.prompts.keys()) } # Integration with AI frameworks class AIApplicationWithMCP: """Example AI application using MCP client""" def __init__(self, mcp_server_command: str): self.mcp_client = MCPClient(mcp_server_command) self.context = [] async def process_request(self, user_input: str) -> str: """Process user request with MCP capabilities""" async with self.mcp_client.connect() as client: # Analyze user intent intent = self._analyze_intent(user_input) if intent['type'] == 'tool_call': # Execute tool result = await client.call_tool( intent['tool_name'], intent['arguments'] ) return self._format_tool_result(result) elif intent['type'] == 'resource_query': # Read resource data = await client.read_resource(intent['resource_uri']) return self._format_resource_data(data) elif intent['type'] == 'prompt_request': # Get and execute prompt messages = await client.get_prompt( intent['prompt_name'], intent.get('arguments', {}) ) return self._process_prompt_messages(messages) else: return "I couldn't understand your request." def _analyze_intent(self, user_input: str) -> Dict[str, Any]: """Analyze user input to determine intent""" # Implement intent analysis logic # This is a simplified example if "query" in user_input.lower(): return { 'type': 'tool_call', 'tool_name': 'database_query', 'arguments': {'query': user_input} } elif "file" in user_input.lower(): return { 'type': 'resource_query', 'resource_uri': 'file://documents/data.json' } else: return {'type': 'unknown'} def _format_tool_result(self, result: List[Any]) -> str: """Format tool execution result""" formatted = [] for item in result: if isinstance(item, str): formatted.append(item) elif isinstance(item, dict): formatted.append(json.dumps(item, indent=2)) return '\n'.join(formatted) def _format_resource_data(self, data: Any) -> str: """Format resource data for display""" if isinstance(data, list): return '\n'.join(str(item) for item in data) elif isinstance(data, dict): return json.dumps(data, indent=2) return str(data) def _process_prompt_messages(self, messages: List[Dict]) -> str: """Process prompt messages""" response = [] for msg in messages: role = msg.get('role', 'assistant') content = msg.get('content', '') response.append(f"{role}: {content}") return '\n'.join(response) # Usage example async def main(): # Create AI application with MCP integration app = AIApplicationWithMCP('npx -y @your-org/mcp-server') # Process user requests requests = [ "Query the database for recent transactions", "Read the configuration file", "Generate a report summary" ] for request in requests: print(f"\nUser: {request}") response = await app.process_request(request) print(f"Assistant: {response}") if __name__ == "__main__": asyncio.run(main())
Implement advanced features for production-ready MCP clients:
Manage multiple server connections efficiently
Optimize performance with batched requests
Cache responses for improved performance
Implement automatic retry with exponential backoff
Monitor server health and connection status
// Advanced MCP Client with Production Features import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { LRUCache } from 'lru-cache'; import pRetry from 'p-retry'; class ProductionMCPClient { private clients: Map= new Map(); private cache: LRUCache ; private healthStatus: Map = new Map(); private requestQueue: any[] = []; private batchTimer: NodeJS.Timeout | null = null; constructor(options: { maxConnections?: number; cacheSize?: number; batchDelay?: number; } = {}) { // Initialize cache this.cache = new LRUCache({ max: options.cacheSize || 500, ttl: 1000 * 60 * 5, // 5 minute TTL }); } async addServer(id: string, transport: any): Promise { const client = new Client({ name: `production-client-${id}`, version: '1.0.0', }); await client.connect(transport); this.clients.set(id, client); this.healthStatus.set(id, true); // Start health monitoring this.startHealthMonitoring(id); } private startHealthMonitoring(serverId: string): void { setInterval(async () => { try { const client = this.clients.get(serverId); if (client) { // Ping server to check health await client.listTools(); this.healthStatus.set(serverId, true); } } catch (error) { console.error(`Server ${serverId} health check failed:`, error); this.healthStatus.set(serverId, false); // Attempt reconnection await this.reconnectServer(serverId); } }, 30000); // Check every 30 seconds } private async reconnectServer(serverId: string): Promise { await pRetry(async () => { const client = this.clients.get(serverId); if (client) { // Reconnect logic here console.log(`Reconnecting to server ${serverId}...`); // await client.reconnect(); this.healthStatus.set(serverId, true); } }, { retries: 3, factor: 2, minTimeout: 1000, maxTimeout: 10000, }); } async callToolWithRetry( serverId: string, toolName: string, args: any, options: { useCache?: boolean; retries?: number } = {} ): Promise { // Check cache first if (options.useCache) { const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`; const cached = this.cache.get(cacheKey); if (cached) { console.log('Cache hit for', cacheKey); return cached; } } // Execute with retry logic const result = await pRetry(async () => { const client = this.getHealthyClient(serverId); return await client.callTool({ name: toolName, arguments: args }); }, { retries: options.retries || 3, onFailedAttempt: (error) => { console.log(`Attempt ${error.attemptNumber} failed. Retrying...`); }, }); // Cache result if (options.useCache) { const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`; this.cache.set(cacheKey, result); } return result; } private getHealthyClient(serverId: string): Client { if (!this.healthStatus.get(serverId)) { // Try to find alternative healthy server for (const [id, healthy] of this.healthStatus.entries()) { if (healthy && id !== serverId) { console.log(`Using fallback server ${id}`); return this.clients.get(id)!; } } throw new Error('No healthy servers available'); } const client = this.clients.get(serverId); if (!client) { throw new Error(`Server ${serverId} not found`); } return client; } async batchRequests(requests: Array<{ serverId: string; type: 'tool' | 'resource' | 'prompt'; name: string; args?: any; }>): Promise { return Promise.all( requests.map(async (req) => { const client = this.getHealthyClient(req.serverId); switch (req.type) { case 'tool': return client.callTool({ name: req.name, arguments: req.args }); case 'resource': return client.readResource({ uri: req.name }); case 'prompt': return client.getPrompt({ name: req.name, arguments: req.args }); default: throw new Error(`Unknown request type: ${req.type}`); } }) ); } async queueRequest(request: any): Promise { this.requestQueue.push(request); // Start batch timer if not already running if (!this.batchTimer) { this.batchTimer = setTimeout(() => { this.processBatch(); }, 100); // 100ms batch window } } private async processBatch(): Promise { const batch = [...this.requestQueue]; this.requestQueue = []; this.batchTimer = null; if (batch.length === 0) return; console.log(`Processing batch of ${batch.length} requests`); const results = await this.batchRequests(batch); // Return results to callers batch.forEach((req, index) => { if (req.callback) { req.callback(results[index]); } }); } getHealthStatus(): Map { return new Map(this.healthStatus); } getCacheStats(): any { return { size: this.cache.size, hits: this.cache.hits, misses: this.cache.misses, }; } async cleanup(): Promise { // Clear cache this.cache.clear(); // Disconnect all clients for (const [id, client] of this.clients.entries()) { await client.close(); console.log(`Disconnected from server ${id}`); } this.clients.clear(); this.healthStatus.clear(); } } // Usage with load balancing async function productionExample() { const client = new ProductionMCPClient({ maxConnections: 5, cacheSize: 1000, batchDelay: 100, }); // Add multiple servers for redundancy await client.addServer('primary', primaryTransport); await client.addServer('secondary', secondaryTransport); await client.addServer('tertiary', tertiaryTransport); // Make requests with automatic failover const result = await client.callToolWithRetry( 'primary', 'complex_operation', { data: 'important' }, { useCache: true, retries: 5 } ); // Batch multiple requests const batchResults = await client.batchRequests([ { serverId: 'primary', type: 'tool', name: 'get_data', args: { id: 1 } }, { serverId: 'secondary', type: 'resource', name: 'config://app' }, { serverId: 'tertiary', type: 'prompt', name: 'summarize' }, ]); // Check health status const health = client.getHealthStatus(); console.log('Server health:', health); // Get cache statistics const cacheStats = client.getCacheStats(); console.log('Cache stats:', cacheStats); // Cleanup await client.cleanup(); }
Ready-to-use templates for various AI frameworks:
Transport Type | Use Case | Performance | Complexity |
---|---|---|---|
stdio | Local subprocess communication | High | Low |
HTTP/REST | Remote server communication | Medium | Low |
WebSocket | Real-time bidirectional communication | High | Medium |
gRPC | High-performance RPC | Very High | High |
IPC | Inter-process communication | Very High | Medium |