Project: Real-Time Analytics Pipeline

Hard 35 min read

Project Overview

What We Are Building

Goal: Build an end-to-end real-time analytics pipeline that ingests clickstream events from Kafka, processes them through a medallion architecture using Structured Streaming, computes real-time aggregations with windowed operations, and feeds results to a live dashboard.

Technologies: Apache Kafka, Databricks Structured Streaming, Delta Lake, PySpark, SQL Warehouses for dashboards.

Business Value: Enable product teams to see user engagement metrics with less than 60 seconds of latency, replacing a batch pipeline that had 4-hour delays.

Real-Time Pipeline Architecture
Kafka Clickstream Events Bronze Raw Events Delta Table Streaming Ingest Checkpoint Silver Cleaned Events Schema Validation Deduplication Enrichment Gold Aggregations Windowed Counts Session Metrics Funnel Stats Dashboard SQL Warehouse Real-Time Views 10K events/sec Trigger: 10s Trigger: 10s Trigger: 30s Refresh: 30s End-to-End Latency: Event to Dashboard in under 60 seconds

Event Source (Kafka)

Our clickstream events are produced by web and mobile applications to a Kafka cluster. Each event contains a user ID, event type, page URL, timestamp, and session metadata. The Kafka topic is partitioned by user ID for ordering guarantees within a user session.

PySpark - Kafka Event Schema
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

# Define the expected event schema
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("session_id", StringType(), True),
    StructField("event_type", StringType(), False),
    StructField("page_url", StringType(), True),
    StructField("referrer", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("country", StringType(), True),
    StructField("event_timestamp", LongType(), False),
    StructField("properties", StringType(), True),
])

Stream Ingestion (Bronze Layer)

The bronze layer ingests raw events from Kafka with minimal transformation. We parse the JSON payload, add ingestion metadata, and write to a Delta table with checkpointing for exactly-once guarantees.

PySpark - Kafka to Bronze Delta Table
from pyspark.sql import functions as F

# Read from Kafka
kafka_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "clickstream-events")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "100000")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.sasl.jaas.config",
        f'org.apache.kafka.common.security.plain.PlainLoginModule required '
        f'username="{dbutils.secrets.get("kafka", "username")}" '
        f'password="{dbutils.secrets.get("kafka", "password")}";')
    .load()
)

# Parse Kafka value (JSON) and add metadata
bronze_stream = (
    kafka_stream
    .select(
        F.col("key").cast("string").alias("kafka_key"),
        F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
        F.col("topic"),
        F.col("partition").alias("kafka_partition"),
        F.col("offset").alias("kafka_offset"),
        F.col("timestamp").alias("kafka_timestamp"),
        F.current_timestamp().alias("ingestion_timestamp"),
    )
    .select("kafka_key", "data.*", "kafka_partition", "kafka_offset",
            "kafka_timestamp", "ingestion_timestamp")
)

# Write to Bronze Delta table
bronze_query = (
    bronze_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/bronze_clickstream")
    .trigger(processingTime="10 seconds")
    .toTable("catalog.streaming.bronze_clickstream")
)

Stream Processing (Silver Layer)

The silver layer cleans and validates the raw events: deduplicating by event_id, filtering malformed records, converting timestamps, and enriching with dimension data.

PySpark - Bronze to Silver Streaming Transform
# Read Bronze as a stream
bronze_df = spark.readStream.table("catalog.streaming.bronze_clickstream")

# Clean, validate, and deduplicate
silver_stream = (
    bronze_df
    # Convert epoch ms to timestamp
    .withColumn("event_time",
        (F.col("event_timestamp") / 1000).cast("timestamp"))
    # Filter invalid records
    .filter(F.col("event_id").isNotNull())
    .filter(F.col("user_id").isNotNull())
    .filter(F.col("event_type").isin(
        "page_view", "click", "scroll", "purchase", "add_to_cart"))
    # Deduplicate within watermark window
    .withWatermark("event_time", "10 minutes")
    .dropDuplicatesWithinWatermark(["event_id"])
    # Add derived columns
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn("event_hour", F.hour("event_time"))
)

# Write to Silver
silver_query = (
    silver_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/silver_clickstream")
    .trigger(processingTime="10 seconds")
    .toTable("catalog.streaming.silver_clickstream")
)

Real-Time Aggregations (Gold Layer)

The gold layer computes windowed aggregations for the dashboard: active users per minute, event counts by type, and page view trends. We use tumbling and sliding windows to produce near-real-time metrics.

