Medallion Architecture

Medium 30 min read

What is Medallion Architecture?

Why Medallion Architecture Matters

The Problem: Raw data is messy, inconsistent, and not immediately useful. Without a clear organizational pattern, data lakes become "data swamps" where nobody trusts the data or knows which version to use.

The Solution: The Medallion Architecture organizes your lakehouse into three progressive layers -- Bronze, Silver, and Gold -- each adding more structure, cleanliness, and business value to the data.

Real Impact: Teams using Medallion Architecture report up to 70% reduction in time-to-insight because analysts always know where to find trusted, query-ready data.

Real-World Analogy

Think of Medallion Architecture like a water treatment plant:

  • Bronze (Raw Intake) = Water pumped directly from the source -- unfiltered, may contain debris
  • Silver (Filtration) = Water passed through filters -- cleaned, standardized, safe to use
  • Gold (Bottled/Tap Ready) = Purified water ready for specific uses -- drinking, cooking, industrial

The Three Layers at a Glance

Medallion Architecture: Bronze to Silver to Gold Data Flow
Data Sources Kafka Topics REST APIs Databases CSV / JSON IoT Sensors Bronze Layer (Raw Ingestion) Raw data as-is Append-only ingestion Metadata columns added Schema-on-read Full history retained Delta format Clean Validate Silver Layer (Cleaned & Conformed) Deduplication Schema enforcement Type casting & nulls Join reference data Data quality checks SCD Type 2 tracking Aggregate Model Gold Layer (Business-Ready) Aggregated metrics Business KPIs Star schema facts Dimension tables ML feature tables Dashboard-ready Landing zone -- keep everything Enterprise data model Consumption-ready

Bronze -- Raw Ingestion

Land raw data exactly as it arrives from source systems. Append-only, full history, no transformations. The "single source of truth" for replay and auditing.

Silver -- Cleaned & Conformed

Apply data quality rules: deduplicate, enforce schemas, cast types, handle nulls, join reference data. This is your enterprise-wide, validated data layer.

Gold -- Business Aggregations

Produce purpose-built datasets: aggregated KPIs, star-schema dimensional models, ML feature tables, and dashboard-ready summaries for specific business needs.

Bronze Layer: Raw Ingestion

The Bronze layer is where all data lands first. The guiding principle is ingest everything, transform nothing. You want a complete, immutable record of every piece of data that enters your lakehouse.

Bronze Layer Design Principles

Key Rules for Bronze

  • Append-only: Never update or delete records in Bronze -- always add new rows
  • Schema-on-read: Store data in its original format; do not force a schema at write time
  • Metadata enrichment: Add ingestion timestamp, source file name, and batch ID
  • Full fidelity: Preserve every field from the source, including fields you do not currently need
  • Delta format: Use Delta Lake for ACID transactions and time travel capability

PySpark: Ingesting to Bronze

PySpark - Bronze Layer Ingestion
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType

# ── Bronze Ingestion: Raw JSON events from cloud storage ──

# Read raw data (JSON files landing in cloud storage)
raw_df = (
    spark.read
    .format("json")
    .option("multiLine", True)
    .option("mode", "PERMISSIVE")          # Don't fail on bad records
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .load("s3://raw-data/events/2024/03/")
)

# Add Bronze metadata columns
bronze_df = (
    raw_df
    .withColumn("_ingestion_timestamp", F.current_timestamp())
    .withColumn("_source_file", F.input_file_name())
    .withColumn("_batch_id", F.lit("batch_20240315_001"))
)

# Write to Bronze Delta table (append-only)
(
    bronze_df.write
    .format("delta")
    .mode("append")
    .partitionBy("_ingestion_date")
    .saveAsTable("bronze.raw_events")
)

print("Bronze ingestion complete")
print(f"Records ingested: {bronze_df.count()}")

Auto Loader for Continuous Bronze Ingestion

PySpark - Auto Loader for Bronze
# Auto Loader automatically picks up new files as they arrive
# Much more efficient than manual file listing

bronze_stream = (
    spark.readStream
    .format("cloudFiles")                   # Auto Loader
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation",
            "s3://checkpoints/bronze/events/schema")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://raw-data/events/")
    .withColumn("_ingestion_timestamp", F.current_timestamp())
    .withColumn("_source_file", F.input_file_name())
)

# Write as streaming Delta table
(
    bronze_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation",
            "s3://checkpoints/bronze/events/checkpoint")
    .trigger(availableNow=True)            # Process all available, then stop
    .toTable("bronze.raw_events")
)

