Performance Tuning & Optimization

Hard 30 min read

Spark UI Deep Dive

Why Performance Tuning Matters

The Problem: Spark jobs that run slowly or fail due to out-of-memory errors waste compute resources and delay critical data pipelines.

The Solution: Systematic performance tuning using the Spark UI, shuffle optimization, partition management, and Databricks-specific features like Photon and AQE can reduce job runtime by 2-10x.

Real Impact: A well-tuned Spark job processing 1 TB of data can complete in minutes rather than hours, saving thousands of dollars in compute costs monthly.

The Spark UI is your primary diagnostic tool for understanding job performance. Every Spark application exposes a web UI that shows detailed information about stages, tasks, shuffles, and resource utilization.

Spark UI: Job Stages Breakdown
Job 0: SELECT * FROM sales JOIN products USING (product_id) Duration: 4.2 min | 3 Stages | 450 Tasks Stage 0: Scan Read sales table 200 tasks | 0.8 min Input: 50 GB | Shuffle Write: 12 GB Stage 1: Scan Read products table 50 tasks | 0.3 min Input: 2 GB | Shuffle Write: 1.5 GB Stage 2: Join Sort-Merge Join 200 tasks | 3.1 min Shuffle Read: 13.5 GB (BOTTLENECK) Key Metrics to Watch Task Duration Median: 1.2s, Max: 45s Data Skew Max/Median: 37x Spill Disk: 8.2 GB, Memory: 0 GC Time Total: 12s (5% of task)

Reading the Spark UI Effectively

Tab What It Shows What to Look For
Jobs All Spark actions triggered Failed jobs, long-running jobs
Stages DAG stages with task metrics Skew (max vs median task time), shuffle size
Storage Cached RDDs/DataFrames Memory usage, fraction cached
SQL/DataFrame Query plans with physical operators Scan types, join strategies, filter pushdown
Executors Resource usage per executor Uneven task distribution, GC overhead

Shuffle Optimization

Shuffles are the most expensive operations in Spark. A shuffle occurs whenever data must be redistributed across partitions -- during joins, aggregations, repartitioning, and distinct operations. Minimizing shuffle volume is the single most impactful optimization you can make.

What Causes Shuffles?

  • Joins: Both sides must be partitioned by the join key
  • groupBy / aggregations: Data must be grouped by key on the same executor
  • repartition(): Explicitly redistributes data
  • distinct(): Requires global deduplication
  • Window functions with partitionBy: Data must be co-located by partition key

Broadcast Join Optimization

When one side of a join is small enough to fit in memory (typically under 10 MB by default, configurable up to 8 GB), Spark can broadcast it to all executors, eliminating the shuffle entirely.

PySpark - Broadcast Join vs Shuffle Join
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

# BAD: Shuffle join (both tables are large or Spark doesn't auto-broadcast)
result_slow = sales_df.join(products_df, "product_id")

# GOOD: Force broadcast join when you know the table is small
result_fast = sales_df.join(broadcast(products_df), "product_id")

# Configure broadcast threshold (default 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100MB")

# Check the query plan to verify broadcast join is used
result_fast.explain(True)
# Look for "BroadcastHashJoin" instead of "SortMergeJoin"

Handling Data Skew in Joins

Data skew occurs when certain keys have disproportionately more rows, causing some tasks to process far more data than others. This leads to stragglers that delay the entire stage.

PySpark - Skew Join Mitigation
# Identify skewed keys
skew_analysis = sales_df.groupBy("customer_id") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(20)
skew_analysis.show()

# Technique 1: Salt the skewed key
salt_range = 10
salted_sales = sales_df.withColumn(
    "salt", (F.rand() * salt_range).cast("int")
).withColumn(
    "salted_key",
    F.concat(F.col("customer_id"), F.lit("_"), F.col("salt"))
)

# Technique 2: Enable AQE skew join optimization (Databricks)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

Partition Tuning

Proper partitioning is fundamental to Spark performance. Too few partitions leads to tasks processing excessive data and possible OOM errors, while too many partitions creates overhead from task scheduling and small file problems.

Partition Size Guidelines

Target 128 MB - 256 MB per partition as a general rule:

  • 10 GB dataset: approximately 40-80 partitions
  • 100 GB dataset: approximately 400-800 partitions
  • 1 TB dataset: approximately 4,000-8,000 partitions
