🚀 LLM Deployment

Master the art of deploying Large Language Models in production. Learn infrastructure choices, optimization techniques, scaling strategies, and best practices for serving LLMs efficiently.

🎯 Deployment Fundamentals

📋 What is LLM Deployment?

The process of making trained language models available for production use, handling real-world traffic, and maintaining performance at scale.

# Basic LLM deployment flow from transformers import AutoModelForCausalLM, AutoTokenizer import torch # Load model and tokenizer model_name = "meta-llama/Llama-2-7b-hf" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16, device_map="auto" ) # Inference function def generate_response(prompt, max_length=100): inputs = tokenizer(prompt, return_tensors="pt") outputs = model.generate( inputs.input_ids, max_length=max_length, temperature=0.7, do_sample=True ) return tokenizer.decode(outputs[0], skip_special_tokens=True)

⚡ Key Challenges

Understanding the unique challenges of deploying LLMs compared to traditional ML models.

  • 🔢 Model Size: Multi-GB to TB models requiring specialized hardware
  • 💰 Cost: High computational costs for inference
  • ⏱️ Latency: Real-time response requirements
  • 🔄 Throughput: Handling concurrent requests efficiently
  • 💾 Memory: GPU memory constraints and optimization

🏗️ Deployment Architectures

Common architectural patterns for serving LLMs in production.

# API Server with FastAPI from fastapi import FastAPI, HTTPException from pydantic import BaseModel import asyncio app = FastAPI() class CompletionRequest(BaseModel): prompt: str max_tokens: int = 100 temperature: float = 0.7 class CompletionResponse(BaseModel): text: str tokens_used: int latency_ms: float @app.post("/v1/completions") async def create_completion(request: CompletionRequest): try: start_time = time.time() response = generate_response( request.prompt, request.max_tokens ) latency = (time.time() - start_time) * 1000 return CompletionResponse( text=response, tokens_used=len(tokenizer.encode(response)), latency_ms=latency ) except Exception as e: raise HTTPException(status_code=500, detail=str(e))

📊 Performance Metrics

Essential metrics to monitor when deploying LLMs in production.

Key Performance Indicators
Time to First Token (TTFT) = Processing + Queue Time Tokens Per Second (TPS) = Total Tokens / Generation Time Cost Per Token = (Compute Cost + Memory Cost) / Tokens P95 Latency = 95th percentile response time

💰 Cost Management

Strategies for optimizing deployment costs while maintaining performance.

# Cost optimization configuration deployment_config = { "instance_type": "g5.2xlarge", # Balance cost/performance "spot_instances": True, # Use spot for 70% savings "auto_scaling": { "min_instances": 1, "max_instances": 10, "target_utilization": 0.7 }, "caching": { "enabled": True, "ttl_seconds": 3600 }, "batching": { "max_batch_size": 8, "timeout_ms": 50 } }

🚀 Quick Start Guide

Step-by-step guide to deploy your first LLM to production.

  1. Choose deployment platform (Cloud, On-premise, Edge)
  2. Select appropriate model size and quantization
  3. Set up inference server (vLLM, TGI, etc.)
  4. Configure load balancing and caching
  5. Implement monitoring and alerting
  6. Test performance and optimize

💰 Deployment Cost Calculator

☁️ Infrastructure & Deployment Options

LLM Deployment Stack

Application Layer
Web App
Mobile App
API Clients
API Gateway
Rate Limiting
Authentication
Load Balancing
Inference Server
vLLM
TGI
TensorRT-LLM
Infrastructure
GPU Nodes
Storage
Networking

☁️ Cloud Deployment

Deploy LLMs on major cloud platforms with managed services.

# AWS SageMaker deployment import boto3 from sagemaker.huggingface import HuggingFaceModel # Configure model hub_config = { 'HF_MODEL_ID': 'meta-llama/Llama-2-7b-hf', 'SM_NUM_GPUS': '1', 'MAX_INPUT_LENGTH': '2048', 'MAX_TOTAL_TOKENS': '4096', } # Create model model = HuggingFaceModel( model_data='s3://my-bucket/model.tar.gz', role='arn:aws:iam::account:role/SageMakerRole', transformers_version='4.28', pytorch_version='2.0', py_version='py310', env=hub_config ) # Deploy to endpoint predictor = model.deploy( initial_instance_count=1, instance_type='ml.g5.2xlarge', endpoint_name='llm-endpoint' )

