Project: End-to-End Data Lakehouse

Hard 40 min read

Project Overview & Requirements

What You Will Build

Goal: Build a production-grade data lakehouse on Databricks using the medallion architecture pattern.

Data: E-commerce transaction data from multiple sources (CSV files in S3/ADLS, JSON event streams, relational database exports).

Outcome: A fully orchestrated pipeline that ingests raw data, cleans and enriches it, creates business-level aggregates, and serves dashboards via SQL Warehouses.

Prerequisites

  • Databricks workspace with Unity Catalog enabled
  • Access to a cloud storage account (S3 or ADLS)
  • Familiarity with PySpark and Delta Lake basics
  • Understanding of the medallion architecture pattern

Architecture Overview

End-to-End Lakehouse Architecture
Sources Bronze (Raw) Silver (Clean) Gold (Business) Consumers S3 / ADLS JSON Events Database CDC CSV Uploads Auto Loader Raw Transactions Raw Events Raw Products Clean Transactions Enriched Events Product Catalog Customer 360 Daily Revenue Product Metrics Customer LTV Funnel Analysis SQL Warehouse Dashboards BI Tools ML Models Databricks Jobs Orchestration (Scheduled Workflows)

Data Source Setup (S3/ADLS)

Supported Data Sources

This project uses cloud object storage as the primary landing zone. The same patterns apply whether you use AWS S3 or Azure Data Lake Storage Gen2 (ADLS).

Configure External Storage Access

01_setup_storage.py
# Create a Unity Catalog external location
# This allows Databricks to read from your cloud storage

# Step 1: Create a storage credential (Admin UI or SQL)
spark.sql("""
CREATE STORAGE CREDENTIAL IF NOT EXISTS lakehouse_cred
COMMENT 'Storage credential for lakehouse project'
""")

# Step 2: Create external location pointing to your landing zone
spark.sql("""
CREATE EXTERNAL LOCATION IF NOT EXISTS landing_zone
URL 's3://my-lakehouse-landing/'
WITH (STORAGE CREDENTIAL lakehouse_cred)
COMMENT 'Raw data landing zone for lakehouse project'
""")

# Step 3: Create the catalog and schemas for medallion layers
spark.sql("CREATE CATALOG IF NOT EXISTS lakehouse")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.gold")

Generate Sample Data

02_generate_sample_data.py
from pyspark.sql import functions as F
from pyspark.sql.types import *
import random, uuid
from datetime import datetime, timedelta

# Generate transactions data
num_records = 100000
products = ["laptop", "phone", "tablet", "headphones", "monitor"]
statuses = ["completed", "pending", "refunded", "cancelled"]

data = []
for i in range(num_records):
    data.append((
        str(uuid.uuid4()),
        f"CUST-{random.randint(1, 5000):05d}",
        random.choice(products),
        random.randint(1, 5),
        round(random.uniform(29.99, 2499.99), 2),
        random.choice(statuses),
        (datetime.now() - timedelta(days=random.randint(0, 365))).isoformat()
    ))

schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("product", StringType()),
    StructField("quantity", IntegerType()),
    StructField("price", DoubleType()),
    StructField("status", StringType()),
    StructField("event_time", StringType()),
])

df = spark.createDataFrame(data, schema)
df.write.format("json").mode("overwrite").save(
    "s3://my-lakehouse-landing/transactions/"
)

Bronze Layer (Auto Loader Ingestion)

Bronze Layer Purpose

The Bronze layer is your raw data landing zone. Data is ingested as-is with minimal transformation, preserving the original schema and adding ingestion metadata (source file, timestamp). Auto Loader handles incremental ingestion efficiently.

Auto Loader Configuration

03_bronze_ingestion.py
from pyspark.sql import functions as F

# Auto Loader: Incrementally ingest new files from landing zone
# cloudFiles source automatically tracks which files have been processed

bronze_transactions = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation",
                "/mnt/checkpoints/bronze/transactions/schema")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .load("s3://my-lakehouse-landing/transactions/")
        # Add ingestion metadata
        .withColumn("_ingested_at", F.current_timestamp())
        .withColumn("_source_file", F.input_file_name())
        .withColumn("_ingestion_date", F.current_date())
)

# Write to Bronze Delta table with partitioning
(bronze_transactions.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation",
            "/mnt/checkpoints/bronze/transactions")
    .option("mergeSchema", "true")
    .partitionBy("_ingestion_date")
    .trigger(availableNow=True)
    .toTable("lakehouse.bronze.raw_transactions")
)

Bronze Layer Best Practices

Schema Evolution