PySpark - Repartition vs Coalesce
# Check current partition count
print(f"Current partitions: {df.rdd.getNumPartitions()}")

# REPARTITION: Full shuffle, creates equal-sized partitions
# Use when you need to INCREASE partitions or partition by a column
df_repartitioned = df.repartition(200)
df_by_date = df.repartition("date")
df_by_date_200 = df.repartition(200, "date")

# COALESCE: No shuffle, merges existing partitions
# Use when you need to DECREASE partitions (e.g., before writing)
df_coalesced = df.coalesce(10)

# Common pattern: process with many partitions, coalesce before write
df.repartition(500) \
    .filter(F.col("status") == "active") \
    .groupBy("region").agg(F.sum("revenue")) \
    .coalesce(1) \
    .write.mode("overwrite").parquet("/output/regional_revenue")

# Delta Lake auto-optimize handles file sizing automatically
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

Z-Ordering

Z-Ordering is a Delta Lake optimization technique that co-locates related data in the same set of files. This dramatically improves data skipping, allowing queries to read far fewer files when filtering on Z-ordered columns.

Z-Ordering: Data Locality Illustration
Without Z-Ordering Query reads 8/8 files = 100% A,C,F,Z B,D,M,X A,E,K,W C,G,A,R D,A,N,Y B,F,A,S C,H,A,T E,A,P,Q WHERE region = 'A' scans ALL files 'A' scattered across every file With Z-Ordering on region Query reads 2/8 files = 25% A,A,A,A A,A,B,B B,B,C,C C,D,D,E E,F,F,G H,K,M,N P,Q,R,S T,W,X,Y WHERE region = 'A' reads only 2 files 'A' values are clustered together
SQL - OPTIMIZE with Z-ORDER
-- Run OPTIMIZE with Z-ORDER on frequently filtered columns
OPTIMIZE catalog.schema.sales
ZORDER BY (region, product_category);

-- Z-ORDER is most effective with:
-- 1. High-cardinality columns used in WHERE clauses
-- 2. Columns frequently used in JOIN conditions
-- 3. Up to 3-4 columns (diminishing returns after)

-- Check file-level stats to verify data skipping
DESCRIBE DETAIL catalog.schema.sales;

-- Schedule regular OPTIMIZE jobs
OPTIMIZE catalog.schema.sales
WHERE date >= current_date() - INTERVAL 7 DAYS
ZORDER BY (customer_id, product_id);

Caching Strategies

Caching stores intermediate DataFrames or tables in memory (or disk) to avoid recomputation. Use caching strategically -- caching everything wastes memory and can actually hurt performance.

When to Cache

Cache DataFrames that are reused multiple times in the same job, especially after expensive operations like joins or aggregations. Cache lookup tables that are used in multiple joins.

When NOT to Cache

Do not cache DataFrames used only once. Do not cache very large DataFrames that exceed available memory. Do not cache streaming DataFrames. Avoid caching when Delta Lake file caching handles it.

PySpark - Caching Strategies
from pyspark import StorageLevel

# Cache in memory (default)
df.cache()  # equivalent to persist(StorageLevel.MEMORY_AND_DISK)

# Persist with specific storage level
df.persist(StorageLevel.MEMORY_ONLY)         # fastest, may lose partitions
df.persist(StorageLevel.MEMORY_AND_DISK)     # spills to disk if needed
df.persist(StorageLevel.DISK_ONLY)           # large datasets, memory pressure

# IMPORTANT: cache is lazy -- trigger with an action
df.cache()
df.count()  # materializes the cache

# Unpersist when done to free memory
df.unpersist()

# Delta Cache (Databricks-specific, automatic)
# Automatically caches frequently-read Parquet files on local SSD
spark.conf.set("spark.databricks.io.cache.enabled", "true")
spark.conf.set("spark.databricks.io.cache.maxDiskUsage", "50g")
spark.conf.set("spark.databricks.io.cache.maxMetaDataCache", "1g")

Photon Engine

Photon is Databricks' native vectorized query engine written in C++ that runs on top of Apache Spark. It accelerates SQL and DataFrame workloads by using columnar processing, SIMD instructions, and optimized memory management. Photon can provide 2-8x speedup for typical workloads without any code changes.

Photon vs Standard Spark