🏢 On-Premise Deployment

Deploy LLMs on your own infrastructure for data privacy and control.

# Docker deployment with NVIDIA Container Toolkit # Dockerfile FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04 # Install Python and dependencies RUN apt-get update && apt-get install -y \ python3.10 python3-pip git # Install ML libraries RUN pip3 install torch transformers accelerate # Copy model files COPY model/ /app/model/ COPY server.py /app/ WORKDIR /app # Run inference server CMD ["python3", "server.py"] # Docker Compose version: '3.8' services: llm-server: build: . runtime: nvidia environment: - NVIDIA_VISIBLE_DEVICES=all ports: - "8000:8000" volumes: - ./models:/app/models

📱 Edge Deployment

Deploy smaller models on edge devices for offline and low-latency use cases.

# ONNX Runtime deployment for edge import onnxruntime as ort import numpy as np # Load quantized model session = ort.InferenceSession( "model_int8.onnx", providers=['CPUExecutionProvider'] ) def edge_inference(input_ids): # Prepare inputs inputs = { 'input_ids': input_ids.astype(np.int64), 'attention_mask': np.ones_like(input_ids) } # Run inference outputs = session.run(None, inputs) return outputs[0] # Mobile deployment with TensorFlow Lite import tensorflow as tf # Convert to TFLite converter = tf.lite.TFLiteConverter.from_saved_model('model') converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.target_spec.supported_types = [tf.float16] tflite_model = converter.convert()

🔧 Inference Servers

Specialized servers optimized for LLM inference with advanced features.

# vLLM server setup from vllm import LLM, SamplingParams # Initialize vLLM engine llm = LLM( model="meta-llama/Llama-2-7b-hf", tensor_parallel_size=2, # Use 2 GPUs max_num_batched_tokens=8192, max_num_seqs=256, trust_remote_code=True ) # Sampling parameters sampling_params = SamplingParams( temperature=0.8, top_p=0.95, max_tokens=100 ) # Batch inference prompts = ["Tell me about AI", "What is ML?"] outputs = llm.generate(prompts, sampling_params) # Text Generation Inference (TGI) $ docker run --gpus all -p 8080:80 \ -v $PWD/models:/data \ ghcr.io/huggingface/text-generation-inference:latest \ --model-id meta-llama/Llama-2-7b-hf \ --max-batch-prefill-tokens 2048 \ --max-batch-total-tokens 8192

🌐 Multi-Region Deployment

Deploy across multiple regions for global availability and redundancy.

# Multi-region configuration regions = { "us-east-1": { "endpoint": "https://llm-us.example.com", "priority": 1, "capacity": 1000 }, "eu-west-1": { "endpoint": "https://llm-eu.example.com", "priority": 2, "capacity": 800 }, "ap-south-1": { "endpoint": "https://llm-ap.example.com", "priority": 3, "capacity": 600 } } def route_request(user_location): # Route to nearest region nearest_region = find_nearest_region(user_location) if is_available(nearest_region): return regions[nearest_region]["endpoint"] # Fallback to next priority return get_fallback_endpoint()

🔒 Secure Deployment

Security best practices for LLM deployment in production.

  • 🔐 API Authentication: JWT tokens, API keys
  • 🛡️ Rate Limiting: Prevent abuse and DoS
  • 🔒 Data Encryption: TLS for transit, AES for storage
  • 📝 Audit Logging: Track all requests and responses
  • 🏥 PII Protection: Redact sensitive information
Deployment Type Pros Cons Best For
Cloud Scalable, Managed, Pay-as-you-go Vendor lock-in, Costs can escalate Variable workloads, Quick start
On-Premise Full control, Data privacy, Fixed costs High upfront cost, Maintenance burden Sensitive data, Compliance requirements
Edge Low latency, Offline capable, Privacy Limited resources, Model size constraints IoT, Mobile apps, Real-time systems
Hybrid Flexibility, Best of both worlds Complex management, Higher overhead Enterprise deployments, Global reach

⚡ Optimization Techniques

🔢 Quantization

Reduce model size and increase inference speed with minimal accuracy loss.

