Project Overview
What We Are Building
Goal: Build an end-to-end real-time analytics pipeline that ingests clickstream events from Kafka, processes them through a medallion architecture using Structured Streaming, computes real-time aggregations with windowed operations, and feeds results to a live dashboard.
Technologies: Apache Kafka, Databricks Structured Streaming, Delta Lake, PySpark, SQL Warehouses for dashboards.
Business Value: Enable product teams to see user engagement metrics with less than 60 seconds of latency, replacing a batch pipeline that had 4-hour delays.
Event Source (Kafka)
Our clickstream events are produced by web and mobile applications to a Kafka cluster. Each event contains a user ID, event type, page URL, timestamp, and session metadata. The Kafka topic is partitioned by user ID for ordering guarantees within a user session.
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
# Define the expected event schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), True),
StructField("event_type", StringType(), False),
StructField("page_url", StringType(), True),
StructField("referrer", StringType(), True),
StructField("device_type", StringType(), True),
StructField("country", StringType(), True),
StructField("event_timestamp", LongType(), False),
StructField("properties", StringType(), True),
])
Stream Ingestion (Bronze Layer)
The bronze layer ingests raw events from Kafka with minimal transformation. We parse the JSON payload, add ingestion metadata, and write to a Delta table with checkpointing for exactly-once guarantees.
from pyspark.sql import functions as F
# Read from Kafka
kafka_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "clickstream-events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", "100000")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="{dbutils.secrets.get("kafka", "username")}" '
f'password="{dbutils.secrets.get("kafka", "password")}";')
.load()
)
# Parse Kafka value (JSON) and add metadata
bronze_stream = (
kafka_stream
.select(
F.col("key").cast("string").alias("kafka_key"),
F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
F.col("topic"),
F.col("partition").alias("kafka_partition"),
F.col("offset").alias("kafka_offset"),
F.col("timestamp").alias("kafka_timestamp"),
F.current_timestamp().alias("ingestion_timestamp"),
)
.select("kafka_key", "data.*", "kafka_partition", "kafka_offset",
"kafka_timestamp", "ingestion_timestamp")
)
# Write to Bronze Delta table
bronze_query = (
bronze_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/bronze_clickstream")
.trigger(processingTime="10 seconds")
.toTable("catalog.streaming.bronze_clickstream")
)
Stream Processing (Silver Layer)
The silver layer cleans and validates the raw events: deduplicating by event_id, filtering malformed records, converting timestamps, and enriching with dimension data.
# Read Bronze as a stream
bronze_df = spark.readStream.table("catalog.streaming.bronze_clickstream")
# Clean, validate, and deduplicate
silver_stream = (
bronze_df
# Convert epoch ms to timestamp
.withColumn("event_time",
(F.col("event_timestamp") / 1000).cast("timestamp"))
# Filter invalid records
.filter(F.col("event_id").isNotNull())
.filter(F.col("user_id").isNotNull())
.filter(F.col("event_type").isin(
"page_view", "click", "scroll", "purchase", "add_to_cart"))
# Deduplicate within watermark window
.withWatermark("event_time", "10 minutes")
.dropDuplicatesWithinWatermark(["event_id"])
# Add derived columns
.withColumn("event_date", F.to_date("event_time"))
.withColumn("event_hour", F.hour("event_time"))
)
# Write to Silver
silver_query = (
silver_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/silver_clickstream")
.trigger(processingTime="10 seconds")
.toTable("catalog.streaming.silver_clickstream")
)
Real-Time Aggregations (Gold Layer)
The gold layer computes windowed aggregations for the dashboard: active users per minute, event counts by type, and page view trends. We use tumbling and sliding windows to produce near-real-time metrics.
# Read Silver as a stream
silver_df = spark.readStream.table("catalog.streaming.silver_clickstream")
# Tumbling window: events per minute by type
events_per_minute = (
silver_df
.withWatermark("event_time", "5 minutes")
.groupBy(
F.window("event_time", "1 minute"),
"event_type"
)
.agg(
F.count("*").alias("event_count"),
F.approx_count_distinct("user_id").alias("unique_users"),
)
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"event_type",
"event_count",
"unique_users",
)
)
# Write Gold aggregations (complete mode for windowed aggs)
gold_query = (
events_per_minute.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/gold_events_per_minute")
.trigger(processingTime="30 seconds")
.toTable("catalog.streaming.gold_events_per_minute")
)
Dashboard Integration
Databricks SQL dashboards can query the gold Delta tables directly. SQL Warehouses support auto-refresh so dashboards update as new data arrives. For sub-minute refresh, configure the dashboard to poll the gold table every 30 seconds.
-- Active users in the last 5 minutes
SELECT
window_start,
SUM(unique_users) AS active_users,
SUM(event_count) AS total_events
FROM catalog.streaming.gold_events_per_minute
WHERE window_start >= current_timestamp() - INTERVAL 5 MINUTES
GROUP BY window_start
ORDER BY window_start DESC;
-- Event breakdown by type (last hour)
SELECT
event_type,
SUM(event_count) AS total,
ROUND(SUM(event_count) * 100.0 /
SUM(SUM(event_count)) OVER(), 1) AS pct
FROM catalog.streaming.gold_events_per_minute
WHERE window_start >= current_timestamp() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY total DESC;
Late Data Handling
In streaming systems, events can arrive out of order or late due to network delays, mobile offline queuing, or batch uploads. Watermarks define how long the system waits for late data before finalizing a window.
Watermark Strategy
- Bronze: No watermark needed (append-only raw storage)
- Silver: 10-minute watermark for deduplication
- Gold: 5-minute watermark for windowed aggregations
- Events arriving after the watermark expires are dropped from aggregations but remain in bronze/silver for batch reprocessing
Monitoring
Monitor streaming query health using the StreamingQueryListener API, Spark UI streaming tab, and Delta table metrics. Set up alerts for processing lag, failed batches, and data quality issues.
# Check streaming query status
for query in spark.streams.active:
print(f"Query: {query.name}")
print(f" Status: {query.status}")
progress = query.lastProgress
if progress:
print(f" Input rows/sec: {progress['inputRowsPerSecond']}")
print(f" Process rows/sec: {progress['processedRowsPerSecond']}")
print(f" Batch duration: {progress['batchDuration']}ms")
# Alert if processing falls behind
import requests
def check_stream_health():
for query in spark.streams.active:
progress = query.lastProgress
if progress:
input_rate = progress["inputRowsPerSecond"]
process_rate = progress["processedRowsPerSecond"]
if process_rate > 0 and input_rate / process_rate > 1.5:
print(f"WARNING: Stream {query.name} falling behind!")
Practice Problems
Problem 1: Exactly-Once Semantics
HardYour streaming pipeline occasionally produces duplicate records in the gold layer after a cluster restart. The bronze and silver layers appear correct. What could cause this and how would you fix it?
Problem 2: Scaling for Peak Traffic
HardDuring a flash sale, your event rate spikes from 10K/sec to 200K/sec. The streaming pipeline cannot keep up -- processing lag grows to 30 minutes. How do you design the pipeline to handle traffic spikes?
Problem 3: Data Quality Monitoring
MediumHow would you detect and alert on data quality issues in a streaming pipeline, such as a sudden drop in event volume, unexpected null rates, or schema changes from the producer?
Quick Reference
Streaming Pipeline Cheat Sheet
| Component | Setting | Recommendation |
|---|---|---|
| Kafka Source | maxOffsetsPerTrigger | 100K for steady state, increase for catch-up |
| Bronze Trigger | processingTime | 10 seconds for low latency |
| Watermark | Silver dedup | 10 minutes (covers network delays) |
| Gold Windows | Tumbling window | 1 minute with 5 min watermark |
| Checkpoints | Location | DBFS or cloud storage, never local |
| Dashboard | Refresh interval | 30 seconds for real-time feel |