ETL vs ELT: Understanding the Difference
Why ETL/ELT Pipelines Matter
The Problem: Organizations have data scattered across dozens of systems -- databases, APIs, SaaS tools, files -- and need to consolidate it into their lakehouse for analytics and ML.
The Solution: ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) pipelines automate the movement and transformation of data from source systems into your Medallion Architecture layers.
Real Impact: Well-designed pipelines reduce manual data wrangling by 80%, enable near-real-time analytics, and ensure data consistency across the organization.
Real-World Analogy
Think of ETL vs ELT like shipping goods internationally:
- ETL = Pack, label, and sort items at the origin warehouse before shipping. Only clean, organized goods arrive at the destination.
- ELT = Ship everything as-is to the destination warehouse, then unpack, sort, and organize. Faster to ship, but requires a powerful destination warehouse.
| Aspect | ETL | ELT (Databricks Preferred) |
|---|---|---|
| Transform Location | External processing engine | Inside the lakehouse (Spark) |
| Raw Data | Not preserved in target | Preserved in Bronze layer |
| Reprocessing | Must re-extract from source | Replay from Bronze layer |
| Schema Evolution | Requires pipeline changes | Auto Loader handles naturally |
| Scalability | Limited by ETL tool | Scales with Spark clusters |
Auto Loader: Incremental File Ingestion
Auto Loader is Databricks' recommended approach for ingesting data from cloud storage. It automatically detects new files as they arrive and processes only the new data -- no manual file tracking required.
Auto Loader Architecture
File Notification Mode
Uses cloud events (S3 SQS, Azure Event Grid, GCS Pub/Sub) to detect new files. Most efficient for high-volume, production workloads. Near-zero listing cost.
Directory Listing Mode
Periodically lists the input directory to find new files. Simple setup, no cloud events needed. Best for development and small-scale ingestion.
Schema Evolution
Automatically handles new columns in source data. Options: addNewColumns, failOnNewColumns, rescue (put unknown fields in _rescued_data column).
Exactly-Once Guarantee
Checkpointing ensures each file is processed exactly once, even if the pipeline restarts. No duplicate records from retries.
from pyspark.sql import functions as F
# ── Auto Loader: Incremental JSON Ingestion ──
# Configure Auto Loader with schema evolution
raw_stream = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"dbfs:/checkpoints/orders/schema")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
# Rescue unknown columns instead of failing
.option("cloudFiles.schemaHints",
"order_id STRING, amount DOUBLE, ts TIMESTAMP")
.load("s3://raw-data/orders/")
)
# Add ingestion metadata and write to Bronze
(
raw_stream
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
"dbfs:/checkpoints/orders/cp")
.option("mergeSchema", True)
.trigger(availableNow=True)
.toTable("bronze.raw_orders")
)
Connecting to Data Sources
Databricks supports reading from virtually any data source. Here are the most common patterns for production pipelines.
# ── JDBC: Read from PostgreSQL / MySQL / SQL Server ──
jdbc_df = (
spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:5432/mydb")
.option("dbtable", "public.customers")
.option("user", dbutils.secrets.get("scope", "pg_user"))
.option("password", dbutils.secrets.get("scope", "pg_pass"))
.option("fetchsize", "10000")
.option("partitionColumn", "id")
.option("lowerBound", "1")
.option("upperBound", "1000000")
.option("numPartitions", "10")
.load()
)
# ── REST API: Ingest from external APIs ──
import requests, json
response = requests.get(
"https://api.example.com/v1/events",
headers={"Authorization": f"Bearer {token}"}
)
data = response.json()["results"]
api_df = spark.createDataFrame(data)
# ── Kafka: Real-time streaming ingestion ──
kafka_df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders,events")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)",
"CAST(value AS STRING)",
"topic", "partition", "offset")
)
# ── CSV with schema enforcement ──
csv_df = (
spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.option("dateFormat", "yyyy-MM-dd")
.option("mode", "PERMISSIVE")
.load("s3://data/reports/*.csv")
)
Error Handling and Recovery
Production pipelines must handle failures gracefully. Databricks provides several patterns for error handling, dead-letter queues, and automatic recovery.
Error Handling Patterns
| Pattern | When to Use | Implementation |
|---|---|---|
| Permissive Mode | Ingestion -- keep bad records aside | _corrupt_record column or _rescued_data |
| Quarantine Table | Silver layer -- separate bad from good | Filter + write to quarantine Delta table |
| Dead-Letter Queue | Streaming -- records that repeatedly fail | foreachBatch with try/except per batch |
| Retry with Backoff | API calls and transient errors | tenacity or custom retry decorator |
| Idempotent Writes | Prevent duplicates on retry | MERGE with match condition on primary key |
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def process_batch_with_errors(batch_df, batch_id):
"""Process a micro-batch with error handling."""
try:
# Separate good and bad records
good = batch_df.filter(
F.col("order_id").isNotNull() &
(F.col("amount") > 0)
)
bad = batch_df.filter(
F.col("order_id").isNull() |
(F.col("amount") <= 0)
)
# MERGE good records (idempotent upsert)
target = DeltaTable.forName(spark, "silver.orders")
(
target.alias("t")
.merge(good.alias("s"),
"t.order_id = s.order_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Send bad records to dead-letter table
if bad.count() > 0:
(bad
.withColumn("_error_batch_id", F.lit(batch_id))
.withColumn("_error_timestamp", F.current_timestamp())
.write.format("delta")
.mode("append")
.saveAsTable("silver.orders_dead_letter"))
except Exception as e:
# Log the entire failed batch for investigation
(batch_df
.withColumn("_error_message", F.lit(str(e)))
.withColumn("_error_batch_id", F.lit(batch_id))
.write.format("delta")
.mode("append")
.saveAsTable("silver.orders_failed_batches"))
raise # Re-raise so Spark retries
# Use foreachBatch for error-handling streaming pipeline
(
spark.readStream
.table("bronze.raw_orders")
.writeStream
.foreachBatch(process_batch_with_errors)
.option("checkpointLocation", "dbfs:/cp/silver_orders")
.trigger(availableNow=True)
.start()
)
Complete Production Pipeline
Here is a full end-to-end ELT pipeline pattern that combines Auto Loader, error handling, and Medallion Architecture layers.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import logging
logger = logging.getLogger("etl_pipeline")
class ELTPipeline:
"""Production ELT pipeline for Databricks."""
def __init__(self, spark, config):
self.spark = spark
self.config = config
def extract_and_load_bronze(self):
"""Extract from source and load raw to Bronze."""
logger.info("Starting Bronze ingestion")
return (
self.spark.readStream
.format("cloudFiles")
.option("cloudFiles.format",
self.config["source_format"])
.option("cloudFiles.schemaLocation",
self.config["schema_path"])
.option("cloudFiles.inferColumnTypes", True)
.load(self.config["source_path"])
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
self.config["bronze_checkpoint"])
.trigger(availableNow=True)
.toTable(self.config["bronze_table"])
)
def transform_to_silver(self):
"""Transform Bronze to Silver with quality checks."""
logger.info("Starting Silver transformation")
bronze = self.spark.table(self.config["bronze_table"])
silver = (
bronze
.dropDuplicates(self.config["dedup_keys"])
.filter(F.col(self.config["primary_key"]).isNotNull())
.withColumn("_processed_at", F.current_timestamp())
)
# Apply custom transformations
for col_name, transform in self.config.get(
"transforms", {}).items():
silver = silver.withColumn(col_name, transform)
silver.write.format("delta").mode("overwrite") .saveAsTable(self.config["silver_table"])
count = silver.count()
logger.info(f"Silver: {count} records written")
return count
def aggregate_to_gold(self):
"""Aggregate Silver to Gold for consumption."""
logger.info("Starting Gold aggregation")
gold = self.spark.sql(self.config["gold_query"])
gold.write.format("delta").mode("overwrite") .saveAsTable(self.config["gold_table"])
count = gold.count()
logger.info(f"Gold: {count} records written")
return count
def run(self):
"""Execute the full ELT pipeline."""
self.extract_and_load_bronze()
self.transform_to_silver()
self.aggregate_to_gold()
logger.info("Pipeline complete")
# ── Run the pipeline ──
config = {
"source_path": "s3://raw-data/orders/",
"source_format": "json",
"schema_path": "dbfs:/checkpoints/orders/schema",
"bronze_checkpoint": "dbfs:/checkpoints/orders/bronze",
"bronze_table": "bronze.raw_orders",
"silver_table": "silver.orders",
"gold_table": "gold.daily_orders",
"dedup_keys": ["order_id"],
"primary_key": "order_id",
"gold_query": """
SELECT order_date, region,
COUNT(*) as orders,
SUM(amount) as revenue
FROM silver.orders
GROUP BY order_date, region
"""
}
pipeline = ELTPipeline(spark, config)
pipeline.run()
Practice Problems
Problem 1: Choose ETL or ELT
EasyYour company is migrating from an on-premises SQL Server data warehouse to Databricks. The existing ETL pipelines use SSIS (SQL Server Integration Services). Should you replicate the ETL pattern or switch to ELT? Justify your choice.
Problem 2: Design Idempotent Ingestion
MediumYour pipeline sometimes processes the same source file twice due to cloud storage eventual consistency. Design an idempotent ingestion pattern that prevents duplicate records in your Silver layer, even if the same file is processed multiple times.
Problem 3: Multi-Source Pipeline Architecture
HardDesign a pipeline that ingests from three sources simultaneously: (1) real-time Kafka events, (2) hourly CSV file drops, and (3) daily JDBC extracts from a legacy database. All three must be unified into a single Silver table with exactly-once semantics.