MLOps & AIOps

Part of Module 4: AI in Production

🔄 MLOps Architecture Overview

Development ML Pipeline Production Data Scientists • Jupyter Notebooks • Experimentation Version Control • Code (Git) • Data (DVC, LakeFS) Experiment Tracking • MLflow • Weights & Biases Feature Store • Feast • Tecton CI/CD Pipeline • GitHub Actions / Jenkins • Automated Testing Data Pipeline • Data Validation • Preprocessing Training Pipeline • Hyperparameter Tuning • Distributed Training Model Validation • Performance Tests • Fairness Checks Model Registry • Model Versioning • Metadata Storage Orchestration • Airflow / Kubeflow • Pipeline DAGs Model Serving • REST/gRPC APIs • Batch Inference A/B Testing • Traffic Splitting • Gradual Rollout Monitoring • Performance Metrics • Data Drift Detection Observability • Logs & Traces • Alerts Auto-Retraining • Drift Triggers • Schedule-based AIOps • Anomaly Detection • Auto-remediation Feedback Loop • Model Performance • Business Metrics

🎯 MLOps Core Principles

The Three Pillars of MLOps:

  • Reproducibility: Any result can be reproduced with the same code, data, and environment
  • Automation: Minimize manual intervention through CI/CD and orchestration
  • Monitoring: Continuous observation of model performance and data quality

Key Differences from Traditional DevOps:

  • Data Dependency: Models depend on data quality, not just code
  • Model Decay: Performance degrades over time due to data drift
  • Experimentation: Multiple experiments before production
  • Versioning Complexity: Track code + data + model versions
  • Testing Complexity: Statistical validation beyond unit tests

AIOps vs MLOps:

  • MLOps: Operationalizing machine learning models (ML → Production)
  • AIOps: Using AI to improve IT operations (AI → Operations)
  • Convergence: MLOps systems using AIOps for self-management

🔄 Understanding the Ops Landscape

DevOps

Meaning: Practices combining software development and IT operations to shorten development lifecycle.
Example: Team uses Jenkins CI/CD → automated testing → deploy to Kubernetes → monitor with Datadog.

Core Practices:

  • Continuous Integration (CI)
  • Continuous Deployment (CD)
  • Infrastructure as Code (IaC)
  • Monitoring and Logging
  • Collaboration and Communication

MLOps

Meaning: DevOps principles applied to machine learning systems, adding data and model lifecycle management.
Example: Data pipeline triggers retraining → model validation → A/B testing → gradual rollout → performance monitoring.

Additional Concerns:

  • Data Versioning: Track dataset changes
  • Experiment Tracking: Log all training runs
  • Model Registry: Version and store models
  • Feature Store: Consistent feature computation
  • Drift Detection: Monitor data/concept drift
# Complete MLOps Pipeline Example
# 1. Data Versioning with DVC
dvc init
dvc remote add -d storage s3://my-bucket/dvc-store
dvc add data/raw/dataset.csv
git add data/raw/dataset.csv.dvc .gitignore
git commit -m "Add raw dataset"

# 2. Pipeline Definition (dvc.yaml)
# dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw
    outs:
      - data/processed
  
  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed
    params:
      - train.epochs
      - train.learning_rate
    metrics:
      - metrics.json
    outs:
      - models/model.pkl

# MLflow tracking in train.py
mlflow.log_params(params)
mlflow.log_metrics(metrics)
mlflow.log_model(model, "model")

AIOps

Meaning: Using AI/ML to enhance IT operations - automated anomaly detection, root cause analysis, and self-healing systems.
Example: AI system detects unusual API latency → predicts server failure → auto-scales resources → prevents outage.

Key Capabilities:

  • Anomaly Detection: Identify unusual patterns
  • Predictive Analytics: Forecast failures
  • Root Cause Analysis: Automated debugging
  • Auto-remediation: Self-healing systems
  • Intelligent Alerting: Reduce alert fatigue

🚀 CI/CD for Machine Learning

ML Pipeline Architecture

Meaning: Automated workflow from data ingestion through model deployment and monitoring.
Example: GitHub push → triggers data validation → model training → automated testing → staging deployment → production release.
# Complete GitHub Actions ML Pipeline
# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
  schedule:
    - cron: '0 0 * * 0'  # Weekly retraining

