Delta Live Tables & Declarative Pipelines

Medium 30 min read

Why Delta Live Tables?

Why DLT Matters

The Problem: Building and maintaining reliable data pipelines requires managing dependencies, handling data quality, dealing with failures, and orchestrating complex DAGs -- all manually.

The Solution: Delta Live Tables (DLT) lets you declare your pipeline as a series of transformations, and Databricks handles orchestration, error handling, monitoring, and data quality for you.

Real Impact: Teams report 40-60% less pipeline code and 80% fewer pipeline failures after adopting DLT.

Real-World Analogy

Think of DLT like a recipe book vs. cooking instructions:

  • Imperative (traditional) = Step-by-step: "First chop onions, then heat oil to 350F, then saute for 3 minutes..."
  • Declarative (DLT) = "I want caramelized onions" -- the system figures out the steps
  • Expectations = Quality checks: "Onions must be golden, not burnt"
  • Pipeline DAG = The full meal plan showing dish dependencies

Key Benefits of DLT

Declarative Definitions

Define what your data should look like, not how to build it. DLT manages execution order, retries, and incremental processing.

Built-in Data Quality

Expectations let you define quality rules inline. Rows that fail can be dropped, flagged, or used to halt the pipeline.

Automatic DAG Resolution

DLT reads your table definitions and automatically determines execution order based on references between datasets.

Simplified Operations

Built-in monitoring, lineage tracking, automatic error recovery, and pipeline event logs -- no external tools required.

Declarative vs Imperative Pipelines

Aspect Imperative (Traditional) Declarative (DLT)
Approach Define each step manually Define desired output, engine handles steps
Execution Order You manage dependencies Auto-resolved from references
Error Handling Custom try/catch, retries Built-in recovery and retry
Data Quality Manual assertions or tests Inline expectations with actions
Incremental Processing Manual watermarking and state Automatic with streaming tables
Monitoring External tools (Airflow, etc.) Built-in pipeline event log

Side-by-Side Comparison

imperative_pipeline.py (Traditional Approach)
# Traditional imperative pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, current_timestamp

spark = SparkSession.builder.getOrCreate()

# Step 1: Read raw data
raw_df = spark.read.format("json").load("/data/raw/events/")

# Step 2: Clean and validate
cleaned_df = raw_df.filter(col("event_id").isNotNull())
cleaned_df = cleaned_df.dropDuplicates(["event_id"])

# Step 3: Transform
enriched_df = cleaned_df.withColumn(
    "processed_at", current_timestamp()
)

# Step 4: Write (handle merge manually)
enriched_df.write.format("delta") \
    .mode("append") \
    .saveAsTable("catalog.schema.enriched_events")
dlt_pipeline.py (DLT Declarative Approach)
import dlt
from pyspark.sql.functions import col, current_timestamp

# DLT handles execution order, retries, and incremental loading

@dlt.table(
    comment="Raw events ingested from JSON source"
)
def raw_events():
    return spark.read.format("json").load("/data/raw/events/")