Use schemaEvolutionMode = "addNewColumns" so new fields in source data are automatically captured without breaking the pipeline.

Ingestion Metadata

Always add _ingested_at, _source_file, and _ingestion_date columns to trace data lineage back to source files.

Partition Strategy

Partition Bronze tables by ingestion date to enable efficient pruning and time-travel queries on raw data.

Silver Layer (Cleaning & Enrichment)

Silver Layer Purpose

The Silver layer contains cleaned, validated, and enriched data. This is where you apply data quality rules, deduplicate records, cast data types, and join related datasets together.

Data Cleaning Pipeline

04_silver_transactions.py
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Read from Bronze
bronze_df = spark.read.table("lakehouse.bronze.raw_transactions")

# Step 1: Data type casting and validation
cleaned_df = (bronze_df
    .withColumn("event_time",
                F.to_timestamp("event_time", "yyyy-MM-dd'T'HH:mm:ss"))
    .withColumn("price", F.col("price").cast("decimal(10,2)"))
    .withColumn("quantity", F.col("quantity").cast("integer"))
    .withColumn("total_amount",
                F.round(F.col("price") * F.col("quantity"), 2))
)

# Step 2: Filter invalid records
valid_df = (cleaned_df
    .filter(F.col("transaction_id").isNotNull())
    .filter(F.col("price") > 0)
    .filter(F.col("quantity") > 0)
    .filter(F.col("event_time").isNotNull())
)

# Step 3: Deduplicate using window function
window_spec = Window.partitionBy("transaction_id").orderBy(
    F.col("_ingested_at").desc()
)
deduped_df = (valid_df
    .withColumn("row_num", F.row_number().over(window_spec))
    .filter(F.col("row_num") == 1)
    .drop("row_num")
)

# Step 4: Enrich with derived columns
enriched_df = (deduped_df
    .withColumn("event_date", F.to_date("event_time"))
    .withColumn("event_hour", F.hour("event_time"))
    .withColumn("day_of_week", F.dayofweek("event_time"))
    .withColumn("is_weekend",
                F.when(F.col("day_of_week").isin(1, 7), True)
                .otherwise(False))
    .withColumn("_processed_at", F.current_timestamp())
)

