Spark UI Deep Dive
Why Performance Tuning Matters
The Problem: Spark jobs that run slowly or fail due to out-of-memory errors waste compute resources and delay critical data pipelines.
The Solution: Systematic performance tuning using the Spark UI, shuffle optimization, partition management, and Databricks-specific features like Photon and AQE can reduce job runtime by 2-10x.
Real Impact: A well-tuned Spark job processing 1 TB of data can complete in minutes rather than hours, saving thousands of dollars in compute costs monthly.
The Spark UI is your primary diagnostic tool for understanding job performance. Every Spark application exposes a web UI that shows detailed information about stages, tasks, shuffles, and resource utilization.
Reading the Spark UI Effectively
| Tab | What It Shows | What to Look For |
|---|---|---|
| Jobs | All Spark actions triggered | Failed jobs, long-running jobs |
| Stages | DAG stages with task metrics | Skew (max vs median task time), shuffle size |
| Storage | Cached RDDs/DataFrames | Memory usage, fraction cached |
| SQL/DataFrame | Query plans with physical operators | Scan types, join strategies, filter pushdown |
| Executors | Resource usage per executor | Uneven task distribution, GC overhead |
Shuffle Optimization
Shuffles are the most expensive operations in Spark. A shuffle occurs whenever data must be redistributed across partitions -- during joins, aggregations, repartitioning, and distinct operations. Minimizing shuffle volume is the single most impactful optimization you can make.
What Causes Shuffles?
- Joins: Both sides must be partitioned by the join key
- groupBy / aggregations: Data must be grouped by key on the same executor
- repartition(): Explicitly redistributes data
- distinct(): Requires global deduplication
- Window functions with partitionBy: Data must be co-located by partition key
Broadcast Join Optimization
When one side of a join is small enough to fit in memory (typically under 10 MB by default, configurable up to 8 GB), Spark can broadcast it to all executors, eliminating the shuffle entirely.
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
# BAD: Shuffle join (both tables are large or Spark doesn't auto-broadcast)
result_slow = sales_df.join(products_df, "product_id")
# GOOD: Force broadcast join when you know the table is small
result_fast = sales_df.join(broadcast(products_df), "product_id")
# Configure broadcast threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")
# Check the query plan to verify broadcast join is used
result_fast.explain(True)
# Look for "BroadcastHashJoin" instead of "SortMergeJoin"
Handling Data Skew in Joins
Data skew occurs when certain keys have disproportionately more rows, causing some tasks to process far more data than others. This leads to stragglers that delay the entire stage.
# Identify skewed keys
skew_analysis = sales_df.groupBy("customer_id") \
.count() \
.orderBy(F.desc("count")) \
.limit(20)
skew_analysis.show()
# Technique 1: Salt the skewed key
salt_range = 10
salted_sales = sales_df.withColumn(
"salt", (F.rand() * salt_range).cast("int")
).withColumn(
"salted_key",
F.concat(F.col("customer_id"), F.lit("_"), F.col("salt"))
)
# Technique 2: Enable AQE skew join optimization (Databricks)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
Partition Tuning
Proper partitioning is fundamental to Spark performance. Too few partitions leads to tasks processing excessive data and possible OOM errors, while too many partitions creates overhead from task scheduling and small file problems.
Partition Size Guidelines
Target 128 MB - 256 MB per partition as a general rule:
- 10 GB dataset: approximately 40-80 partitions
- 100 GB dataset: approximately 400-800 partitions
- 1 TB dataset: approximately 4,000-8,000 partitions
# Check current partition count
print(f"Current partitions: {df.rdd.getNumPartitions()}")
# REPARTITION: Full shuffle, creates equal-sized partitions
# Use when you need to INCREASE partitions or partition by a column
df_repartitioned = df.repartition(200)
df_by_date = df.repartition("date")
df_by_date_200 = df.repartition(200, "date")
# COALESCE: No shuffle, merges existing partitions
# Use when you need to DECREASE partitions (e.g., before writing)
df_coalesced = df.coalesce(10)
# Common pattern: process with many partitions, coalesce before write
df.repartition(500) \
.filter(F.col("status") == "active") \
.groupBy("region").agg(F.sum("revenue")) \
.coalesce(1) \
.write.mode("overwrite").parquet("/output/regional_revenue")
# Delta Lake auto-optimize handles file sizing automatically
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
Z-Ordering
Z-Ordering is a Delta Lake optimization technique that co-locates related data in the same set of files. This dramatically improves data skipping, allowing queries to read far fewer files when filtering on Z-ordered columns.
-- Run OPTIMIZE with Z-ORDER on frequently filtered columns
OPTIMIZE catalog.schema.sales
ZORDER BY (region, product_category);
-- Z-ORDER is most effective with:
-- 1. High-cardinality columns used in WHERE clauses
-- 2. Columns frequently used in JOIN conditions
-- 3. Up to 3-4 columns (diminishing returns after)
-- Check file-level stats to verify data skipping
DESCRIBE DETAIL catalog.schema.sales;
-- Schedule regular OPTIMIZE jobs
OPTIMIZE catalog.schema.sales
WHERE date >= current_date() - INTERVAL 7 DAYS
ZORDER BY (customer_id, product_id);
Caching Strategies
Caching stores intermediate DataFrames or tables in memory (or disk) to avoid recomputation. Use caching strategically -- caching everything wastes memory and can actually hurt performance.
When to Cache
Cache DataFrames that are reused multiple times in the same job, especially after expensive operations like joins or aggregations. Cache lookup tables that are used in multiple joins.
When NOT to Cache
Do not cache DataFrames used only once. Do not cache very large DataFrames that exceed available memory. Do not cache streaming DataFrames. Avoid caching when Delta Lake file caching handles it.
from pyspark import StorageLevel
# Cache in memory (default)
df.cache() # equivalent to persist(StorageLevel.MEMORY_AND_DISK)
# Persist with specific storage level
df.persist(StorageLevel.MEMORY_ONLY) # fastest, may lose partitions
df.persist(StorageLevel.MEMORY_AND_DISK) # spills to disk if needed
df.persist(StorageLevel.DISK_ONLY) # large datasets, memory pressure
# IMPORTANT: cache is lazy -- trigger with an action
df.cache()
df.count() # materializes the cache
# Unpersist when done to free memory
df.unpersist()
# Delta Cache (Databricks-specific, automatic)
# Automatically caches frequently-read Parquet files on local SSD
spark.conf.set("spark.databricks.io.cache.enabled", "true")
spark.conf.set("spark.databricks.io.cache.maxDiskUsage", "50g")
spark.conf.set("spark.databricks.io.cache.maxMetaDataCache", "1g")
Photon Engine
Photon is Databricks' native vectorized query engine written in C++ that runs on top of Apache Spark. It accelerates SQL and DataFrame workloads by using columnar processing, SIMD instructions, and optimized memory management. Photon can provide 2-8x speedup for typical workloads without any code changes.
Photon vs Standard Spark
Photon replaces the Spark SQL execution engine at the physical plan level. It excels at:
- Scans and filters: Optimized Parquet/Delta reading with vectorized decoding
- Joins: Hash joins with optimized memory layout
- Aggregations: Vectorized group-by with adaptive hash tables
- String operations: SIMD-accelerated string processing
Photon is available on Photon-enabled cluster types and is automatically used when supported. Check the Spark UI for "Photon" annotations on query plan nodes.
-- Check if Photon is enabled on the current cluster
SET spark.databricks.photon.enabled;
-- Enable Photon (if supported by cluster type)
SET spark.databricks.photon.enabled = true;
-- Run a query and check the physical plan for Photon operators
EXPLAIN SELECT region, SUM(revenue)
FROM catalog.schema.sales
WHERE date >= '2025-01-01'
GROUP BY region;
-- Look for "PhotonGroupingAgg" and "PhotonScan" in the plan
-- Photon works best with Delta Lake tables
CREATE TABLE catalog.schema.sales_delta
USING DELTA
AS SELECT * FROM catalog.schema.sales_parquet;
Adaptive Query Execution (AQE)
AQE dynamically optimizes query plans at runtime based on actual data statistics collected during query execution. Unlike traditional Spark which creates a fixed plan before execution, AQE can adapt the plan mid-flight to handle data skew, optimize join strategies, and coalesce partitions.
Dynamic Partition Coalescing
Automatically merges small post-shuffle partitions into larger ones, reducing task overhead. Eliminates the need to manually tune spark.sql.shuffle.partitions.
Dynamic Join Strategy
Switches from sort-merge join to broadcast join at runtime if one side of the join is smaller than expected after filtering.
Skew Join Optimization
Detects skewed partitions at runtime and splits them into smaller sub-partitions, distributing work more evenly across executors.
Dynamic Filter Pruning
Pushes runtime-derived filters from one side of a join to the scan of the other side, drastically reducing data read.
# Enable AQE (enabled by default in Databricks Runtime 7.3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Coalesce partitions: merge small partitions after shuffle
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# Skew join: automatically handle data skew
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# With AQE, you can set shuffle.partitions high and let AQE coalesce
spark.conf.set("spark.sql.shuffle.partitions", "auto")
# Verify AQE is active in the query plan
df = spark.sql("""
SELECT region, SUM(revenue) as total
FROM catalog.schema.sales
GROUP BY region
""")
df.explain("formatted")
# Look for "AdaptiveSparkPlan isFinalPlan=true"
Practice Problems
Problem 1: Diagnosing a Slow Join
HardYour daily ETL job joins a 500 GB sales table with a 200 MB products table. The join stage takes 45 minutes and the Spark UI shows the median task time is 10 seconds but the max task time is 25 minutes. What is happening and how would you fix it?
Problem 2: Partition Strategy Design
HardYou have a Delta Lake table that receives 50 GB of new data daily. Analysts query it primarily by date range and region. The table currently has 500,000 small files averaging 2 MB each. Design a complete optimization strategy.
Problem 3: Photon vs Standard Spark
MediumYour team runs a mix of SQL analytics queries and PySpark ML training jobs. Should you enable Photon clusters for all workloads? Explain your reasoning.
Quick Reference
Performance Tuning Cheat Sheet
| Technique | When to Use | Expected Impact |
|---|---|---|
| Broadcast Join | One side under 8 GB | Eliminates shuffle, 5-20x faster |
| Z-Ordering | Filtered queries on specific columns | 2-10x fewer files read |
| AQE | Always enable (default in DBR 7.3+) | Auto-optimizes joins, partitions |
| Photon | SQL/DataFrame heavy workloads | 2-8x for scans, joins, aggs |
| Caching | Reused DataFrames in same job | Eliminates recomputation |
| Partition Tuning | OOM errors or small file problems | Target 128-256 MB per partition |
| Optimize Write | Delta Lake tables with frequent writes | Reduces small files automatically |
Key Spark Configurations
# Shuffle and Partitions
spark.sql.shuffle.partitions = auto
spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 100MB
# Delta Lake Optimization
spark.databricks.delta.optimizeWrite.enabled = true
spark.databricks.delta.autoCompact.enabled = true
# Caching
spark.databricks.io.cache.enabled = true
# Photon
spark.databricks.photon.enabled = true