Silver Layer: Cleaned & Conformed

The Silver layer is where data gets cleaned, validated, and conformed to an enterprise data model. This is the most labor-intensive transformation step, but it is where you build trust in your data.

Silver Layer Transformations

Transformation Purpose Example
Deduplication Remove duplicate records dropDuplicates(["event_id"])
Schema Enforcement Cast to expected types col("price").cast("decimal(10,2)")
Null Handling Fill or filter invalid data fillna({"status": "unknown"})
Standardization Consistent formats Dates to ISO 8601, emails to lowercase
Reference Joins Enrich with dimension data Join user_id to users dimension table
Quality Checks Validate business rules price > 0, valid email format

PySpark: Bronze to Silver

PySpark - Silver Layer Transformation
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType

# ── Silver Transformation: Clean and validate events ──

# Read from Bronze
bronze_events = spark.table("bronze.raw_events")

# Apply Silver transformations
silver_events = (
    bronze_events
    # 1. Remove duplicates based on event_id
    .dropDuplicates(["event_id"])

    # 2. Enforce schema -- cast to correct types
    .withColumn("event_timestamp",
        F.to_timestamp("event_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    .withColumn("amount",
        F.col("amount").cast(DecimalType(10, 2)))
    .withColumn("user_id",
        F.col("user_id").cast("long"))

    # 3. Handle nulls
    .filter(F.col("event_id").isNotNull())
    .fillna({"country": "unknown", "device_type": "unknown"})

    # 4. Standardize values
    .withColumn("email", F.lower(F.trim(F.col("email"))))
    .withColumn("event_type", F.upper(F.trim(F.col("event_type"))))

    # 5. Add derived columns
    .withColumn("event_date", F.to_date("event_timestamp"))
    .withColumn("event_hour", F.hour("event_timestamp"))

    # 6. Drop Bronze metadata columns
    .drop("_corrupt_record", "_source_file", "_batch_id")
)

# Data quality filter -- quarantine bad records
good_records = silver_events.filter(
    (F.col("amount") >= 0) &
    (F.col("event_timestamp").isNotNull()) &
    (F.col("user_id").isNotNull())
)

bad_records = silver_events.filter(
    (F.col("amount") < 0) |
    (F.col("event_timestamp").isNull()) |
    (F.col("user_id").isNull())
)

# Write good records to Silver
(
    good_records.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .partitionBy("event_date")
    .saveAsTable("silver.events")
)

# Write bad records to quarantine table
(
    bad_records.write
    .format("delta")
    .mode("append")
    .saveAsTable("silver.events_quarantine")
)

print(f"Silver: {good_records.count()} good, {bad_records.count()} quarantined")

SQL: Silver Layer Quality Checks

SQL - Data Quality Validation
-- Check Silver layer data quality metrics
SELECT
    COUNT(*)                                    AS total_records,
    COUNT(DISTINCT event_id)                    AS unique_events,
    COUNT(*) - COUNT(DISTINCT event_id)         AS duplicate_count,
    SUM(CASE WHEN amount IS NULL
        THEN 1 ELSE 0 END)                     AS null_amounts,
    ROUND(AVG(amount), 2)                      AS avg_amount,
    MIN(event_timestamp)                        AS earliest_event,
    MAX(event_timestamp)                        AS latest_event
FROM silver.events
WHERE event_date = current_date();

Gold Layer: Business Aggregations

The Gold layer is tailored for specific business use cases. Unlike Bronze and Silver, which serve the entire organization, Gold tables are purpose-built for analytics dashboards, ML features, or reporting needs.

Common Gold Layer Patterns

Aggregation Tables

Pre-computed metrics like daily revenue, hourly active users, or weekly conversion rates. Dramatically speeds up dashboard queries.

Star Schema / Dimensional

Fact tables (transactions, events) with dimension tables (users, products, dates) for BI tool compatibility and efficient analytical queries.

ML Feature Tables

Pre-computed features for machine learning models: user behavior aggregates, rolling averages, frequency counts, and encoded categoricals.

Report Snapshots

Point-in-time snapshots of key business metrics for regulatory reporting, compliance audits, and historical trend analysis.

PySpark: Silver to Gold Aggregation

PySpark - Gold Layer: Daily Revenue Metrics
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ── Gold Layer: Daily Revenue Dashboard ──

silver_events = spark.table("silver.events")

# Aggregate daily revenue metrics
gold_daily_revenue = (
    silver_events
    .filter(F.col("event_type") == "PURCHASE")
    .groupBy("event_date", "country", "device_type")
    .agg(
        F.count("*").alias("total_transactions"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value"),
        F.countDistinct("user_id").alias("unique_customers"),
        F.min("amount").alias("min_order"),
        F.max("amount").alias("max_order"),
        F.percentile_approx("amount", 0.5).alias("median_order")
    )
    # Add rolling 7-day average
    .withColumn("revenue_7d_avg",
        F.avg("total_revenue").over(
            Window.partitionBy("country")
            .orderBy("event_date")
            .rowsBetween(-6, 0)
        ))
    .withColumn("_gold_updated_at", F.current_timestamp())
)

# Write to Gold
(
    gold_daily_revenue.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .saveAsTable("gold.daily_revenue")
)

print("Gold daily revenue table updated")

SQL: Gold Layer Star Schema

SQL - Gold Layer: Dimensional Model
-- Gold Fact Table: fact_orders
CREATE OR REPLACE TABLE gold.fact_orders AS
SELECT
    e.event_id              AS order_id,
    e.user_id,
    e.event_date            AS order_date,
    e.event_hour            AS order_hour,
    e.amount                AS order_amount,
    e.device_type,
    u.age_group,
    u.signup_date,
    u.lifetime_value,
    p.product_category,
    p.product_name,
    g.region,
    g.continent
FROM silver.events e
LEFT JOIN silver.users u ON e.user_id = u.user_id
LEFT JOIN silver.products p ON e.product_id = p.product_id
LEFT JOIN silver.geo_lookup g ON e.country = g.country_code
WHERE e.event_type = 'PURCHASE';

-- Gold Dimension Table: dim_date
CREATE OR REPLACE TABLE gold.dim_date AS
SELECT DISTINCT
    event_date                              AS date_key,
    year(event_date)                        AS year,
    month(event_date)                       AS month,
    day(event_date)                         AS day,
    dayofweek(event_date)                   AS day_of_week,
    weekofyear(event_date)                  AS week_of_year,
    quarter(event_date)                     AS quarter,
    CASE WHEN dayofweek(event_date) IN (1, 7)
        THEN TRUE ELSE FALSE END            AS is_weekend
FROM silver.events;

Full PySpark Implementation

Here is a complete, production-ready implementation that ties all three layers together in a single reusable pipeline class.

PySpark - Complete Medallion Pipeline Class
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

class MedallionPipeline:
    """Reusable Medallion Architecture pipeline."""

    def __init__(self, spark, source_path, catalog="main"):
        self.spark = spark
        self.source_path = source_path
        self.catalog = catalog

    def ingest_to_bronze(self, table_name, file_format="json"):
        """Ingest raw data to Bronze layer using Auto Loader."""
        return (
            self.spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", file_format)
            .option("cloudFiles.schemaLocation",
                    f"dbfs:/checkpoints/bronze/{table_name}/schema")
            .option("cloudFiles.inferColumnTypes", True)
            .load(self.source_path)
            .withColumn("_ingested_at", F.current_timestamp())
            .withColumn("_source_file", F.input_file_name())
            .writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation",
                    f"dbfs:/checkpoints/bronze/{table_name}/cp")
            .trigger(availableNow=True)
            .toTable(f"{self.catalog}.bronze.{table_name}")
        )

    def bronze_to_silver(self, source_table, target_table,
                         dedup_cols, quality_rules):
        """Transform Bronze data to Silver with quality checks."""
        bronze_df = self.spark.table(
            f"{self.catalog}.bronze.{source_table}")

        # Deduplicate
        cleaned = bronze_df.dropDuplicates(dedup_cols)

        # Apply quality rules (keep good, quarantine bad)
        quality_filter = quality_rules  # Column expression
        good = cleaned.filter(quality_filter)
        bad = cleaned.filter(~quality_filter)

        # Write Silver
        good.write.format("delta").mode("overwrite") \
            .saveAsTable(f"{self.catalog}.silver.{target_table}")

        # Quarantine bad records
        bad.write.format("delta").mode("append") \
            .saveAsTable(
                f"{self.catalog}.silver.{target_table}_quarantine")

        return good.count(), bad.count()

    def silver_to_gold(self, query, target_table):
        """Aggregate Silver data into Gold using SQL."""
        gold_df = self.spark.sql(query)
        gold_df = gold_df.withColumn(
            "_gold_updated_at", F.current_timestamp())
        gold_df.write.format("delta").mode("overwrite") \
            .saveAsTable(f"{self.catalog}.gold.{target_table}")
        return gold_df.count()

# ── Usage Example ──
pipeline = MedallionPipeline(spark, "s3://raw-data/orders/")

# Step 1: Ingest to Bronze
pipeline.ingest_to_bronze("raw_orders")

# Step 2: Clean to Silver
quality = (F.col("order_id").isNotNull()) & (F.col("amount") > 0)
good, bad = pipeline.bronze_to_silver(
    "raw_orders", "orders", ["order_id"], quality)
print(f"Silver: {good} good, {bad} quarantined")

# Step 3: Aggregate to Gold
count = pipeline.silver_to_gold("""
    SELECT order_date, country,
           COUNT(*) as total_orders,
           SUM(amount) as total_revenue,
           AVG(amount) as avg_order_value
    FROM main.silver.orders
    GROUP BY order_date, country
""", "daily_orders_summary")
print(f"Gold: {count} rows written")

Data Quality at Each Layer

Data quality is not a one-time check -- it should be enforced progressively at each layer of the Medallion Architecture. Each layer adds stricter quality gates.

Layer Quality Level Checks Applied Action on Failure
Bronze Minimal File format valid, not empty, parseable Log to _corrupt_record column
Silver Strict Schema, nulls, ranges, uniqueness, referential integrity Quarantine bad records to separate table
Gold Business Rules KPI thresholds, completeness, freshness, consistency Alert data team, block dashboard refresh

Delta Lake Constraints for Quality

SQL - Delta Table Constraints
-- Add quality constraints directly to Delta tables

-- Silver: Enforce NOT NULL on key columns
ALTER TABLE silver.events
ADD CONSTRAINT valid_event_id CHECK (event_id IS NOT NULL);

ALTER TABLE silver.events
ADD CONSTRAINT valid_timestamp CHECK (event_timestamp IS NOT NULL);

ALTER TABLE silver.events
ADD CONSTRAINT positive_amount CHECK (amount >= 0);

-- Gold: Ensure aggregation completeness
ALTER TABLE gold.daily_revenue
ADD CONSTRAINT valid_revenue CHECK (total_revenue >= 0);

ALTER TABLE gold.daily_revenue
ADD CONSTRAINT valid_count CHECK (total_transactions > 0);

Monitoring Data Quality Metrics

PySpark - Quality Monitoring
def compute_quality_metrics(table_name, key_col, date_col):
    """Compute data quality metrics for any table."""
    df = spark.table(table_name)

    metrics = df.agg(
        F.count("*").alias("total_rows"),
        F.countDistinct(key_col).alias("unique_keys"),
        F.count(F.when(F.col(key_col).isNull(), 1))
            .alias("null_key_count"),
        F.max(date_col).alias("latest_record"),
        F.min(date_col).alias("earliest_record"),
    ).withColumn("duplicate_count",
        F.col("total_rows") - F.col("unique_keys")
    ).withColumn("quality_score",
        F.round(
            (F.col("unique_keys") / F.col("total_rows")) * 100, 2)
    ).withColumn("measured_at", F.current_timestamp())

    return metrics

# Run quality checks across all layers
bronze_q = compute_quality_metrics(
    "bronze.raw_events", "event_id", "_ingested_at")
silver_q = compute_quality_metrics(
    "silver.events", "event_id", "event_timestamp")
gold_q = compute_quality_metrics(
    "gold.daily_revenue", "event_date", "event_date")

display(bronze_q.union(silver_q).union(gold_q))

Practice Problems

Problem 1: Design a Medallion Pipeline

Medium

Your e-commerce company receives order data from three sources: a web application (JSON), a mobile app (Avro), and a POS system (CSV). Design a Medallion Architecture pipeline that unifies this data into a single Gold-layer table for the finance dashboard.

Problem 2: Handle Late-Arriving Data

Medium

Your IoT pipeline receives sensor data that can arrive up to 48 hours late. How do you handle this in a Medallion Architecture without reprocessing the entire Silver and Gold layers?

Problem 3: Quality Gate Implementation

Hard

Design a quality gate system where the Gold layer is only updated if Silver-layer data quality metrics meet minimum thresholds: at least 99.5% uniqueness, less than 0.1% null rate on key columns, and data freshness within 4 hours.