@dlt.table(
    comment="Cleaned events with quality rules"
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
def cleaned_events():
    return dlt.read("raw_events").dropDuplicates(["event_id"])

@dlt.table(
    comment="Enriched events ready for analytics"
)
def enriched_events():
    return dlt.read("cleaned_events").withColumn(
        "processed_at", current_timestamp()
    )

Creating DLT Tables & Views

DLT Object Types

  • @dlt.table: Creates a materialized Delta table that is persisted and queryable
  • @dlt.view: Creates a temporary, intermediate view (not persisted) -- used for intermediate transformations
  • Streaming tables: Tables that process data incrementally using Structured Streaming
dlt_table_types.py
import dlt
from pyspark.sql.functions import col, sum, avg, count

# --- Materialized Table (persisted) ---
@dlt.table(
    name="bronze_orders",
    comment="Raw orders ingested from source system",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_orders():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/data/raw/orders/")
    )

# --- View (not persisted, intermediate step) ---
@dlt.view(
    name="validated_orders",
    comment="Orders that pass validation (not persisted)"
)
def validated_orders():
    return (
        dlt.read("bronze_orders")
        .filter(col("order_total") > 0)
        .filter(col("customer_id").isNotNull())
    )

# --- Silver table using the view ---
@dlt.table(
    name="silver_orders",
    comment="Cleaned, validated orders"
)
def silver_orders():
    return dlt.read("validated_orders")

# --- Gold aggregation table ---
@dlt.table(
    name="gold_daily_revenue",
    comment="Daily revenue aggregation"
)
def gold_daily_revenue():
    return (
        dlt.read("silver_orders")
        .groupBy("order_date")
        .agg(
            sum("order_total").alias("total_revenue"),
            count("order_id").alias("order_count"),
            avg("order_total").alias("avg_order_value")
        )
    )

SQL Syntax for DLT

dlt_sql.sql
-- Create a DLT table using SQL
CREATE OR REFRESH STREAMING LIVE TABLE bronze_customers
COMMENT "Raw customers from CSV"
AS SELECT *
FROM cloud_files("/data/raw/customers/", "csv",
    map("header", "true", "inferSchema", "true"));

-- Create a live view
CREATE LIVE VIEW valid_customers
AS SELECT *
FROM LIVE.bronze_customers
WHERE customer_id IS NOT NULL
  AND email IS NOT NULL;

-- Create a silver table from the view
CREATE OR REFRESH LIVE TABLE silver_customers
COMMENT "Cleaned customer records"
AS SELECT * FROM LIVE.valid_customers;

Expectations (Data Quality Rules)

What Are Expectations?

Expectations are DLT's built-in data quality framework. They let you define rules that each row must satisfy, and specify what happens when rows violate those rules.

Decorator On Failure Use Case
@dlt.expect Log warning, keep row Soft validation, monitoring only
@dlt.expect_or_drop Drop the failing row Filter out bad data silently
@dlt.expect_or_fail Halt the entire pipeline Critical rules that must never be violated
@dlt.expect_all Log warnings for multiple rules Apply several soft checks at once
@dlt.expect_all_or_drop Drop if any rule fails Multiple filters combined
@dlt.expect_all_or_fail Halt if any rule fails Strict multi-rule enforcement
expectations_examples.py
import dlt
from pyspark.sql.functions import col

# Single expectation: warn but keep the row
@dlt.table()
@dlt.expect("valid_amount", "amount > 0")
def orders_warn():
    return dlt.read("raw_orders")

# Single expectation: drop failing rows
@dlt.table()
@dlt.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
def customers_filtered():
    return dlt.read("raw_customers")

# Single expectation: halt pipeline on failure
@dlt.table()
@dlt.expect_or_fail("unique_transaction", "txn_id IS NOT NULL")
def transactions_strict():
    return dlt.read("raw_transactions")

# Multiple expectations at once
rules = {
    "valid_id": "order_id IS NOT NULL",
    "valid_amount": "amount > 0",
    "valid_date": "order_date >= '2020-01-01'",
    "valid_status": "status IN ('pending', 'completed', 'cancelled')"
}

@dlt.table()
@dlt.expect_all_or_drop(rules)
def validated_orders():
    return dlt.read("raw_orders")
Expectations Quality Enforcement Flow
Incoming Row Expectation Check PASS Write to Table FAIL @expect Warn + Keep @expect_or_drop Drop Row @expect_or_fail Halt Pipeline Pipeline Metrics Rows passed: 9,847 Rows dropped: 153 Rows warned: 42 Quality: 98.5%

Pipeline DAG Visualization

How DLT Builds the DAG

When you define DLT tables and views, each call to dlt.read("table_name") creates a dependency edge. DLT analyzes all your definitions and constructs a directed acyclic graph (DAG), then executes tables in the correct topological order.

DLT Pipeline DAG -- Medallion Architecture
Bronze raw_orders Streaming Table raw_customers Streaming Table raw_products Streaming Table Silver cleaned_orders Materialized + Expectations enriched_orders Join orders + customers valid_products View (not persisted) Gold daily_revenue Aggregation top_customers Aggregation product_metrics Aggregation BI / Dashboard

Materialized Views vs Streaming Tables

Feature Materialized View (LIVE TABLE) Streaming Table (STREAMING LIVE TABLE)
Processing Full recomputation each run Incremental, append-only
Data Source Any (batch or streaming) Streaming sources only
Use Case Aggregations, slowly changing data High-volume, append-heavy data
Reads With dlt.read("table") dlt.read_stream("table")
Performance Slower for large datasets Efficient for large, growing datasets
State Fully replaced each run Checkpoint-based, resumes from last position
streaming_vs_materialized.py
import dlt

# --- Streaming Table: for continuously arriving data ---
@dlt.table(comment="Streaming ingestion from cloud storage")
def streaming_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/data/events/")
    )