jobs:
  validate-data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Validate Data Quality
        run: |
          python scripts/validate_data.py
          great_expectations checkpoint run data_quality

  train-model:
    needs: validate-data
    runs-on: ubuntu-latest
    steps:
      - name: Train Model
        run: |
          python scripts/train.py
          mlflow models validate model/
      
      - name: Register Model
        run: |
          mlflow models register \
            --model-uri runs:/${{ env.RUN_ID }}/model \
            --name production-model

  deploy:
    needs: train-model
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: |
          kubectl apply -f k8s/staging/
          python scripts/smoke_test.py --env staging
      
      - name: Run A/B Test
        run: |
          python scripts/ab_test.py --traffic 0.1
          sleep 3600  # Monitor for 1 hour
          python scripts/evaluate_ab.py
      
      - name: Promote to Production
        if: success()
        run: |
          kubectl set image deployment/model-server \
            model=${{ env.MODEL_IMAGE }}:${{ env.VERSION }}

Pipeline Components:

  • Data Pipeline: Ingestion → Validation → Transformation
  • Training Pipeline: Feature Engineering → Training → Evaluation
  • Model Pipeline: Validation → Registry → Deployment
  • Monitoring Pipeline: Metrics → Alerts → Retraining Triggers

Testing Strategies

Types of ML Tests:

  • Data Tests: Schema validation, distribution checks
  • Model Tests: Performance thresholds, fairness metrics
  • Integration Tests: API contracts, latency requirements
  • Drift Tests: Feature/prediction drift detection
# Comprehensive ML Testing Suite
import pytest
import numpy as np
from model import load_model, predict

class TestModel:
    def test_model_performance(self):
        """Test model meets performance threshold"""
        model = load_model("latest")
        X_test, y_test = load_test_data()
        
        predictions = model.predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        
        assert accuracy > 0.95, f"Accuracy {accuracy} below threshold"
    
    def test_inference_latency(self):
        """Test prediction latency requirement"""
        model = load_model("latest")
        sample = np.random.randn(1, 100)
        
        start = time.time()
        _ = model.predict(sample)
        latency = time.time() - start
        
        assert latency < 0.1, f"Latency {latency}s exceeds 100ms"
    
    def test_model_fairness(self):
        """Test model fairness across groups"""
        model = load_model("latest")
        
        for group in ['group_a', 'group_b']:
            X, y = load_group_data(group)
            accuracy = model.score(X, y)
            assert accuracy > 0.93, f"Bias detected for {group}"

🛠️ MLOps Tool Ecosystem

End-to-End Platforms

Major Platforms:

  • Kubeflow: Kubernetes-native ML workflows
  • MLflow: Open-source lifecycle management
  • Metaflow: Netflix's human-centric framework
  • SageMaker: AWS managed ML platform
  • Vertex AI: Google Cloud ML platform
  • Azure ML: Microsoft's ML platform

Tool Categories:

Category Tools Purpose
Experiment Tracking MLflow, W&B, Neptune Log experiments
Data Versioning DVC, LakeFS, Pachyderm Version datasets
Feature Store Feast, Tecton, Hopsworks Feature management
Model Registry MLflow, Seldon, BentoML Model versioning
Orchestration Airflow, Prefect, Dagster Pipeline automation
Monitoring Evidently, WhyLabs, Arize Production monitoring

Platform Comparison

Decision Factors:

  • Cloud vs On-Premise: Managed services vs control
  • Scale: Team size and model volume
  • Complexity: Simple models vs complex pipelines
  • Budget: Open-source vs enterprise
  • Ecosystem: Existing tools and infrastructure

Typical Stack Examples:

  • Startup: MLflow + DVC + GitHub Actions + Heroku
  • Scale-up: Kubeflow + Feast + Seldon + K8s
  • Enterprise: SageMaker + Step Functions + CloudWatch
  • Research: W&B + Papermill + Colab + GCS

💻 Production MLOps Implementation

Complete MLOps Setup with Kubeflow

# Kubeflow Pipeline Definition
import kfp
from kfp import dsl
from kfp.components import create_component_from_func

