Project: ML Feature Store & Model Serving

Hard 35 min read

Project Overview

What You Will Build

In this project, you will build an end-to-end ML system for a fictional e-commerce company that predicts customer churn. The pipeline covers every stage from raw data to a live serving endpoint with traffic splitting for A/B testing.

Skills practiced: Feature engineering, Databricks Feature Store, MLflow experiment tracking, model registry, model serving endpoints, and A/B test configuration.

ML Pipeline Flow: Feature Store to Model Serving
Raw Data Orders, Users, Clickstream Feature Engineering PySpark transforms Feature Store Unity Catalog MLflow Training Experiment tracking Model Registry Version control Serving Endpoint REST API Feature lookup at training time Online feature lookup at serving time A/B Testing Traffic splitting PIPELINE STAGES: Raw Data Engineering Feature Store Training Registry Serving

Feature Engineering

The first step is transforming raw data into meaningful features. For our churn prediction model, we compute behavioral features from user activity data.

PySpark - Customer Churn Feature Engineering
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Load raw tables from the lakehouse
orders = spark.table("catalog.bronze.orders")
users = spark.table("catalog.bronze.users")
clickstream = spark.table("catalog.bronze.clickstream")

# Feature 1: Order recency and frequency (RFM features)
order_features = (
    orders
    .groupBy("customer_id")
    .agg(
        F.datediff(F.current_date(), F.max("order_date")).alias("days_since_last_order"),
        F.count("order_id").alias("total_orders"),
        F.sum("order_total").alias("lifetime_revenue"),
        F.avg("order_total").alias("avg_order_value"),
        F.stddev("order_total").alias("order_value_stddev"),
        F.countDistinct("product_category").alias("unique_categories")
    )
)

# Feature 2: Engagement metrics from clickstream
engagement_features = (
    clickstream
    .where(F.col("event_date") >= F.date_sub(F.current_date(), 30))
    .groupBy("customer_id")
    .agg(
        F.count("session_id").alias("sessions_last_30d"),
        F.sum(F.when(F.col("event_type") == "page_view", 1).otherwise(0)).alias("page_views_30d"),
        F.sum(F.when(F.col("event_type") == "add_to_cart", 1).otherwise(0)).alias("cart_adds_30d"),
        F.avg("session_duration_sec").alias("avg_session_duration")
    )
)

# Feature 3: User demographics
user_features = (
    users
    .select(
        "customer_id",
        F.datediff(F.current_date(), F.col("signup_date")).alias("account_age_days"),
        "subscription_tier",
        "region"
    )
)

# Join all features together
customer_features = (
    user_features
    .join(order_features, "customer_id", "left")
    .join(engagement_features, "customer_id", "left")
    .fillna(0)
    .withColumn("computed_timestamp", F.current_timestamp())
)

display(customer_features)
print(f"Total customers with features: {customer_features.count()}")

Databricks Feature Store

The Feature Store centralizes feature definitions so they can be reused across training and serving. Features are stored as Delta tables in Unity Catalog and automatically looked up during both batch scoring and online serving.

Why Use a Feature Store?

  • Consistency: Same features used in training and serving -- eliminates training/serving skew
  • Reusability: Features defined once, used by multiple models across teams
  • Discovery: Search and browse available features in Unity Catalog
  • Lineage: Track which models use which features
  • Online serving: Automatic feature lookup at inference time
PySpark - Creating and Writing to Feature Store
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

fe = FeatureEngineeringClient()

# Create a feature table in Unity Catalog
fe.create_table(
    name="catalog.ml_features.customer_churn_features",
    primary_keys=["customer_id"],
    timestamp_keys=["computed_timestamp"],
    df=customer_features,
    description="Customer behavioral and demographic features for churn prediction",
    tags={"team": "data-science", "project": "churn-v2"}
)

# To update features later (e.g., daily refresh):
fe.write_table(
    name="catalog.ml_features.customer_churn_features",
    df=customer_features,
    mode="merge"  # Upserts based on primary keys
)

# Browse features
display(fe.read_table("catalog.ml_features.customer_churn_features"))

Online Feature Store

For real-time model serving, features need to be available with low latency. Databricks publishes feature tables to an online store backed by DynamoDB, Cosmos DB, or a built-in online table:

SQL - Create Online Table for Low-Latency Serving
-- Create an online table that syncs from the feature table
CREATE ONLINE TABLE catalog.ml_features.customer_churn_features_online
AS SELECT * FROM catalog.ml_features.customer_churn_features;