# --- Materialized View: aggregation that recomputes ---
@dlt.table(comment="Hourly event counts, recomputed each run")
def hourly_event_counts():
    return (
        dlt.read("streaming_events")
        .groupBy("event_type", "hour")
        .count()
    )

# --- Streaming table reading from another streaming table ---
@dlt.table(comment="Filtered stream of error events")
def error_events():
    return (
        dlt.read_stream("streaming_events")
        .filter("event_type = 'error'")
    )

Deployment & Monitoring

Pipeline Configuration

DLT pipelines are configured through the Databricks UI, CLI, or REST API. Key settings include the target schema, cluster size, pipeline mode (triggered vs. continuous), and notebook paths.

pipeline_config.json
{
  "name": "ecommerce_pipeline",
  "target": "ecommerce_analytics",
  "catalog": "production",
  "libraries": [
    { "notebook": { "path": "/Repos/team/pipelines/bronze" } },
    { "notebook": { "path": "/Repos/team/pipelines/silver" } },
    { "notebook": { "path": "/Repos/team/pipelines/gold" } }
  ],
  "configuration": {
    "pipelines.trigger.interval": "1 hour",
    "source_env": "production"
  },
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "continuous": false,
  "development": false,
  "photon": true,
  "channel": "CURRENT"
}

Monitoring with the Event Log

monitoring.sql
-- Query the DLT event log for pipeline metrics
SELECT
    timestamp,
    details:flow_definition.output_dataset AS dataset,
    details:flow_progress.metrics.num_output_rows AS rows_written,
    details:flow_progress.data_quality.expectations
FROM event_log(TABLE(ecommerce_analytics.bronze_orders))
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 20;

-- Check data quality metrics
SELECT
    details:flow_definition.output_dataset AS dataset,
    details:flow_progress.data_quality.dropped_records AS dropped,
    details:flow_progress.data_quality.expectations
FROM event_log(TABLE(ecommerce_analytics.cleaned_orders))
WHERE event_type = 'flow_progress';

Common Pitfall: Development vs Production Mode

Development mode reuses the cluster between runs and does not automatically retry on failure -- great for iterating. Production mode spins up a fresh cluster each run, enables retries, and is optimized for reliability. Always test in development mode before deploying to production.

Practice Problems

Easy Basic DLT Pipeline

Create a DLT pipeline with three tables following the medallion pattern:

  1. A bronze streaming table that ingests CSV files from /data/raw/sales/
  2. A silver table that filters out rows where sale_amount is null or negative
  3. A gold table that computes total sales per product category

Use @dlt.table for each layer. For bronze, use spark.readStream.format("cloudFiles"). For filtering, use @dlt.expect_or_drop. For aggregation, use groupBy and agg.

import dlt
from pyspark.sql.functions import col, sum

@dlt.table(comment="Raw sales from CSV")
def bronze_sales():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("/data/raw/sales/")
    )

@dlt.table(comment="Validated sales")
@dlt.expect_or_drop("valid_amount", "sale_amount IS NOT NULL AND sale_amount > 0")
def silver_sales():
    return dlt.read_stream("bronze_sales")

@dlt.table(comment="Sales by category")
def gold_sales_by_category():
    return (
        dlt.read("silver_sales")
        .groupBy("product_category")
        .agg(sum("sale_amount").alias("total_sales"))
    )