# 8-bit quantization with bitsandbytes from transformers import AutoModelForCausalLM, BitsAndBytesConfig import torch # Configure 8-bit quantization quantization_config = BitsAndBytesConfig( load_in_8bit=True, bnb_8bit_compute_dtype=torch.float16, bnb_8bit_use_double_quant=True, bnb_8bit_quant_type="nf4" ) # Load quantized model model = AutoModelForCausalLM.from_pretrained( "meta-llama/Llama-2-7b-hf", quantization_config=quantization_config, device_map="auto", trust_remote_code=True ) # GPTQ quantization for 4-bit from auto_gptq import AutoGPTQForCausalLM model = AutoGPTQForCausalLM.from_quantized( "TheBloke/Llama-2-7B-GPTQ", use_safetensors=True, trust_remote_code=False, device="cuda:0", quantize_config=None )

💾 KV Cache Optimization

Optimize memory usage and speed up generation with efficient caching.

# PagedAttention with vLLM class KVCacheManager: def __init__(self, max_cache_size_gb=16): self.cache = {} self.max_size = max_cache_size_gb * 1024 ** 3 self.current_size = 0 def get_cached_kv(self, prompt_hash): if prompt_hash in self.cache: # Move to end (LRU) kv = self.cache.pop(prompt_hash) self.cache[prompt_hash] = kv return kv return None def store_kv(self, prompt_hash, kv_states): kv_size = self._calculate_size(kv_states) # Evict if necessary while self.current_size + kv_size > self.max_size: self._evict_oldest() self.cache[prompt_hash] = kv_states self.current_size += kv_size

🎯 Dynamic Batching

Improve throughput by intelligently batching requests together.

# Dynamic batching implementation import asyncio from typing import List, Dict import time class DynamicBatcher: def __init__(self, max_batch_size=8, timeout_ms=50): self.max_batch_size = max_batch_size self.timeout_ms = timeout_ms self.pending_requests = [] self.lock = asyncio.Lock() async def add_request(self, request): async with self.lock: future = asyncio.Future() self.pending_requests.append((request, future)) # Check if batch is ready if len(self.pending_requests) >= self.max_batch_size: await self._process_batch() else: # Schedule timeout asyncio.create_task(self._timeout_handler()) return await future async def _process_batch(self): if not self.pending_requests: return batch = self.pending_requests[:self.max_batch_size] self.pending_requests = self.pending_requests[self.max_batch_size:] # Process batch prompts = [req for req, _ in batch] results = await self._batch_inference(prompts) # Return results for (_, future), result in zip(batch, results): future.set_result(result)

🔄 Model Parallelism

Distribute large models across multiple GPUs for efficient inference.

# Tensor parallelism with DeepSpeed import deepspeed import torch # DeepSpeed configuration ds_config = { "fp16": { "enabled": True }, "zero_optimization": { "stage": 3, "offload_optimizer": { "device": "cpu", "pin_memory": True }, "offload_param": { "device": "cpu", "pin_memory": True } }, "tensor_parallel": { "tp_size": 4, # Use 4 GPUs "mpu": None } } # Initialize with DeepSpeed model_engine, _, _, _ = deepspeed.initialize( model=model, config=ds_config )

📊 Flash Attention

Accelerate attention computation with memory-efficient algorithms.

# Flash Attention 2 implementation from flash_attn import flash_attn_func import torch class FlashAttentionLayer(torch.nn.Module): def __init__(self, hidden_size, num_heads): super().__init__() self.hidden_size = hidden_size self.num_heads = num_heads self.head_dim = hidden_size // num_heads self.q_proj = torch.nn.Linear(hidden_size, hidden_size) self.k_proj = torch.nn.Linear(hidden_size, hidden_size) self.v_proj = torch.nn.Linear(hidden_size, hidden_size) def forward(self, hidden_states, attention_mask=None): batch_size, seq_len = hidden_states.shape[:2] # Project to Q, K, V q = self.q_proj(hidden_states) k = self.k_proj(hidden_states) v = self.v_proj(hidden_states) # Reshape for multi-head attention q = q.view(batch_size, seq_len, self.num_heads, self.head_dim) k = k.view(batch_size, seq_len, self.num_heads, self.head_dim) v = v.view(batch_size, seq_len, self.num_heads, self.head_dim) # Apply Flash Attention output = flash_attn_func(q, k, v, causal=True) return output.view(batch_size, seq_len, self.hidden_size)

🎨 Prompt Caching

