Apache Spark Fundamentals

Medium 35 min read

Why Spark Matters

Why Learn Apache Spark?

The Problem: Traditional data processing tools cannot handle the volume, velocity, and variety of modern data at scale.

The Solution: Apache Spark provides a unified engine for large-scale data processing with in-memory computation, making it 10-100x faster than Hadoop MapReduce.

Real Impact: Spark powers data pipelines at Netflix, Uber, Airbnb, and thousands of companies processing petabytes of data daily.

Real-World Analogy

Think of Spark as a factory assembly line:

  • Driver = Factory manager who plans the work and coordinates
  • Executors = Workers on the assembly line doing the actual processing
  • Partitions = Batches of raw materials split among workers
  • DAG = The blueprint/plan of the assembly process
  • Stages = Major phases of assembly that must complete before the next

Spark in the Databricks Ecosystem

Unified Analytics

Spark handles batch processing, streaming, machine learning, and SQL queries all in one engine.

In-Memory Speed

By caching intermediate results in memory, Spark avoids costly disk I/O and achieves orders-of-magnitude speedups.

Scalability

Spark scales horizontally from a single laptop to clusters of thousands of nodes processing petabytes.

Rich Ecosystem

Spark SQL, MLlib, GraphX, and Structured Streaming provide specialized libraries for every workload.

Spark Architecture (Driver/Executors)

Spark Driver/Executor Architecture
Spark Driver SparkContext DAG Scheduler Task Scheduler UI / Monitoring Cluster Manager Executor 1 Task 1 Task 2 Cache (Memory) Block Manager Executor 2 Task 3 Task 4 Cache (Memory) Block Manager Executor 3 Task 5 Task 6 Cache (Memory) Block Manager

Architecture Components Explained

Component Role Key Responsibilities
Driver Orchestrator process Creates SparkContext, builds DAG, schedules tasks, collects results
SparkContext Entry point to Spark Connects to cluster manager, coordinates executors, manages configuration
Cluster Manager Resource allocator Allocates executors, manages cluster resources (YARN, Mesos, Kubernetes, Standalone)
Executor Worker process Runs tasks, stores data in memory/disk, reports status to driver
Task Unit of work Smallest unit of execution, runs on a single partition of data
create_spark_session.py
# In Databricks, SparkSession is pre-configured as 'spark'
# But here's how you'd create one manually:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

# In Databricks notebooks, just use the pre-configured 'spark'
print(spark.version)           # e.g., '3.5.0'
print(spark.sparkContext.uiWebUrl)  # Spark UI URL

# Check cluster configuration
spark.sparkContext.getConf().getAll()

RDDs vs DataFrames vs Datasets

Feature RDD DataFrame Dataset
Abstraction Level Low-level High-level (tabular) High-level (typed)
Type Safety Compile-time Runtime only Compile-time
Optimization No Catalyst/Tungsten Full Catalyst + Tungsten Full Catalyst + Tungsten
API Functional (map, filter) Declarative (SQL-like) Both functional + declarative
Language Support Python, Scala, Java Python, Scala, Java, R Scala, Java only
When to Use Fine-grained control, unstructured data Most use cases (recommended) Type-safe Scala/Java apps

Key Takeaway

In Databricks, always prefer DataFrames over RDDs. DataFrames benefit from Catalyst query optimization and Tungsten binary processing, making them significantly faster. RDDs are only needed for very low-level operations on unstructured data.

rdd_vs_dataframe.py
# RDD approach (avoid in most cases)
rdd = spark.sparkContext.parallelize([
    ("Alice", 34), ("Bob", 45), ("Charlie", 29)
])
result_rdd = rdd.filter(lambda x: x[1] > 30).collect()

# DataFrame approach (recommended)
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(name="Alice", age=34),
    Row(name="Bob", age=45),
    Row(name="Charlie", age=29)
])
result_df = df.filter(df.age > 30)
result_df.show()

# SQL approach (also uses DataFrame engine)
df.createOrReplaceTempView("people")
result_sql = spark.sql("SELECT * FROM people WHERE age > 30")
result_sql.show()

Lazy Evaluation & DAG

What is Lazy Evaluation?

Spark does not execute transformations immediately. Instead, it builds a Directed Acyclic Graph (DAG) of all transformations. Execution only happens when an action (like collect(), show(), or write()) is called.