# Step 5: MERGE into Silver (upsert pattern)
if spark.catalog.tableExists("lakehouse.silver.clean_transactions"):
    silver_table = DeltaTable.forName(spark,
        "lakehouse.silver.clean_transactions")

    (silver_table.alias("target")
        .merge(enriched_df.alias("source"),
               "target.transaction_id = source.transaction_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    (enriched_df.write
        .format("delta")
        .partitionBy("event_date")
        .saveAsTable("lakehouse.silver.clean_transactions")
    )

Data Quality Checks

05_data_quality.py
# Data quality validation after Silver processing
silver_df = spark.read.table("lakehouse.silver.clean_transactions")

# Check: No null transaction IDs
null_count = silver_df.filter(F.col("transaction_id").isNull()).count()
assert null_count == 0, f"Found {null_count} null transaction IDs"

# Check: All prices are positive
neg_prices = silver_df.filter(F.col("price") <= 0).count()
assert neg_prices == 0, f"Found {neg_prices} non-positive prices"

# Check: No duplicate transaction IDs
total = silver_df.count()
distinct = silver_df.select("transaction_id").distinct().count()
assert total == distinct, f"Found {total - distinct} duplicate records"

# Check: Record count within expected range
assert total > 0, "Silver table is empty"

print(f"Data quality checks passed. Total records: {total}")

Gold Layer (Business Aggregates)

Gold Layer Purpose

The Gold layer contains business-level aggregations and metrics optimized for consumption by analysts, dashboards, and ML models. Each Gold table answers specific business questions.

Daily Revenue Aggregation

06_gold_daily_revenue.py
from pyspark.sql import functions as F

silver_df = spark.read.table("lakehouse.silver.clean_transactions")

# Daily revenue by product
daily_revenue = (silver_df
    .filter(F.col("status") == "completed")
    .groupBy("event_date", "product")
    .agg(
        F.sum("total_amount").alias("revenue"),
        F.count("transaction_id").alias("order_count"),
        F.sum("quantity").alias("units_sold"),
        F.avg("total_amount").alias("avg_order_value"),
        F.countDistinct("customer_id").alias("unique_customers")
    )
    .withColumn("_computed_at", F.current_timestamp())
)

# Overwrite Gold table (full rebuild for aggregates)
(daily_revenue.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("lakehouse.gold.daily_revenue")
)

Customer Lifetime Value

07_gold_customer_ltv.py
# Customer Lifetime Value (LTV) calculation
customer_ltv = (silver_df
    .filter(F.col("status") == "completed")
    .groupBy("customer_id")
    .agg(
        F.sum("total_amount").alias("total_spend"),
        F.count("transaction_id").alias("total_orders"),
        F.avg("total_amount").alias("avg_order_value"),
        F.min("event_date").alias("first_purchase"),
        F.max("event_date").alias("last_purchase"),
        F.countDistinct("product").alias("unique_products")
    )
    .withColumn("customer_tenure_days",
                F.datediff("last_purchase", "first_purchase"))
    .withColumn("ltv_segment",
        F.when(F.col("total_spend") >= 10000, "Platinum")
        .when(F.col("total_spend") >= 5000, "Gold")
        .when(F.col("total_spend") >= 1000, "Silver")
        .otherwise("Bronze"))
    .withColumn("_computed_at", F.current_timestamp())
)

(customer_ltv.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("lakehouse.gold.customer_ltv")
)

Serving Layer (SQL Warehouses + Dashboards)

SQL Warehouse Configuration

Use a Serverless SQL Warehouse for cost-efficient querying of Gold tables. Serverless warehouses start instantly and auto-scale based on demand.

Dashboard Queries

08_dashboard_queries.sql
-- Revenue trend for the last 30 days
SELECT
    event_date,
    SUM(revenue) AS total_revenue,
    SUM(order_count) AS total_orders,
    ROUND(AVG(avg_order_value), 2) AS avg_order_value
FROM lakehouse.gold.daily_revenue
WHERE event_date >= current_date() - INTERVAL 30 DAYS
GROUP BY event_date
ORDER BY event_date;

-- Top customers by LTV
SELECT
    customer_id,
    ltv_segment,
    total_spend,
    total_orders,
    avg_order_value,
    customer_tenure_days
FROM lakehouse.gold.customer_ltv
ORDER BY total_spend DESC
LIMIT 100;

-- Product performance comparison
SELECT
    product,
    SUM(revenue) AS total_revenue,
    SUM(units_sold) AS total_units,
    ROUND(SUM(revenue) / SUM(units_sold), 2) AS revenue_per_unit
FROM lakehouse.gold.daily_revenue
GROUP BY product
ORDER BY total_revenue DESC;

End-to-End Orchestration with Jobs

Workflow Orchestration

Databricks Jobs allow you to chain multiple tasks into a directed acyclic graph (DAG). Each layer of the medallion architecture becomes a task with dependencies.

Job Configuration

09_job_config.json
{
  "name": "Lakehouse ETL Pipeline",
  "tasks": [
    {
      "task_key": "bronze_ingestion",
      "notebook_task": {
        "notebook_path": "/Repos/project/03_bronze_ingestion"
      },
      "job_cluster_key": "etl_cluster"
    },
    {
      "task_key": "silver_processing",
      "depends_on": [{ "task_key": "bronze_ingestion" }],
      "notebook_task": {
        "notebook_path": "/Repos/project/04_silver_transactions"
      },
      "job_cluster_key": "etl_cluster"
    },
    {
      "task_key": "data_quality",
      "depends_on": [{ "task_key": "silver_processing" }],
      "notebook_task": {
        "notebook_path": "/Repos/project/05_data_quality"
      },
      "job_cluster_key": "etl_cluster"
    },
    {
      "task_key": "gold_revenue",
      "depends_on": [{ "task_key": "data_quality" }],
      "notebook_task": {
        "notebook_path": "/Repos/project/06_gold_daily_revenue"
      },
      "job_cluster_key": "etl_cluster"
    },
    {
      "task_key": "gold_customer_ltv",
      "depends_on": [{ "task_key": "data_quality" }],
      "notebook_task": {
        "notebook_path": "/Repos/project/07_gold_customer_ltv"
      },
      "job_cluster_key": "etl_cluster"
    }
  ],
  "schedule": {
    "quartz_cron_expression": "0 0 6 * * ?",
    "timezone_id": "UTC"
  },
  "job_clusters": [{
    "job_cluster_key": "etl_cluster",
    "new_cluster": {
      "spark_version": "14.3.x-scala2.12",
      "node_type_id": "i3.xlarge",
      "num_workers": 2,
      "autoscale": { "min_workers": 1, "max_workers": 4 }
    }
  }]
}

Monitoring & Alerting

Critical Monitoring Points

  • Pipeline failures: Set email/Slack alerts on job failures
  • Data quality drops: Monitor assertion failures in quality checks
  • Data freshness: Alert if Gold tables are not updated within SLA
  • Volume anomalies: Track record counts to detect missing or duplicate data
10_monitoring.sql
-- Monitor data freshness across layers
SELECT
    'bronze' AS layer,
    MAX(_ingested_at) AS latest_record,
    timestampdiff(HOUR, MAX(_ingested_at), current_timestamp()) AS hours_since_update,
    COUNT(*) AS total_records
FROM lakehouse.bronze.raw_transactions
UNION ALL
SELECT
    'silver',
    MAX(_processed_at),
    timestampdiff(HOUR, MAX(_processed_at), current_timestamp()),
    COUNT(*)
FROM lakehouse.silver.clean_transactions
UNION ALL
SELECT
    'gold',
    MAX(_computed_at),
    timestampdiff(HOUR, MAX(_computed_at), current_timestamp()),
    COUNT(*)
FROM lakehouse.gold.daily_revenue;

Practice Problems

Hard Add a Product Dimension Table

Extend the lakehouse with a product dimension:

  1. Create a Bronze table for product catalog data (CSV ingest)
  2. Build a Silver product table with SCD Type 2 tracking
  3. Join product attributes into the Gold revenue table

Use Delta Lake MERGE with condition columns is_current and effective_date / end_date to implement SCD Type 2.

# SCD Type 2 merge for product dimension
products_updates = spark.read.table("lakehouse.bronze.raw_products")

target = DeltaTable.forName(spark, "lakehouse.silver.dim_products")
(target.alias("t")
    .merge(products_updates.alias("s"),
           "t.product_id = s.product_id AND t.is_current = true")
    .whenMatchedUpdate(
        condition="t.product_name != s.product_name OR t.category != s.category",
        set={"is_current": "false", "end_date": "current_date()"}
    )
    .whenNotMatchedInsert(values={
        "product_id": "s.product_id",
        "product_name": "s.product_name",
        "category": "s.category",
        "is_current": "true",
        "effective_date": "current_date()",
        "end_date": "null"
    })
    .execute()
)

Hard Implement Data Lineage Tracking

Build a lineage tracking system:

  1. Log pipeline run metadata (start time, end time, record counts) to a control table
  2. Track which Bronze records flow into which Silver records
  3. Create a dashboard showing pipeline health over time

Create a lakehouse.meta.pipeline_runs table with columns for run_id, layer, task, start_time, end_time, records_read, records_written, and status.

# Pipeline run logging utility
import uuid
from datetime import datetime

def log_pipeline_run(layer, task, records_read, records_written, status):
    run_data = [(
        str(uuid.uuid4()),
        layer, task,
        datetime.now().isoformat(),
        records_read, records_written, status
    )]
    schema = "run_id STRING, layer STRING, task STRING, " \
             "run_time STRING, records_read LONG, " \
             "records_written LONG, status STRING"
    spark.createDataFrame(run_data, schema).write \
        .format("delta").mode("append") \
        .saveAsTable("lakehouse.meta.pipeline_runs")

Medium Add Data Retention Policies

Implement data lifecycle management:

  1. Configure VACUUM on Bronze tables to remove files older than 30 days
  2. Set up OPTIMIZE with Z-ORDER on Silver tables for query performance
  3. Schedule maintenance jobs to run during off-peak hours

Use VACUUM table_name RETAIN 720 HOURS and OPTIMIZE table_name ZORDER BY (column). Set delta.logRetentionDuration table property.

-- Optimize Silver table with Z-ORDER
OPTIMIZE lakehouse.silver.clean_transactions
ZORDER BY (customer_id, event_date);

-- Vacuum Bronze table (remove files older than 30 days)
VACUUM lakehouse.bronze.raw_transactions RETAIN 720 HOURS;

-- Set table properties for retention
ALTER TABLE lakehouse.bronze.raw_transactions
SET TBLPROPERTIES (
    'delta.logRetentionDuration' = 'interval 30 days',
    'delta.deletedFileRetentionDuration' = 'interval 30 days'
);

Quick Reference

Medallion Architecture Cheat Sheet

Layer Purpose Key Operations
Bronze Raw data ingestion, preserve original format Auto Loader, COPY INTO, append-only writes
Silver Cleaned, validated, enriched data MERGE (upsert), dedup, type casting, joins
Gold Business aggregates, analytics-ready GROUP BY, window functions, overwrite

Essential Commands

Command Description
DESCRIBE HISTORY table View Delta table version history
OPTIMIZE table ZORDER BY (col) Compact files and co-locate data
VACUUM table RETAIN N HOURS Remove old data files
SELECT * FROM table VERSION AS OF N Time travel to a specific version

Useful Resources