Photon replaces the Spark SQL execution engine at the physical plan level. It excels at:

  • Scans and filters: Optimized Parquet/Delta reading with vectorized decoding
  • Joins: Hash joins with optimized memory layout
  • Aggregations: Vectorized group-by with adaptive hash tables
  • String operations: SIMD-accelerated string processing

Photon is available on Photon-enabled cluster types and is automatically used when supported. Check the Spark UI for "Photon" annotations on query plan nodes.

SQL - Verify Photon Usage
-- Check if Photon is enabled on the current cluster
SET spark.databricks.photon.enabled;

-- Enable Photon (if supported by cluster type)
SET spark.databricks.photon.enabled = true;

-- Run a query and check the physical plan for Photon operators
EXPLAIN SELECT region, SUM(revenue)
FROM catalog.schema.sales
WHERE date >= '2025-01-01'
GROUP BY region;
-- Look for "PhotonGroupingAgg" and "PhotonScan" in the plan

-- Photon works best with Delta Lake tables
CREATE TABLE catalog.schema.sales_delta
USING DELTA
AS SELECT * FROM catalog.schema.sales_parquet;

Adaptive Query Execution (AQE)

AQE dynamically optimizes query plans at runtime based on actual data statistics collected during query execution. Unlike traditional Spark which creates a fixed plan before execution, AQE can adapt the plan mid-flight to handle data skew, optimize join strategies, and coalesce partitions.

Dynamic Partition Coalescing

Automatically merges small post-shuffle partitions into larger ones, reducing task overhead. Eliminates the need to manually tune spark.sql.shuffle.partitions.

Dynamic Join Strategy

Switches from sort-merge join to broadcast join at runtime if one side of the join is smaller than expected after filtering.

Skew Join Optimization

Detects skewed partitions at runtime and splits them into smaller sub-partitions, distributing work more evenly across executors.

Dynamic Filter Pruning

Pushes runtime-derived filters from one side of a join to the scan of the other side, drastically reducing data read.

PySpark - AQE Configuration
# Enable AQE (enabled by default in Databricks Runtime 7.3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Coalesce partitions: merge small partitions after shuffle
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# Skew join: automatically handle data skew
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# With AQE, you can set shuffle.partitions high and let AQE coalesce
spark.conf.set("spark.sql.shuffle.partitions", "auto")

# Verify AQE is active in the query plan
df = spark.sql("""
    SELECT region, SUM(revenue) as total
    FROM catalog.schema.sales
    GROUP BY region
""")
df.explain("formatted")
# Look for "AdaptiveSparkPlan isFinalPlan=true"

Practice Problems

Problem 1: Diagnosing a Slow Join

Hard

Your daily ETL job joins a 500 GB sales table with a 200 MB products table. The join stage takes 45 minutes and the Spark UI shows the median task time is 10 seconds but the max task time is 25 minutes. What is happening and how would you fix it?

Problem 2: Partition Strategy Design

Hard

You have a Delta Lake table that receives 50 GB of new data daily. Analysts query it primarily by date range and region. The table currently has 500,000 small files averaging 2 MB each. Design a complete optimization strategy.

Problem 3: Photon vs Standard Spark

Medium

Your team runs a mix of SQL analytics queries and PySpark ML training jobs. Should you enable Photon clusters for all workloads? Explain your reasoning.

Quick Reference

Performance Tuning Cheat Sheet

Technique When to Use Expected Impact
Broadcast Join One side under 8 GB Eliminates shuffle, 5-20x faster
Z-Ordering Filtered queries on specific columns 2-10x fewer files read
AQE Always enable (default in DBR 7.3+) Auto-optimizes joins, partitions
Photon SQL/DataFrame heavy workloads 2-8x for scans, joins, aggs
Caching Reused DataFrames in same job Eliminates recomputation
Partition Tuning OOM errors or small file problems Target 128-256 MB per partition
Optimize Write Delta Lake tables with frequent writes Reduces small files automatically

Key Spark Configurations

Essential Performance Configs
# Shuffle and Partitions
spark.sql.shuffle.partitions = auto
spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 100MB

# Delta Lake Optimization
spark.databricks.delta.optimizeWrite.enabled = true
spark.databricks.delta.autoCompact.enabled = true

# Caching
spark.databricks.io.cache.enabled = true

# Photon
spark.databricks.photon.enabled = true