Cache common prompts and system messages to reduce computation.

# Redis-based prompt cache import redis import hashlib import json class PromptCache: def __init__(self, redis_host="localhost", ttl=3600): self.redis_client = redis.Redis(host=redis_host) self.ttl = ttl def _hash_prompt(self, prompt, params): # Create unique hash for prompt + params content = f"{prompt}_{json.dumps(params, sort_keys=True)}" return hashlib.sha256(content.encode()).hexdigest() def get(self, prompt, params): key = self._hash_prompt(prompt, params) cached = self.redis_client.get(key) if cached: return json.loads(cached) return None def set(self, prompt, params, response): key = self._hash_prompt(prompt, params) self.redis_client.setex( key, self.ttl, json.dumps(response) )
💡
Optimization Impact:
  • Quantization: 2-4x memory reduction, 1.5-2x speedup
  • Flash Attention: 2-3x faster, 50% memory reduction
  • Dynamic Batching: 3-5x throughput improvement
  • KV Cache: 30-50% latency reduction for long contexts
  • Tensor Parallelism: Linear scaling with GPU count

📈 Scaling Strategies

⚖️ Load Balancing

Distribute requests across multiple instances for optimal performance.

# HAProxy configuration for LLM load balancing global maxconn 4096 defaults mode http timeout connect 5000ms timeout client 60000ms timeout server 60000ms backend llm_servers balance leastconn # Use least connections option httpchk GET /health server llm1 10.0.1.10:8000 check weight 100 server llm2 10.0.1.11:8000 check weight 100 server llm3 10.0.1.12:8000 check weight 50 # Lower spec # Python load balancer with health checks class LoadBalancer: def __init__(self, servers): self.servers = servers self.current = 0 async def get_server(self): # Round-robin with health check attempts = 0 while attempts < len(self.servers): server = self.servers[self.current] self.current = (self.current + 1) % len(self.servers) if await self._health_check(server): return server attempts += 1 raise Exception("No healthy servers available")

🔄 Auto-scaling

Automatically adjust resources based on demand and metrics.

# Kubernetes HPA for LLM deployment apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: llm-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: llm-deployment minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: gpu target: type: Utilization averageUtilization: 70 - type: Pods pods: metric: name: pending_requests target: type: AverageValue averageValue: "30" behavior: scaleUp: stabilizationWindowSeconds: 60 policies: - type: Percent value: 50 periodSeconds: 60 scaleDown: stabilizationWindowSeconds: 300

🌊 Request Queuing

Manage request queues to handle traffic spikes gracefully.

# Priority queue for request management import heapq from dataclasses import dataclass import time @dataclass class Request: priority: int timestamp: float prompt: str max_tokens: int callback: callable def __lt__(self, other): return self.priority < other.priority class RequestQueue: def __init__(self, max_size=1000): self.queue = [] self.max_size = max_size def add_request(self, request, priority=5): if len(self.queue) >= self.max_size: raise Exception("Queue full") req = Request( priority=priority, timestamp=time.time(), prompt=request['prompt'], max_tokens=request['max_tokens'], callback=request['callback'] ) heapq.heappush(self.queue, req) def get_batch(self, batch_size): batch = [] for _ in range(min(batch_size, len(self.queue))): if self.queue: batch.append(heapq.heappop(self.queue)) return batch

💾 Caching Strategy

Implement multi-level caching for improved response times.

# Multi-level cache implementation class MultiLevelCache: def __init__(self): # L1: In-memory cache (fast, small) self.l1_cache = LRUCache(max_size=100) # L2: Redis cache (medium speed, medium size) self.l2_cache = RedisCache(ttl=3600) # L3: Database cache (slow, large) self.l3_cache = DatabaseCache() async def get(self, key): # Check L1 result = self.l1_cache.get(key) if result: return result, 'L1' # Check L2 result = await self.l2_cache.get(key) if result: self.l1_cache.set(key, result) return result, 'L2' # Check L3 result = await self.l3_cache.get(key) if result: await self.l2_cache.set(key, result) self.l1_cache.set(key, result) return result, 'L3' return None, None

🔀 A/B Testing

Test different models and configurations in production.

