What is Structured Streaming?
Why Structured Streaming Matters
The Problem: Modern businesses need real-time insights -- fraud detection, live dashboards, IoT monitoring -- but batch processing introduces hours of latency.
The Solution: Structured Streaming treats a live data stream as an unbounded table that is continuously appended to. You write the same DataFrame/SQL code as batch, and Spark handles the streaming execution.
Real Impact: Companies using Structured Streaming on Databricks achieve sub-second to minute-level latency for analytics that previously took hours with batch jobs.
Real-World Analogy
Think of Structured Streaming like a conveyor belt sushi restaurant:
- Micro-batch = Plates arrive in small groups every few seconds. You process each group.
- Continuous = Plates arrive one by one. You grab and process each immediately (lowest latency).
- Watermarks = A sign saying "no plates older than 10 minutes will arrive" so you can finalize your bill.
- Windowing = Grouping plates by the time they were made (e.g., all plates from 12:00-12:05).
Micro-Batch Processing
Micro-batch is the default execution mode. Spark divides the stream into small batches (typically every 100ms-10s) and processes each batch as a regular Spark job. This provides exactly-once semantics with low latency.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
# Define the expected schema for Kafka messages
event_schema = StructType() .add("event_id", StringType()) .add("user_id", StringType()) .add("event_type", StringType()) .add("amount", DoubleType()) .add("timestamp", TimestampType())
# Read from Kafka
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
# Parse JSON messages
parsed_stream = (
raw_stream
.selectExpr("CAST(value AS STRING) as json_str")
.select(F.from_json("json_str", event_schema).alias("data"))
.select("data.*")
.withColumn("processed_at", F.current_timestamp())
)
# Write to Delta Lake
query = (
parsed_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/cp/user_events")
.trigger(processingTime="10 seconds")
.toTable("bronze.user_events")
)
# Monitor the query
print(query.status)
print(query.lastProgress)
Watermarks and Windowing
Watermarks tell Spark how long to wait for late-arriving data before finalizing aggregations. Windowing groups events by time intervals for time-based analytics.
# ── Windowed Aggregation with Watermarks ──
# Count events per 5-minute window, allowing 10-min late data
windowed_counts = (
parsed_stream
# Watermark: drop data arriving more than 10 min late
.withWatermark("timestamp", "10 minutes")
# Group by 5-minute tumbling windows
.groupBy(
F.window("timestamp", "5 minutes"),
"event_type"
)
.agg(
F.count("*").alias("event_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
F.approx_count_distinct("user_id").alias("unique_users")
)
)
# Sliding window: 10-minute window, sliding every 2 minutes
sliding_counts = (
parsed_stream
.withWatermark("timestamp", "15 minutes")
.groupBy(
F.window("timestamp", "10 minutes", "2 minutes"),
"event_type"
)
.count()
)
# Write windowed results to Delta
(
windowed_counts.writeStream
.format("delta")
.outputMode("append") # Append finalized windows
.option("checkpointLocation", "dbfs:/cp/windowed_events")
.trigger(processingTime="30 seconds")
.toTable("silver.event_windows")
)
Stream-Static Joins
A stream-static join enriches streaming data with a static (batch) dimension table. The static side is re-read on each micro-batch, so updates to the dimension are picked up automatically.
# ── Stream-Static Join: Enrich events with user data ──
# Static dimension table (re-read each micro-batch)
users_dim = spark.table("silver.users")
# Join streaming events with static users
enriched_stream = (
parsed_stream
.join(users_dim, "user_id", "left")
.select(
"event_id", "user_id", "event_type",
"amount", "timestamp",
users_dim["country"],
users_dim["segment"],
users_dim["signup_date"]
)
)
# ── Stream-Stream Join: Match orders with payments ──
orders_stream = spark.readStream.table("bronze.orders")
payments_stream = spark.readStream.table("bronze.payments")
# Join two streams with watermarks (required)
matched = (
orders_stream
.withWatermark("order_ts", "30 minutes")
.join(
payments_stream.withWatermark("payment_ts", "30 minutes"),
F.expr("""
order_id = payment_order_id AND
payment_ts >= order_ts AND
payment_ts <= order_ts + interval 1 hour
"""),
"inner"
)
)
Output Modes and Triggers
| Output Mode | Behavior | Use Case |
|---|---|---|
| Append | Only new rows that will never change again | Raw event ingestion, finalized windows |
| Complete | Entire result table rewritten each batch | Small aggregation results, dashboards |
| Update | Only changed rows since last batch | Running aggregations, stateful processing |
| Trigger Type | Behavior | Use Case |
|---|---|---|
| processingTime | Fixed interval micro-batches | Continuous streaming with controlled latency |
| availableNow | Process all available, then stop | Scheduled batch-style with streaming semantics |
| once | Single micro-batch, then stop | One-time catch-up processing |
| continuous | True continuous (experimental) | Sub-millisecond latency requirements |
Practice Problems
Problem 1: Real-Time Fraud Detection
MediumDesign a streaming pipeline that detects potentially fraudulent transactions in near-real-time. A transaction is suspicious if a user makes more than 5 purchases totaling over $1,000 within a 10-minute window.
Problem 2: Stream-Stream Join for Order Matching
HardYou have two Kafka topics: "orders" and "shipments." Design a stream-stream join that matches orders to their shipments. Orders should be matched within 24 hours. Unmatched orders after 24 hours should be flagged as delayed.
Problem 3: Exactly-Once Semantics Design
HardYour streaming pipeline writes to both a Delta table and an external PostgreSQL database. How do you ensure exactly-once processing across both sinks, considering that Delta supports transactions but PostgreSQL does not support Spark checkpoints?