-- The online table auto-syncs from the source Delta table
-- Typical latency: single-digit milliseconds for lookups

MLflow Model Training

With features in the store, we train a churn prediction model using MLflow for experiment tracking. The key is to use feature lookups rather than passing raw DataFrames -- this ensures the model knows which features it needs at serving time.

PySpark - Training with Feature Store and MLflow
import mlflow
from mlflow.models import infer_signature
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score

fe = FeatureEngineeringClient()

# Define feature lookups -- this links the model to the Feature Store
feature_lookups = [
    FeatureLookup(
        table_name="catalog.ml_features.customer_churn_features",
        lookup_key=["customer_id"],
        feature_names=[
            "days_since_last_order", "total_orders",
            "lifetime_revenue", "avg_order_value",
            "sessions_last_30d", "page_views_30d",
            "cart_adds_30d", "avg_session_duration",
            "account_age_days", "unique_categories"
        ]
    )
]

# Training labels (customer_id + churned label)
labels_df = spark.table("catalog.gold.customer_churn_labels")

# Create training set with feature lookups
training_set = fe.create_training_set(
    df=labels_df,
    feature_lookups=feature_lookups,
    label="churned",
    exclude_columns=["customer_id"]
)

# Convert to pandas for sklearn
training_df = training_set.load_df().toPandas()
X = training_df.drop("churned", axis=1)
y = training_df["churned"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Train with MLflow tracking
mlflow.set_experiment("/Experiments/churn-prediction-v2")

with mlflow.start_run(run_name="gbm_churn_v2") as run:
    params = {
        "n_estimators": 200,
        "max_depth": 6,
        "learning_rate": 0.1,
        "min_samples_leaf": 20,
        "subsample": 0.8
    }
    mlflow.log_params(params)

    model = GradientBoostingClassifier(**params)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_proba = model.predict_proba(X_test)[:, 1]

    metrics = {
        "auc_roc": roc_auc_score(y_test, y_proba),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred),
        "test_size": len(X_test)
    }
    mlflow.log_metrics(metrics)

    # Log model WITH feature store metadata
    fe.log_model(
        model=model,
        artifact_path="churn_model",
        flavor=mlflow.sklearn,
        training_set=training_set,
        registered_model_name="catalog.ml_models.churn_predictor"
    )

    print(f"AUC-ROC: {metrics['auc_roc']:.4f}")
    print(f"Precision: {metrics['precision']:.4f}")
    print(f"Recall: {metrics['recall']:.4f}")

Model Registry & Lifecycle

The Unity Catalog Model Registry provides versioned model management with aliases for stage transitions. Models move through stages like "Champion" and "Challenger" rather than being deployed directly.

Python - Model Registry Operations
import mlflow
from mlflow import MlflowClient

client = MlflowClient()
model_name = "catalog.ml_models.churn_predictor"

# List all versions
versions = client.search_model_versions(f"name='{model_name}'")
for v in versions:
    print(f"Version {v.version}: {v.aliases} (run_id: {v.run_id})")

# Set alias for production (Champion)
client.set_registered_model_alias(
    name=model_name,
    alias="Champion",
    version=3
)

# Set alias for new candidate (Challenger)
client.set_registered_model_alias(
    name=model_name,
    alias="Challenger",
    version=4
)

# Load a model by alias
champion_model = mlflow.pyfunc.load_model(
    f"models:/{model_name}@Champion"
)

# Add model description
client.update_registered_model(
    name=model_name,
    description="Customer churn prediction model. Uses behavioral and demographic features. Target: 30-day churn."
)

# Add version description
client.update_model_version(
    name=model_name,
    version=4,
    description="GBM v2 with engagement features. AUC=0.89. Candidate for A/B test."
)

Model Serving Endpoint

Databricks Model Serving provides a fully managed REST API endpoint for your registered models. It handles scaling, load balancing, and integrates with the Feature Store for automatic feature lookup.

Python - Create a Serving Endpoint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    EndpointCoreConfigInput,
    ServedEntityInput,
    AutoCaptureConfigInput
)

w = WorkspaceClient()

# Create serving endpoint with the Champion model
endpoint_config = EndpointCoreConfigInput(
    served_entities=[
        ServedEntityInput(
            entity_name="catalog.ml_models.churn_predictor",
            entity_version="3",
            name="champion",
            workload_size="Small",
            scale_to_zero_enabled=True,
            environment_vars={
                "FEATURE_STORE_URI": "databricks-uc"
            }
        )
    ],
    auto_capture_config=AutoCaptureConfigInput(
        catalog_name="catalog",
        schema_name="ml_monitoring",
        table_name_prefix="churn_endpoint",
        enabled=True
    )
)

