What are Databricks Jobs and Workflows?
Why Jobs and Workflows Matter
The Problem: Running notebooks manually is fine for development, but production data pipelines need automated scheduling, dependency management, error handling, and monitoring.
The Solution: Databricks Jobs and Workflows provide a fully managed orchestration platform for scheduling and running notebooks, JARs, Python scripts, and SQL queries as automated pipelines with DAG-based dependencies.
Real Impact: Teams using Databricks Workflows replace complex external orchestrators (Airflow, Step Functions) with a built-in, zero-infrastructure solution that reduces pipeline setup time by 75%.
Real-World Analogy
Think of Databricks Workflows like a factory assembly line:
- Tasks = Individual stations on the assembly line (welding, painting, inspection)
- DAG = The blueprint showing which stations depend on which
- Schedule = The factory shift schedule (run at 6 AM daily)
- Parameters = Configuration for each run (which model, which date)
- Repair & Rerun = Fixing a defective station and resuming from that point
Single-Task vs Multi-Task Jobs
| Feature | Single-Task Job | Multi-Task Workflow |
|---|---|---|
| Structure | One notebook/script per job | DAG of multiple tasks with dependencies |
| Use Case | Simple ETL, single notebook | Multi-step pipelines, ML workflows |
| Error Handling | Retry the entire job | Retry or repair individual tasks |
| Compute | One cluster | Different clusters per task, shared clusters |
| Parameters | Job-level parameters | Task-level parameters, dynamic values |
DAG-Based Workflows
Multi-task workflows use Directed Acyclic Graphs (DAGs) to define task dependencies. Tasks run in parallel when possible and wait for upstream tasks to complete before starting.
Scheduling and Triggers
Jobs can be triggered by schedules (cron), file arrival events, or API calls. You can also chain workflows together using the Runs API.
import requests, json
DATABRICKS_HOST = "https://my-workspace.databricks.com"
TOKEN = dbutils.secrets.get("scope", "api_token")
headers = {
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json"
}
# Create a multi-task workflow job
job_config = {
"name": "Daily ETL Pipeline",
"tasks": [
{
"task_key": "ingest_orders",
"notebook_task": {
"notebook_path": "/Repos/etl/ingest_orders",
"base_parameters": {
"date": "{{job.start_time.iso_date}}"
}
},
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"num_workers": 2,
"node_type_id": "i3.xlarge"
}
},
{
"task_key": "ingest_users",
"notebook_task": {
"notebook_path": "/Repos/etl/ingest_users"
},
"existing_cluster_id": "0315-abc123-shared"
},
{
"task_key": "transform_silver",
"depends_on": [
{"task_key": "ingest_orders"},
{"task_key": "ingest_users"}
],
"notebook_task": {
"notebook_path": "/Repos/etl/transform_silver"
},
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"num_workers": 4,
"node_type_id": "i3.xlarge"
}
},
{
"task_key": "aggregate_gold",
"depends_on": [
{"task_key": "transform_silver"}
],
"sql_task": {
"query": {
"query_id": "abc123-gold-query"
},
"warehouse_id": "warehouse-xyz"
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 6 * * ?", # 6 AM daily
"timezone_id": "America/New_York"
},
"email_notifications": {
"on_failure": ["[email protected]"]
},
"max_concurrent_runs": 1
}
response = requests.post(
f"{DATABRICKS_HOST}/api/2.1/jobs/create",
headers=headers,
json=job_config
)
job_id = response.json()["job_id"]
print(f"Created job: {job_id}")
Parameters and Dynamic Values
Jobs support parameters at both the job and task level. Dynamic values like the current date, run ID, and task return values enable flexible, reusable workflows.
# ── In the notebook: Access job parameters ──
# Method 1: dbutils.widgets (recommended)
dbutils.widgets.text("date", "", "Processing Date")
dbutils.widgets.text("env", "dev", "Environment")
processing_date = dbutils.widgets.get("date")
environment = dbutils.widgets.get("env")
print(f"Processing date: {processing_date}")
print(f"Environment: {environment}")
# Method 2: Task values (pass between tasks)
# In Task A: set a value
record_count = df.count()
dbutils.jobs.taskValues.set(
key="bronze_record_count",
value=record_count
)
# In Task B: read the value from Task A
bronze_count = dbutils.jobs.taskValues.get(
taskKey="ingest_orders",
key="bronze_record_count",
default=0
)
print(f"Bronze ingested {bronze_count} records")
# Dynamic value references in job config:
# {{job.start_time.iso_date}} -- current run date
# {{job.id}} -- job ID
# {{run.id}} -- run ID
# {{task.key}} -- current task key
Repair and Rerun
When a task fails in a multi-task workflow, you do not need to rerun the entire pipeline. Databricks supports repair runs that restart only the failed task and its downstream dependencies, saving time and compute costs.
# ── Repair a failed run via REST API ──
# List recent runs to find the failed one
runs = requests.get(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/list",
headers=headers,
params={"job_id": job_id, "limit": 5}
).json()
# Find the failed run
failed_run = None
for run in runs.get("runs", []):
if run["state"]["result_state"] == "FAILED":
failed_run = run
break
if failed_run:
# Repair: rerun only failed tasks + downstream
repair = requests.post(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/repair",
headers=headers,
json={
"run_id": failed_run["run_id"],
"rerun_tasks": ["transform_silver"] # Failed task
}
)
print(f"Repair started: {repair.json()}")
# Retry policies in job config:
# "max_retries": 2,
# "min_retry_interval_millis": 60000,
# "retry_on_timeout": true
Monitoring and Alerting
-- Query job run history from system tables
SELECT
job_id,
run_id,
task_key,
start_time,
end_time,
TIMESTAMPDIFF(MINUTE, start_time, end_time) AS duration_min,
result_state,
error_message
FROM system.workflow.job_run_timeline
WHERE job_id = 12345
AND start_time >= current_date() - INTERVAL 7 DAYS
ORDER BY start_time DESC;
-- Find slow-running tasks (performance regression)
SELECT
task_key,
AVG(duration_min) AS avg_duration,
MAX(duration_min) AS max_duration,
COUNT(*) AS run_count,
SUM(CASE WHEN result_state = 'FAILED'
THEN 1 ELSE 0 END) AS failure_count
FROM system.workflow.job_run_timeline
WHERE start_time >= current_date() - INTERVAL 30 DAYS
GROUP BY task_key
ORDER BY avg_duration DESC;
Practice Problems
Problem 1: Design a Multi-Task Workflow
MediumDesign a Databricks Workflow for a daily ML retraining pipeline with these requirements: (1) Refresh feature tables from Silver data. (2) Train a new model version. (3) Evaluate against the current production model. (4) If the new model is better, promote it. (5) Send a Slack notification with results.
Problem 2: Repair Strategy
EasyYour 8-task ETL workflow failed at the 5th task (transform_orders) due to a temporary network timeout. The first 4 tasks completed successfully and wrote data to Delta tables. What is the most efficient way to recover?
Problem 3: Orchestration Architecture
HardYour organization has 50+ data pipelines. Some need to run hourly, others daily, and some are event-triggered. Several pipelines depend on outputs from other pipelines. Design an orchestration architecture using Databricks Workflows that handles cross-pipeline dependencies, monitoring, and alerting.