Structured Streaming & Real-Time Data

Hard 35 min read

What is Structured Streaming?

Why Structured Streaming Matters

The Problem: Modern businesses need real-time insights -- fraud detection, live dashboards, IoT monitoring -- but batch processing introduces hours of latency.

The Solution: Structured Streaming treats a live data stream as an unbounded table that is continuously appended to. You write the same DataFrame/SQL code as batch, and Spark handles the streaming execution.

Real Impact: Companies using Structured Streaming on Databricks achieve sub-second to minute-level latency for analytics that previously took hours with batch jobs.

Real-World Analogy

Think of Structured Streaming like a conveyor belt sushi restaurant:

  • Micro-batch = Plates arrive in small groups every few seconds. You process each group.
  • Continuous = Plates arrive one by one. You grab and process each immediately (lowest latency).
  • Watermarks = A sign saying "no plates older than 10 minutes will arrive" so you can finalize your bill.
  • Windowing = Grouping plates by the time they were made (e.g., all plates from 12:00-12:05).
Structured Streaming Architecture
Sources Kafka Event Hubs Kinesis Auto Loader Delta CDF Rate Source Spark Streaming Engine Micro-Batch Scheduler Watermark Manager State Store (RocksDB) Checkpoint Manager Sinks Delta Lake Kafka Console Memory foreachBatch Custom Sink Output Modes Append New rows only Complete Full result table Update Changed rows only

Micro-Batch Processing

Micro-batch is the default execution mode. Spark divides the stream into small batches (typically every 100ms-10s) and processes each batch as a regular Spark job. This provides exactly-once semantics with low latency.

PySpark - Basic Streaming Pipeline
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

# Define the expected schema for Kafka messages
event_schema = StructType()     .add("event_id", StringType())     .add("user_id", StringType())     .add("event_type", StringType())     .add("amount", DoubleType())     .add("timestamp", TimestampType())

# Read from Kafka
raw_stream = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "user-events")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", 10000)
    .load()
)

# Parse JSON messages
parsed_stream = (
    raw_stream
    .selectExpr("CAST(value AS STRING) as json_str")
    .select(F.from_json("json_str", event_schema).alias("data"))
    .select("data.*")
    .withColumn("processed_at", F.current_timestamp())
)

# Write to Delta Lake
query = (
    parsed_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "dbfs:/cp/user_events")
    .trigger(processingTime="10 seconds")
    .toTable("bronze.user_events")
)

# Monitor the query
print(query.status)
print(query.lastProgress)

Watermarks and Windowing

Watermarks tell Spark how long to wait for late-arriving data before finalizing aggregations. Windowing groups events by time intervals for time-based analytics.

PySpark - Watermarks and Window Aggregations
# ── Windowed Aggregation with Watermarks ──

# Count events per 5-minute window, allowing 10-min late data
windowed_counts = (
    parsed_stream
    # Watermark: drop data arriving more than 10 min late
    .withWatermark("timestamp", "10 minutes")

    # Group by 5-minute tumbling windows
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "event_type"
    )
    .agg(
        F.count("*").alias("event_count"),
        F.sum("amount").alias("total_amount"),
        F.avg("amount").alias("avg_amount"),
        F.approx_count_distinct("user_id").alias("unique_users")
    )
)

# Sliding window: 10-minute window, sliding every 2 minutes
sliding_counts = (
    parsed_stream
    .withWatermark("timestamp", "15 minutes")
    .groupBy(
        F.window("timestamp", "10 minutes", "2 minutes"),
        "event_type"
    )
    .count()
)

# Write windowed results to Delta
(
    windowed_counts.writeStream
    .format("delta")
    .outputMode("append")    # Append finalized windows
    .option("checkpointLocation", "dbfs:/cp/windowed_events")
    .trigger(processingTime="30 seconds")
    .toTable("silver.event_windows")
)

Stream-Static Joins

A stream-static join enriches streaming data with a static (batch) dimension table. The static side is re-read on each micro-batch, so updates to the dimension are picked up automatically.

PySpark - Stream-Static and Stream-Stream Joins
# ── Stream-Static Join: Enrich events with user data ──

# Static dimension table (re-read each micro-batch)
users_dim = spark.table("silver.users")

# Join streaming events with static users
enriched_stream = (
    parsed_stream
    .join(users_dim, "user_id", "left")
    .select(
        "event_id", "user_id", "event_type",
        "amount", "timestamp",
        users_dim["country"],
        users_dim["segment"],
        users_dim["signup_date"]
    )
)

# ── Stream-Stream Join: Match orders with payments ──
orders_stream = spark.readStream.table("bronze.orders")
payments_stream = spark.readStream.table("bronze.payments")

# Join two streams with watermarks (required)
matched = (
    orders_stream
    .withWatermark("order_ts", "30 minutes")
    .join(
        payments_stream.withWatermark("payment_ts", "30 minutes"),
        F.expr("""
            order_id = payment_order_id AND
            payment_ts >= order_ts AND
            payment_ts <= order_ts + interval 1 hour
        """),
        "inner"
    )
)

Output Modes and Triggers

Output Mode Behavior Use Case
Append Only new rows that will never change again Raw event ingestion, finalized windows
Complete Entire result table rewritten each batch Small aggregation results, dashboards
Update Only changed rows since last batch Running aggregations, stateful processing
Trigger Type Behavior Use Case
processingTime Fixed interval micro-batches Continuous streaming with controlled latency
availableNow Process all available, then stop Scheduled batch-style with streaming semantics
once Single micro-batch, then stop One-time catch-up processing
continuous True continuous (experimental) Sub-millisecond latency requirements

Practice Problems

Problem 1: Real-Time Fraud Detection

Medium

Design a streaming pipeline that detects potentially fraudulent transactions in near-real-time. A transaction is suspicious if a user makes more than 5 purchases totaling over $1,000 within a 10-minute window.

Problem 2: Stream-Stream Join for Order Matching

Hard

You have two Kafka topics: "orders" and "shipments." Design a stream-stream join that matches orders to their shipments. Orders should be matched within 24 hours. Unmatched orders after 24 hours should be flagged as delayed.

Problem 3: Exactly-Once Semantics Design

Hard

Your streaming pipeline writes to both a Delta table and an external PostgreSQL database. How do you ensure exactly-once processing across both sinks, considering that Delta supports transactions but PostgreSQL does not support Spark checkpoints?