Why Spark Matters
Why Learn Apache Spark?
The Problem: Traditional data processing tools cannot handle the volume, velocity, and variety of modern data at scale.
The Solution: Apache Spark provides a unified engine for large-scale data processing with in-memory computation, making it 10-100x faster than Hadoop MapReduce.
Real Impact: Spark powers data pipelines at Netflix, Uber, Airbnb, and thousands of companies processing petabytes of data daily.
Real-World Analogy
Think of Spark as a factory assembly line:
- Driver = Factory manager who plans the work and coordinates
- Executors = Workers on the assembly line doing the actual processing
- Partitions = Batches of raw materials split among workers
- DAG = The blueprint/plan of the assembly process
- Stages = Major phases of assembly that must complete before the next
Spark in the Databricks Ecosystem
Unified Analytics
Spark handles batch processing, streaming, machine learning, and SQL queries all in one engine.
In-Memory Speed
By caching intermediate results in memory, Spark avoids costly disk I/O and achieves orders-of-magnitude speedups.
Scalability
Spark scales horizontally from a single laptop to clusters of thousands of nodes processing petabytes.
Rich Ecosystem
Spark SQL, MLlib, GraphX, and Structured Streaming provide specialized libraries for every workload.
Spark Architecture (Driver/Executors)
Architecture Components Explained
| Component | Role | Key Responsibilities |
|---|---|---|
| Driver | Orchestrator process | Creates SparkContext, builds DAG, schedules tasks, collects results |
| SparkContext | Entry point to Spark | Connects to cluster manager, coordinates executors, manages configuration |
| Cluster Manager | Resource allocator | Allocates executors, manages cluster resources (YARN, Mesos, Kubernetes, Standalone) |
| Executor | Worker process | Runs tasks, stores data in memory/disk, reports status to driver |
| Task | Unit of work | Smallest unit of execution, runs on a single partition of data |
# In Databricks, SparkSession is pre-configured as 'spark'
# But here's how you'd create one manually:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("MySparkApp") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
# In Databricks notebooks, just use the pre-configured 'spark'
print(spark.version) # e.g., '3.5.0'
print(spark.sparkContext.uiWebUrl) # Spark UI URL
# Check cluster configuration
spark.sparkContext.getConf().getAll()
RDDs vs DataFrames vs Datasets
| Feature | RDD | DataFrame | Dataset |
|---|---|---|---|
| Abstraction Level | Low-level | High-level (tabular) | High-level (typed) |
| Type Safety | Compile-time | Runtime only | Compile-time |
| Optimization | No Catalyst/Tungsten | Full Catalyst + Tungsten | Full Catalyst + Tungsten |
| API | Functional (map, filter) | Declarative (SQL-like) | Both functional + declarative |
| Language Support | Python, Scala, Java | Python, Scala, Java, R | Scala, Java only |
| When to Use | Fine-grained control, unstructured data | Most use cases (recommended) | Type-safe Scala/Java apps |
Key Takeaway
In Databricks, always prefer DataFrames over RDDs. DataFrames benefit from Catalyst query optimization and Tungsten binary processing, making them significantly faster. RDDs are only needed for very low-level operations on unstructured data.
# RDD approach (avoid in most cases)
rdd = spark.sparkContext.parallelize([
("Alice", 34), ("Bob", 45), ("Charlie", 29)
])
result_rdd = rdd.filter(lambda x: x[1] > 30).collect()
# DataFrame approach (recommended)
from pyspark.sql import Row
df = spark.createDataFrame([
Row(name="Alice", age=34),
Row(name="Bob", age=45),
Row(name="Charlie", age=29)
])
result_df = df.filter(df.age > 30)
result_df.show()
# SQL approach (also uses DataFrame engine)
df.createOrReplaceTempView("people")
result_sql = spark.sql("SELECT * FROM people WHERE age > 30")
result_sql.show()
Lazy Evaluation & DAG
What is Lazy Evaluation?
Spark does not execute transformations immediately. Instead, it builds a Directed Acyclic Graph (DAG) of all transformations. Execution only happens when an action (like collect(), show(), or write()) is called.
Transformations vs Actions
| Type | Examples | Behavior |
|---|---|---|
| Narrow Transformations | select(), filter(), map(), withColumn() |
No data shuffle; each partition processed independently |
| Wide Transformations | groupBy(), join(), repartition(), distinct() |
Requires data shuffle across partitions (expensive) |
| Actions | show(), collect(), count(), write(), save() |
Triggers actual computation of the DAG |
# None of these lines execute immediately - they build a DAG
df = spark.read.csv("/data/sales.csv", header=True, inferSchema=True)
filtered = df.filter(df.amount > 100) # Narrow transformation
selected = filtered.select("customer", "amount") # Narrow transformation
grouped = selected.groupBy("customer").sum("amount") # Wide transformation
# THIS triggers execution of the entire DAG
grouped.show()
# You can verify the plan without executing
grouped.explain(True) # Shows parsed, analyzed, optimized, and physical plans
Spark Execution Plan
Understanding explain() Output
The explain(True) method shows four plan levels:
- Parsed Logical Plan: Raw plan from your code
- Analyzed Logical Plan: Resolved references (column names, tables)
- Optimized Logical Plan: After Catalyst optimizer (predicate pushdown, column pruning)
- Physical Plan: Actual execution plan with specific algorithms chosen
# Create sample data and build a query
df = spark.read.parquet("/data/orders")
result = df.filter(df.status == "completed") \
.select("order_id", "customer_id", "total") \
.groupBy("customer_id") \
.agg({"total": "sum", "order_id": "count"})
# Show the full execution plan
result.explain(True)
# Output (simplified):
# == Optimized Logical Plan ==
# Aggregate [customer_id], [customer_id, sum(total), count(order_id)]
# +- Project [order_id, customer_id, total]
# +- Filter (status = completed) <-- predicate pushdown
# +- Relation [parquet]
#
# == Physical Plan ==
# *(2) HashAggregate(keys=[customer_id], functions=[sum(total), count(order_id)])
# +- Exchange hashpartitioning(customer_id, 200) <-- shuffle!
# +- *(1) HashAggregate(keys=[customer_id], functions=[partial_sum(total), partial_count(order_id)])
# +- *(1) Project [order_id, customer_id, total]
# +- *(1) Filter (status = completed)
# +- *(1) FileScan parquet [order_id,customer_id,total,status]
Common Pitfall: Reading Plans
Problem: Physical plans read bottom-to-top, which confuses beginners.
Solution: Start at the bottom (FileScan) and read upward. Each Exchange node indicates a shuffle (stage boundary). The number prefix like *(1) indicates which stage the operation belongs to.
Partitions & Parallelism
Why Partitions Matter
Partitions are the fundamental unit of parallelism in Spark. Each partition is processed by exactly one task on one executor core. The number of partitions directly affects performance:
- Too few partitions: Underutilization of cluster resources, large tasks that may OOM
- Too many partitions: Excessive scheduling overhead, small tasks with high coordination cost
- Rule of thumb: 2-4 partitions per CPU core in your cluster
# Check current number of partitions
df = spark.read.parquet("/data/large_dataset")
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Repartition (wide transformation - causes shuffle)
df_repartitioned = df.repartition(200)
# Coalesce (narrow transformation - no shuffle, only reduces partitions)
df_coalesced = df.coalesce(50)
# Repartition by column (good for downstream joins/groupBys)
df_by_col = df.repartition(100, "customer_id")
# Configure default shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
# In Databricks, Adaptive Query Execution (AQE) auto-tunes this
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
repartition(n)
Creates exactly n partitions via full shuffle. Use when you need to increase partitions or redistribute data evenly.
coalesce(n)
Reduces partitions without a full shuffle. More efficient than repartition when decreasing partitions.
repartition(n, col)
Redistributes by column values. Ensures same keys land on same partition - great before joins or groupBys.
Spark UI Overview
Navigating the Spark UI
The Spark UI is your primary debugging and performance tuning tool. In Databricks, access it via the cluster details page or from a notebook's Spark Jobs panel.
| Tab | What It Shows | When to Use |
|---|---|---|
| Jobs | All Spark jobs triggered by actions | Overview of what ran and how long |
| Stages | Stages within each job, task metrics | Identify slow stages, data skew |
| Storage | Cached/persisted RDDs and DataFrames | Verify caching is working |
| Environment | Spark configuration properties | Verify configuration settings |
| Executors | Executor memory, cores, task counts | Check resource utilization, GC time |
| SQL | SQL query plans and execution details | Optimize SQL queries, check plans |
Key Metrics to Watch
- Shuffle Read/Write: High values indicate expensive data movement
- GC Time: If > 10% of task time, increase executor memory or reduce data per task
- Data Skew: Huge variance in task durations within a stage signals skewed partitions
- Spill (Memory/Disk): Data exceeding memory; increase memory or reduce partition size
Practice Problems
Easy Spark Session Exploration
Explore the Spark environment in Databricks:
- Print the Spark version and Scala version
- List all Spark configuration properties
- Create a simple DataFrame from a Python list and show it
- Check how many partitions your DataFrame has
Use spark.version, spark.sparkContext.getConf().getAll(), and spark.createDataFrame(). Check partitions with .rdd.getNumPartitions().
# 1. Spark and Scala version
print(f"Spark: {spark.version}")
print(f"Scala: {spark.sparkContext.version}")
# 2. All configuration
for k, v in spark.sparkContext.getConf().getAll():
print(f"{k} = {v}")
# 3. Create and show DataFrame
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.show()
# 4. Check partitions
print(f"Partitions: {df.rdd.getNumPartitions()}")
Medium DAG and Lazy Evaluation
Understand lazy evaluation and execution plans:
- Create a DataFrame from a CSV file with at least 3 transformations chained
- Use
explain(True)to view the full execution plan before calling an action - Identify which transformations are narrow vs wide in your plan
- Count the number of stages your job will produce
Chain .filter(), .select(), .groupBy().agg(). In the physical plan, look for Exchange nodes -- each one marks a stage boundary.
# Build a chain of transformations
df = spark.read.csv("/databricks-datasets/airlines", header=True, inferSchema=True)
result = df \
.filter(df.Year >= 2000) \
.select("Year", "Origin", "ArrDelay") \
.groupBy("Year", "Origin") \
.avg("ArrDelay")
# View execution plan (no computation yet!)
result.explain(True)
# Narrow: filter, select (Stage 1)
# Wide: groupBy + avg causes Exchange/shuffle (Stage 2)
# Total: 2 stages
# Now trigger execution
result.show(10)
Hard Partition Tuning
Optimize partitioning for a real workload:
- Read a large dataset and check its default partition count
- Repartition by a high-cardinality column and measure the effect
- Compare execution time of a groupBy with default partitions vs tuned partitions
- Enable AQE and observe how Spark auto-coalesces shuffle partitions
Use %%timeit or manual timing with time.time(). Toggle spark.sql.adaptive.enabled between true and false to compare. Check partition counts after each operation.
import time
df = spark.read.parquet("/databricks-datasets/nyctaxi/tables/nyctaxi_yellow")
print(f"Default partitions: {df.rdd.getNumPartitions()}")
# Test with default shuffle partitions (200)
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")
start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"200 partitions: {time.time() - start:.2f}s")
# Test with tuned partitions
spark.conf.set("spark.sql.shuffle.partitions", "50")
start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"50 partitions: {time.time() - start:.2f}s")
# Enable AQE and let Spark decide
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"AQE auto-tuned: {time.time() - start:.2f}s")
Quick Reference
Essential Spark Concepts
| Concept | Description | Example |
|---|---|---|
SparkSession |
Entry point for all Spark functionality | spark = SparkSession.builder.getOrCreate() |
Transformation |
Lazy operation that defines computation | df.filter(col("age") > 30) |
Action |
Triggers DAG execution and returns result | df.show(), df.count() |
Partition |
Chunk of data processed by one task | df.rdd.getNumPartitions() |
Stage |
Set of tasks between shuffle boundaries | Visible in Spark UI Jobs tab |
Shuffle |
Data redistribution across executors | Triggered by groupBy(), join() |
explain() |
View execution plan without running | df.explain(True) |
Key Configuration Properties
Most Important Spark Configs
- spark.sql.shuffle.partitions: Number of partitions after shuffle (default 200)
- spark.sql.adaptive.enabled: Enable Adaptive Query Execution (default true in Databricks)
- spark.executor.memory: Memory per executor (e.g., "4g")
- spark.executor.cores: CPU cores per executor (e.g., 4)
- spark.default.parallelism: Default number of partitions for RDD operations