What is Medallion Architecture?
Why Medallion Architecture Matters
The Problem: Raw data is messy, inconsistent, and not immediately useful. Without a clear organizational pattern, data lakes become "data swamps" where nobody trusts the data or knows which version to use.
The Solution: The Medallion Architecture organizes your lakehouse into three progressive layers -- Bronze, Silver, and Gold -- each adding more structure, cleanliness, and business value to the data.
Real Impact: Teams using Medallion Architecture report up to 70% reduction in time-to-insight because analysts always know where to find trusted, query-ready data.
Real-World Analogy
Think of Medallion Architecture like a water treatment plant:
- Bronze (Raw Intake) = Water pumped directly from the source -- unfiltered, may contain debris
- Silver (Filtration) = Water passed through filters -- cleaned, standardized, safe to use
- Gold (Bottled/Tap Ready) = Purified water ready for specific uses -- drinking, cooking, industrial
The Three Layers at a Glance
Bronze -- Raw Ingestion
Land raw data exactly as it arrives from source systems. Append-only, full history, no transformations. The "single source of truth" for replay and auditing.
Silver -- Cleaned & Conformed
Apply data quality rules: deduplicate, enforce schemas, cast types, handle nulls, join reference data. This is your enterprise-wide, validated data layer.
Gold -- Business Aggregations
Produce purpose-built datasets: aggregated KPIs, star-schema dimensional models, ML feature tables, and dashboard-ready summaries for specific business needs.
Bronze Layer: Raw Ingestion
The Bronze layer is where all data lands first. The guiding principle is ingest everything, transform nothing. You want a complete, immutable record of every piece of data that enters your lakehouse.
Bronze Layer Design Principles
Key Rules for Bronze
- Append-only: Never update or delete records in Bronze -- always add new rows
- Schema-on-read: Store data in its original format; do not force a schema at write time
- Metadata enrichment: Add ingestion timestamp, source file name, and batch ID
- Full fidelity: Preserve every field from the source, including fields you do not currently need
- Delta format: Use Delta Lake for ACID transactions and time travel capability
PySpark: Ingesting to Bronze
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType
# ── Bronze Ingestion: Raw JSON events from cloud storage ──
# Read raw data (JSON files landing in cloud storage)
raw_df = (
spark.read
.format("json")
.option("multiLine", True)
.option("mode", "PERMISSIVE") # Don't fail on bad records
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load("s3://raw-data/events/2024/03/")
)
# Add Bronze metadata columns
bronze_df = (
raw_df
.withColumn("_ingestion_timestamp", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.withColumn("_batch_id", F.lit("batch_20240315_001"))
)
# Write to Bronze Delta table (append-only)
(
bronze_df.write
.format("delta")
.mode("append")
.partitionBy("_ingestion_date")
.saveAsTable("bronze.raw_events")
)
print("Bronze ingestion complete")
print(f"Records ingested: {bronze_df.count()}")
Auto Loader for Continuous Bronze Ingestion
# Auto Loader automatically picks up new files as they arrive
# Much more efficient than manual file listing
bronze_stream = (
spark.readStream
.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"s3://checkpoints/bronze/events/schema")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://raw-data/events/")
.withColumn("_ingestion_timestamp", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
)
# Write as streaming Delta table
(
bronze_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
"s3://checkpoints/bronze/events/checkpoint")
.trigger(availableNow=True) # Process all available, then stop
.toTable("bronze.raw_events")
)
Silver Layer: Cleaned & Conformed
The Silver layer is where data gets cleaned, validated, and conformed to an enterprise data model. This is the most labor-intensive transformation step, but it is where you build trust in your data.
Silver Layer Transformations
| Transformation | Purpose | Example |
|---|---|---|
| Deduplication | Remove duplicate records | dropDuplicates(["event_id"]) |
| Schema Enforcement | Cast to expected types | col("price").cast("decimal(10,2)") |
| Null Handling | Fill or filter invalid data | fillna({"status": "unknown"}) |
| Standardization | Consistent formats | Dates to ISO 8601, emails to lowercase |
| Reference Joins | Enrich with dimension data | Join user_id to users dimension table |
| Quality Checks | Validate business rules | price > 0, valid email format |
PySpark: Bronze to Silver
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
# ── Silver Transformation: Clean and validate events ──
# Read from Bronze
bronze_events = spark.table("bronze.raw_events")
# Apply Silver transformations
silver_events = (
bronze_events
# 1. Remove duplicates based on event_id
.dropDuplicates(["event_id"])
# 2. Enforce schema -- cast to correct types
.withColumn("event_timestamp",
F.to_timestamp("event_timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
.withColumn("amount",
F.col("amount").cast(DecimalType(10, 2)))
.withColumn("user_id",
F.col("user_id").cast("long"))
# 3. Handle nulls
.filter(F.col("event_id").isNotNull())
.fillna({"country": "unknown", "device_type": "unknown"})
# 4. Standardize values
.withColumn("email", F.lower(F.trim(F.col("email"))))
.withColumn("event_type", F.upper(F.trim(F.col("event_type"))))
# 5. Add derived columns
.withColumn("event_date", F.to_date("event_timestamp"))
.withColumn("event_hour", F.hour("event_timestamp"))
# 6. Drop Bronze metadata columns
.drop("_corrupt_record", "_source_file", "_batch_id")
)
# Data quality filter -- quarantine bad records
good_records = silver_events.filter(
(F.col("amount") >= 0) &
(F.col("event_timestamp").isNotNull()) &
(F.col("user_id").isNotNull())
)
bad_records = silver_events.filter(
(F.col("amount") < 0) |
(F.col("event_timestamp").isNull()) |
(F.col("user_id").isNull())
)
# Write good records to Silver
(
good_records.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", True)
.partitionBy("event_date")
.saveAsTable("silver.events")
)
# Write bad records to quarantine table
(
bad_records.write
.format("delta")
.mode("append")
.saveAsTable("silver.events_quarantine")
)
print(f"Silver: {good_records.count()} good, {bad_records.count()} quarantined")
SQL: Silver Layer Quality Checks
-- Check Silver layer data quality metrics
SELECT
COUNT(*) AS total_records,
COUNT(DISTINCT event_id) AS unique_events,
COUNT(*) - COUNT(DISTINCT event_id) AS duplicate_count,
SUM(CASE WHEN amount IS NULL
THEN 1 ELSE 0 END) AS null_amounts,
ROUND(AVG(amount), 2) AS avg_amount,
MIN(event_timestamp) AS earliest_event,
MAX(event_timestamp) AS latest_event
FROM silver.events
WHERE event_date = current_date();
Gold Layer: Business Aggregations
The Gold layer is tailored for specific business use cases. Unlike Bronze and Silver, which serve the entire organization, Gold tables are purpose-built for analytics dashboards, ML features, or reporting needs.
Common Gold Layer Patterns
Aggregation Tables
Pre-computed metrics like daily revenue, hourly active users, or weekly conversion rates. Dramatically speeds up dashboard queries.
Star Schema / Dimensional
Fact tables (transactions, events) with dimension tables (users, products, dates) for BI tool compatibility and efficient analytical queries.
ML Feature Tables
Pre-computed features for machine learning models: user behavior aggregates, rolling averages, frequency counts, and encoded categoricals.
Report Snapshots
Point-in-time snapshots of key business metrics for regulatory reporting, compliance audits, and historical trend analysis.
PySpark: Silver to Gold Aggregation
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# ── Gold Layer: Daily Revenue Dashboard ──
silver_events = spark.table("silver.events")
# Aggregate daily revenue metrics
gold_daily_revenue = (
silver_events
.filter(F.col("event_type") == "PURCHASE")
.groupBy("event_date", "country", "device_type")
.agg(
F.count("*").alias("total_transactions"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value"),
F.countDistinct("user_id").alias("unique_customers"),
F.min("amount").alias("min_order"),
F.max("amount").alias("max_order"),
F.percentile_approx("amount", 0.5).alias("median_order")
)
# Add rolling 7-day average
.withColumn("revenue_7d_avg",
F.avg("total_revenue").over(
Window.partitionBy("country")
.orderBy("event_date")
.rowsBetween(-6, 0)
))
.withColumn("_gold_updated_at", F.current_timestamp())
)
# Write to Gold
(
gold_daily_revenue.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", True)
.saveAsTable("gold.daily_revenue")
)
print("Gold daily revenue table updated")
SQL: Gold Layer Star Schema
-- Gold Fact Table: fact_orders
CREATE OR REPLACE TABLE gold.fact_orders AS
SELECT
e.event_id AS order_id,
e.user_id,
e.event_date AS order_date,
e.event_hour AS order_hour,
e.amount AS order_amount,
e.device_type,
u.age_group,
u.signup_date,
u.lifetime_value,
p.product_category,
p.product_name,
g.region,
g.continent
FROM silver.events e
LEFT JOIN silver.users u ON e.user_id = u.user_id
LEFT JOIN silver.products p ON e.product_id = p.product_id
LEFT JOIN silver.geo_lookup g ON e.country = g.country_code
WHERE e.event_type = 'PURCHASE';
-- Gold Dimension Table: dim_date
CREATE OR REPLACE TABLE gold.dim_date AS
SELECT DISTINCT
event_date AS date_key,
year(event_date) AS year,
month(event_date) AS month,
day(event_date) AS day,
dayofweek(event_date) AS day_of_week,
weekofyear(event_date) AS week_of_year,
quarter(event_date) AS quarter,
CASE WHEN dayofweek(event_date) IN (1, 7)
THEN TRUE ELSE FALSE END AS is_weekend
FROM silver.events;
Full PySpark Implementation
Here is a complete, production-ready implementation that ties all three layers together in a single reusable pipeline class.
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
class MedallionPipeline:
"""Reusable Medallion Architecture pipeline."""
def __init__(self, spark, source_path, catalog="main"):
self.spark = spark
self.source_path = source_path
self.catalog = catalog
def ingest_to_bronze(self, table_name, file_format="json"):
"""Ingest raw data to Bronze layer using Auto Loader."""
return (
self.spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", file_format)
.option("cloudFiles.schemaLocation",
f"dbfs:/checkpoints/bronze/{table_name}/schema")
.option("cloudFiles.inferColumnTypes", True)
.load(self.source_path)
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
f"dbfs:/checkpoints/bronze/{table_name}/cp")
.trigger(availableNow=True)
.toTable(f"{self.catalog}.bronze.{table_name}")
)
def bronze_to_silver(self, source_table, target_table,
dedup_cols, quality_rules):
"""Transform Bronze data to Silver with quality checks."""
bronze_df = self.spark.table(
f"{self.catalog}.bronze.{source_table}")
# Deduplicate
cleaned = bronze_df.dropDuplicates(dedup_cols)
# Apply quality rules (keep good, quarantine bad)
quality_filter = quality_rules # Column expression
good = cleaned.filter(quality_filter)
bad = cleaned.filter(~quality_filter)
# Write Silver
good.write.format("delta").mode("overwrite") \
.saveAsTable(f"{self.catalog}.silver.{target_table}")
# Quarantine bad records
bad.write.format("delta").mode("append") \
.saveAsTable(
f"{self.catalog}.silver.{target_table}_quarantine")
return good.count(), bad.count()
def silver_to_gold(self, query, target_table):
"""Aggregate Silver data into Gold using SQL."""
gold_df = self.spark.sql(query)
gold_df = gold_df.withColumn(
"_gold_updated_at", F.current_timestamp())
gold_df.write.format("delta").mode("overwrite") \
.saveAsTable(f"{self.catalog}.gold.{target_table}")
return gold_df.count()
# ── Usage Example ──
pipeline = MedallionPipeline(spark, "s3://raw-data/orders/")
# Step 1: Ingest to Bronze
pipeline.ingest_to_bronze("raw_orders")
# Step 2: Clean to Silver
quality = (F.col("order_id").isNotNull()) & (F.col("amount") > 0)
good, bad = pipeline.bronze_to_silver(
"raw_orders", "orders", ["order_id"], quality)
print(f"Silver: {good} good, {bad} quarantined")
# Step 3: Aggregate to Gold
count = pipeline.silver_to_gold("""
SELECT order_date, country,
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM main.silver.orders
GROUP BY order_date, country
""", "daily_orders_summary")
print(f"Gold: {count} rows written")
Data Quality at Each Layer
Data quality is not a one-time check -- it should be enforced progressively at each layer of the Medallion Architecture. Each layer adds stricter quality gates.
| Layer | Quality Level | Checks Applied | Action on Failure |
|---|---|---|---|
| Bronze | Minimal | File format valid, not empty, parseable | Log to _corrupt_record column |
| Silver | Strict | Schema, nulls, ranges, uniqueness, referential integrity | Quarantine bad records to separate table |
| Gold | Business Rules | KPI thresholds, completeness, freshness, consistency | Alert data team, block dashboard refresh |
Delta Lake Constraints for Quality
-- Add quality constraints directly to Delta tables
-- Silver: Enforce NOT NULL on key columns
ALTER TABLE silver.events
ADD CONSTRAINT valid_event_id CHECK (event_id IS NOT NULL);
ALTER TABLE silver.events
ADD CONSTRAINT valid_timestamp CHECK (event_timestamp IS NOT NULL);
ALTER TABLE silver.events
ADD CONSTRAINT positive_amount CHECK (amount >= 0);
-- Gold: Ensure aggregation completeness
ALTER TABLE gold.daily_revenue
ADD CONSTRAINT valid_revenue CHECK (total_revenue >= 0);
ALTER TABLE gold.daily_revenue
ADD CONSTRAINT valid_count CHECK (total_transactions > 0);
Monitoring Data Quality Metrics
def compute_quality_metrics(table_name, key_col, date_col):
"""Compute data quality metrics for any table."""
df = spark.table(table_name)
metrics = df.agg(
F.count("*").alias("total_rows"),
F.countDistinct(key_col).alias("unique_keys"),
F.count(F.when(F.col(key_col).isNull(), 1))
.alias("null_key_count"),
F.max(date_col).alias("latest_record"),
F.min(date_col).alias("earliest_record"),
).withColumn("duplicate_count",
F.col("total_rows") - F.col("unique_keys")
).withColumn("quality_score",
F.round(
(F.col("unique_keys") / F.col("total_rows")) * 100, 2)
).withColumn("measured_at", F.current_timestamp())
return metrics
# Run quality checks across all layers
bronze_q = compute_quality_metrics(
"bronze.raw_events", "event_id", "_ingested_at")
silver_q = compute_quality_metrics(
"silver.events", "event_id", "event_timestamp")
gold_q = compute_quality_metrics(
"gold.daily_revenue", "event_date", "event_date")
display(bronze_q.union(silver_q).union(gold_q))
Practice Problems
Problem 1: Design a Medallion Pipeline
MediumYour e-commerce company receives order data from three sources: a web application (JSON), a mobile app (Avro), and a POS system (CSV). Design a Medallion Architecture pipeline that unifies this data into a single Gold-layer table for the finance dashboard.
Problem 2: Handle Late-Arriving Data
MediumYour IoT pipeline receives sensor data that can arrive up to 48 hours late. How do you handle this in a Medallion Architecture without reprocessing the entire Silver and Gold layers?
Problem 3: Quality Gate Implementation
HardDesign a quality gate system where the Gold layer is only updated if Silver-layer data quality metrics meet minimum thresholds: at least 99.5% uniqueness, less than 0.1% null rate on key columns, and data freshness within 4 hours.