Jobs, Workflows & Orchestration

Medium 25 min read

What are Databricks Jobs and Workflows?

Why Jobs and Workflows Matter

The Problem: Running notebooks manually is fine for development, but production data pipelines need automated scheduling, dependency management, error handling, and monitoring.

The Solution: Databricks Jobs and Workflows provide a fully managed orchestration platform for scheduling and running notebooks, JARs, Python scripts, and SQL queries as automated pipelines with DAG-based dependencies.

Real Impact: Teams using Databricks Workflows replace complex external orchestrators (Airflow, Step Functions) with a built-in, zero-infrastructure solution that reduces pipeline setup time by 75%.

Real-World Analogy

Think of Databricks Workflows like a factory assembly line:

  • Tasks = Individual stations on the assembly line (welding, painting, inspection)
  • DAG = The blueprint showing which stations depend on which
  • Schedule = The factory shift schedule (run at 6 AM daily)
  • Parameters = Configuration for each run (which model, which date)
  • Repair & Rerun = Fixing a defective station and resuming from that point

Single-Task vs Multi-Task Jobs

Feature Single-Task Job Multi-Task Workflow
Structure One notebook/script per job DAG of multiple tasks with dependencies
Use Case Simple ETL, single notebook Multi-step pipelines, ML workflows
Error Handling Retry the entire job Retry or repair individual tasks
Compute One cluster Different clusters per task, shared clusters
Parameters Job-level parameters Task-level parameters, dynamic values

DAG-Based Workflows

Multi-task workflows use Directed Acyclic Graphs (DAGs) to define task dependencies. Tasks run in parallel when possible and wait for upstream tasks to complete before starting.

Example: ETL Workflow DAG
Ingest Orders Auto Loader (Bronze) Ingest Users JDBC Extract (Bronze) Ingest Products CSV Auto Loader (Bronze) Transform Clean & join (Silver) depends_on: all 3 ingests Revenue Metrics Gold aggregation User Segments Gold aggregation ML Features Feature Store update Notify Slack Execution Order: 1. Three ingest tasks run in PARALLEL (no dependencies between them) 2. Transform waits for ALL ingests to complete, then runs 3. Three Gold tasks run in PARALLEL after Transform 4. Notify runs after ALL Gold tasks complete

Scheduling and Triggers

Jobs can be triggered by schedules (cron), file arrival events, or API calls. You can also chain workflows together using the Runs API.

Python - Create a Job via REST API
import requests, json

DATABRICKS_HOST = "https://my-workspace.databricks.com"
TOKEN = dbutils.secrets.get("scope", "api_token")

headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json"
}

# Create a multi-task workflow job
job_config = {
    "name": "Daily ETL Pipeline",
    "tasks": [
        {
            "task_key": "ingest_orders",
            "notebook_task": {
                "notebook_path": "/Repos/etl/ingest_orders",
                "base_parameters": {
                    "date": "{{job.start_time.iso_date}}"
                }
            },
            "new_cluster": {
                "spark_version": "14.3.x-scala2.12",
                "num_workers": 2,
                "node_type_id": "i3.xlarge"
            }
        },
        {
            "task_key": "ingest_users",
            "notebook_task": {
                "notebook_path": "/Repos/etl/ingest_users"
            },
            "existing_cluster_id": "0315-abc123-shared"
        },
        {
            "task_key": "transform_silver",
            "depends_on": [
                {"task_key": "ingest_orders"},
                {"task_key": "ingest_users"}
            ],
            "notebook_task": {
                "notebook_path": "/Repos/etl/transform_silver"
            },
            "new_cluster": {
                "spark_version": "14.3.x-scala2.12",
                "num_workers": 4,
                "node_type_id": "i3.xlarge"
            }
        },
        {
            "task_key": "aggregate_gold",
            "depends_on": [
                {"task_key": "transform_silver"}
            ],
            "sql_task": {
                "query": {
                    "query_id": "abc123-gold-query"
                },
                "warehouse_id": "warehouse-xyz"
            }
        }
    ],
    "schedule": {
        "quartz_cron_expression": "0 0 6 * * ?",  # 6 AM daily
        "timezone_id": "America/New_York"
    },
    "email_notifications": {
        "on_failure": ["[email protected]"]
    },
    "max_concurrent_runs": 1
}

