Master the complete lifecycle from training to deployment with evaluation and monitoring
Well-developed models directly translate to business value through improved accuracy, efficiency, and user satisfaction.
Structured processes enable faster iteration, experimentation, and time-to-market for AI features.
Proper development processes prevent model failures, bias issues, and production incidents.
Enter your model specifications to see performance estimates...
| Use Case | Model Type | Typical Accuracy | Training Time | Deployment Time |
|---|---|---|---|---|
| Fraud Detection | XGBoost/Neural Net | 95-99% | 2-4 hours | < 100ms |
| Recommendation | Collaborative Filtering | 80-90% (Precision@K) | 4-8 hours | < 50ms |
| NLP Sentiment | BERT/Transformer | 85-95% | 8-24 hours | < 200ms |
| Image Classification | CNN/ResNet | 90-98% | 12-48 hours | < 150ms |
Clean & Feature
Model Fitting
Hyperparameter
Final Evaluation
Production
from sklearn.model_selection import train_test_split
# First split: separate test set
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Second split: separate validation set
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)
# Result: 60% train, 20% validation, 20% test
print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
| Algorithm | Use Case | Pros |
|---|---|---|
| Linear/Logistic | Baseline | Fast, interpretable |
| Random Forest | Tabular data | No scaling needed |
| XGBoost | Competitions | High accuracy |
| Neural Networks | Complex patterns | Flexible |
Epoch 75/100 - Loss: 0.234 - Accuracy: 92.3%
Enter confusion matrix values to calculate performance metrics...
Accuracy: Overall correctness
(TP + TN) / Total
Precision: Positive prediction quality
TP / (TP + FP)
Recall: Coverage of positives
TP / (TP + FN)
F1-Score: Balance of precision/recall
2 * (Prec * Rec) / (Prec + Rec)
MAE: Average absolute error
mean(|y_true - y_pred|)
MSE: Squared error average
mean((y_true - y_pred)Β²)
RMSE: Root mean squared error
sqrt(MSE)
RΒ²: Variance explained
1 - (SS_res / SS_tot)
Revenue Impact: $ generated
Cost Savings: $ saved
User Engagement: CTR, time spent
Churn Rate: Customer retention
NPS Impact: Satisfaction change
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# Define parameter grid
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
# Setup grid search
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(
estimator=rf,
param_grid=param_grid,
cv=5, # 5-fold cross-validation
scoring='f1_weighted',
n_jobs=-1, # Use all cores
verbose=2
)
# Fit grid search
grid_search.fit(X_train, y_train)
# Best parameters
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.4f}")
Leverage pre-trained models for faster development
import torch
from transformers import AutoModel, AutoTokenizer
# Load pre-trained model
base_model = AutoModel.from_pretrained('bert-base-uncased')
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
# Freeze base layers
for param in base_model.parameters():
param.requires_grad = False
# Add custom head
class CustomClassifier(torch.nn.Module):
def __init__(self, base_model, num_classes):
super().__init__()
self.base = base_model
self.classifier = torch.nn.Linear(768, num_classes)
def forward(self, input_ids, attention_mask):
outputs = self.base(input_ids, attention_mask)
pooled = outputs.last_hidden_state[:, 0] # [CLS] token
return self.classifier(pooled)
model = CustomClassifier(base_model, num_classes=3)
Combine multiple models for better performance
Robust evaluation through multiple splits
Select algorithm and optimization method to generate a tuning plan...
| Pattern | Description | Use Case | Pros | Cons |
|---|---|---|---|---|
| Blue-Green | Two identical environments | Zero-downtime deployment | Quick rollback | Resource intensive |
| Canary | Gradual rollout | Risk mitigation | Early issue detection | Complex monitoring |
| Shadow | Parallel execution | Testing in production | No user impact | Double resources |
| A/B Testing | Split traffic | Performance comparison | Statistical validation | Longer deployment |
import numpy as np
from scipy import stats
import logging
class ModelMonitor:
def __init__(self, baseline_metrics):
self.baseline = baseline_metrics
self.alerts = []
self.metrics_history = []
def check_performance_degradation(self, current_metrics):
"""Monitor for performance drops"""
degradation = {}
for metric, baseline_value in self.baseline.items():
current_value = current_metrics.get(metric)
if current_value:
# Calculate percentage change
change = (current_value - baseline_value) / baseline_value * 100
# Alert if significant degradation
if metric in ['accuracy', 'precision', 'recall', 'f1']:
if change < -5: # 5% degradation threshold
self.alert(f"{metric} degraded by {abs(change):.2f}%")
degradation[metric] = change
return degradation
def check_data_drift(self, reference_data, current_data):
"""Detect distribution shifts using KS test"""
drift_detected = {}
for column in reference_data.columns:
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
reference_data[column],
current_data[column]
)
if p_value < 0.05: # Significant drift
drift_detected[column] = {
'ks_statistic': ks_stat,
'p_value': p_value
}
self.alert(f"Data drift detected in {column}")
return drift_detected
def check_prediction_distribution(self, predictions):
"""Monitor prediction distribution changes"""
pred_stats = {
'mean': np.mean(predictions),
'std': np.std(predictions),
'min': np.min(predictions),
'max': np.max(predictions),
'skew': stats.skew(predictions),
'kurtosis': stats.kurtosis(predictions)
}
# Check for anomalies
if abs(pred_stats['skew']) > 2:
self.alert(f"High skewness in predictions: {pred_stats['skew']:.2f}")
return pred_stats
def alert(self, message):
"""Send alert for critical issues"""
alert = {
'timestamp': datetime.now(),
'message': message,
'severity': self.classify_severity(message)
}
self.alerts.append(alert)
logging.warning(f"MODEL ALERT: {message}")
# Trigger notifications (email, Slack, etc.)
self.notify_stakeholders(alert)
def classify_severity(self, message):
"""Classify alert severity"""
if 'degraded' in message and any(x in message for x in ['10%', '15%', '20%']):
return 'CRITICAL'
elif 'drift' in message:
return 'WARNING'
else:
return 'INFO'
def generate_report(self):
"""Generate monitoring report"""
return {
'total_alerts': len(self.alerts),
'critical_alerts': sum(1 for a in self.alerts if a['severity'] == 'CRITICAL'),
'recent_alerts': self.alerts[-10:],
'metrics_trend': self.analyze_trends()
}
Fixed intervals (daily, weekly, monthly)
Retrain when conditions are met
Online learning with streaming data
Configure your model and dataset to simulate training...
Select metrics to view detailed evaluation...
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
def objective(trial):
"""Optuna objective function for hyperparameter optimization"""
# Suggest hyperparameters
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 500),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2']),
'bootstrap': trial.suggest_categorical('bootstrap', [True, False])
}
# Create model with suggested parameters
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
# Evaluate using cross-validation
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='f1_weighted')
return scores.mean()
# Create study and optimize
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100, show_progress_bar=True)
# Get best parameters
best_params = study.best_params
best_score = study.best_value
print(f"Best Score: {best_score:.4f}")
print(f"Best Parameters: {best_params}")
# Visualize optimization history
optuna.visualization.plot_optimization_history(study)
optuna.visualization.plot_param_importances(study)
Complete all checklist items for deployment
Configure your A/B test parameters to calculate duration and sample requirements...
| Version | Algorithm | Accuracy | Latency | Model Size | Status |
|---|---|---|---|---|---|
| v1.0 | Logistic Regression | 82.3% | 5ms | 2MB | Deprecated |
| v2.0 | Random Forest | 87.5% | 15ms | 50MB | Production |
| v3.0 | XGBoost | 89.2% | 12ms | 30MB | Staging |
| v4.0 | Neural Network | 91.1% | 25ms | 150MB | Development |
import numpy as np
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import time
class AutoMLPipeline:
def __init__(self, time_budget=3600, metric='accuracy'):
self.time_budget = time_budget # seconds
self.metric = metric
self.models = self._get_model_zoo()
self.best_model = None
self.best_score = -np.inf
self.results = []
def _get_model_zoo(self):
"""Define candidate models"""
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier
return [
('LogisticRegression', LogisticRegression(max_iter=1000)),
('RandomForest', RandomForestClassifier(n_estimators=100)),
('GradientBoosting', GradientBoostingClassifier(n_estimators=100)),
('XGBoost', XGBClassifier(n_estimators=100, use_label_encoder=False)),
('SVM', SVC(probability=True))
]
def fit(self, X, y):
"""Automatically find best model and hyperparameters"""
start_time = time.time()
for name, model in self.models:
if time.time() - start_time > self.time_budget:
break
# Create pipeline with preprocessing
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', model)
])
# Evaluate model
try:
scores = cross_val_score(
pipeline, X, y,
cv=5,
scoring=self.metric,
n_jobs=-1
)
mean_score = scores.mean()
std_score = scores.std()
self.results.append({
'model': name,
'score': mean_score,
'std': std_score,
'time': time.time() - start_time
})
# Update best model
if mean_score > self.best_score:
self.best_score = mean_score
self.best_model = pipeline
print(f"{name}: {mean_score:.4f} (+/- {std_score:.4f})")
except Exception as e:
print(f"Failed to evaluate {name}: {e}")
# Perform hyperparameter tuning on best model
if self.best_model and time.time() - start_time < self.time_budget:
self._tune_hyperparameters(X, y, time.time() - start_time)
return self
def _tune_hyperparameters(self, X, y, time_used):
"""Fine-tune the best model"""
from sklearn.model_selection import RandomizedSearchCV
remaining_time = self.time_budget - time_used
n_iter = min(50, int(remaining_time / 10)) # Estimate iterations
# Get parameter distributions based on model type
model_name = type(self.best_model.named_steps['model']).__name__
param_dist = self._get_param_distributions(model_name)
if param_dist:
random_search = RandomizedSearchCV(
self.best_model,
param_dist,
n_iter=n_iter,
cv=5,
scoring=self.metric,
n_jobs=-1,
random_state=42
)
random_search.fit(X, y)
self.best_model = random_search.best_estimator_
self.best_score = random_search.best_score_
print(f"Tuned {model_name}: {self.best_score:.4f}")
def _get_param_distributions(self, model_name):
"""Get hyperparameter distributions for each model"""
from scipy.stats import uniform, randint
distributions = {
'RandomForestClassifier': {
'model__n_estimators': randint(50, 500),
'model__max_depth': randint(3, 20),
'model__min_samples_split': randint(2, 20),
'model__min_samples_leaf': randint(1, 10)
},
'XGBClassifier': {
'model__n_estimators': randint(50, 500),
'model__max_depth': randint(3, 10),
'model__learning_rate': uniform(0.01, 0.3),
'model__subsample': uniform(0.6, 0.4)
},
'LogisticRegression': {
'model__C': uniform(0.01, 10),
'model__penalty': ['l1', 'l2']
}
}
return distributions.get(model_name, {})
def predict(self, X):
"""Make predictions with best model"""
return self.best_model.predict(X)
def get_leaderboard(self):
"""Get sorted results"""
return sorted(self.results, key=lambda x: x['score'], reverse=True)
# Usage
automl = AutoMLPipeline(time_budget=300, metric='f1_weighted')
automl.fit(X_train, y_train)
print("\nLeaderboard:")
for result in automl.get_leaderboard():
print(f"{result['model']}: {result['score']:.4f}")
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
class DistributedTrainer:
def __init__(self, model, rank, world_size):
self.rank = rank
self.world_size = world_size
# Initialize distributed training
dist.init_process_group(
backend='nccl',
init_method='env://',
world_size=world_size,
rank=rank
)
# Move model to GPU
self.device = torch.device(f'cuda:{rank}')
self.model = model.to(self.device)
# Wrap model with DDP
self.model = DDP(
self.model,
device_ids=[rank],
output_device=rank,
find_unused_parameters=True
)
# Setup optimizer with scaled learning rate
base_lr = 0.001
self.lr = base_lr * world_size # Linear scaling rule
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
# Learning rate scheduler
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=100
)
def train_epoch(self, dataloader):
"""Train one epoch with gradient accumulation"""
self.model.train()
total_loss = 0
accumulation_steps = 4 # Gradient accumulation
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(self.device), target.to(self.device)
# Forward pass
output = self.model(data)
loss = nn.functional.cross_entropy(output, target)
loss = loss / accumulation_steps
# Backward pass
loss.backward()
# Update weights
if (batch_idx + 1) % accumulation_steps == 0:
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
self.optimizer.zero_grad()
total_loss += loss.item()
# Log progress (only on rank 0)
if self.rank == 0 and batch_idx % 100 == 0:
print(f'Batch [{batch_idx}/{len(dataloader)}] Loss: {loss.item():.4f}')
# All-reduce to get average loss across all processes
avg_loss = total_loss / len(dataloader)
avg_loss_tensor = torch.tensor(avg_loss).to(self.device)
dist.all_reduce(avg_loss_tensor, op=dist.ReduceOp.SUM)
avg_loss = avg_loss_tensor.item() / self.world_size
return avg_loss
def save_checkpoint(self, epoch, path):
"""Save model checkpoint (only on rank 0)"""
if self.rank == 0:
checkpoint = {
'epoch': epoch,
'model_state_dict': self.model.module.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
'loss': self.current_loss
}
torch.save(checkpoint, path)
print(f"Checkpoint saved: {path}")
def cleanup(self):
"""Clean up distributed training"""
dist.destroy_process_group()
# Usage
def main(rank, world_size):
# Setup
trainer = DistributedTrainer(model, rank, world_size)
# Create distributed sampler
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank
)
train_loader = DataLoader(
train_dataset,
batch_size=32,
sampler=train_sampler,
num_workers=4,
pin_memory=True
)
# Training loop
for epoch in range(num_epochs):
train_sampler.set_epoch(epoch) # Shuffle differently each epoch
loss = trainer.train_epoch(train_loader)
if rank == 0:
print(f"Epoch {epoch}: Loss = {loss:.4f}")
trainer.scheduler.step()
# Save checkpoint
if epoch % 10 == 0:
trainer.save_checkpoint(epoch, f"checkpoint_epoch_{epoch}.pt")
trainer.cleanup()
# Launch distributed training
if __name__ == "__main__":
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(
main,
args=(world_size,),
nprocs=world_size,
join=True
)
| Technique | Description | Speed Gain | Size Reduction | Accuracy Impact |
|---|---|---|---|---|
| Quantization | Reduce precision (FP32βINT8) | 2-4x | 75% | 0-2% loss |
| Pruning | Remove unnecessary weights | 1.5-3x | 50-90% | 1-3% loss |
| Knowledge Distillation | Train smaller student model | 5-10x | 80-95% | 2-5% loss |
| Mixed Precision | FP16 training with FP32 master | 2-3x | 50% | No loss |
| ONNX Conversion | Optimize for inference | 1.5-2x | 10-20% | No loss |
import shap
# Create explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# Feature importance
shap.summary_plot(shap_values, X_test)
# Individual prediction
shap.force_plot(
explainer.expected_value[1],
shap_values[1][0],
X_test.iloc[0]
)
Systematic analysis of failure modes
| Framework | Best For | Language | Deployment | Community |
|---|---|---|---|---|
| TensorFlow | Production systems | Python/C++ | TF Serving, TF Lite | Very Large |
| PyTorch | Research & development | Python | TorchServe, ONNX | Large |
| Scikit-learn | Classical ML | Python | Pickle, ONNX | Very Large |
| XGBoost | Tabular data | Python/R/Java | Native, ONNX | Large |
| JAX | High performance | Python | JIT compilation | Growing |
# Model Training
model.fit(X_train, y_train, epochs=10, batch_size=32)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# Model Evaluation
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)
# Model Saving/Loading
model.save('model.h5') # Keras
torch.save(model.state_dict(), 'model.pt') # PyTorch
joblib.dump(model, 'model.pkl') # Scikit-learn
# Hyperparameter Tuning
GridSearchCV(estimator, param_grid, cv=5)
RandomizedSearchCV(estimator, param_distributions, n_iter=100)
# Cross-Validation
cross_val_score(model, X, y, cv=5, scoring='accuracy')
cross_validate(model, X, y, cv=5, return_train_score=True)
# Feature Importance
feature_importance = model.feature_importances_ # Tree-based
permutation_importance(model, X_test, y_test) # Model-agnostic
# Model Monitoring
mlflow.log_metric("accuracy", accuracy)
wandb.log({"loss": loss, "accuracy": accuracy})
Symptoms: High train accuracy, low validation accuracy
Solutions:
Symptoms: Low train and validation accuracy
Solutions:
Symptoms: Too-good-to-be-true results
Solutions:
Accuracy: Balanced classes
Precision: False positives costly
Recall: False negatives costly
F1: Balance precision/recall
AUC-ROC: Probability ranking
Log Loss: Probability calibration
Accuracy: > 90%Precision: > 85%Recall: > 80%F1 Score: > 0.85AUC-ROC: > 0.9Latency: < 100ms
GPU Memory: Model size Γ 4RAM: Dataset size Γ 3Storage: Model + Data Γ 2Training Time: O(n Γ epochs)Inference: O(1) per sample