Project Overview & Requirements
What You Will Build
Goal: Build a production-grade data lakehouse on Databricks using the medallion architecture pattern.
Data: E-commerce transaction data from multiple sources (CSV files in S3/ADLS, JSON event streams, relational database exports).
Outcome: A fully orchestrated pipeline that ingests raw data, cleans and enriches it, creates business-level aggregates, and serves dashboards via SQL Warehouses.
Prerequisites
- Databricks workspace with Unity Catalog enabled
- Access to a cloud storage account (S3 or ADLS)
- Familiarity with PySpark and Delta Lake basics
- Understanding of the medallion architecture pattern
Architecture Overview
Data Source Setup (S3/ADLS)
Supported Data Sources
This project uses cloud object storage as the primary landing zone. The same patterns apply whether you use AWS S3 or Azure Data Lake Storage Gen2 (ADLS).
Configure External Storage Access
# Create a Unity Catalog external location
# This allows Databricks to read from your cloud storage
# Step 1: Create a storage credential (Admin UI or SQL)
spark.sql("""
CREATE STORAGE CREDENTIAL IF NOT EXISTS lakehouse_cred
COMMENT 'Storage credential for lakehouse project'
""")
# Step 2: Create external location pointing to your landing zone
spark.sql("""
CREATE EXTERNAL LOCATION IF NOT EXISTS landing_zone
URL 's3://my-lakehouse-landing/'
WITH (STORAGE CREDENTIAL lakehouse_cred)
COMMENT 'Raw data landing zone for lakehouse project'
""")
# Step 3: Create the catalog and schemas for medallion layers
spark.sql("CREATE CATALOG IF NOT EXISTS lakehouse")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.gold")
Generate Sample Data
from pyspark.sql import functions as F
from pyspark.sql.types import *
import random, uuid
from datetime import datetime, timedelta
# Generate transactions data
num_records = 100000
products = ["laptop", "phone", "tablet", "headphones", "monitor"]
statuses = ["completed", "pending", "refunded", "cancelled"]
data = []
for i in range(num_records):
data.append((
str(uuid.uuid4()),
f"CUST-{random.randint(1, 5000):05d}",
random.choice(products),
random.randint(1, 5),
round(random.uniform(29.99, 2499.99), 2),
random.choice(statuses),
(datetime.now() - timedelta(days=random.randint(0, 365))).isoformat()
))
schema = StructType([
StructField("transaction_id", StringType()),
StructField("customer_id", StringType()),
StructField("product", StringType()),
StructField("quantity", IntegerType()),
StructField("price", DoubleType()),
StructField("status", StringType()),
StructField("event_time", StringType()),
])
df = spark.createDataFrame(data, schema)
df.write.format("json").mode("overwrite").save(
"s3://my-lakehouse-landing/transactions/"
)
Bronze Layer (Auto Loader Ingestion)
Bronze Layer Purpose
The Bronze layer is your raw data landing zone. Data is ingested as-is with minimal transformation, preserving the original schema and adding ingestion metadata (source file, timestamp). Auto Loader handles incremental ingestion efficiently.
Auto Loader Configuration
from pyspark.sql import functions as F
# Auto Loader: Incrementally ingest new files from landing zone
# cloudFiles source automatically tracks which files have been processed
bronze_transactions = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"/mnt/checkpoints/bronze/transactions/schema")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://my-lakehouse-landing/transactions/")
# Add ingestion metadata
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.withColumn("_ingestion_date", F.current_date())
)
# Write to Bronze Delta table with partitioning
(bronze_transactions.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
"/mnt/checkpoints/bronze/transactions")
.option("mergeSchema", "true")
.partitionBy("_ingestion_date")
.trigger(availableNow=True)
.toTable("lakehouse.bronze.raw_transactions")
)
Bronze Layer Best Practices
Schema Evolution
Use schemaEvolutionMode = "addNewColumns" so new fields in source data are automatically captured without breaking the pipeline.
Ingestion Metadata
Always add _ingested_at, _source_file, and _ingestion_date columns to trace data lineage back to source files.
Partition Strategy
Partition Bronze tables by ingestion date to enable efficient pruning and time-travel queries on raw data.
Silver Layer (Cleaning & Enrichment)
Silver Layer Purpose
The Silver layer contains cleaned, validated, and enriched data. This is where you apply data quality rules, deduplicate records, cast data types, and join related datasets together.
Data Cleaning Pipeline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
# Read from Bronze
bronze_df = spark.read.table("lakehouse.bronze.raw_transactions")
# Step 1: Data type casting and validation
cleaned_df = (bronze_df
.withColumn("event_time",
F.to_timestamp("event_time", "yyyy-MM-dd'T'HH:mm:ss"))
.withColumn("price", F.col("price").cast("decimal(10,2)"))
.withColumn("quantity", F.col("quantity").cast("integer"))
.withColumn("total_amount",
F.round(F.col("price") * F.col("quantity"), 2))
)
# Step 2: Filter invalid records
valid_df = (cleaned_df
.filter(F.col("transaction_id").isNotNull())
.filter(F.col("price") > 0)
.filter(F.col("quantity") > 0)
.filter(F.col("event_time").isNotNull())
)
# Step 3: Deduplicate using window function
window_spec = Window.partitionBy("transaction_id").orderBy(
F.col("_ingested_at").desc()
)
deduped_df = (valid_df
.withColumn("row_num", F.row_number().over(window_spec))
.filter(F.col("row_num") == 1)
.drop("row_num")
)
# Step 4: Enrich with derived columns
enriched_df = (deduped_df
.withColumn("event_date", F.to_date("event_time"))
.withColumn("event_hour", F.hour("event_time"))
.withColumn("day_of_week", F.dayofweek("event_time"))
.withColumn("is_weekend",
F.when(F.col("day_of_week").isin(1, 7), True)
.otherwise(False))
.withColumn("_processed_at", F.current_timestamp())
)
# Step 5: MERGE into Silver (upsert pattern)
if spark.catalog.tableExists("lakehouse.silver.clean_transactions"):
silver_table = DeltaTable.forName(spark,
"lakehouse.silver.clean_transactions")
(silver_table.alias("target")
.merge(enriched_df.alias("source"),
"target.transaction_id = source.transaction_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
(enriched_df.write
.format("delta")
.partitionBy("event_date")
.saveAsTable("lakehouse.silver.clean_transactions")
)
Data Quality Checks
# Data quality validation after Silver processing
silver_df = spark.read.table("lakehouse.silver.clean_transactions")
# Check: No null transaction IDs
null_count = silver_df.filter(F.col("transaction_id").isNull()).count()
assert null_count == 0, f"Found {null_count} null transaction IDs"
# Check: All prices are positive
neg_prices = silver_df.filter(F.col("price") <= 0).count()
assert neg_prices == 0, f"Found {neg_prices} non-positive prices"
# Check: No duplicate transaction IDs
total = silver_df.count()
distinct = silver_df.select("transaction_id").distinct().count()
assert total == distinct, f"Found {total - distinct} duplicate records"
# Check: Record count within expected range
assert total > 0, "Silver table is empty"
print(f"Data quality checks passed. Total records: {total}")
Gold Layer (Business Aggregates)
Gold Layer Purpose
The Gold layer contains business-level aggregations and metrics optimized for consumption by analysts, dashboards, and ML models. Each Gold table answers specific business questions.
Daily Revenue Aggregation
from pyspark.sql import functions as F
silver_df = spark.read.table("lakehouse.silver.clean_transactions")
# Daily revenue by product
daily_revenue = (silver_df
.filter(F.col("status") == "completed")
.groupBy("event_date", "product")
.agg(
F.sum("total_amount").alias("revenue"),
F.count("transaction_id").alias("order_count"),
F.sum("quantity").alias("units_sold"),
F.avg("total_amount").alias("avg_order_value"),
F.countDistinct("customer_id").alias("unique_customers")
)
.withColumn("_computed_at", F.current_timestamp())
)
# Overwrite Gold table (full rebuild for aggregates)
(daily_revenue.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable("lakehouse.gold.daily_revenue")
)
Customer Lifetime Value
# Customer Lifetime Value (LTV) calculation
customer_ltv = (silver_df
.filter(F.col("status") == "completed")
.groupBy("customer_id")
.agg(
F.sum("total_amount").alias("total_spend"),
F.count("transaction_id").alias("total_orders"),
F.avg("total_amount").alias("avg_order_value"),
F.min("event_date").alias("first_purchase"),
F.max("event_date").alias("last_purchase"),
F.countDistinct("product").alias("unique_products")
)
.withColumn("customer_tenure_days",
F.datediff("last_purchase", "first_purchase"))
.withColumn("ltv_segment",
F.when(F.col("total_spend") >= 10000, "Platinum")
.when(F.col("total_spend") >= 5000, "Gold")
.when(F.col("total_spend") >= 1000, "Silver")
.otherwise("Bronze"))
.withColumn("_computed_at", F.current_timestamp())
)
(customer_ltv.write
.format("delta")
.mode("overwrite")
.saveAsTable("lakehouse.gold.customer_ltv")
)
Serving Layer (SQL Warehouses + Dashboards)
SQL Warehouse Configuration
Use a Serverless SQL Warehouse for cost-efficient querying of Gold tables. Serverless warehouses start instantly and auto-scale based on demand.
Dashboard Queries
-- Revenue trend for the last 30 days
SELECT
event_date,
SUM(revenue) AS total_revenue,
SUM(order_count) AS total_orders,
ROUND(AVG(avg_order_value), 2) AS avg_order_value
FROM lakehouse.gold.daily_revenue
WHERE event_date >= current_date() - INTERVAL 30 DAYS
GROUP BY event_date
ORDER BY event_date;
-- Top customers by LTV
SELECT
customer_id,
ltv_segment,
total_spend,
total_orders,
avg_order_value,
customer_tenure_days
FROM lakehouse.gold.customer_ltv
ORDER BY total_spend DESC
LIMIT 100;
-- Product performance comparison
SELECT
product,
SUM(revenue) AS total_revenue,
SUM(units_sold) AS total_units,
ROUND(SUM(revenue) / SUM(units_sold), 2) AS revenue_per_unit
FROM lakehouse.gold.daily_revenue
GROUP BY product
ORDER BY total_revenue DESC;
End-to-End Orchestration with Jobs
Workflow Orchestration
Databricks Jobs allow you to chain multiple tasks into a directed acyclic graph (DAG). Each layer of the medallion architecture becomes a task with dependencies.
Job Configuration
{
"name": "Lakehouse ETL Pipeline",
"tasks": [
{
"task_key": "bronze_ingestion",
"notebook_task": {
"notebook_path": "/Repos/project/03_bronze_ingestion"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "silver_processing",
"depends_on": [{ "task_key": "bronze_ingestion" }],
"notebook_task": {
"notebook_path": "/Repos/project/04_silver_transactions"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "data_quality",
"depends_on": [{ "task_key": "silver_processing" }],
"notebook_task": {
"notebook_path": "/Repos/project/05_data_quality"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "gold_revenue",
"depends_on": [{ "task_key": "data_quality" }],
"notebook_task": {
"notebook_path": "/Repos/project/06_gold_daily_revenue"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "gold_customer_ltv",
"depends_on": [{ "task_key": "data_quality" }],
"notebook_task": {
"notebook_path": "/Repos/project/07_gold_customer_ltv"
},
"job_cluster_key": "etl_cluster"
}
],
"schedule": {
"quartz_cron_expression": "0 0 6 * * ?",
"timezone_id": "UTC"
},
"job_clusters": [{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
"autoscale": { "min_workers": 1, "max_workers": 4 }
}
}]
}
Monitoring & Alerting
Critical Monitoring Points
- Pipeline failures: Set email/Slack alerts on job failures
- Data quality drops: Monitor assertion failures in quality checks
- Data freshness: Alert if Gold tables are not updated within SLA
- Volume anomalies: Track record counts to detect missing or duplicate data
-- Monitor data freshness across layers
SELECT
'bronze' AS layer,
MAX(_ingested_at) AS latest_record,
timestampdiff(HOUR, MAX(_ingested_at), current_timestamp()) AS hours_since_update,
COUNT(*) AS total_records
FROM lakehouse.bronze.raw_transactions
UNION ALL
SELECT
'silver',
MAX(_processed_at),
timestampdiff(HOUR, MAX(_processed_at), current_timestamp()),
COUNT(*)
FROM lakehouse.silver.clean_transactions
UNION ALL
SELECT
'gold',
MAX(_computed_at),
timestampdiff(HOUR, MAX(_computed_at), current_timestamp()),
COUNT(*)
FROM lakehouse.gold.daily_revenue;
Practice Problems
Hard Add a Product Dimension Table
Extend the lakehouse with a product dimension:
- Create a Bronze table for product catalog data (CSV ingest)
- Build a Silver product table with SCD Type 2 tracking
- Join product attributes into the Gold revenue table
Use Delta Lake MERGE with condition columns is_current and effective_date / end_date to implement SCD Type 2.
# SCD Type 2 merge for product dimension
products_updates = spark.read.table("lakehouse.bronze.raw_products")
target = DeltaTable.forName(spark, "lakehouse.silver.dim_products")
(target.alias("t")
.merge(products_updates.alias("s"),
"t.product_id = s.product_id AND t.is_current = true")
.whenMatchedUpdate(
condition="t.product_name != s.product_name OR t.category != s.category",
set={"is_current": "false", "end_date": "current_date()"}
)
.whenNotMatchedInsert(values={
"product_id": "s.product_id",
"product_name": "s.product_name",
"category": "s.category",
"is_current": "true",
"effective_date": "current_date()",
"end_date": "null"
})
.execute()
)
Hard Implement Data Lineage Tracking
Build a lineage tracking system:
- Log pipeline run metadata (start time, end time, record counts) to a control table
- Track which Bronze records flow into which Silver records
- Create a dashboard showing pipeline health over time
Create a lakehouse.meta.pipeline_runs table with columns for run_id, layer, task, start_time, end_time, records_read, records_written, and status.
# Pipeline run logging utility
import uuid
from datetime import datetime
def log_pipeline_run(layer, task, records_read, records_written, status):
run_data = [(
str(uuid.uuid4()),
layer, task,
datetime.now().isoformat(),
records_read, records_written, status
)]
schema = "run_id STRING, layer STRING, task STRING, " \
"run_time STRING, records_read LONG, " \
"records_written LONG, status STRING"
spark.createDataFrame(run_data, schema).write \
.format("delta").mode("append") \
.saveAsTable("lakehouse.meta.pipeline_runs")
Medium Add Data Retention Policies
Implement data lifecycle management:
- Configure VACUUM on Bronze tables to remove files older than 30 days
- Set up OPTIMIZE with Z-ORDER on Silver tables for query performance
- Schedule maintenance jobs to run during off-peak hours
Use VACUUM table_name RETAIN 720 HOURS and OPTIMIZE table_name ZORDER BY (column). Set delta.logRetentionDuration table property.
-- Optimize Silver table with Z-ORDER
OPTIMIZE lakehouse.silver.clean_transactions
ZORDER BY (customer_id, event_date);
-- Vacuum Bronze table (remove files older than 30 days)
VACUUM lakehouse.bronze.raw_transactions RETAIN 720 HOURS;
-- Set table properties for retention
ALTER TABLE lakehouse.bronze.raw_transactions
SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedFileRetentionDuration' = 'interval 30 days'
);
Quick Reference
Medallion Architecture Cheat Sheet
| Layer | Purpose | Key Operations |
|---|---|---|
| Bronze | Raw data ingestion, preserve original format | Auto Loader, COPY INTO, append-only writes |
| Silver | Cleaned, validated, enriched data | MERGE (upsert), dedup, type casting, joins |
| Gold | Business aggregates, analytics-ready | GROUP BY, window functions, overwrite |
Essential Commands
| Command | Description |
|---|---|
DESCRIBE HISTORY table |
View Delta table version history |
OPTIMIZE table ZORDER BY (col) |
Compact files and co-locate data |
VACUUM table RETAIN N HOURS |
Remove old data files |
SELECT * FROM table VERSION AS OF N |
Time travel to a specific version |