response = requests.post(
    f"{DATABRICKS_HOST}/api/2.1/jobs/create",
    headers=headers,
    json=job_config
)
job_id = response.json()["job_id"]
print(f"Created job: {job_id}")

Parameters and Dynamic Values

Jobs support parameters at both the job and task level. Dynamic values like the current date, run ID, and task return values enable flexible, reusable workflows.

Python - Working with Job Parameters
# ── In the notebook: Access job parameters ──

# Method 1: dbutils.widgets (recommended)
dbutils.widgets.text("date", "", "Processing Date")
dbutils.widgets.text("env", "dev", "Environment")

processing_date = dbutils.widgets.get("date")
environment = dbutils.widgets.get("env")

print(f"Processing date: {processing_date}")
print(f"Environment: {environment}")

# Method 2: Task values (pass between tasks)
# In Task A: set a value
record_count = df.count()
dbutils.jobs.taskValues.set(
    key="bronze_record_count",
    value=record_count
)

# In Task B: read the value from Task A
bronze_count = dbutils.jobs.taskValues.get(
    taskKey="ingest_orders",
    key="bronze_record_count",
    default=0
)
print(f"Bronze ingested {bronze_count} records")

# Dynamic value references in job config:
# {{job.start_time.iso_date}}  -- current run date
# {{job.id}}                   -- job ID
# {{run.id}}                   -- run ID
# {{task.key}}                 -- current task key

Repair and Rerun

When a task fails in a multi-task workflow, you do not need to rerun the entire pipeline. Databricks supports repair runs that restart only the failed task and its downstream dependencies, saving time and compute costs.

Python - Repair a Failed Run
# ── Repair a failed run via REST API ──

# List recent runs to find the failed one
runs = requests.get(
    f"{DATABRICKS_HOST}/api/2.1/jobs/runs/list",
    headers=headers,
    params={"job_id": job_id, "limit": 5}
).json()

# Find the failed run
failed_run = None
for run in runs.get("runs", []):
    if run["state"]["result_state"] == "FAILED":
        failed_run = run
        break

if failed_run:
    # Repair: rerun only failed tasks + downstream
    repair = requests.post(
        f"{DATABRICKS_HOST}/api/2.1/jobs/runs/repair",
        headers=headers,
        json={
            "run_id": failed_run["run_id"],
            "rerun_tasks": ["transform_silver"]  # Failed task
        }
    )
    print(f"Repair started: {repair.json()}")

# Retry policies in job config:
# "max_retries": 2,
# "min_retry_interval_millis": 60000,
# "retry_on_timeout": true

Monitoring and Alerting

SQL - Monitor Job Run History
-- Query job run history from system tables
SELECT
    job_id,
    run_id,
    task_key,
    start_time,
    end_time,
    TIMESTAMPDIFF(MINUTE, start_time, end_time) AS duration_min,
    result_state,
    error_message
FROM system.workflow.job_run_timeline
WHERE job_id = 12345
    AND start_time >= current_date() - INTERVAL 7 DAYS
ORDER BY start_time DESC;

-- Find slow-running tasks (performance regression)
SELECT
    task_key,
    AVG(duration_min) AS avg_duration,
    MAX(duration_min) AS max_duration,
    COUNT(*) AS run_count,
    SUM(CASE WHEN result_state = 'FAILED'
        THEN 1 ELSE 0 END) AS failure_count
FROM system.workflow.job_run_timeline
WHERE start_time >= current_date() - INTERVAL 30 DAYS
GROUP BY task_key
ORDER BY avg_duration DESC;

Practice Problems

Problem 1: Design a Multi-Task Workflow

Medium

Design a Databricks Workflow for a daily ML retraining pipeline with these requirements: (1) Refresh feature tables from Silver data. (2) Train a new model version. (3) Evaluate against the current production model. (4) If the new model is better, promote it. (5) Send a Slack notification with results.

Problem 2: Repair Strategy

Easy

Your 8-task ETL workflow failed at the 5th task (transform_orders) due to a temporary network timeout. The first 4 tasks completed successfully and wrote data to Delta tables. What is the most efficient way to recover?

Problem 3: Orchestration Architecture

Hard

Your organization has 50+ data pipelines. Some need to run hourly, others daily, and some are event-triggered. Several pipelines depend on outputs from other pipelines. Design an orchestration architecture using Databricks Workflows that handles cross-pipeline dependencies, monitoring, and alerting.