# A/B testing framework for LLMs class ABTestManager: def __init__(self): self.experiments = { "model_version": { "control": {"model": "llama-7b", "weight": 0.8}, "treatment": {"model": "llama-13b", "weight": 0.2} }, "temperature": { "control": {"value": 0.7, "weight": 0.5}, "treatment": {"value": 0.9, "weight": 0.5} } } self.metrics = defaultdict(list) def assign_variant(self, user_id, experiment): # Consistent assignment based on user ID hash_val = hash(f"{user_id}_{experiment}") % 100 cumulative = 0 for variant, config in self.experiments[experiment].items(): cumulative += config["weight"] * 100 if hash_val < cumulative: return variant, config return "control", self.experiments[experiment]["control"]

🌐 CDN Integration

Use CDN for caching and global distribution of responses.

# CloudFlare Workers for edge caching addEventListener('fetch', event => { event.respondWith(handleRequest(event.request)) }) async function handleRequest(request) { // Check cache const cache = caches.default const cacheKey = new Request(request.url, request) const cachedResponse = await cache.match(cacheKey) if (cachedResponse) { return cachedResponse } // Forward to origin const response = await fetch(request) // Cache if successful if (response.status === 200) { const headers = new Headers(response.headers) headers.set('Cache-Control', 'max-age=3600') const cachedResponse = new Response( response.body, { ...response, headers } ) event.waitUntil(cache.put(cacheKey, cachedResponse.clone())) return cachedResponse } return response }
Scaling Checklist:
  • Implement request queuing and prioritization
  • Set up auto-scaling based on metrics
  • Use load balancing across multiple instances
  • Implement multi-level caching
  • Monitor and optimize bottlenecks
  • Plan for graceful degradation

📊 Monitoring & Observability

45ms
P50 Latency
120ms
P95 Latency
250
Tokens/Sec
99.9%
Uptime

📈 Metrics Collection

Collect and track essential metrics for LLM deployments.

# Prometheus metrics for LLM monitoring from prometheus_client import Counter, Histogram, Gauge import time # Define metrics request_count = Counter( 'llm_requests_total', 'Total LLM requests', ['model', 'status'] ) request_duration = Histogram( 'llm_request_duration_seconds', 'Request duration', ['model', 'operation'] ) tokens_generated = Counter( 'llm_tokens_generated_total', 'Total tokens generated', ['model'] ) gpu_utilization = Gauge( 'llm_gpu_utilization_percent', 'GPU utilization', ['gpu_id'] ) # Collect metrics def track_request(model_name): def decorator(func): def wrapper(*args, **kwargs): start = time.time() try: result = func(*args, **kwargs) request_count.labels(model=model_name, status='success').inc() return result except Exception as e: request_count.labels(model=model_name, status='error').inc() raise e finally: duration = time.time() - start request_duration.labels( model=model_name, operation='inference' ).observe(duration) return wrapper return decorator

📝 Logging

Structured logging for debugging and analysis.

# Structured logging with context import logging import json from datetime import datetime class LLMLogger: def __init__(self, service_name="llm-service"): self.logger = logging.getLogger(service_name) self.logger.setLevel(logging.INFO) # JSON formatter handler = logging.StreamHandler() handler.setFormatter(self.JSONFormatter()) self.logger.addHandler(handler) class JSONFormatter(logging.Formatter): def format(self, record): log_obj = { 'timestamp': datetime.utcnow().isoformat(), 'level': record.levelname, 'message': record.getMessage(), 'service': record.name, 'trace_id': getattr(record, 'trace_id', None), 'user_id': getattr(record, 'user_id', None), 'model': getattr(record, 'model', None), 'latency_ms': getattr(record, 'latency_ms', None), 'tokens': getattr(record, 'tokens', None) } return json.dumps(log_obj) def log_inference(self, **kwargs): for key, value in kwargs.items(): self.logger.info('Inference completed', extra={key: value})

🔍 Distributed Tracing

Track requests across your entire LLM infrastructure.

# OpenTelemetry tracing from opentelemetry import trace from opentelemetry.exporter.jaeger import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor # Setup tracing trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) # Configure Jaeger exporter jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) span_processor = BatchSpanProcessor(jaeger_exporter) trace.get_tracer_provider().add_span_processor(span_processor) # Trace LLM operations async def process_request(prompt): with tracer.start_as_current_span("llm_request") as span: span.set_attribute("prompt.length", len(prompt)) # Tokenization span with tracer.start_as_current_span("tokenization"): tokens = tokenizer.encode(prompt) span.set_attribute("token.count", len(tokens)) # Inference span with tracer.start_as_current_span("inference"): output = await model.generate(tokens) span.set_attribute("output.tokens", len(output)) return output