w.serving_endpoints.create(
    name="churn-prediction-endpoint",
    config=endpoint_config
)

print("Endpoint created. Waiting for deployment...")

Querying the Endpoint

Python - Making Predictions via REST API
import requests
import json

DATABRICKS_HOST = "https://<workspace-url>"
TOKEN = dbutils.secrets.get(scope="serving", key="api-token")

# With Feature Store integration, you only need the lookup key
payload = {
    "dataframe_records": [
        {"customer_id": "C-10042"},
        {"customer_id": "C-20087"},
        {"customer_id": "C-30155"}
    ]
}

response = requests.post(
    f"{DATABRICKS_HOST}/serving-endpoints/churn-prediction-endpoint/invocations",
    headers={
        "Authorization": f"Bearer {TOKEN}",
        "Content-Type": "application/json"
    },
    json=payload
)

predictions = response.json()
for pred in predictions["predictions"]:
    print(f"Customer: {pred['customer_id']}, Churn Prob: {pred['churn_probability']:.2%}")

A/B Testing with Traffic Splitting

When deploying a new model version, you want to gradually shift traffic and compare performance against the current champion. Databricks supports traffic splitting at the endpoint level.

Python - Configure A/B Test with Traffic Splitting
from databricks.sdk.service.serving import (
    EndpointCoreConfigInput,
    ServedEntityInput,
    TrafficConfig,
    Route
)

# Update endpoint to serve both Champion and Challenger
ab_config = EndpointCoreConfigInput(
    served_entities=[
        ServedEntityInput(
            entity_name="catalog.ml_models.churn_predictor",
            entity_version="3",
            name="champion",
            workload_size="Small",
            scale_to_zero_enabled=False
        ),
        ServedEntityInput(
            entity_name="catalog.ml_models.churn_predictor",
            entity_version="4",
            name="challenger",
            workload_size="Small",
            scale_to_zero_enabled=False
        )
    ],
    traffic_config=TrafficConfig(
        routes=[
            Route(served_model_name="champion", traffic_percentage=90),
            Route(served_model_name="challenger", traffic_percentage=10)
        ]
    )
)

w.serving_endpoints.update_config(
    name="churn-prediction-endpoint",
    served_entities=ab_config.served_entities,
    traffic_config=ab_config.traffic_config
)

print("A/B test configured: 90% Champion, 10% Challenger")

Monitoring A/B Test Results

SQL - Analyze A/B Test Results from Inference Logs
-- Query the auto-captured inference logs
SELECT
    served_model_name,
    COUNT(*) AS total_requests,
    AVG(response_time_ms) AS avg_latency_ms,
    PERCENTILE(response_time_ms, 0.95) AS p95_latency_ms,
    AVG(
        CASE WHEN prediction = actual_outcome
        THEN 1.0 ELSE 0.0 END
    ) AS accuracy
FROM catalog.ml_monitoring.churn_endpoint_payload
WHERE timestamp_ms >= UNIX_TIMESTAMP(CURRENT_DATE() - INTERVAL 7 DAYS) * 1000
GROUP BY served_model_name
ORDER BY served_model_name;

A/B Test Decision Framework

  • Week 1: 90/10 split -- validate Challenger stability (no errors, acceptable latency)
  • Week 2: 70/30 split -- gather enough data for statistical significance
  • Week 3: Evaluate metrics. If Challenger wins, promote to 100%. If not, roll back.
  • Key metrics: AUC-ROC on holdout set, prediction latency p95, error rate
  • Safety: Set up alerts for latency spikes or error rate increases on the Challenger

Practice Problems

Problem 1: Feature Store Design

Medium

You are building a fraud detection model that needs: (1) real-time transaction features (amount, merchant category), (2) customer historical features (avg transaction amount, frequency), and (3) device features (device type, location). Design the Feature Store tables and explain which features should be online vs batch-only.

Problem 2: Model Rollback Strategy

Hard

Your Challenger model passed A/B testing and was promoted to 100% traffic. Two days later, you discover it performs poorly for enterprise accounts. Describe your rollback plan and how to prevent this in the future.

Problem 3: End-to-End Pipeline Design

Hard

Design a complete ML pipeline that retrains weekly, validates against the current Champion, and automatically promotes if it passes all quality gates. Include the Databricks Jobs workflow structure, Feature Store refresh, and serving endpoint update steps.