PySpark - Windowed Aggregations
# Read Silver as a stream
silver_df = spark.readStream.table("catalog.streaming.silver_clickstream")

# Tumbling window: events per minute by type
events_per_minute = (
    silver_df
    .withWatermark("event_time", "5 minutes")
    .groupBy(
        F.window("event_time", "1 minute"),
        "event_type"
    )
    .agg(
        F.count("*").alias("event_count"),
        F.approx_count_distinct("user_id").alias("unique_users"),
    )
    .select(
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "event_type",
        "event_count",
        "unique_users",
    )
)

# Write Gold aggregations (complete mode for windowed aggs)
gold_query = (
    events_per_minute.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/gold_events_per_minute")
    .trigger(processingTime="30 seconds")
    .toTable("catalog.streaming.gold_events_per_minute")
)

Dashboard Integration

Databricks SQL dashboards can query the gold Delta tables directly. SQL Warehouses support auto-refresh so dashboards update as new data arrives. For sub-minute refresh, configure the dashboard to poll the gold table every 30 seconds.

SQL - Dashboard Queries
-- Active users in the last 5 minutes
SELECT
    window_start,
    SUM(unique_users) AS active_users,
    SUM(event_count) AS total_events
FROM catalog.streaming.gold_events_per_minute
WHERE window_start >= current_timestamp() - INTERVAL 5 MINUTES
GROUP BY window_start
ORDER BY window_start DESC;

-- Event breakdown by type (last hour)
SELECT
    event_type,
    SUM(event_count) AS total,
    ROUND(SUM(event_count) * 100.0 /
        SUM(SUM(event_count)) OVER(), 1) AS pct
FROM catalog.streaming.gold_events_per_minute
WHERE window_start >= current_timestamp() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY total DESC;

Late Data Handling

In streaming systems, events can arrive out of order or late due to network delays, mobile offline queuing, or batch uploads. Watermarks define how long the system waits for late data before finalizing a window.

Watermark Strategy

  • Bronze: No watermark needed (append-only raw storage)
  • Silver: 10-minute watermark for deduplication
  • Gold: 5-minute watermark for windowed aggregations
  • Events arriving after the watermark expires are dropped from aggregations but remain in bronze/silver for batch reprocessing

Monitoring

Monitor streaming query health using the StreamingQueryListener API, Spark UI streaming tab, and Delta table metrics. Set up alerts for processing lag, failed batches, and data quality issues.

PySpark - Stream Monitoring
# Check streaming query status
for query in spark.streams.active:
    print(f"Query: {query.name}")
    print(f"  Status: {query.status}")
    progress = query.lastProgress
    if progress:
        print(f"  Input rows/sec: {progress['inputRowsPerSecond']}")
        print(f"  Process rows/sec: {progress['processedRowsPerSecond']}")
        print(f"  Batch duration: {progress['batchDuration']}ms")

# Alert if processing falls behind
import requests

def check_stream_health():
    for query in spark.streams.active:
        progress = query.lastProgress
        if progress:
            input_rate = progress["inputRowsPerSecond"]
            process_rate = progress["processedRowsPerSecond"]
            if process_rate > 0 and input_rate / process_rate > 1.5:
                print(f"WARNING: Stream {query.name} falling behind!")

Practice Problems

Problem 1: Exactly-Once Semantics

Hard

Your streaming pipeline occasionally produces duplicate records in the gold layer after a cluster restart. The bronze and silver layers appear correct. What could cause this and how would you fix it?

Problem 2: Scaling for Peak Traffic

Hard

During a flash sale, your event rate spikes from 10K/sec to 200K/sec. The streaming pipeline cannot keep up -- processing lag grows to 30 minutes. How do you design the pipeline to handle traffic spikes?

Problem 3: Data Quality Monitoring

Medium

How would you detect and alert on data quality issues in a streaming pipeline, such as a sudden drop in event volume, unexpected null rates, or schema changes from the producer?

Quick Reference

Streaming Pipeline Cheat Sheet

Component Setting Recommendation
Kafka Source maxOffsetsPerTrigger 100K for steady state, increase for catch-up
Bronze Trigger processingTime 10 seconds for low latency
Watermark Silver dedup 10 minutes (covers network delays)
Gold Windows Tumbling window 1 minute with 5 min watermark
Checkpoints Location DBFS or cloud storage, never local
Dashboard Refresh interval 30 seconds for real-time feel