# Component definitions
@create_component_from_func
def preprocess_data(data_path: str, output_path: str):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    
    df = pd.read_csv(data_path)
    scaler = StandardScaler()
    df_scaled = scaler.fit_transform(df)
    pd.DataFrame(df_scaled).to_csv(output_path)
    
@create_component_from_func
def train_model(data_path: str, model_path: str, 
                metrics_path: str) -> dict:
    import mlflow
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    
    # Start MLflow run
    with mlflow.start_run():
        # Load data and train
        X_train, y_train = load_data(data_path)
        model = RandomForestClassifier(n_estimators=100)
        model.fit(X_train, y_train)
        
        # Calculate metrics
        metrics = {
            'accuracy': 0.95,
            'precision': 0.93,
            'recall': 0.94
        }
        
        # Log to MLflow
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, "model")
        
        # Save model
        joblib.dump(model, model_path)
        
    return metrics

@create_component_from_func
def validate_model(model_path: str, test_data: str) -> bool:
    # Model validation logic
    model = joblib.load(model_path)
    X_test, y_test = load_data(test_data)
    
    # Performance checks
    accuracy = model.score(X_test, y_test)
    if accuracy < 0.9:
        raise ValueError(f"Model accuracy {accuracy} below threshold")
    
    # Fairness checks
    fairness_score = check_fairness(model, X_test, y_test)
    if fairness_score < 0.95:
        raise ValueError("Model fails fairness criteria")
    
    return True

# Pipeline definition
@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training pipeline'
)
def ml_pipeline(data_path: str = 'gs://data/raw'):
    # Data preprocessing
    preprocess_op = preprocess_data(
        data_path=data_path,
        output_path='processed_data.csv'
    )
    
    # Model training
    train_op = train_model(
        data_path=preprocess_op.outputs['output_path'],
        model_path='model.pkl',
        metrics_path='metrics.json'
    )
    
    # Model validation
    validate_op = validate_model(
        model_path=train_op.outputs['model_path'],
        test_data='test_data.csv'
    )
    
    # Conditional deployment
    with dsl.Condition(validate_op.output == True):
        deploy_op = deploy_model(
            model_path=train_op.outputs['model_path'],
            endpoint='production'
        )

# Compile and run pipeline
kfp.compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path='ml_pipeline.yaml'
)

client = kfp.Client()
client.create_run_from_pipeline_package(
    'ml_pipeline.yaml',
    arguments={'data_path': 'gs://my-bucket/data'}
)

Monitoring & Drift Detection

# Production Monitoring with Evidently
import evidently
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
from evidently.dashboard import Dashboard
from evidently.dashboard.tabs import DataDriftTab

class ModelMonitor:
    def __init__(self, reference_data, model):
        self.reference_data = reference_data
        self.model = model
        self.drift_threshold = 0.5
        
    def check_data_drift(self, current_data):
        """Check for data drift"""
        drift_profile = Profile(
            sections=[DataDriftProfileSection()]
        )
        drift_profile.calculate(
            reference_data=self.reference_data,
            current_data=current_data
        )
        
        drift_score = drift_profile.json()['data_drift']['score']
        
        if drift_score > self.drift_threshold:
            self.trigger_retraining(drift_score)
        
        return drift_score
    
    def check_prediction_drift(self, current_predictions):
        """Monitor prediction distribution"""
        from scipy import stats
        
        reference_preds = self.model.predict(self.reference_data)
        ks_statistic, p_value = stats.ks_2samp(
            reference_preds, 
            current_predictions
        )
        
        if p_value < 0.05:
            print(f"Prediction drift detected: p={p_value}")
            self.trigger_retraining(p_value)
        
        return p_value
    
    def monitor_performance(self, y_true, y_pred):
        """Track model performance metrics"""
        from sklearn.metrics import accuracy_score, precision_score
        
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, average='weighted')
        }
        
        # Send to monitoring dashboard
        self.send_to_prometheus(metrics)
        
        # Check for performance degradation
        if metrics['accuracy'] < 0.85:
            self.alert_team(
                f"Model performance degraded: {metrics['accuracy']}"
            )
        
        return metrics
    
    def trigger_retraining(self, reason):
        """Trigger automatic retraining"""
        import requests
        
        payload = {
            'reason': reason,
            'timestamp': datetime.now().isoformat(),
            'current_model_version': self.model.version
        }
        
        response = requests.post(
            'http://airflow:8080/api/v1/dags/retrain_model/dagRuns',
            json=payload
        )
        
        print(f"Retraining triggered: {response.status_code}")
    
    def generate_report(self):
        """Generate monitoring dashboard"""
        dashboard = Dashboard(tabs=[DataDriftTab()])
        dashboard.calculate(
            reference_data=self.reference_data,
            current_data=self.current_data
        )
        dashboard.save("monitoring_report.html")