⚠️ Alerting

Set up alerts for critical issues and anomalies.

# Alert configuration alerts = [ { "name": "High Latency", "condition": "p95_latency > 1000", "threshold": 1000, # ms "severity": "warning", "action": "notify_slack" }, { "name": "GPU Memory OOM", "condition": "gpu_memory_used > 95", "threshold": 95, # percent "severity": "critical", "action": "page_oncall" }, { "name": "Error Rate Spike", "condition": "error_rate > 5", "threshold": 5, # percent "severity": "critical", "action": "auto_rollback" } ] class AlertManager: def __init__(self, alerts_config): self.alerts = alerts_config self.alert_history = [] def check_alerts(self, metrics): for alert in self.alerts: value = metrics.get(alert['metric']) if value > alert['threshold']: self.trigger_alert(alert, value)

📊 Dashboards

Visualize metrics and system health in real-time.

# Grafana dashboard configuration { "dashboard": { "title": "LLM Deployment Metrics", "panels": [ { "title": "Request Rate", "type": "graph", "targets": [{ "expr": "rate(llm_requests_total[5m])" }] }, { "title": "Latency Percentiles", "type": "graph", "targets": [ {"expr": "histogram_quantile(0.5, llm_request_duration_seconds)"}, {"expr": "histogram_quantile(0.95, llm_request_duration_seconds)"}, {"expr": "histogram_quantile(0.99, llm_request_duration_seconds)"} ] }, { "title": "GPU Utilization", "type": "gauge", "targets": [{ "expr": "avg(llm_gpu_utilization_percent)" }] }, { "title": "Tokens Per Second", "type": "stat", "targets": [{ "expr": "rate(llm_tokens_generated_total[1m])" }] } ] } }

🔥 Error Tracking

Track and analyze errors in your LLM deployment.

# Sentry integration for error tracking import sentry_sdk from sentry_sdk.integrations.logging import LoggingIntegration sentry_sdk.init( dsn="your-sentry-dsn", integrations=[LoggingIntegration()], traces_sample_rate=0.1, environment="production" ) def handle_inference_error(error, context): with sentry_sdk.push_scope() as scope: scope.set_context("llm_context", { "model": context['model'], "prompt_length": context['prompt_length'], "max_tokens": context['max_tokens'], "gpu_memory": context['gpu_memory'] }) scope.set_tag("error_type", type(error).__name__) sentry_sdk.capture_exception(error)

📊 Live Monitoring Dashboard

45%
72%
58%

🎯 Practice & Exercises

📝 Exercise 1: Deploy Your First LLM

Set up a basic LLM deployment with FastAPI.

# Complete this deployment script from fastapi import FastAPI from transformers import AutoModelForCausalLM, AutoTokenizer app = FastAPI() # TODO: Load model and tokenizer model = ??? tokenizer = ??? @app.post("/generate") async def generate(prompt: str, max_tokens: int = 100): # TODO: Implement generation logic pass @app.get("/health") async def health(): # TODO: Implement health check pass

🔧 Exercise 2: Implement Quantization

Reduce model size with quantization techniques.

# Implement 8-bit quantization def quantize_model(model_path): # TODO: Load model with 8-bit quantization # TODO: Compare memory usage before and after # TODO: Measure inference speed improvement pass # Bonus: Implement dynamic quantization def dynamic_quantize(model): # TODO: Apply dynamic quantization pass

📊 Exercise 3: Add Monitoring

Implement comprehensive monitoring for your deployment.

# Add monitoring to your LLM service class LLMMonitor: def __init__(self): # TODO: Initialize metrics collectors pass def track_request(self, prompt, response, latency): # TODO: Track request metrics pass def get_metrics(self): # TODO: Return current metrics pass def check_health(self): # TODO: Perform health checks pass
🎯 Key Takeaways
1. Choose the right infrastructure for your use case 2. Optimize aggressively: quantization, batching, caching 3. Monitor everything: latency, throughput, errors, costs 4. Plan for scale from day one 5. Implement graceful degradation and fallbacks 6. Security and compliance are non-negotiable