MCP clients enable AI applications to communicate with MCP servers, accessing tools, resources, and prompts. Build powerful integrations that extend your AI's capabilities with external data and services.
Embed MCP client directly into your AI application for seamless communication
Support for stdio, HTTP, and WebSocket transports
Subscribe to server notifications and resource changes
Follow these steps to integrate an MCP client into your application:
Add the MCP client SDK to your project dependencies
Create and configure your MCP client instance
Establish connection using appropriate transport
Query available tools, resources, and prompts
Call tools and access resources as needed
// TypeScript MCP Client Integration
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js';
class MCPClientIntegration {
private client: Client;
private transport: any;
constructor() {
this.client = new Client({
name: 'my-ai-application',
version: '1.0.0',
});
}
async connectStdio(command: string, args: string[] = []): Promise {
// Connect via stdio transport (subprocess)
this.transport = new StdioClientTransport({
command,
args,
});
await this.client.connect(this.transport);
console.log('Connected to MCP server via stdio');
}
async connectWebSocket(url: string): Promise {
// Connect via WebSocket transport
this.transport = new WebSocketClientTransport(new URL(url));
await this.client.connect(this.transport);
console.log('Connected to MCP server via WebSocket');
}
async discoverCapabilities(): Promise<{
tools: any[],
resources: any[],
prompts: any[]
}> {
// List available tools
const toolsResponse = await this.client.listTools();
// List available resources
const resourcesResponse = await this.client.listResources();
// List available prompts
const promptsResponse = await this.client.listPrompts();
return {
tools: toolsResponse.tools || [],
resources: resourcesResponse.resources || [],
prompts: promptsResponse.prompts || [],
};
}
async callTool(name: string, args: any): Promise {
try {
const response = await this.client.callTool({
name,
arguments: args,
});
return response.content;
} catch (error) {
console.error(`Error calling tool ${name}:`, error);
throw error;
}
}
async readResource(uri: string): Promise {
try {
const response = await this.client.readResource({ uri });
return response.contents;
} catch (error) {
console.error(`Error reading resource ${uri}:`, error);
throw error;
}
}
async getPrompt(name: string, args?: any): Promise {
try {
const response = await this.client.getPrompt({
name,
arguments: args,
});
return response.messages;
} catch (error) {
console.error(`Error getting prompt ${name}:`, error);
throw error;
}
}
async subscribeToUpdates(): Promise {
// Subscribe to resource updates
this.client.on('notification', (notification) => {
if (notification.method === 'resources/updated') {
console.log('Resources updated:', notification.params);
// Handle resource updates
}
});
// Subscribe to tool updates
this.client.on('notification', (notification) => {
if (notification.method === 'tools/updated') {
console.log('Tools updated:', notification.params);
// Handle tool updates
}
});
}
async disconnect(): Promise {
await this.client.close();
console.log('Disconnected from MCP server');
}
}
// Usage example
async function main() {
const mcpClient = new MCPClientIntegration();
// Connect to server
await mcpClient.connectStdio('npx', ['-y', '@your-org/mcp-server']);
// Discover capabilities
const capabilities = await mcpClient.discoverCapabilities();
console.log('Available tools:', capabilities.tools);
// Call a tool
const result = await mcpClient.callTool('get_data', {
query: 'SELECT * FROM users LIMIT 10',
});
console.log('Tool result:', result);
// Read a resource
const resource = await mcpClient.readResource('db://users/profile');
console.log('Resource data:', resource);
// Clean up
await mcpClient.disconnect();
}
main().catch(console.error);
Build MCP clients in Python for seamless integration with AI frameworks:
Install mcp package and configure Python environment
Implement client wrapper with async support
Manage server connections and reconnection logic
Parse and handle server responses appropriately
# Python MCP Client Implementation
import asyncio
import json
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import Tool, Resource, TextContent, ImageContent
class MCPClient:
"""MCP Client for AI Application Integration"""
def __init__(self, server_command: str, server_args: List[str] = None):
self.server_params = StdioServerParameters(
command=server_command,
args=server_args or [],
env=None
)
self.session: Optional[ClientSession] = None
self.tools: Dict[str, Tool] = {}
self.resources: Dict[str, Resource] = {}
self.prompts: Dict[str, Any] = {}
@asynccontextmanager
async def connect(self):
"""Establish connection to MCP server"""
async with stdio_client(self.server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
await self._initialize()
try:
yield self
finally:
self.session = None
async def _initialize(self):
"""Initialize client and discover server capabilities"""
if not self.session:
raise RuntimeError("Client not connected")
# Initialize connection
await self.session.initialize()
# Discover available tools
tools_response = await self.session.list_tools()
self.tools = {tool.name: tool for tool in tools_response.tools}
# Discover available resources
resources_response = await self.session.list_resources()
self.resources = {res.uri: res for res in resources_response.resources}
# Discover available prompts
prompts_response = await self.session.list_prompts()
self.prompts = {prompt.name: prompt for prompt in prompts_response.prompts}
print(f"Discovered {len(self.tools)} tools, "
f"{len(self.resources)} resources, "
f"{len(self.prompts)} prompts")
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> List[Any]:
"""Call a tool on the MCP server"""
if not self.session:
raise RuntimeError("Client not connected")
if tool_name not in self.tools:
raise ValueError(f"Tool '{tool_name}' not found")
result = await self.session.call_tool(tool_name, arguments)
# Process content based on type
processed_content = []
for content in result.content:
if isinstance(content, TextContent):
processed_content.append(content.text)
elif isinstance(content, ImageContent):
processed_content.append({
'type': 'image',
'data': content.data,
'mimeType': content.mimeType
})
else:
processed_content.append(content)
return processed_content
async def read_resource(self, uri: str) -> Any:
"""Read a resource from the MCP server"""
if not self.session:
raise RuntimeError("Client not connected")
if uri not in self.resources:
raise ValueError(f"Resource '{uri}' not found")
result = await self.session.read_resource(uri)
return result.contents
async def get_prompt(self, prompt_name: str, arguments: Dict[str, Any] = None) -> List[Dict]:
"""Get a prompt template from the server"""
if not self.session:
raise RuntimeError("Client not connected")
if prompt_name not in self.prompts:
raise ValueError(f"Prompt '{prompt_name}' not found")
result = await self.session.get_prompt(
prompt_name,
arguments=arguments or {}
)
return result.messages
async def subscribe_to_resource(self, uri: str, callback):
"""Subscribe to resource updates"""
if not self.session:
raise RuntimeError("Client not connected")
await self.session.subscribe_resource(uri)
# Register callback for notifications
self.session.on_notification(
'resources/updated',
lambda params: callback(params) if params.get('uri') == uri else None
)
def list_capabilities(self) -> Dict[str, List[str]]:
"""List all available capabilities"""
return {
'tools': list(self.tools.keys()),
'resources': list(self.resources.keys()),
'prompts': list(self.prompts.keys())
}
# Integration with AI frameworks
class AIApplicationWithMCP:
"""Example AI application using MCP client"""
def __init__(self, mcp_server_command: str):
self.mcp_client = MCPClient(mcp_server_command)
self.context = []
async def process_request(self, user_input: str) -> str:
"""Process user request with MCP capabilities"""
async with self.mcp_client.connect() as client:
# Analyze user intent
intent = self._analyze_intent(user_input)
if intent['type'] == 'tool_call':
# Execute tool
result = await client.call_tool(
intent['tool_name'],
intent['arguments']
)
return self._format_tool_result(result)
elif intent['type'] == 'resource_query':
# Read resource
data = await client.read_resource(intent['resource_uri'])
return self._format_resource_data(data)
elif intent['type'] == 'prompt_request':
# Get and execute prompt
messages = await client.get_prompt(
intent['prompt_name'],
intent.get('arguments', {})
)
return self._process_prompt_messages(messages)
else:
return "I couldn't understand your request."
def _analyze_intent(self, user_input: str) -> Dict[str, Any]:
"""Analyze user input to determine intent"""
# Implement intent analysis logic
# This is a simplified example
if "query" in user_input.lower():
return {
'type': 'tool_call',
'tool_name': 'database_query',
'arguments': {'query': user_input}
}
elif "file" in user_input.lower():
return {
'type': 'resource_query',
'resource_uri': 'file://documents/data.json'
}
else:
return {'type': 'unknown'}
def _format_tool_result(self, result: List[Any]) -> str:
"""Format tool execution result"""
formatted = []
for item in result:
if isinstance(item, str):
formatted.append(item)
elif isinstance(item, dict):
formatted.append(json.dumps(item, indent=2))
return '\n'.join(formatted)
def _format_resource_data(self, data: Any) -> str:
"""Format resource data for display"""
if isinstance(data, list):
return '\n'.join(str(item) for item in data)
elif isinstance(data, dict):
return json.dumps(data, indent=2)
return str(data)
def _process_prompt_messages(self, messages: List[Dict]) -> str:
"""Process prompt messages"""
response = []
for msg in messages:
role = msg.get('role', 'assistant')
content = msg.get('content', '')
response.append(f"{role}: {content}")
return '\n'.join(response)
# Usage example
async def main():
# Create AI application with MCP integration
app = AIApplicationWithMCP('npx -y @your-org/mcp-server')
# Process user requests
requests = [
"Query the database for recent transactions",
"Read the configuration file",
"Generate a report summary"
]
for request in requests:
print(f"\nUser: {request}")
response = await app.process_request(request)
print(f"Assistant: {response}")
if __name__ == "__main__":
asyncio.run(main())
Implement advanced features for production-ready MCP clients:
Manage multiple server connections efficiently
Optimize performance with batched requests
Cache responses for improved performance
Implement automatic retry with exponential backoff
Monitor server health and connection status
// Advanced MCP Client with Production Features
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { LRUCache } from 'lru-cache';
import pRetry from 'p-retry';
class ProductionMCPClient {
private clients: Map = new Map();
private cache: LRUCache;
private healthStatus: Map = new Map();
private requestQueue: any[] = [];
private batchTimer: NodeJS.Timeout | null = null;
constructor(options: {
maxConnections?: number;
cacheSize?: number;
batchDelay?: number;
} = {}) {
// Initialize cache
this.cache = new LRUCache({
max: options.cacheSize || 500,
ttl: 1000 * 60 * 5, // 5 minute TTL
});
}
async addServer(id: string, transport: any): Promise {
const client = new Client({
name: `production-client-${id}`,
version: '1.0.0',
});
await client.connect(transport);
this.clients.set(id, client);
this.healthStatus.set(id, true);
// Start health monitoring
this.startHealthMonitoring(id);
}
private startHealthMonitoring(serverId: string): void {
setInterval(async () => {
try {
const client = this.clients.get(serverId);
if (client) {
// Ping server to check health
await client.listTools();
this.healthStatus.set(serverId, true);
}
} catch (error) {
console.error(`Server ${serverId} health check failed:`, error);
this.healthStatus.set(serverId, false);
// Attempt reconnection
await this.reconnectServer(serverId);
}
}, 30000); // Check every 30 seconds
}
private async reconnectServer(serverId: string): Promise {
await pRetry(async () => {
const client = this.clients.get(serverId);
if (client) {
// Reconnect logic here
console.log(`Reconnecting to server ${serverId}...`);
// await client.reconnect();
this.healthStatus.set(serverId, true);
}
}, {
retries: 3,
factor: 2,
minTimeout: 1000,
maxTimeout: 10000,
});
}
async callToolWithRetry(
serverId: string,
toolName: string,
args: any,
options: { useCache?: boolean; retries?: number } = {}
): Promise {
// Check cache first
if (options.useCache) {
const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`;
const cached = this.cache.get(cacheKey);
if (cached) {
console.log('Cache hit for', cacheKey);
return cached;
}
}
// Execute with retry logic
const result = await pRetry(async () => {
const client = this.getHealthyClient(serverId);
return await client.callTool({ name: toolName, arguments: args });
}, {
retries: options.retries || 3,
onFailedAttempt: (error) => {
console.log(`Attempt ${error.attemptNumber} failed. Retrying...`);
},
});
// Cache result
if (options.useCache) {
const cacheKey = `${serverId}:${toolName}:${JSON.stringify(args)}`;
this.cache.set(cacheKey, result);
}
return result;
}
private getHealthyClient(serverId: string): Client {
if (!this.healthStatus.get(serverId)) {
// Try to find alternative healthy server
for (const [id, healthy] of this.healthStatus.entries()) {
if (healthy && id !== serverId) {
console.log(`Using fallback server ${id}`);
return this.clients.get(id)!;
}
}
throw new Error('No healthy servers available');
}
const client = this.clients.get(serverId);
if (!client) {
throw new Error(`Server ${serverId} not found`);
}
return client;
}
async batchRequests(requests: Array<{
serverId: string;
type: 'tool' | 'resource' | 'prompt';
name: string;
args?: any;
}>): Promise {
return Promise.all(
requests.map(async (req) => {
const client = this.getHealthyClient(req.serverId);
switch (req.type) {
case 'tool':
return client.callTool({ name: req.name, arguments: req.args });
case 'resource':
return client.readResource({ uri: req.name });
case 'prompt':
return client.getPrompt({ name: req.name, arguments: req.args });
default:
throw new Error(`Unknown request type: ${req.type}`);
}
})
);
}
async queueRequest(request: any): Promise {
this.requestQueue.push(request);
// Start batch timer if not already running
if (!this.batchTimer) {
this.batchTimer = setTimeout(() => {
this.processBatch();
}, 100); // 100ms batch window
}
}
private async processBatch(): Promise {
const batch = [...this.requestQueue];
this.requestQueue = [];
this.batchTimer = null;
if (batch.length === 0) return;
console.log(`Processing batch of ${batch.length} requests`);
const results = await this.batchRequests(batch);
// Return results to callers
batch.forEach((req, index) => {
if (req.callback) {
req.callback(results[index]);
}
});
}
getHealthStatus(): Map {
return new Map(this.healthStatus);
}
getCacheStats(): any {
return {
size: this.cache.size,
hits: this.cache.hits,
misses: this.cache.misses,
};
}
async cleanup(): Promise {
// Clear cache
this.cache.clear();
// Disconnect all clients
for (const [id, client] of this.clients.entries()) {
await client.close();
console.log(`Disconnected from server ${id}`);
}
this.clients.clear();
this.healthStatus.clear();
}
}
// Usage with load balancing
async function productionExample() {
const client = new ProductionMCPClient({
maxConnections: 5,
cacheSize: 1000,
batchDelay: 100,
});
// Add multiple servers for redundancy
await client.addServer('primary', primaryTransport);
await client.addServer('secondary', secondaryTransport);
await client.addServer('tertiary', tertiaryTransport);
// Make requests with automatic failover
const result = await client.callToolWithRetry(
'primary',
'complex_operation',
{ data: 'important' },
{ useCache: true, retries: 5 }
);
// Batch multiple requests
const batchResults = await client.batchRequests([
{ serverId: 'primary', type: 'tool', name: 'get_data', args: { id: 1 } },
{ serverId: 'secondary', type: 'resource', name: 'config://app' },
{ serverId: 'tertiary', type: 'prompt', name: 'summarize' },
]);
// Check health status
const health = client.getHealthStatus();
console.log('Server health:', health);
// Get cache statistics
const cacheStats = client.getCacheStats();
console.log('Cache stats:', cacheStats);
// Cleanup
await client.cleanup();
}
Ready-to-use templates for various AI frameworks:
| Transport Type | Use Case | Performance | Complexity |
|---|---|---|---|
| stdio | Local subprocess communication | High | Low |
| HTTP/REST | Remote server communication | Medium | Low |
| WebSocket | Real-time bidirectional communication | High | Medium |
| gRPC | High-performance RPC | Very High | High |
| IPC | Inter-process communication | Very High | Medium |