AIOps Implementation

# AIOps for Automated Incident Response
import numpy as np
from sklearn.ensemble import IsolationForest
import prometheus_client

class AIOpsEngine:
    def __init__(self):
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )
        self.remediation_actions = {
            'high_cpu': self.scale_horizontally,
            'memory_leak': self.restart_service,
            'disk_full': self.cleanup_logs,
            'network_latency': self.optimize_routing
        }
    
    def detect_anomalies(self, metrics_df):
        """Detect anomalies in system metrics"""
        # Train anomaly detector on historical data
        self.anomaly_detector.fit(metrics_df)
        
        # Detect anomalies in current metrics
        current_metrics = self.get_current_metrics()
        anomaly_scores = self.anomaly_detector.decision_function(
            current_metrics
        )
        
        anomalies = []
        for idx, score in enumerate(anomaly_scores):
            if score < -0.5:  # Anomaly threshold
                anomaly = self.identify_anomaly_type(
                    current_metrics.iloc[idx]
                )
                anomalies.append(anomaly)
        
        return anomalies
    
    def root_cause_analysis(self, anomaly):
        """Perform root cause analysis"""
        # Correlation analysis
        correlations = self.calculate_correlations(anomaly)
        
        # Dependency graph analysis
        dependencies = self.trace_dependencies(anomaly['service'])
        
        # Log analysis
        log_patterns = self.analyze_logs(
            service=anomaly['service'],
            timeframe=anomaly['timestamp']
        )
        
        root_cause = self.infer_root_cause(
            correlations, dependencies, log_patterns
        )
        
        return root_cause
    
    def auto_remediate(self, anomaly, root_cause):
        """Automatically remediate issues"""
        action = self.remediation_actions.get(
            root_cause['type'], 
            self.default_action
        )
        
        # Execute remediation
        result = action(
            service=anomaly['service'],
            params=root_cause['params']
        )
        
        # Verify remediation
        if self.verify_remediation(anomaly):
            self.log_success(anomaly, action, result)
        else:
            self.escalate_to_human(anomaly, root_cause)
        
        return result
    
    def scale_horizontally(self, service, params):
        """Auto-scale service"""
        import kubernetes
        
        k8s = kubernetes.client.AppsV1Api()
        
        # Get current replicas
        deployment = k8s.read_namespaced_deployment(
            name=service,
            namespace='production'
        )
        
        # Scale up by 50%
        new_replicas = int(deployment.spec.replicas * 1.5)
        deployment.spec.replicas = new_replicas
        
        k8s.patch_namespaced_deployment(
            name=service,
            namespace='production',
            body=deployment
        )
        
        return {'scaled_to': new_replicas}

✅ MLOps Best Practices

Maturity Model

Level 0: Manual Process

  • Manual, script-driven process
  • Interactive notebooks
  • No CI/CD for ML
  • Manual deployment

Level 1: ML Pipeline Automation

  • Automated training pipeline
  • Experiment tracking
  • Model registry
  • Metadata logging

Level 2: CI/CD Pipeline Automation

  • Source control for code AND data
  • Automated testing
  • CI/CD for training pipeline
  • Automated deployment
  • Production monitoring

Implementation Guidelines:

  • Start with experiment tracking
  • Version everything (code, data, models)
  • Automate gradually
  • Monitor from day one
  • Document model decisions
  • Plan for model updates
  • Build for reproducibility

Common Anti-patterns:

  • No separation between dev/staging/prod
  • Missing rollback strategy
  • Ignoring data quality issues
  • Manual model deployment
  • No monitoring after deployment
  • Treating models as static artifacts

Module 4: AI in Production Topics