Why Delta Live Tables?
Why DLT Matters
The Problem: Building and maintaining reliable data pipelines requires managing dependencies, handling data quality, dealing with failures, and orchestrating complex DAGs -- all manually.
The Solution: Delta Live Tables (DLT) lets you declare your pipeline as a series of transformations, and Databricks handles orchestration, error handling, monitoring, and data quality for you.
Real Impact: Teams report 40-60% less pipeline code and 80% fewer pipeline failures after adopting DLT.
Real-World Analogy
Think of DLT like a recipe book vs. cooking instructions:
- Imperative (traditional) = Step-by-step: "First chop onions, then heat oil to 350F, then saute for 3 minutes..."
- Declarative (DLT) = "I want caramelized onions" -- the system figures out the steps
- Expectations = Quality checks: "Onions must be golden, not burnt"
- Pipeline DAG = The full meal plan showing dish dependencies
Key Benefits of DLT
Declarative Definitions
Define what your data should look like, not how to build it. DLT manages execution order, retries, and incremental processing.
Built-in Data Quality
Expectations let you define quality rules inline. Rows that fail can be dropped, flagged, or used to halt the pipeline.
Automatic DAG Resolution
DLT reads your table definitions and automatically determines execution order based on references between datasets.
Simplified Operations
Built-in monitoring, lineage tracking, automatic error recovery, and pipeline event logs -- no external tools required.
Declarative vs Imperative Pipelines
| Aspect | Imperative (Traditional) | Declarative (DLT) |
|---|---|---|
| Approach | Define each step manually | Define desired output, engine handles steps |
| Execution Order | You manage dependencies | Auto-resolved from references |
| Error Handling | Custom try/catch, retries | Built-in recovery and retry |
| Data Quality | Manual assertions or tests | Inline expectations with actions |
| Incremental Processing | Manual watermarking and state | Automatic with streaming tables |
| Monitoring | External tools (Airflow, etc.) | Built-in pipeline event log |
Side-by-Side Comparison
# Traditional imperative pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, current_timestamp
spark = SparkSession.builder.getOrCreate()
# Step 1: Read raw data
raw_df = spark.read.format("json").load("/data/raw/events/")
# Step 2: Clean and validate
cleaned_df = raw_df.filter(col("event_id").isNotNull())
cleaned_df = cleaned_df.dropDuplicates(["event_id"])
# Step 3: Transform
enriched_df = cleaned_df.withColumn(
"processed_at", current_timestamp()
)
# Step 4: Write (handle merge manually)
enriched_df.write.format("delta") \
.mode("append") \
.saveAsTable("catalog.schema.enriched_events")
import dlt
from pyspark.sql.functions import col, current_timestamp
# DLT handles execution order, retries, and incremental loading
@dlt.table(
comment="Raw events ingested from JSON source"
)
def raw_events():
return spark.read.format("json").load("/data/raw/events/")
@dlt.table(
comment="Cleaned events with quality rules"
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
def cleaned_events():
return dlt.read("raw_events").dropDuplicates(["event_id"])
@dlt.table(
comment="Enriched events ready for analytics"
)
def enriched_events():
return dlt.read("cleaned_events").withColumn(
"processed_at", current_timestamp()
)
Creating DLT Tables & Views
DLT Object Types
- @dlt.table: Creates a materialized Delta table that is persisted and queryable
- @dlt.view: Creates a temporary, intermediate view (not persisted) -- used for intermediate transformations
- Streaming tables: Tables that process data incrementally using Structured Streaming
import dlt
from pyspark.sql.functions import col, sum, avg, count
# --- Materialized Table (persisted) ---
@dlt.table(
name="bronze_orders",
comment="Raw orders ingested from source system",
table_properties={
"quality": "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def bronze_orders():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/data/raw/orders/")
)
# --- View (not persisted, intermediate step) ---
@dlt.view(
name="validated_orders",
comment="Orders that pass validation (not persisted)"
)
def validated_orders():
return (
dlt.read("bronze_orders")
.filter(col("order_total") > 0)
.filter(col("customer_id").isNotNull())
)
# --- Silver table using the view ---
@dlt.table(
name="silver_orders",
comment="Cleaned, validated orders"
)
def silver_orders():
return dlt.read("validated_orders")
# --- Gold aggregation table ---
@dlt.table(
name="gold_daily_revenue",
comment="Daily revenue aggregation"
)
def gold_daily_revenue():
return (
dlt.read("silver_orders")
.groupBy("order_date")
.agg(
sum("order_total").alias("total_revenue"),
count("order_id").alias("order_count"),
avg("order_total").alias("avg_order_value")
)
)
SQL Syntax for DLT
-- Create a DLT table using SQL
CREATE OR REFRESH STREAMING LIVE TABLE bronze_customers
COMMENT "Raw customers from CSV"
AS SELECT *
FROM cloud_files("/data/raw/customers/", "csv",
map("header", "true", "inferSchema", "true"));
-- Create a live view
CREATE LIVE VIEW valid_customers
AS SELECT *
FROM LIVE.bronze_customers
WHERE customer_id IS NOT NULL
AND email IS NOT NULL;
-- Create a silver table from the view
CREATE OR REFRESH LIVE TABLE silver_customers
COMMENT "Cleaned customer records"
AS SELECT * FROM LIVE.valid_customers;
Expectations (Data Quality Rules)
What Are Expectations?
Expectations are DLT's built-in data quality framework. They let you define rules that each row must satisfy, and specify what happens when rows violate those rules.
| Decorator | On Failure | Use Case |
|---|---|---|
@dlt.expect |
Log warning, keep row | Soft validation, monitoring only |
@dlt.expect_or_drop |
Drop the failing row | Filter out bad data silently |
@dlt.expect_or_fail |
Halt the entire pipeline | Critical rules that must never be violated |
@dlt.expect_all |
Log warnings for multiple rules | Apply several soft checks at once |
@dlt.expect_all_or_drop |
Drop if any rule fails | Multiple filters combined |
@dlt.expect_all_or_fail |
Halt if any rule fails | Strict multi-rule enforcement |
import dlt
from pyspark.sql.functions import col
# Single expectation: warn but keep the row
@dlt.table()
@dlt.expect("valid_amount", "amount > 0")
def orders_warn():
return dlt.read("raw_orders")
# Single expectation: drop failing rows
@dlt.table()
@dlt.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
def customers_filtered():
return dlt.read("raw_customers")
# Single expectation: halt pipeline on failure
@dlt.table()
@dlt.expect_or_fail("unique_transaction", "txn_id IS NOT NULL")
def transactions_strict():
return dlt.read("raw_transactions")
# Multiple expectations at once
rules = {
"valid_id": "order_id IS NOT NULL",
"valid_amount": "amount > 0",
"valid_date": "order_date >= '2020-01-01'",
"valid_status": "status IN ('pending', 'completed', 'cancelled')"
}
@dlt.table()
@dlt.expect_all_or_drop(rules)
def validated_orders():
return dlt.read("raw_orders")
Pipeline DAG Visualization
How DLT Builds the DAG
When you define DLT tables and views, each call to dlt.read("table_name") creates a dependency edge. DLT analyzes all your definitions and constructs a directed acyclic graph (DAG), then executes tables in the correct topological order.
Materialized Views vs Streaming Tables
| Feature | Materialized View (LIVE TABLE) | Streaming Table (STREAMING LIVE TABLE) |
|---|---|---|
| Processing | Full recomputation each run | Incremental, append-only |
| Data Source | Any (batch or streaming) | Streaming sources only |
| Use Case | Aggregations, slowly changing data | High-volume, append-heavy data |
| Reads With | dlt.read("table") |
dlt.read_stream("table") |
| Performance | Slower for large datasets | Efficient for large, growing datasets |
| State | Fully replaced each run | Checkpoint-based, resumes from last position |
import dlt
# --- Streaming Table: for continuously arriving data ---
@dlt.table(comment="Streaming ingestion from cloud storage")
def streaming_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/data/events/")
)
# --- Materialized View: aggregation that recomputes ---
@dlt.table(comment="Hourly event counts, recomputed each run")
def hourly_event_counts():
return (
dlt.read("streaming_events")
.groupBy("event_type", "hour")
.count()
)
# --- Streaming table reading from another streaming table ---
@dlt.table(comment="Filtered stream of error events")
def error_events():
return (
dlt.read_stream("streaming_events")
.filter("event_type = 'error'")
)
Deployment & Monitoring
Pipeline Configuration
DLT pipelines are configured through the Databricks UI, CLI, or REST API. Key settings include the target schema, cluster size, pipeline mode (triggered vs. continuous), and notebook paths.
{
"name": "ecommerce_pipeline",
"target": "ecommerce_analytics",
"catalog": "production",
"libraries": [
{ "notebook": { "path": "/Repos/team/pipelines/bronze" } },
{ "notebook": { "path": "/Repos/team/pipelines/silver" } },
{ "notebook": { "path": "/Repos/team/pipelines/gold" } }
],
"configuration": {
"pipelines.trigger.interval": "1 hour",
"source_env": "production"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"continuous": false,
"development": false,
"photon": true,
"channel": "CURRENT"
}
Monitoring with the Event Log
-- Query the DLT event log for pipeline metrics
SELECT
timestamp,
details:flow_definition.output_dataset AS dataset,
details:flow_progress.metrics.num_output_rows AS rows_written,
details:flow_progress.data_quality.expectations
FROM event_log(TABLE(ecommerce_analytics.bronze_orders))
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 20;
-- Check data quality metrics
SELECT
details:flow_definition.output_dataset AS dataset,
details:flow_progress.data_quality.dropped_records AS dropped,
details:flow_progress.data_quality.expectations
FROM event_log(TABLE(ecommerce_analytics.cleaned_orders))
WHERE event_type = 'flow_progress';
Common Pitfall: Development vs Production Mode
Development mode reuses the cluster between runs and does not automatically retry on failure -- great for iterating. Production mode spins up a fresh cluster each run, enables retries, and is optimized for reliability. Always test in development mode before deploying to production.
Practice Problems
Easy Basic DLT Pipeline
Create a DLT pipeline with three tables following the medallion pattern:
- A bronze streaming table that ingests CSV files from
/data/raw/sales/ - A silver table that filters out rows where
sale_amountis null or negative - A gold table that computes total sales per product category
Use @dlt.table for each layer. For bronze, use spark.readStream.format("cloudFiles"). For filtering, use @dlt.expect_or_drop. For aggregation, use groupBy and agg.
import dlt
from pyspark.sql.functions import col, sum
@dlt.table(comment="Raw sales from CSV")
def bronze_sales():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load("/data/raw/sales/")
)
@dlt.table(comment="Validated sales")
@dlt.expect_or_drop("valid_amount", "sale_amount IS NOT NULL AND sale_amount > 0")
def silver_sales():
return dlt.read_stream("bronze_sales")
@dlt.table(comment="Sales by category")
def gold_sales_by_category():
return (
dlt.read("silver_sales")
.groupBy("product_category")
.agg(sum("sale_amount").alias("total_sales"))
)
Medium Multi-Rule Expectations
Build a DLT table for user signups with these quality rules:
- Email must not be null and must contain '@'
- Age must be between 13 and 120
- Signup date must be after 2020-01-01
- Country must be a non-empty string
Rows failing email or age rules should be dropped. Rows failing date or country rules should be warned but kept.
Combine @dlt.expect_all_or_drop for strict rules and @dlt.expect_all for soft rules. You can stack multiple expectation decorators on the same table.
import dlt
strict_rules = {
"valid_email": "email IS NOT NULL AND email LIKE '%@%'",
"valid_age": "age BETWEEN 13 AND 120"
}
soft_rules = {
"valid_signup_date": "signup_date >= '2020-01-01'",
"valid_country": "country IS NOT NULL AND LENGTH(country) > 0"
}
@dlt.table(comment="Validated user signups")
@dlt.expect_all_or_drop(strict_rules)
@dlt.expect_all(soft_rules)
def validated_signups():
return dlt.read("raw_signups")
Hard Full Pipeline with CDC
Design a DLT pipeline that handles Change Data Capture (CDC):
- Ingest CDC events (inserts, updates, deletes) from a streaming source
- Apply changes to maintain a current-state silver table using
dlt.apply_changes - Create a gold table with customer lifetime value calculations
Use dlt.apply_changes() with keys, sequence_by, and apply_as_deletes parameters. The target must be declared with dlt.create_streaming_table().
import dlt
from pyspark.sql.functions import col, sum, expr
# Bronze: ingest CDC events
@dlt.table(comment="Raw CDC events")
def bronze_cdc_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/data/cdc/customers/")
)
# Silver: apply changes to get current state
dlt.create_streaming_table("silver_customers")
dlt.apply_changes(
target="silver_customers",
source="bronze_cdc_events",
keys=["customer_id"],
sequence_by=col("event_timestamp"),
apply_as_deletes=expr("operation = 'DELETE'"),
apply_as_truncates=expr("operation = 'TRUNCATE'"),
column_list=["customer_id", "name", "email", "total_purchases"]
)
# Gold: customer lifetime value
@dlt.table(comment="Customer lifetime value")
def gold_customer_ltv():
return (
dlt.read("silver_customers")
.filter(col("total_purchases").isNotNull())
.withColumn("ltv_tier",
expr("""CASE
WHEN total_purchases >= 10000 THEN 'platinum'
WHEN total_purchases >= 5000 THEN 'gold'
WHEN total_purchases >= 1000 THEN 'silver'
ELSE 'bronze'
END"""))
)
Quick Reference
DLT Python API Cheat Sheet
| API | Purpose | Example |
|---|---|---|
@dlt.table |
Define a materialized table | @dlt.table(name="my_table", comment="...") |
@dlt.view |
Define a non-persisted view | @dlt.view(name="temp_view") |
dlt.read() |
Read from a DLT dataset (batch) | dlt.read("silver_orders") |
dlt.read_stream() |
Read from a streaming DLT dataset | dlt.read_stream("bronze_events") |
@dlt.expect |
Warn on quality violation | @dlt.expect("rule_name", "condition") |
@dlt.expect_or_drop |
Drop rows violating rule | @dlt.expect_or_drop("valid_id", "id IS NOT NULL") |
@dlt.expect_or_fail |
Halt pipeline on violation | @dlt.expect_or_fail("critical", "amount > 0") |
dlt.apply_changes() |
Process CDC data | dlt.apply_changes(target="t", source="s", keys=["id"], ...) |
Pipeline Modes
Key Configuration Options
- Triggered: Pipeline runs once and stops. Ideal for scheduled batch processing.
- Continuous: Pipeline runs continuously, processing new data as it arrives. Ideal for low-latency streaming.
- Development: Reuses clusters, no automatic retries. Use for testing.
- Production: Fresh cluster each run, automatic retries. Use for reliable deployments.
- Photon: Enable the Photon engine for faster query execution on DLT tables.