Build and manage scalable AI systems from prototype to production at enterprise scale
Scale from serving hundreds to millions of users without rebuilding your infrastructure.
Optimize infrastructure costs while maintaining performance as you scale.
Maintain high availability and performance under increasing load.
Enter your growth projections to calculate infrastructure requirements...
Company | AI System | Scale | Infrastructure | Key Challenge |
---|---|---|---|---|
OpenAI | ChatGPT | 100M+ users | 10,000+ GPUs | Inference optimization |
Search AI | 8.5B queries/day | Custom TPUs | Sub-second latency | |
Netflix | Recommendations | 230M subscribers | AWS + Edge | Personalization at scale |
Uber | ETA Prediction | 5M+ trips/day | Hybrid cloud | Real-time inference |
Meta | Feed Ranking | 3B+ users | Custom hardware | Global distribution |
Traffic Distribution
Request Management
Edge Caching
Inference Engines
Feature Serving
Redis/Memcached
Raw Storage
Async Processing
Observability
Increase resources of single machine
# AWS EC2 Instance Scaling # From: t3.medium (2 vCPU, 4 GB RAM) # To: p3.8xlarge (32 vCPU, 244 GB RAM, 4 V100 GPUs) # Pros: # - Simple implementation # - No code changes needed # - Good for memory-intensive models # Cons: # - Hardware limits # - Single point of failure # - Expensive at scale
Add more machines to the pool
Deploy models closer to users
80% faster response time
Metric | Description | Target | Measurement |
---|---|---|---|
Throughput | Requests per second | > 1000 RPS | Load testing tools |
Latency | Response time | < 100ms P99 | APM tools |
Availability | Uptime percentage | 99.9% | Monitoring systems |
Utilization | Resource usage | 60-80% | Cloud metrics |
Error Rate | Failed requests | < 0.1% | Log analysis |
Select deployment and workload types to see recommendations...
Frequently accessed data
SSD, Redis, DynamoDB
Occasional access
HDD, RDS, MongoDB
Archive & backup
S3 Glacier, Tape
import torch import redis from fastapi import FastAPI from pydantic import BaseModel import asyncio import numpy as np class ScalableModelServer: def __init__(self, model_path, cache_ttl=3600): self.model = self.load_model(model_path) self.cache = redis.Redis(host='localhost', port=6379) self.cache_ttl = cache_ttl self.request_queue = asyncio.Queue() self.batch_size = 32 self.batch_timeout = 0.1 # 100ms def load_model(self, path): """Load model with optimization""" model = torch.jit.load(path) model.eval() # Optimization techniques if torch.cuda.is_available(): model = model.cuda() model = torch.jit.optimize_for_inference(model) return model async def batch_inference_worker(self): """Process requests in batches for efficiency""" while True: batch = [] futures = [] # Collect batch try: # Wait for first request req, future = await self.request_queue.get() batch.append(req) futures.append(future) # Collect more requests up to batch_size deadline = asyncio.get_event_loop().time() + self.batch_timeout while len(batch) < self.batch_size: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: break try: req, future = await asyncio.wait_for( self.request_queue.get(), timeout=remaining ) batch.append(req) futures.append(future) except asyncio.TimeoutError: break except Exception as e: continue # Process batch if batch: try: results = await self.process_batch(batch) for future, result in zip(futures, results): future.set_result(result) except Exception as e: for future in futures: future.set_exception(e) async def process_batch(self, batch): """Process a batch of requests""" # Check cache for any cached results cache_keys = [self.get_cache_key(req) for req in batch] cached_results = self.cache.mget(cache_keys) # Separate cached and uncached requests uncached_indices = [] uncached_requests = [] results = [None] * len(batch) for i, (cached, req) in enumerate(zip(cached_results, batch)): if cached: results[i] = self.deserialize(cached) else: uncached_indices.append(i) uncached_requests.append(req) # Process uncached requests if uncached_requests: # Stack inputs for batch processing batch_input = self.prepare_batch_input(uncached_requests) # Run inference with torch.no_grad(): if torch.cuda.is_available(): batch_input = batch_input.cuda() batch_output = self.model(batch_input) batch_results = self.postprocess(batch_output) # Store results and cache for idx, req_idx in enumerate(uncached_indices): result = batch_results[idx] results[req_idx] = result # Cache result cache_key = cache_keys[req_idx] self.cache.setex( cache_key, self.cache_ttl, self.serialize(result) ) return results async def predict(self, request): """Async prediction with batching""" future = asyncio.Future() await self.request_queue.put((request, future)) return await future def get_cache_key(self, request): """Generate cache key for request""" return f"model:{hash(str(request))}" def horizontal_scale(self, num_replicas): """Scale out with multiple model replicas""" replicas = [] for i in range(num_replicas): replica = self.model.clone() if torch.cuda.is_available() and i < torch.cuda.device_count(): replica = replica.to(f'cuda:{i}') replicas.append(replica) return replicas # FastAPI application app = FastAPI() model_server = ScalableModelServer("model.pt") class PredictionRequest(BaseModel): data: list @app.post("/predict") async def predict(request: PredictionRequest): result = await model_server.predict(request.data) return {"prediction": result} # Start batch processing worker @app.on_event("startup") async def startup(): asyncio.create_task(model_server.batch_inference_worker()) # Health check for load balancer @app.get("/health") async def health(): return {"status": "healthy"}
Select a strategy and simulate traffic to see load distribution...
Strategy | Use Case | Implementation | Hit Rate |
---|---|---|---|
Result Caching | Repeated predictions | Redis/Memcached | 60-80% |
Feature Caching | Expensive features | Feature Store | 40-60% |
Model Caching | Multiple versions | Memory/Disk | 90%+ |
Edge Caching | Geographic distribution | CDN | 70-90% |
Scale based on current metrics
# Kubernetes HPA apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: model-server-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: model-server minReplicas: 2 maxReplicas: 100 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 - type: Pods pods: metric: name: request_latency target: type: AverageValue averageValue: "100m"
Scale based on predicted demand
Scale based on known patterns
Primary Region
Secondary Region
Edge Region
↕️ Cross-Region Replication ↕️
Enter your requirements to calculate infrastructure needs...
Configure your infrastructure to see cost optimization opportunities...
import asyncio import aiohttp import time import numpy as np from dataclasses import dataclass from typing import List import matplotlib.pyplot as plt @dataclass class LoadTestResult: total_requests: int successful_requests: int failed_requests: int average_latency: float p50_latency: float p95_latency: float p99_latency: float throughput: float error_rate: float class LoadTester: def __init__(self, endpoint: str, timeout: int = 30): self.endpoint = endpoint self.timeout = timeout self.latencies = [] self.errors = [] async def send_request(self, session: aiohttp.ClientSession, data: dict): """Send single request and measure latency""" start = time.time() try: async with session.post( self.endpoint, json=data, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as response: await response.json() latency = (time.time() - start) * 1000 # ms self.latencies.append(latency) return True except Exception as e: self.errors.append(str(e)) return False async def run_load_test( self, concurrent_users: int, requests_per_user: int, ramp_up_time: int = 0 ): """Run load test with specified parameters""" print(f"Starting load test: {concurrent_users} users, {requests_per_user} requests each") # Reset metrics self.latencies = [] self.errors = [] # Create sample data sample_data = {"data": [1.0] * 100} # Example input # Ramp up users gradually users_per_second = concurrent_users / ramp_up_time if ramp_up_time > 0 else concurrent_users start_time = time.time() async with aiohttp.ClientSession() as session: tasks = [] for user in range(concurrent_users): # Add delay for ramp-up if ramp_up_time > 0: await asyncio.sleep(user / users_per_second) # Create tasks for this user for _ in range(requests_per_user): task = self.send_request(session, sample_data) tasks.append(task) # Execute all requests results = await asyncio.gather(*tasks) total_time = time.time() - start_time # Calculate metrics return self.calculate_metrics(results, total_time) def calculate_metrics(self, results: List[bool], total_time: float) -> LoadTestResult: """Calculate performance metrics""" total_requests = len(results) successful_requests = sum(results) failed_requests = total_requests - successful_requests if self.latencies: latencies_array = np.array(self.latencies) metrics = LoadTestResult( total_requests=total_requests, successful_requests=successful_requests, failed_requests=failed_requests, average_latency=np.mean(latencies_array), p50_latency=np.percentile(latencies_array, 50), p95_latency=np.percentile(latencies_array, 95), p99_latency=np.percentile(latencies_array, 99), throughput=total_requests / total_time, error_rate=failed_requests / total_requests * 100 ) else: metrics = LoadTestResult( total_requests=total_requests, successful_requests=0, failed_requests=total_requests, average_latency=0, p50_latency=0, p95_latency=0, p99_latency=0, throughput=0, error_rate=100 ) return metrics def plot_results(self): """Visualize latency distribution""" if self.latencies: plt.figure(figsize=(12, 4)) # Latency histogram plt.subplot(1, 3, 1) plt.hist(self.latencies, bins=50, edgecolor='black') plt.xlabel('Latency (ms)') plt.ylabel('Frequency') plt.title('Latency Distribution') # Latency over time plt.subplot(1, 3, 2) plt.plot(self.latencies) plt.xlabel('Request Number') plt.ylabel('Latency (ms)') plt.title('Latency Over Time') # Percentiles plt.subplot(1, 3, 3) percentiles = [50, 75, 90, 95, 99] values = [np.percentile(self.latencies, p) for p in percentiles] plt.bar([f'P{p}' for p in percentiles], values) plt.ylabel('Latency (ms)') plt.title('Latency Percentiles') plt.tight_layout() plt.show() async def find_breaking_point(self, max_users: int = 1000, step: int = 50): """Find system breaking point""" print("Finding system breaking point...") for users in range(step, max_users + 1, step): result = await self.run_load_test(users, 10, ramp_up_time=5) print(f"Users: {users}, Throughput: {result.throughput:.2f} RPS, " f"P99 Latency: {result.p99_latency:.2f}ms, " f"Error Rate: {result.error_rate:.2f}%") # Stop if error rate exceeds threshold or latency is too high if result.error_rate > 5 or result.p99_latency > 1000: print(f"Breaking point found at {users} concurrent users") return users return max_users # Usage async def main(): tester = LoadTester("http://localhost:8000/predict") # Run standard load test result = await tester.run_load_test( concurrent_users=100, requests_per_user=50, ramp_up_time=10 ) print(f"\nLoad Test Results:") print(f" Total Requests: {result.total_requests}") print(f" Successful: {result.successful_requests}") print(f" Failed: {result.failed_requests}") print(f" Average Latency: {result.average_latency:.2f}ms") print(f" P50 Latency: {result.p50_latency:.2f}ms") print(f" P95 Latency: {result.p95_latency:.2f}ms") print(f" P99 Latency: {result.p99_latency:.2f}ms") print(f" Throughput: {result.throughput:.2f} RPS") print(f" Error Rate: {result.error_rate:.2f}%") # Visualize results tester.plot_results() # Find breaking point breaking_point = await tester.find_breaking_point() if __name__ == "__main__": asyncio.run(main())
Enter system metrics to identify bottlenecks...
Scenario | Symptoms | Solution | Implementation |
---|---|---|---|
High Latency | P99 > 500ms | Add model replicas | Horizontal scaling |
Memory Pressure | OOM errors | Larger instances | Vertical scaling |
GPU Underutilization | < 50% GPU usage | Batch processing | Request batching |
Network Bottleneck | High network latency | Edge deployment | CDN/Edge servers |
Cost Overrun | Budget exceeded | Spot instances | Mixed instance types |
import torch import torch.nn as nn from torch.quantization import quantize_dynamic import tensorrt as trt import onnx import onnxruntime as ort class ModelOptimizer: def __init__(self, model, optimization_level='O2'): self.model = model self.optimization_level = optimization_level def quantize_model(self, calibration_data=None): """Quantize model to INT8 for faster inference""" if calibration_data: # Post-training quantization with calibration model_int8 = torch.quantization.quantize_dynamic( self.model, {nn.Linear, nn.Conv2d}, dtype=torch.qint8 ) else: # Dynamic quantization model_int8 = quantize_dynamic( self.model, {nn.Linear}, dtype=torch.qint8 ) # Measure speedup self.benchmark_speedup(self.model, model_int8) return model_int8 def export_to_onnx(self, dummy_input, output_path="model.onnx"): """Export to ONNX for cross-platform deployment""" torch.onnx.export( self.model, dummy_input, output_path, export_params=True, opset_version=11, do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes={ 'input': {0: 'batch_size'}, 'output': {0: 'batch_size'} } ) # Optimize ONNX model import onnx from onnxruntime.transformers import optimizer model = onnx.load(output_path) optimized_model = optimizer.optimize_model( model, model_type='bert', # or your model type num_heads=12, hidden_size=768 ) onnx.save(optimized_model, output_path.replace('.onnx', '_optimized.onnx')) return output_path def tensorrt_optimization(self, onnx_path): """Optimize with TensorRT for NVIDIA GPUs""" logger = trt.Logger(trt.Logger.WARNING) builder = trt.Builder(logger) network = builder.create_network( 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) ) parser = trt.OnnxParser(network, logger) # Parse ONNX model with open(onnx_path, 'rb') as model: parser.parse(model.read()) # Configure builder config = builder.create_builder_config() config.max_workspace_size = 1 << 30 # 1GB # Enable FP16 mode config.set_flag(trt.BuilderFlag.FP16) # Enable INT8 mode with calibration config.set_flag(trt.BuilderFlag.INT8) config.int8_calibrator = self.create_calibrator() # Build engine engine = builder.build_engine(network, config) return engine def model_pruning(self, sparsity=0.5): """Prune model weights for smaller size""" import torch.nn.utils.prune as prune for name, module in self.model.named_modules(): if isinstance(module, (nn.Linear, nn.Conv2d)): prune.l1_unstructured( module, name='weight', amount=sparsity ) # Fine-tune after pruning self.fine_tune_pruned_model() # Remove pruning reparameterization for name, module in self.model.named_modules(): if isinstance(module, (nn.Linear, nn.Conv2d)): prune.remove(module, 'weight') return self.model def knowledge_distillation(self, student_model, temperature=3.0): """Train smaller student model from teacher""" criterion_kd = nn.KLDivLoss(reduction='batchmean') criterion_ce = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(student_model.parameters()) for epoch in range(num_epochs): for data, target in train_loader: # Teacher predictions with torch.no_grad(): teacher_output = self.model(data) # Student predictions student_output = student_model(data) # Distillation loss loss_kd = criterion_kd( F.log_softmax(student_output / temperature, dim=1), F.softmax(teacher_output / temperature, dim=1) ) * (temperature ** 2) # Student loss loss_ce = criterion_ce(student_output, target) # Combined loss loss = 0.9 * loss_kd + 0.1 * loss_ce optimizer.zero_grad() loss.backward() optimizer.step() return student_model def benchmark_speedup(self, original_model, optimized_model, num_runs=100): """Benchmark inference speedup""" import time dummy_input = torch.randn(1, 3, 224, 224) # Warmup for _ in range(10): _ = original_model(dummy_input) _ = optimized_model(dummy_input) # Original model timing start = time.time() for _ in range(num_runs): _ = original_model(dummy_input) original_time = time.time() - start # Optimized model timing start = time.time() for _ in range(num_runs): _ = optimized_model(dummy_input) optimized_time = time.time() - start speedup = original_time / optimized_time print(f"Speedup: {speedup:.2f}x") print(f"Original: {original_time:.3f}s, Optimized: {optimized_time:.3f}s") return speedup # Multi-GPU Distributed Training class DistributedTraining: def __init__(self, model, world_size): self.model = model self.world_size = world_size def setup(self, rank): """Setup distributed training""" import torch.distributed as dist dist.init_process_group( backend='nccl', init_method='env://', world_size=self.world_size, rank=rank ) # Move model to GPU self.model = self.model.to(rank) # Wrap with DDP self.model = nn.parallel.DistributedDataParallel( self.model, device_ids=[rank], output_device=rank, find_unused_parameters=True ) def train(self, rank, train_dataset): """Distributed training loop""" from torch.utils.data.distributed import DistributedSampler # Setup distributed sampler train_sampler = DistributedSampler( train_dataset, num_replicas=self.world_size, rank=rank ) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=64, sampler=train_sampler, num_workers=4, pin_memory=True ) optimizer = torch.optim.Adam(self.model.parameters()) for epoch in range(num_epochs): train_sampler.set_epoch(epoch) for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(rank), target.to(rank) optimizer.zero_grad() output = self.model(data) loss = F.cross_entropy(output, target) loss.backward() optimizer.step() if batch_idx % 100 == 0 and rank == 0: print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')
Device | Performance | Power |
---|---|---|
NVIDIA Jetson | 472 GFLOPS | 10W |
Google Coral | 4 TOPS | 2W |
Intel NCS2 | 1 TOPS | 1W |
# Terraform configuration for scalable AI infrastructure provider "aws" { region = var.aws_region } # Variables variable "environment" { default = "production" } variable "min_instances" { default = 2 } variable "max_instances" { default = 100 } # VPC and Networking module "vpc" { source = "terraform-aws-modules/vpc/aws" name = "ai-platform-vpc" cidr = "10.0.0.0/16" azs = ["${var.aws_region}a", "${var.aws_region}b", "${var.aws_region}c"] private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"] public_subnets = ["10.0.101.0/24", "10.0.102.0/24", "10.0.103.0/24"] enable_nat_gateway = true enable_vpn_gateway = true enable_dns_hostnames = true } # EKS Cluster for Container Orchestration module "eks" { source = "terraform-aws-modules/eks/aws" cluster_name = "ai-platform-cluster" cluster_version = "1.24" vpc_id = module.vpc.vpc_id subnet_ids = module.vpc.private_subnets # Node groups for different workloads eks_managed_node_groups = { # CPU nodes for API servers cpu_nodes = { desired_capacity = 3 max_capacity = 10 min_capacity = 2 instance_types = ["m5.2xlarge"] k8s_labels = { workload = "api" } } # GPU nodes for model inference gpu_nodes = { desired_capacity = 2 max_capacity = 20 min_capacity = 1 instance_types = ["p3.2xlarge"] k8s_labels = { workload = "inference" gpu = "true" } taints = [ { key = "gpu" value = "true" effect = "NO_SCHEDULE" } ] } # Spot instances for batch processing spot_nodes = { desired_capacity = 5 max_capacity = 50 min_capacity = 0 instance_types = ["m5.large", "m5.xlarge", "m5.2xlarge"] capacity_type = "SPOT" k8s_labels = { workload = "batch" spot = "true" } } } } # Redis for caching resource "aws_elasticache_cluster" "model_cache" { cluster_id = "ai-model-cache" engine = "redis" node_type = "cache.r6g.xlarge" num_cache_nodes = 3 parameter_group_name = "default.redis7" port = 6379 subnet_group_name = aws_elasticache_subnet_group.cache.name security_group_ids = [aws_security_group.cache.id] } # S3 for model storage resource "aws_s3_bucket" "model_store" { bucket = "ai-platform-models-${var.environment}" versioning { enabled = true } lifecycle_rule { enabled = true transition { days = 30 storage_class = "INTELLIGENT_TIERING" } transition { days = 90 storage_class = "GLACIER" } } } # Application Load Balancer resource "aws_lb" "api_lb" { name = "ai-api-lb" internal = false load_balancer_type = "application" security_groups = [aws_security_group.lb.id] subnets = module.vpc.public_subnets enable_deletion_protection = true enable_http2 = true enable_cross_zone_load_balancing = true } # Auto Scaling for ECS/EKS resource "aws_autoscaling_policy" "scale_up" { name = "ai-scale-up" scaling_adjustment = 2 adjustment_type = "ChangeInCapacity" cooldown = 300 autoscaling_group_name = module.eks.node_groups.gpu_nodes.asg_name } resource "aws_autoscaling_policy" "scale_down" { name = "ai-scale-down" scaling_adjustment = -1 adjustment_type = "ChangeInCapacity" cooldown = 300 autoscaling_group_name = module.eks.node_groups.gpu_nodes.asg_name } # CloudWatch Alarms resource "aws_cloudwatch_metric_alarm" "high_cpu" { alarm_name = "ai-high-cpu" comparison_operator = "GreaterThanThreshold" evaluation_periods = "2" metric_name = "CPUUtilization" namespace = "AWS/EKS" period = "120" statistic = "Average" threshold = "75" alarm_description = "This metric monitors cpu utilization" alarm_actions = [aws_autoscaling_policy.scale_up.arn] } # Output important values output "cluster_endpoint" { value = module.eks.cluster_endpoint } output "load_balancer_dns" { value = aws_lb.api_lb.dns_name } output "model_bucket" { value = aws_s3_bucket.model_store.id }
# Throughput Throughput = Requests / Time # Latency Percentiles P99 = 99th percentile response time P95 = 95th percentile response time P50 = Median response time # Capacity Planning Required Instances = (Peak RPS × Latency) / Instance Capacity With Redundancy = Required Instances × Redundancy Factor # Cost Optimization Cost per Request = (Infrastructure Cost / Total Requests) ROI = (Revenue - Costs) / Costs × 100 # Availability Availability = Uptime / (Uptime + Downtime) × 100 9s Availability: 99% = 3.65 days downtime/year 99.9% = 8.76 hours downtime/year 99.99% = 52.56 minutes downtime/year 99.999% = 5.26 minutes downtime/year # Amdahl's Law (Parallel Speedup) Speedup = 1 / ((1 - P) + P/N) Where P = Parallel portion, N = Number of processors # Little's Law L = λ × W Where L = Avg number in system, λ = Arrival rate, W = Avg time in system
Category | Tool | Best For | Scale |
---|---|---|---|
Orchestration | Kubernetes | Container orchestration | Unlimited |
Docker Swarm | Simple deployment | Medium | |
Apache Mesos | Large clusters | Very Large | |
Model Serving | TensorFlow Serving | TensorFlow models | Large |
TorchServe | PyTorch models | Large | |
Triton Inference Server | Multi-framework | Very Large | |
Monitoring | Prometheus | Metrics | Large |
Grafana | Visualization | Any | |
DataDog | Full observability | Enterprise |
Problem: Over-engineering before understanding needs
Solution: Start simple, measure, then optimize
Problem: Moving large datasets is expensive
Solution: Process data where it lives
Problem: One component failure brings down system
Solution: Redundancy and failover mechanisms
• Use spot instances: 70% savings
• Reserved instances: 50% savings
• Right-sizing: 30% savings
• Auto-scaling: 40% savings
• Lifecycle policies
• Compression: 50-80% reduction
• Deduplication
• Tiered storage
• CDN usage: 60% reduction
• Data transfer optimization
• Regional deployment
• Compression