DAG Execution Plan Visualization
Stage 1 (Narrow Transformations) Read CSV .filter() .select() .withColumn() Shuffle Boundary Stage 2 (Wide Transformation) .groupBy() .agg(sum()) .show() [ACTION] Legend: Narrow transformation (no shuffle) Wide transformation (requires shuffle) Action (triggers execution)

Transformations vs Actions

Type Examples Behavior
Narrow Transformations select(), filter(), map(), withColumn() No data shuffle; each partition processed independently
Wide Transformations groupBy(), join(), repartition(), distinct() Requires data shuffle across partitions (expensive)
Actions show(), collect(), count(), write(), save() Triggers actual computation of the DAG
lazy_evaluation_demo.py
# None of these lines execute immediately - they build a DAG
df = spark.read.csv("/data/sales.csv", header=True, inferSchema=True)
filtered = df.filter(df.amount > 100)          # Narrow transformation
selected = filtered.select("customer", "amount")  # Narrow transformation
grouped = selected.groupBy("customer").sum("amount")  # Wide transformation

# THIS triggers execution of the entire DAG
grouped.show()

# You can verify the plan without executing
grouped.explain(True)  # Shows parsed, analyzed, optimized, and physical plans

Spark Execution Plan

Understanding explain() Output

The explain(True) method shows four plan levels:

  • Parsed Logical Plan: Raw plan from your code
  • Analyzed Logical Plan: Resolved references (column names, tables)
  • Optimized Logical Plan: After Catalyst optimizer (predicate pushdown, column pruning)
  • Physical Plan: Actual execution plan with specific algorithms chosen
explain_output.py
# Create sample data and build a query
df = spark.read.parquet("/data/orders")
result = df.filter(df.status == "completed") \
           .select("order_id", "customer_id", "total") \
           .groupBy("customer_id") \
           .agg({"total": "sum", "order_id": "count"})

# Show the full execution plan
result.explain(True)

# Output (simplified):
# == Optimized Logical Plan ==
# Aggregate [customer_id], [customer_id, sum(total), count(order_id)]
# +- Project [order_id, customer_id, total]
#    +- Filter (status = completed)    <-- predicate pushdown
#       +- Relation [parquet]
#
# == Physical Plan ==
# *(2) HashAggregate(keys=[customer_id], functions=[sum(total), count(order_id)])
# +- Exchange hashpartitioning(customer_id, 200)    <-- shuffle!
#    +- *(1) HashAggregate(keys=[customer_id], functions=[partial_sum(total), partial_count(order_id)])
#       +- *(1) Project [order_id, customer_id, total]
#          +- *(1) Filter (status = completed)
#             +- *(1) FileScan parquet [order_id,customer_id,total,status]

Common Pitfall: Reading Plans

Problem: Physical plans read bottom-to-top, which confuses beginners.

Solution: Start at the bottom (FileScan) and read upward. Each Exchange node indicates a shuffle (stage boundary). The number prefix like *(1) indicates which stage the operation belongs to.

Partitions & Parallelism

Why Partitions Matter

Partitions are the fundamental unit of parallelism in Spark. Each partition is processed by exactly one task on one executor core. The number of partitions directly affects performance:

  • Too few partitions: Underutilization of cluster resources, large tasks that may OOM
  • Too many partitions: Excessive scheduling overhead, small tasks with high coordination cost
  • Rule of thumb: 2-4 partitions per CPU core in your cluster
partitions_demo.py
# Check current number of partitions
df = spark.read.parquet("/data/large_dataset")
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Repartition (wide transformation - causes shuffle)
df_repartitioned = df.repartition(200)

# Coalesce (narrow transformation - no shuffle, only reduces partitions)
df_coalesced = df.coalesce(50)

# Repartition by column (good for downstream joins/groupBys)
df_by_col = df.repartition(100, "customer_id")

# Configure default shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")

# In Databricks, Adaptive Query Execution (AQE) auto-tunes this
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

repartition(n)

Creates exactly n partitions via full shuffle. Use when you need to increase partitions or redistribute data evenly.

coalesce(n)

Reduces partitions without a full shuffle. More efficient than repartition when decreasing partitions.

repartition(n, col)

Redistributes by column values. Ensures same keys land on same partition - great before joins or groupBys.

Spark UI Overview

Navigating the Spark UI

The Spark UI is your primary debugging and performance tuning tool. In Databricks, access it via the cluster details page or from a notebook's Spark Jobs panel.

