ACID Transactions
Why Delta Lake Matters
The Problem: Traditional data lakes (Parquet/CSV on cloud storage) have no transactions, no schema enforcement, no versioning. Concurrent writes corrupt data, failed jobs leave partial results, and there is no way to roll back mistakes.
The Solution: Delta Lake adds a transaction log on top of Parquet files, providing ACID transactions, schema enforcement, time travel, and data versioning -- bringing data warehouse reliability to data lake economics.
Key Insight: Delta Lake is the default storage format in Databricks. Every CREATE TABLE statement creates a Delta table unless you specify otherwise.
-- Create a Delta table
CREATE TABLE catalog.schema.users (
user_id BIGINT,
name STRING,
email STRING,
created_at TIMESTAMP
) USING delta;
-- INSERT (atomic -- all or nothing)
INSERT INTO catalog.schema.users VALUES
(1, 'Alice', '[email protected]', current_timestamp()),
(2, 'Bob', '[email protected]', current_timestamp());
-- UPDATE
UPDATE catalog.schema.users
SET email = '[email protected]'
WHERE user_id = 1;
-- DELETE
DELETE FROM catalog.schema.users WHERE user_id = 2;
-- View transaction history
DESCRIBE HISTORY catalog.schema.users;
Time Travel
Delta Lake maintains a complete history of all changes, allowing you to query any previous version of your data. This is invaluable for auditing, debugging, and recovering from mistakes.
-- Query a specific version
SELECT * FROM catalog.schema.users VERSION AS OF 2;
-- Query by timestamp
SELECT * FROM catalog.schema.users TIMESTAMP AS OF '2024-03-10';
-- Compare versions (what changed?)
SELECT * FROM catalog.schema.users VERSION AS OF 3
EXCEPT
SELECT * FROM catalog.schema.users VERSION AS OF 2;
-- Restore to a previous version (undo a bad operation)
RESTORE TABLE catalog.schema.users TO VERSION AS OF 2;
-- PySpark time travel
df_v2 = spark.read.format("delta").option("versionAsOf", 2).load("/path/to/table")
df_ts = spark.read.format("delta").option("timestampAsOf", "2024-03-10").load("/path/to/table")
Schema Evolution
-- Schema enforcement: rejects mismatched writes by default
-- Schema evolution: allows adding new columns with mergeSchema
-- Add a new column via ALTER TABLE
ALTER TABLE catalog.schema.users ADD COLUMN phone STRING;
-- Auto-merge schema during write (PySpark)
new_data.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("catalog.schema.users")
-- Overwrite schema entirely
new_data.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("catalog.schema.users")
MERGE (Upsert) Operations
MERGE is one of Delta Lake's most powerful features -- it combines INSERT, UPDATE, and DELETE in a single atomic operation. This is essential for CDC (Change Data Capture) and SCD (Slowly Changing Dimensions) patterns.
-- Upsert: Insert new records, update existing ones
MERGE INTO catalog.schema.users AS target
USING staging.new_users AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.email = source.email,
target.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (user_id, name, email, created_at)
VALUES (source.user_id, source.name, source.email, current_timestamp())
WHEN NOT MATCHED BY SOURCE THEN
DELETE;
-- SCD Type 2: Track history of changes
MERGE INTO catalog.schema.users_scd2 AS target
USING (
SELECT * FROM staging.updates
WHERE updated_at > (
SELECT MAX(effective_date) FROM catalog.schema.users_scd2
)
) AS source
ON target.user_id = source.user_id AND target.is_current = true
WHEN MATCHED AND target.email != source.email THEN
UPDATE SET is_current = false, end_date = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (user_id, email, is_current, effective_date, end_date)
VALUES (source.user_id, source.email, true, current_timestamp(), null);
OPTIMIZE & Z-ORDER
-- OPTIMIZE: Compact small files into larger ones (target: 1GB)
OPTIMIZE catalog.schema.orders;
-- OPTIMIZE with Z-ORDER: Co-locate data by specified columns
-- This dramatically speeds up queries that filter on these columns
OPTIMIZE catalog.schema.orders
ZORDER BY (customer_id, order_date);
-- OPTIMIZE with WHERE clause (incremental)
OPTIMIZE catalog.schema.orders
WHERE order_date >= '2024-03-01'
ZORDER BY (customer_id);
-- Auto-optimize: enable at table level
ALTER TABLE catalog.schema.orders
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);
-- Liquid Clustering (next-gen replacement for Z-ORDER)
ALTER TABLE catalog.schema.orders
CLUSTER BY (customer_id, order_date);
VACUUM
VACUUM removes old data files that are no longer referenced by the transaction log. This reclaims storage space but removes time travel capability for the vacuumed versions.
-- VACUUM: Remove files older than retention period (default: 7 days)
VACUUM catalog.schema.orders;
-- VACUUM with custom retention
VACUUM catalog.schema.orders RETAIN 168 HOURS; -- 7 days
-- DRY RUN: See what would be deleted without deleting
VACUUM catalog.schema.orders RETAIN 168 HOURS DRY RUN;
-- WARNING: Setting retention below 7 days is dangerous
-- It can break concurrent readers. Only do this if you understand the risk:
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM catalog.schema.orders RETAIN 0 HOURS; -- Danger! Removes all history
Practice Problems
Problem 1: Data Recovery
EasyA team member accidentally ran DELETE FROM orders WHERE 1=1, deleting all 50 million rows. The table is a Delta table with default settings. How do you recover the data?
Problem 2: CDC Pipeline with MERGE
MediumWrite a MERGE statement that implements a CDC pattern: a staging table receives daily inserts, updates, and deletes (marked with an operation column: I/U/D). Apply these changes to the target Delta table atomically.
Problem 3: Performance Optimization
MediumA 2TB Delta table has 500,000 small files (avg 4MB each). Queries that filter by region and date take 3 minutes. Design an optimization strategy to get queries under 15 seconds.
Quick Reference
| Command | Purpose | Key Detail |
|---|---|---|
| DESCRIBE HISTORY | View table change history | Shows operations, timestamps, versions |
| VERSION AS OF | Query historical version | Works with SELECT and DataFrameReader |
| RESTORE TABLE | Revert to previous version | Creates a new version (does not delete history) |
| MERGE INTO | Upsert (insert/update/delete) | Atomic CDC and SCD operations |
| OPTIMIZE | Compact small files | Target file size: 1GB |
| ZORDER BY | Co-locate data by columns | Best for 1-4 frequently filtered columns |
| VACUUM | Remove old files | Default retention: 7 days, breaks time travel |