🔄 MLOps Architecture Overview
🎯 MLOps Core Principles
The Three Pillars of MLOps:
- Reproducibility: Any result can be reproduced with the same code, data, and environment
- Automation: Minimize manual intervention through CI/CD and orchestration
- Monitoring: Continuous observation of model performance and data quality
Key Differences from Traditional DevOps:
- Data Dependency: Models depend on data quality, not just code
- Model Decay: Performance degrades over time due to data drift
- Experimentation: Multiple experiments before production
- Versioning Complexity: Track code + data + model versions
- Testing Complexity: Statistical validation beyond unit tests
AIOps vs MLOps:
- MLOps: Operationalizing machine learning models (ML → Production)
- AIOps: Using AI to improve IT operations (AI → Operations)
- Convergence: MLOps systems using AIOps for self-management
🔄 Understanding the Ops Landscape
DevOps
Meaning: Practices combining software development and IT operations to shorten development lifecycle.
Example: Team uses Jenkins CI/CD → automated testing → deploy to Kubernetes → monitor with Datadog.
Core Practices:
- Continuous Integration (CI)
- Continuous Deployment (CD)
- Infrastructure as Code (IaC)
- Monitoring and Logging
- Collaboration and Communication
MLOps
Meaning: DevOps principles applied to machine learning systems, adding data and model lifecycle management.
Example: Data pipeline triggers retraining → model validation → A/B testing → gradual rollout → performance monitoring.
Additional Concerns:
- Data Versioning: Track dataset changes
- Experiment Tracking: Log all training runs
- Model Registry: Version and store models
- Feature Store: Consistent feature computation
- Drift Detection: Monitor data/concept drift
# Complete MLOps Pipeline Example # 1. Data Versioning with DVC dvc init dvc remote add -d storage s3://my-bucket/dvc-store dvc add data/raw/dataset.csv git add data/raw/dataset.csv.dvc .gitignore git commit -m "Add raw dataset" # 2. Pipeline Definition (dvc.yaml) # dvc.yaml stages: prepare: cmd: python src/prepare.py deps: - src/prepare.py - data/raw outs: - data/processed train: cmd: python src/train.py deps: - src/train.py - data/processed params: - train.epochs - train.learning_rate metrics: - metrics.json outs: - models/model.pkl # MLflow tracking in train.py mlflow.log_params(params) mlflow.log_metrics(metrics) mlflow.log_model(model, "model")
AIOps
Meaning: Using AI/ML to enhance IT operations - automated anomaly detection, root cause analysis, and self-healing systems.
Example: AI system detects unusual API latency → predicts server failure → auto-scales resources → prevents outage.
Key Capabilities:
- Anomaly Detection: Identify unusual patterns
- Predictive Analytics: Forecast failures
- Root Cause Analysis: Automated debugging
- Auto-remediation: Self-healing systems
- Intelligent Alerting: Reduce alert fatigue
🚀 CI/CD for Machine Learning
ML Pipeline Architecture
Meaning: Automated workflow from data ingestion through model deployment and monitoring.
Example: GitHub push → triggers data validation → model training → automated testing → staging deployment → production release.
# Complete GitHub Actions ML Pipeline # .github/workflows/ml-pipeline.yml name: ML Pipeline on: push: branches: [main] schedule: - cron: '0 0 * * 0' # Weekly retraining jobs: validate-data: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Validate Data Quality run: | python scripts/validate_data.py great_expectations checkpoint run data_quality train-model: needs: validate-data runs-on: ubuntu-latest steps: - name: Train Model run: | python scripts/train.py mlflow models validate model/ - name: Register Model run: | mlflow models register \ --model-uri runs:/${{ env.RUN_ID }}/model \ --name production-model deploy: needs: train-model if: github.ref == 'refs/heads/main' runs-on: ubuntu-latest steps: - name: Deploy to Staging run: | kubectl apply -f k8s/staging/ python scripts/smoke_test.py --env staging - name: Run A/B Test run: | python scripts/ab_test.py --traffic 0.1 sleep 3600 # Monitor for 1 hour python scripts/evaluate_ab.py - name: Promote to Production if: success() run: | kubectl set image deployment/model-server \ model=${{ env.MODEL_IMAGE }}:${{ env.VERSION }}
Pipeline Components:
- Data Pipeline: Ingestion → Validation → Transformation
- Training Pipeline: Feature Engineering → Training → Evaluation
- Model Pipeline: Validation → Registry → Deployment
- Monitoring Pipeline: Metrics → Alerts → Retraining Triggers
Testing Strategies
Types of ML Tests:
- Data Tests: Schema validation, distribution checks
- Model Tests: Performance thresholds, fairness metrics
- Integration Tests: API contracts, latency requirements
- Drift Tests: Feature/prediction drift detection
# Comprehensive ML Testing Suite import pytest import numpy as np from model import load_model, predict class TestModel: def test_model_performance(self): """Test model meets performance threshold""" model = load_model("latest") X_test, y_test = load_test_data() predictions = model.predict(X_test) accuracy = accuracy_score(y_test, predictions) assert accuracy > 0.95, f"Accuracy {accuracy} below threshold" def test_inference_latency(self): """Test prediction latency requirement""" model = load_model("latest") sample = np.random.randn(1, 100) start = time.time() _ = model.predict(sample) latency = time.time() - start assert latency < 0.1, f"Latency {latency}s exceeds 100ms" def test_model_fairness(self): """Test model fairness across groups""" model = load_model("latest") for group in ['group_a', 'group_b']: X, y = load_group_data(group) accuracy = model.score(X, y) assert accuracy > 0.93, f"Bias detected for {group}"
🛠️ MLOps Tool Ecosystem
End-to-End Platforms
Major Platforms:
- Kubeflow: Kubernetes-native ML workflows
- MLflow: Open-source lifecycle management
- Metaflow: Netflix's human-centric framework
- SageMaker: AWS managed ML platform
- Vertex AI: Google Cloud ML platform
- Azure ML: Microsoft's ML platform
Tool Categories:
Category | Tools | Purpose |
---|---|---|
Experiment Tracking | MLflow, W&B, Neptune | Log experiments |
Data Versioning | DVC, LakeFS, Pachyderm | Version datasets |
Feature Store | Feast, Tecton, Hopsworks | Feature management |
Model Registry | MLflow, Seldon, BentoML | Model versioning |
Orchestration | Airflow, Prefect, Dagster | Pipeline automation |
Monitoring | Evidently, WhyLabs, Arize | Production monitoring |
Platform Comparison
Decision Factors:
- Cloud vs On-Premise: Managed services vs control
- Scale: Team size and model volume
- Complexity: Simple models vs complex pipelines
- Budget: Open-source vs enterprise
- Ecosystem: Existing tools and infrastructure
Typical Stack Examples:
- Startup: MLflow + DVC + GitHub Actions + Heroku
- Scale-up: Kubeflow + Feast + Seldon + K8s
- Enterprise: SageMaker + Step Functions + CloudWatch
- Research: W&B + Papermill + Colab + GCS
💻 Production MLOps Implementation
Complete MLOps Setup with Kubeflow
# Kubeflow Pipeline Definition import kfp from kfp import dsl from kfp.components import create_component_from_func # Component definitions @create_component_from_func def preprocess_data(data_path: str, output_path: str): import pandas as pd from sklearn.preprocessing import StandardScaler df = pd.read_csv(data_path) scaler = StandardScaler() df_scaled = scaler.fit_transform(df) pd.DataFrame(df_scaled).to_csv(output_path) @create_component_from_func def train_model(data_path: str, model_path: str, metrics_path: str) -> dict: import mlflow import joblib from sklearn.ensemble import RandomForestClassifier # Start MLflow run with mlflow.start_run(): # Load data and train X_train, y_train = load_data(data_path) model = RandomForestClassifier(n_estimators=100) model.fit(X_train, y_train) # Calculate metrics metrics = { 'accuracy': 0.95, 'precision': 0.93, 'recall': 0.94 } # Log to MLflow mlflow.log_metrics(metrics) mlflow.sklearn.log_model(model, "model") # Save model joblib.dump(model, model_path) return metrics @create_component_from_func def validate_model(model_path: str, test_data: str) -> bool: # Model validation logic model = joblib.load(model_path) X_test, y_test = load_data(test_data) # Performance checks accuracy = model.score(X_test, y_test) if accuracy < 0.9: raise ValueError(f"Model accuracy {accuracy} below threshold") # Fairness checks fairness_score = check_fairness(model, X_test, y_test) if fairness_score < 0.95: raise ValueError("Model fails fairness criteria") return True # Pipeline definition @dsl.pipeline( name='ML Training Pipeline', description='End-to-end ML training pipeline' ) def ml_pipeline(data_path: str = 'gs://data/raw'): # Data preprocessing preprocess_op = preprocess_data( data_path=data_path, output_path='processed_data.csv' ) # Model training train_op = train_model( data_path=preprocess_op.outputs['output_path'], model_path='model.pkl', metrics_path='metrics.json' ) # Model validation validate_op = validate_model( model_path=train_op.outputs['model_path'], test_data='test_data.csv' ) # Conditional deployment with dsl.Condition(validate_op.output == True): deploy_op = deploy_model( model_path=train_op.outputs['model_path'], endpoint='production' ) # Compile and run pipeline kfp.compiler.Compiler().compile( pipeline_func=ml_pipeline, package_path='ml_pipeline.yaml' ) client = kfp.Client() client.create_run_from_pipeline_package( 'ml_pipeline.yaml', arguments={'data_path': 'gs://my-bucket/data'} )
Monitoring & Drift Detection
# Production Monitoring with Evidently import evidently from evidently.model_profile import Profile from evidently.model_profile.sections import DataDriftProfileSection from evidently.dashboard import Dashboard from evidently.dashboard.tabs import DataDriftTab class ModelMonitor: def __init__(self, reference_data, model): self.reference_data = reference_data self.model = model self.drift_threshold = 0.5 def check_data_drift(self, current_data): """Check for data drift""" drift_profile = Profile( sections=[DataDriftProfileSection()] ) drift_profile.calculate( reference_data=self.reference_data, current_data=current_data ) drift_score = drift_profile.json()['data_drift']['score'] if drift_score > self.drift_threshold: self.trigger_retraining(drift_score) return drift_score def check_prediction_drift(self, current_predictions): """Monitor prediction distribution""" from scipy import stats reference_preds = self.model.predict(self.reference_data) ks_statistic, p_value = stats.ks_2samp( reference_preds, current_predictions ) if p_value < 0.05: print(f"Prediction drift detected: p={p_value}") self.trigger_retraining(p_value) return p_value def monitor_performance(self, y_true, y_pred): """Track model performance metrics""" from sklearn.metrics import accuracy_score, precision_score metrics = { 'accuracy': accuracy_score(y_true, y_pred), 'precision': precision_score(y_true, y_pred, average='weighted') } # Send to monitoring dashboard self.send_to_prometheus(metrics) # Check for performance degradation if metrics['accuracy'] < 0.85: self.alert_team( f"Model performance degraded: {metrics['accuracy']}" ) return metrics def trigger_retraining(self, reason): """Trigger automatic retraining""" import requests payload = { 'reason': reason, 'timestamp': datetime.now().isoformat(), 'current_model_version': self.model.version } response = requests.post( 'http://airflow:8080/api/v1/dags/retrain_model/dagRuns', json=payload ) print(f"Retraining triggered: {response.status_code}") def generate_report(self): """Generate monitoring dashboard""" dashboard = Dashboard(tabs=[DataDriftTab()]) dashboard.calculate( reference_data=self.reference_data, current_data=self.current_data ) dashboard.save("monitoring_report.html")
AIOps Implementation
# AIOps for Automated Incident Response import numpy as np from sklearn.ensemble import IsolationForest import prometheus_client class AIOpsEngine: def __init__(self): self.anomaly_detector = IsolationForest( contamination=0.1, random_state=42 ) self.remediation_actions = { 'high_cpu': self.scale_horizontally, 'memory_leak': self.restart_service, 'disk_full': self.cleanup_logs, 'network_latency': self.optimize_routing } def detect_anomalies(self, metrics_df): """Detect anomalies in system metrics""" # Train anomaly detector on historical data self.anomaly_detector.fit(metrics_df) # Detect anomalies in current metrics current_metrics = self.get_current_metrics() anomaly_scores = self.anomaly_detector.decision_function( current_metrics ) anomalies = [] for idx, score in enumerate(anomaly_scores): if score < -0.5: # Anomaly threshold anomaly = self.identify_anomaly_type( current_metrics.iloc[idx] ) anomalies.append(anomaly) return anomalies def root_cause_analysis(self, anomaly): """Perform root cause analysis""" # Correlation analysis correlations = self.calculate_correlations(anomaly) # Dependency graph analysis dependencies = self.trace_dependencies(anomaly['service']) # Log analysis log_patterns = self.analyze_logs( service=anomaly['service'], timeframe=anomaly['timestamp'] ) root_cause = self.infer_root_cause( correlations, dependencies, log_patterns ) return root_cause def auto_remediate(self, anomaly, root_cause): """Automatically remediate issues""" action = self.remediation_actions.get( root_cause['type'], self.default_action ) # Execute remediation result = action( service=anomaly['service'], params=root_cause['params'] ) # Verify remediation if self.verify_remediation(anomaly): self.log_success(anomaly, action, result) else: self.escalate_to_human(anomaly, root_cause) return result def scale_horizontally(self, service, params): """Auto-scale service""" import kubernetes k8s = kubernetes.client.AppsV1Api() # Get current replicas deployment = k8s.read_namespaced_deployment( name=service, namespace='production' ) # Scale up by 50% new_replicas = int(deployment.spec.replicas * 1.5) deployment.spec.replicas = new_replicas k8s.patch_namespaced_deployment( name=service, namespace='production', body=deployment ) return {'scaled_to': new_replicas}
✅ MLOps Best Practices
Maturity Model
Level 0: Manual Process
- Manual, script-driven process
- Interactive notebooks
- No CI/CD for ML
- Manual deployment
Level 1: ML Pipeline Automation
- Automated training pipeline
- Experiment tracking
- Model registry
- Metadata logging
Level 2: CI/CD Pipeline Automation
- Source control for code AND data
- Automated testing
- CI/CD for training pipeline
- Automated deployment
- Production monitoring
Implementation Guidelines:
- Start with experiment tracking
- Version everything (code, data, models)
- Automate gradually
- Monitor from day one
- Document model decisions
- Plan for model updates
- Build for reproducibility
Common Anti-patterns:
- No separation between dev/staging/prod
- Missing rollback strategy
- Ignoring data quality issues
- Manual model deployment
- No monitoring after deployment
- Treating models as static artifacts
Module 4: AI in Production Topics
- ML Lifecycle
- Serving Frameworks
- MLOps & AIOps
- GPU Orchestration