ETL/ELT Pipelines in Databricks

Medium 30 min read

ETL vs ELT: Understanding the Difference

Why ETL/ELT Pipelines Matter

The Problem: Organizations have data scattered across dozens of systems -- databases, APIs, SaaS tools, files -- and need to consolidate it into their lakehouse for analytics and ML.

The Solution: ETL (Extract, Transform, Load) and ELT (Extract, Load, Transform) pipelines automate the movement and transformation of data from source systems into your Medallion Architecture layers.

Real Impact: Well-designed pipelines reduce manual data wrangling by 80%, enable near-real-time analytics, and ensure data consistency across the organization.

Real-World Analogy

Think of ETL vs ELT like shipping goods internationally:

  • ETL = Pack, label, and sort items at the origin warehouse before shipping. Only clean, organized goods arrive at the destination.
  • ELT = Ship everything as-is to the destination warehouse, then unpack, sort, and organize. Faster to ship, but requires a powerful destination warehouse.
ETL vs ELT Pipeline Flow
ETL: Transform Before Loading Extract Source systems Transform Clean, validate, join Load Data warehouse Clean data only arrives in warehouse ELT: Load First, Transform Inside Extract Source systems Load Raw to lakehouse Transform Inside the lakehouse Raw data preserved Transform with Spark ETL: Best when storage is expensive or data must be cleaned before landing ELT: Best for cloud lakehouses Databricks recommends ELT pattern
Aspect ETL ELT (Databricks Preferred)
Transform Location External processing engine Inside the lakehouse (Spark)
Raw Data Not preserved in target Preserved in Bronze layer
Reprocessing Must re-extract from source Replay from Bronze layer
Schema Evolution Requires pipeline changes Auto Loader handles naturally
Scalability Limited by ETL tool Scales with Spark clusters

Auto Loader: Incremental File Ingestion

Auto Loader is Databricks' recommended approach for ingesting data from cloud storage. It automatically detects new files as they arrive and processes only the new data -- no manual file tracking required.

Auto Loader Architecture

File Notification Mode

Uses cloud events (S3 SQS, Azure Event Grid, GCS Pub/Sub) to detect new files. Most efficient for high-volume, production workloads. Near-zero listing cost.

Directory Listing Mode

Periodically lists the input directory to find new files. Simple setup, no cloud events needed. Best for development and small-scale ingestion.

Schema Evolution

Automatically handles new columns in source data. Options: addNewColumns, failOnNewColumns, rescue (put unknown fields in _rescued_data column).

Exactly-Once Guarantee

Checkpointing ensures each file is processed exactly once, even if the pipeline restarts. No duplicate records from retries.

PySpark - Auto Loader with Schema Evolution
from pyspark.sql import functions as F

# ── Auto Loader: Incremental JSON Ingestion ──

# Configure Auto Loader with schema evolution
raw_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation",
            "dbfs:/checkpoints/orders/schema")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    # Rescue unknown columns instead of failing
    .option("cloudFiles.schemaHints",
            "order_id STRING, amount DOUBLE, ts TIMESTAMP")
    .load("s3://raw-data/orders/")
)

# Add ingestion metadata and write to Bronze
(
    raw_stream
    .withColumn("_ingested_at", F.current_timestamp())
    .withColumn("_source_file", F.input_file_name())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation",
            "dbfs:/checkpoints/orders/cp")
    .option("mergeSchema", True)
    .trigger(availableNow=True)
    .toTable("bronze.raw_orders")
)

Connecting to Data Sources

Databricks supports reading from virtually any data source. Here are the most common patterns for production pipelines.

PySpark - Common Data Source Connectors
# ── JDBC: Read from PostgreSQL / MySQL / SQL Server ──
jdbc_df = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://host:5432/mydb")
    .option("dbtable", "public.customers")
    .option("user", dbutils.secrets.get("scope", "pg_user"))
    .option("password", dbutils.secrets.get("scope", "pg_pass"))
    .option("fetchsize", "10000")
    .option("partitionColumn", "id")
    .option("lowerBound", "1")
    .option("upperBound", "1000000")
    .option("numPartitions", "10")
    .load()
)

# ── REST API: Ingest from external APIs ──
import requests, json

response = requests.get(
    "https://api.example.com/v1/events",
    headers={"Authorization": f"Bearer {token}"}
)
data = response.json()["results"]
api_df = spark.createDataFrame(data)

# ── Kafka: Real-time streaming ingestion ──
kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "orders,events")
    .option("startingOffsets", "latest")
    .load()
    .selectExpr("CAST(key AS STRING)",
                "CAST(value AS STRING)",
                "topic", "partition", "offset")
)

# ── CSV with schema enforcement ──
csv_df = (
    spark.read
    .format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .option("dateFormat", "yyyy-MM-dd")
    .option("mode", "PERMISSIVE")
    .load("s3://data/reports/*.csv")
)

Error Handling and Recovery

Production pipelines must handle failures gracefully. Databricks provides several patterns for error handling, dead-letter queues, and automatic recovery.

Error Handling Patterns

Pattern When to Use Implementation
Permissive Mode Ingestion -- keep bad records aside _corrupt_record column or _rescued_data
Quarantine Table Silver layer -- separate bad from good Filter + write to quarantine Delta table
Dead-Letter Queue Streaming -- records that repeatedly fail foreachBatch with try/except per batch
Retry with Backoff API calls and transient errors tenacity or custom retry decorator
Idempotent Writes Prevent duplicates on retry MERGE with match condition on primary key
PySpark - Production Error Handling Pipeline
from pyspark.sql import functions as F
from delta.tables import DeltaTable