Tab What It Shows When to Use
Jobs All Spark jobs triggered by actions Overview of what ran and how long
Stages Stages within each job, task metrics Identify slow stages, data skew
Storage Cached/persisted RDDs and DataFrames Verify caching is working
Environment Spark configuration properties Verify configuration settings
Executors Executor memory, cores, task counts Check resource utilization, GC time
SQL SQL query plans and execution details Optimize SQL queries, check plans

Key Metrics to Watch

  • Shuffle Read/Write: High values indicate expensive data movement
  • GC Time: If > 10% of task time, increase executor memory or reduce data per task
  • Data Skew: Huge variance in task durations within a stage signals skewed partitions
  • Spill (Memory/Disk): Data exceeding memory; increase memory or reduce partition size

Practice Problems

Easy Spark Session Exploration

Explore the Spark environment in Databricks:

  1. Print the Spark version and Scala version
  2. List all Spark configuration properties
  3. Create a simple DataFrame from a Python list and show it
  4. Check how many partitions your DataFrame has

Use spark.version, spark.sparkContext.getConf().getAll(), and spark.createDataFrame(). Check partitions with .rdd.getNumPartitions().

# 1. Spark and Scala version
print(f"Spark: {spark.version}")
print(f"Scala: {spark.sparkContext.version}")

# 2. All configuration
for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k} = {v}")

# 3. Create and show DataFrame
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.show()

# 4. Check partitions
print(f"Partitions: {df.rdd.getNumPartitions()}")

Medium DAG and Lazy Evaluation

Understand lazy evaluation and execution plans:

  1. Create a DataFrame from a CSV file with at least 3 transformations chained
  2. Use explain(True) to view the full execution plan before calling an action
  3. Identify which transformations are narrow vs wide in your plan
  4. Count the number of stages your job will produce

Chain .filter(), .select(), .groupBy().agg(). In the physical plan, look for Exchange nodes -- each one marks a stage boundary.

# Build a chain of transformations
df = spark.read.csv("/databricks-datasets/airlines", header=True, inferSchema=True)

result = df \
    .filter(df.Year >= 2000) \
    .select("Year", "Origin", "ArrDelay") \
    .groupBy("Year", "Origin") \
    .avg("ArrDelay")

# View execution plan (no computation yet!)
result.explain(True)

# Narrow: filter, select (Stage 1)
# Wide: groupBy + avg causes Exchange/shuffle (Stage 2)
# Total: 2 stages

# Now trigger execution
result.show(10)

Hard Partition Tuning

Optimize partitioning for a real workload:

  1. Read a large dataset and check its default partition count
  2. Repartition by a high-cardinality column and measure the effect
  3. Compare execution time of a groupBy with default partitions vs tuned partitions
  4. Enable AQE and observe how Spark auto-coalesces shuffle partitions

Use %%timeit or manual timing with time.time(). Toggle spark.sql.adaptive.enabled between true and false to compare. Check partition counts after each operation.

import time

df = spark.read.parquet("/databricks-datasets/nyctaxi/tables/nyctaxi_yellow")
print(f"Default partitions: {df.rdd.getNumPartitions()}")

# Test with default shuffle partitions (200)
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.shuffle.partitions", "200")

start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"200 partitions: {time.time() - start:.2f}s")

# Test with tuned partitions
spark.conf.set("spark.sql.shuffle.partitions", "50")
start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"50 partitions: {time.time() - start:.2f}s")

# Enable AQE and let Spark decide
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
start = time.time()
df.groupBy("pickup_zip").count().collect()
print(f"AQE auto-tuned: {time.time() - start:.2f}s")

Quick Reference

Essential Spark Concepts

Concept Description Example
SparkSession Entry point for all Spark functionality spark = SparkSession.builder.getOrCreate()
Transformation Lazy operation that defines computation df.filter(col("age") > 30)
Action Triggers DAG execution and returns result df.show(), df.count()
Partition Chunk of data processed by one task df.rdd.getNumPartitions()
Stage Set of tasks between shuffle boundaries Visible in Spark UI Jobs tab
Shuffle Data redistribution across executors Triggered by groupBy(), join()
explain() View execution plan without running df.explain(True)

Key Configuration Properties

Most Important Spark Configs

  • spark.sql.shuffle.partitions: Number of partitions after shuffle (default 200)
  • spark.sql.adaptive.enabled: Enable Adaptive Query Execution (default true in Databricks)
  • spark.executor.memory: Memory per executor (e.g., "4g")
  • spark.executor.cores: CPU cores per executor (e.g., 4)
  • spark.default.parallelism: Default number of partitions for RDD operations