Medium Multi-Rule Expectations

Build a DLT table for user signups with these quality rules:

  1. Email must not be null and must contain '@'
  2. Age must be between 13 and 120
  3. Signup date must be after 2020-01-01
  4. Country must be a non-empty string

Rows failing email or age rules should be dropped. Rows failing date or country rules should be warned but kept.

Combine @dlt.expect_all_or_drop for strict rules and @dlt.expect_all for soft rules. You can stack multiple expectation decorators on the same table.

import dlt

strict_rules = {
    "valid_email": "email IS NOT NULL AND email LIKE '%@%'",
    "valid_age": "age BETWEEN 13 AND 120"
}

soft_rules = {
    "valid_signup_date": "signup_date >= '2020-01-01'",
    "valid_country": "country IS NOT NULL AND LENGTH(country) > 0"
}

@dlt.table(comment="Validated user signups")
@dlt.expect_all_or_drop(strict_rules)
@dlt.expect_all(soft_rules)
def validated_signups():
    return dlt.read("raw_signups")

Hard Full Pipeline with CDC

Design a DLT pipeline that handles Change Data Capture (CDC):

  1. Ingest CDC events (inserts, updates, deletes) from a streaming source
  2. Apply changes to maintain a current-state silver table using dlt.apply_changes
  3. Create a gold table with customer lifetime value calculations

Use dlt.apply_changes() with keys, sequence_by, and apply_as_deletes parameters. The target must be declared with dlt.create_streaming_table().

import dlt
from pyspark.sql.functions import col, sum, expr

# Bronze: ingest CDC events
@dlt.table(comment="Raw CDC events")
def bronze_cdc_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/data/cdc/customers/")
    )

# Silver: apply changes to get current state
dlt.create_streaming_table("silver_customers")

dlt.apply_changes(
    target="silver_customers",
    source="bronze_cdc_events",
    keys=["customer_id"],
    sequence_by=col("event_timestamp"),
    apply_as_deletes=expr("operation = 'DELETE'"),
    apply_as_truncates=expr("operation = 'TRUNCATE'"),
    column_list=["customer_id", "name", "email", "total_purchases"]
)

# Gold: customer lifetime value
@dlt.table(comment="Customer lifetime value")
def gold_customer_ltv():
    return (
        dlt.read("silver_customers")
        .filter(col("total_purchases").isNotNull())
        .withColumn("ltv_tier",
            expr("""CASE
                WHEN total_purchases >= 10000 THEN 'platinum'
                WHEN total_purchases >= 5000 THEN 'gold'
                WHEN total_purchases >= 1000 THEN 'silver'
                ELSE 'bronze'
            END"""))
    )

Quick Reference

DLT Python API Cheat Sheet

API Purpose Example
@dlt.table Define a materialized table @dlt.table(name="my_table", comment="...")
@dlt.view Define a non-persisted view @dlt.view(name="temp_view")
dlt.read() Read from a DLT dataset (batch) dlt.read("silver_orders")
dlt.read_stream() Read from a streaming DLT dataset dlt.read_stream("bronze_events")
@dlt.expect Warn on quality violation @dlt.expect("rule_name", "condition")
@dlt.expect_or_drop Drop rows violating rule @dlt.expect_or_drop("valid_id", "id IS NOT NULL")
@dlt.expect_or_fail Halt pipeline on violation @dlt.expect_or_fail("critical", "amount > 0")
dlt.apply_changes() Process CDC data dlt.apply_changes(target="t", source="s", keys=["id"], ...)

Pipeline Modes

Key Configuration Options

  • Triggered: Pipeline runs once and stops. Ideal for scheduled batch processing.
  • Continuous: Pipeline runs continuously, processing new data as it arrives. Ideal for low-latency streaming.
  • Development: Reuses clusters, no automatic retries. Use for testing.
  • Production: Fresh cluster each run, automatic retries. Use for reliable deployments.
  • Photon: Enable the Photon engine for faster query execution on DLT tables.