def process_batch_with_errors(batch_df, batch_id):
    """Process a micro-batch with error handling."""
    try:
        # Separate good and bad records
        good = batch_df.filter(
            F.col("order_id").isNotNull() &
            (F.col("amount") > 0)
        )
        bad = batch_df.filter(
            F.col("order_id").isNull() |
            (F.col("amount") <= 0)
        )

        # MERGE good records (idempotent upsert)
        target = DeltaTable.forName(spark, "silver.orders")
        (
            target.alias("t")
            .merge(good.alias("s"),
                   "t.order_id = s.order_id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

        # Send bad records to dead-letter table
        if bad.count() > 0:
            (bad
             .withColumn("_error_batch_id", F.lit(batch_id))
             .withColumn("_error_timestamp", F.current_timestamp())
             .write.format("delta")
             .mode("append")
             .saveAsTable("silver.orders_dead_letter"))

    except Exception as e:
        # Log the entire failed batch for investigation
        (batch_df
         .withColumn("_error_message", F.lit(str(e)))
         .withColumn("_error_batch_id", F.lit(batch_id))
         .write.format("delta")
         .mode("append")
         .saveAsTable("silver.orders_failed_batches"))
        raise  # Re-raise so Spark retries

# Use foreachBatch for error-handling streaming pipeline
(
    spark.readStream
    .table("bronze.raw_orders")
    .writeStream
    .foreachBatch(process_batch_with_errors)
    .option("checkpointLocation", "dbfs:/cp/silver_orders")
    .trigger(availableNow=True)
    .start()
)

Complete Production Pipeline

Here is a full end-to-end ELT pipeline pattern that combines Auto Loader, error handling, and Medallion Architecture layers.

PySpark - End-to-End ELT Pipeline
from pyspark.sql import functions as F
from delta.tables import DeltaTable
import logging

logger = logging.getLogger("etl_pipeline")

class ELTPipeline:
    """Production ELT pipeline for Databricks."""

    def __init__(self, spark, config):
        self.spark = spark
        self.config = config

    def extract_and_load_bronze(self):
        """Extract from source and load raw to Bronze."""
        logger.info("Starting Bronze ingestion")
        return (
            self.spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format",
                    self.config["source_format"])
            .option("cloudFiles.schemaLocation",
                    self.config["schema_path"])
            .option("cloudFiles.inferColumnTypes", True)
            .load(self.config["source_path"])
            .withColumn("_ingested_at", F.current_timestamp())
            .withColumn("_source_file", F.input_file_name())
            .writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation",
                    self.config["bronze_checkpoint"])
            .trigger(availableNow=True)
            .toTable(self.config["bronze_table"])
        )

    def transform_to_silver(self):
        """Transform Bronze to Silver with quality checks."""
        logger.info("Starting Silver transformation")
        bronze = self.spark.table(self.config["bronze_table"])

        silver = (
            bronze
            .dropDuplicates(self.config["dedup_keys"])
            .filter(F.col(self.config["primary_key"]).isNotNull())
            .withColumn("_processed_at", F.current_timestamp())
        )

        # Apply custom transformations
        for col_name, transform in self.config.get(
                "transforms", {}).items():
            silver = silver.withColumn(col_name, transform)

        silver.write.format("delta").mode("overwrite")             .saveAsTable(self.config["silver_table"])

        count = silver.count()
        logger.info(f"Silver: {count} records written")
        return count

    def aggregate_to_gold(self):
        """Aggregate Silver to Gold for consumption."""
        logger.info("Starting Gold aggregation")
        gold = self.spark.sql(self.config["gold_query"])
        gold.write.format("delta").mode("overwrite")             .saveAsTable(self.config["gold_table"])
        count = gold.count()
        logger.info(f"Gold: {count} records written")
        return count

    def run(self):
        """Execute the full ELT pipeline."""
        self.extract_and_load_bronze()
        self.transform_to_silver()
        self.aggregate_to_gold()
        logger.info("Pipeline complete")

# ── Run the pipeline ──
config = {
    "source_path": "s3://raw-data/orders/",
    "source_format": "json",
    "schema_path": "dbfs:/checkpoints/orders/schema",
    "bronze_checkpoint": "dbfs:/checkpoints/orders/bronze",
    "bronze_table": "bronze.raw_orders",
    "silver_table": "silver.orders",
    "gold_table": "gold.daily_orders",
    "dedup_keys": ["order_id"],
    "primary_key": "order_id",
    "gold_query": """
        SELECT order_date, region,
               COUNT(*) as orders,
               SUM(amount) as revenue
        FROM silver.orders
        GROUP BY order_date, region
    """
}

pipeline = ELTPipeline(spark, config)
pipeline.run()

Practice Problems

Problem 1: Choose ETL or ELT

Easy

Your company is migrating from an on-premises SQL Server data warehouse to Databricks. The existing ETL pipelines use SSIS (SQL Server Integration Services). Should you replicate the ETL pattern or switch to ELT? Justify your choice.

Problem 2: Design Idempotent Ingestion

Medium

Your pipeline sometimes processes the same source file twice due to cloud storage eventual consistency. Design an idempotent ingestion pattern that prevents duplicate records in your Silver layer, even if the same file is processed multiple times.

Problem 3: Multi-Source Pipeline Architecture

Hard

Design a pipeline that ingests from three sources simultaneously: (1) real-time Kafka events, (2) hourly CSV file drops, and (3) daily JDBC extracts from a legacy database. All three must be unified into a single Silver table with exactly-once semantics.