📊 Topological Sort Made Easy

Master dependency ordering in Directed Acyclic Graphs (DAGs)

⏱️ 30 min read 🎯 Graph Algorithms 💡 Intermediate

🎯 Understanding Topological Sort

What is Topological Sort?

A linear ordering of vertices in a Directed Acyclic Graph (DAG) such that for every directed edge u → v, vertex u comes before v in the ordering.

  • 📌 Only possible for DAGs (no cycles)
  • 📊 Multiple valid orderings may exist
  • 🔄 Used for dependency resolution

Real-World Examples

Course Prerequisites

Taking courses in a university where some courses require others as prerequisites.

Courses: [Math101, CS101, Math201, CS201, CS301] Prerequisites: - Math101 → Math201 - CS101 → CS201 - Math201 → CS301 - CS201 → CS301 Valid Order: Math101, CS101, Math201, CS201, CS301

DAG Visualization

A B C D E Valid Orders: A→B→C→D→E or A→C→B→D→E

🌟 Kahn's Algorithm (BFS-based)

Algorithm Steps

  1. Calculate in-degree for all vertices
  2. Add all vertices with in-degree 0 to a queue
  3. While queue is not empty:
    • Remove vertex from queue and add to result
    • Reduce in-degree of all neighbors
    • If neighbor's in-degree becomes 0, add to queue
  4. If result contains all vertices, we have a valid ordering

Implementation

from collections import deque, defaultdict def topological_sort_kahns(vertices, edges): """ Kahn's algorithm for topological sorting Time: O(V + E), Space: O(V) """ # Build adjacency list and in-degree count graph = defaultdict(list) in_degree = {v: 0 for v in vertices} for u, v in edges: graph[u].append(v) in_degree[v] += 1 # Initialize queue with vertices having 0 in-degree queue = deque([v for v in vertices if in_degree[v] == 0]) result = [] while queue: vertex = queue.popleft() result.append(vertex) # Reduce in-degree of neighbors for neighbor in graph[vertex]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) # Check if topological sort is possible if len(result) != len(vertices): return None # Cycle detected return result # Example usage vertices = ['A', 'B', 'C', 'D', 'E'] edges = [('A', 'D'), ('B', 'D'), ('C', 'E'), ('D', 'E')] print(topological_sort_kahns(vertices, edges)) # Output: ['A', 'B', 'C', 'D', 'E']

Cycle Detection

def has_cycle_kahns(vertices, edges): """ Detect cycle using Kahn's algorithm Returns True if cycle exists """ graph = defaultdict(list) in_degree = {v: 0 for v in vertices} for u, v in edges: graph[u].append(v) in_degree[v] += 1 queue = deque([v for v in vertices if in_degree[v] == 0]) processed = 0 while queue: vertex = queue.popleft() processed += 1 for neighbor in graph[vertex]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) return processed != len(vertices)

🎯 DFS-based Topological Sort

DFS Algorithm

Uses post-order traversal with a stack to build the topological order.

  • 🔍 Visit each unvisited vertex
  • 📚 Recursively visit all neighbors
  • 🎯 Add vertex to stack after visiting all neighbors
  • ↩️ Reverse the stack for final order

Implementation with Cycle Detection

def topological_sort_dfs(vertices, edges): """ DFS-based topological sorting Time: O(V + E), Space: O(V) """ # Build adjacency list graph = defaultdict(list) for u, v in edges: graph[u].append(v) # Color states: WHITE (0) = unvisited, GRAY (1) = visiting, BLACK (2) = visited color = {v: 0 for v in vertices} stack = [] has_cycle = False def dfs(vertex): nonlocal has_cycle if has_cycle: return color[vertex] = 1 # Mark as visiting (GRAY) for neighbor in graph[vertex]: if color[neighbor] == 1: # Back edge (cycle detected) has_cycle = True return elif color[neighbor] == 0: # Unvisited dfs(neighbor) color[vertex] = 2 # Mark as visited (BLACK) stack.append(vertex) # Add to stack in post-order # Visit all vertices for vertex in vertices: if color[vertex] == 0: dfs(vertex) if has_cycle: return None # Cycle detected return stack[::-1] # Reverse for topological order # Example with cycle detection vertices = ['A', 'B', 'C', 'D'] edges = [('A', 'B'), ('B', 'C'), ('C', 'D')] print(topological_sort_dfs(vertices, edges)) # Output: ['A', 'B', 'C', 'D'] # Example with cycle edges_with_cycle = [('A', 'B'), ('B', 'C'), ('C', 'A')] print(topological_sort_dfs(['A', 'B', 'C'], edges_with_cycle)) # Output: None (cycle detected)

All Topological Sorts

def all_topological_sorts(vertices, edges): """ Generate all possible topological sorts """ graph = defaultdict(list) in_degree = {v: 0 for v in vertices} for u, v in edges: graph[u].append(v) in_degree[v] += 1 all_sorts = [] def backtrack(current_sort, remaining): if not remaining: all_sorts.append(current_sort[:]) return # Try each vertex with in-degree 0 for vertex in remaining: if in_degree[vertex] == 0: # Choose vertex current_sort.append(vertex) new_remaining = remaining - {vertex} # Update in-degrees for neighbor in graph[vertex]: in_degree[neighbor] -= 1 # Recurse backtrack(current_sort, new_remaining) # Backtrack for neighbor in graph[vertex]: in_degree[neighbor] += 1 current_sort.pop() backtrack([], set(vertices)) return all_sorts # Example vertices = ['A', 'B', 'C'] edges = [('A', 'C')] print(all_topological_sorts(vertices, edges)) # Output: [['A', 'B', 'C'], ['B', 'A', 'C']]

⚡ Real-World Applications

1. Build System Dependencies

class BuildSystem: def __init__(self): self.tasks = {} self.dependencies = defaultdict(list) def add_task(self, task, deps=None): """Add a build task with dependencies""" self.tasks[task] = deps or [] for dep in self.tasks[task]: self.dependencies[dep].append(task) def build_order(self): """Determine build order using topological sort""" in_degree = {task: len(deps) for task, deps in self.tasks.items()} queue = deque([task for task, degree in in_degree.items() if degree == 0]) order = [] while queue: task = queue.popleft() order.append(task) for dependent in self.dependencies[task]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) return order if len(order) == len(self.tasks) else None # Example: Building a project build = BuildSystem() build.add_task('compile_utils', []) build.add_task('compile_core', ['compile_utils']) build.add_task('compile_app', ['compile_core']) build.add_task('run_tests', ['compile_app']) build.add_task('package', ['run_tests']) print(build.build_order()) # Output: ['compile_utils', 'compile_core', 'compile_app', 'run_tests', 'package']

2. Course Schedule Problem

def can_finish_courses(num_courses, prerequisites): """ Determine if all courses can be completed prerequisites[i] = [course, prerequisite] """ graph = defaultdict(list) in_degree = [0] * num_courses # Build graph for course, prereq in prerequisites: graph[prereq].append(course) in_degree[course] += 1 # Start with courses having no prerequisites queue = deque([i for i in range(num_courses) if in_degree[i] == 0]) completed = 0 while queue: course = queue.popleft() completed += 1 for next_course in graph[course]: in_degree[next_course] -= 1 if in_degree[next_course] == 0: queue.append(next_course) return completed == num_courses def find_course_order(num_courses, prerequisites): """Find order to take all courses""" graph = defaultdict(list) in_degree = [0] * num_courses for course, prereq in prerequisites: graph[prereq].append(course) in_degree[course] += 1 queue = deque([i for i in range(num_courses) if in_degree[i] == 0]) order = [] while queue: course = queue.popleft() order.append(course) for next_course in graph[course]: in_degree[next_course] -= 1 if in_degree[next_course] == 0: queue.append(next_course) return order if len(order) == num_courses else [] # Example num_courses = 4 prerequisites = [[1, 0], [2, 0], [3, 1], [3, 2]] print(can_finish_courses(num_courses, prerequisites)) # True print(find_course_order(num_courses, prerequisites)) # [0, 1, 2, 3]

3. Task Scheduling with Deadlines

def schedule_tasks_with_deadlines(tasks, dependencies): """ Schedule tasks considering dependencies and deadlines tasks = [(name, duration, deadline), ...] """ # First, find valid topological order task_names = [t[0] for t in tasks] task_info = {t[0]: {'duration': t[1], 'deadline': t[2]} for t in tasks} graph = defaultdict(list) in_degree = {name: 0 for name in task_names} for prereq, task in dependencies: graph[prereq].append(task) in_degree[task] += 1 # Find topological order queue = deque([t for t in task_names if in_degree[t] == 0]) schedule = [] current_time = 0 while queue: # Among available tasks, choose one with earliest deadline queue = deque(sorted(queue, key=lambda x: task_info[x]['deadline'])) task = queue.popleft() start_time = current_time end_time = current_time + task_info[task]['duration'] schedule.append({ 'task': task, 'start': start_time, 'end': end_time, 'deadline': task_info[task]['deadline'], 'on_time': end_time <= task_info[task]['deadline'] }) current_time = end_time # Update available tasks for next_task in graph[task]: in_degree[next_task] -= 1 if in_degree[next_task] == 0: queue.append(next_task) return schedule # Example tasks = [ ('Design', 3, 5), ('Implement', 5, 12), ('Test', 2, 15), ('Deploy', 1, 16) ] dependencies = [ ('Design', 'Implement'), ('Implement', 'Test'), ('Test', 'Deploy') ] schedule = schedule_tasks_with_deadlines(tasks, dependencies) for item in schedule: print(f"{item['task']}: Day {item['start']}-{item['end']} " f"(Deadline: Day {item['deadline']}, " f"On time: {item['on_time']})")

🚀 Advanced Problems

1. Alien Dictionary

Hard
def alien_order(words): """ Derive alien alphabet order from sorted words """ # Build graph from character ordering graph = defaultdict(set) in_degree = {} # Add all characters for word in words: for char in word: if char not in in_degree: in_degree[char] = 0 # Compare adjacent words for i in range(len(words) - 1): w1, w2 = words[i], words[i + 1] min_len = min(len(w1), len(w2)) # Find first different character for j in range(min_len): if w1[j] != w2[j]: if w2[j] not in graph[w1[j]]: graph[w1[j]].add(w2[j]) in_degree[w2[j]] += 1 break else: # Check for invalid order (w1 is prefix but longer) if len(w1) > len(w2): return "" # Topological sort using Kahn's algorithm queue = deque([c for c in in_degree if in_degree[c] == 0]) result = [] while queue: char = queue.popleft() result.append(char) for next_char in graph[char]: in_degree[next_char] -= 1 if in_degree[next_char] == 0: queue.append(next_char) # Check if all characters are included if len(result) != len(in_degree): return "" # Invalid order (cycle exists) return ''.join(result) # Example words = ["wrt", "wrf", "er", "ett", "rftt"] print(alien_order(words)) # "wertf"

2. Parallel Courses

Medium
def minimum_semesters(n, relations): """ Find minimum number of semesters to take all courses Can take multiple courses per semester if no dependencies """ graph = defaultdict(list) in_degree = [0] * (n + 1) for prereq, course in relations: graph[prereq].append(course) in_degree[course] += 1 queue = deque([i for i in range(1, n + 1) if in_degree[i] == 0]) semesters = 0 studied = 0 while queue: # Take all available courses this semester semester_size = len(queue) semesters += 1 for _ in range(semester_size): course = queue.popleft() studied += 1 for next_course in graph[course]: in_degree[next_course] -= 1 if in_degree[next_course] == 0: queue.append(next_course) return semesters if studied == n else -1 # Example n = 3 relations = [[1, 3], [2, 3]] print(minimum_semesters(n, relations)) # 2 semesters

3. Sequence Reconstruction

Hard
def sequence_reconstruction(org, seqs): """ Check if org is the unique topological sort of seqs """ if not seqs: return False # Build graph graph = defaultdict(set) in_degree = defaultdict(int) nodes = set() for seq in seqs: for num in seq: nodes.add(num) if num not in in_degree: in_degree[num] = 0 for i in range(len(seq) - 1): if seq[i + 1] not in graph[seq[i]]: graph[seq[i]].add(seq[i + 1]) in_degree[seq[i + 1]] += 1 # Check if all elements in org are present if len(nodes) != len(org) or nodes != set(org): return False # Topological sort queue = deque([n for n in nodes if in_degree[n] == 0]) result = [] while queue: # Must have exactly one choice at each step if len(queue) != 1: return False num = queue.popleft() result.append(num) for next_num in graph[num]: in_degree[next_num] -= 1 if in_degree[next_num] == 0: queue.append(next_num) return result == org # Example org = [1, 2, 3] seqs = [[1, 2], [1, 3], [2, 3]] print(sequence_reconstruction(org, seqs)) # True

💪 Practice Exercises

💡 Key Points to Remember:
  • Topological sort only works on DAGs (no cycles)
  • Multiple valid orderings may exist
  • Use Kahn's for counting/detecting cycles
  • Use DFS for simple implementation
  • Time complexity: O(V + E) for both approaches

Problem Set

Exercise 1: Verify DAG

Write a function to check if a directed graph is acyclic.

def is_dag(vertices, edges): """ Check if graph is a DAG Your implementation here """ # Hint: Use either DFS with colors or Kahn's algorithm pass # Test cases print(is_dag([1, 2, 3], [(1, 2), (2, 3)])) # True print(is_dag([1, 2, 3], [(1, 2), (2, 3), (3, 1)])) # False

Exercise 2: Longest Path in DAG

Find the longest path in a DAG using topological sort.

def longest_path_dag(vertices, edges): """ Find longest path in DAG """ # Step 1: Topological sort # Step 2: Process vertices in topological order # Step 3: Update distances graph = defaultdict(list) in_degree = {v: 0 for v in vertices} for u, v in edges: graph[u].append(v) in_degree[v] += 1 # Initialize distances dist = {v: 0 for v in vertices} # Topological sort queue = deque([v for v in vertices if in_degree[v] == 0]) topo_order = [] while queue: vertex = queue.popleft() topo_order.append(vertex) for neighbor in graph[vertex]: # Update distance dist[neighbor] = max(dist[neighbor], dist[vertex] + 1) in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) return max(dist.values()) # Test vertices = ['A', 'B', 'C', 'D', 'E'] edges = [('A', 'B'), ('A', 'C'), ('B', 'D'), ('C', 'D'), ('D', 'E')] print(longest_path_dag(vertices, edges)) # 3

Exercise 3: Critical Path Method

Find the critical path in a project network.

def critical_path(tasks): """ Find critical path in project network tasks = [(name, duration, dependencies), ...] """ # Build graph task_map = {t[0]: {'duration': t[1], 'deps': t[2]} for t in tasks} # Topological sort in_degree = {name: len(info['deps']) for name, info in task_map.items()} queue = deque([name for name in in_degree if in_degree[name] == 0]) # Calculate earliest start times earliest = {name: 0 for name in task_map} topo_order = [] while queue: task = queue.popleft() topo_order.append(task) # Update dependent tasks for other_task, info in task_map.items(): if task in info['deps']: earliest[other_task] = max( earliest[other_task], earliest[task] + task_map[task]['duration'] ) in_degree[other_task] -= 1 if in_degree[other_task] == 0: queue.append(other_task) # Calculate latest start times (backward pass) project_duration = max(earliest[t] + task_map[t]['duration'] for t in topo_order) latest = {name: project_duration for name in task_map} for task in reversed(topo_order): for dep_task in task_map[task]['deps']: latest[dep_task] = min( latest[dep_task], latest[task] - task_map[dep_task]['duration'] ) # Find critical path (tasks where earliest == latest) critical = [task for task in topo_order if earliest[task] == latest[task]] return critical, project_duration # Example tasks = [ ('A', 3, []), ('B', 2, []), ('C', 4, ['A']), ('D', 3, ['A', 'B']), ('E', 2, ['C', 'D']) ] critical, duration = critical_path(tasks) print(f"Critical Path: {critical}") print(f"Project Duration: {duration}")

Challenge Problems

Problem Difficulty Key Technique
Course Schedule III Hard Topological Sort + Greedy
Find Eventual Safe States Medium Reverse Graph + Topological Sort
Reconstruct Itinerary Medium Modified DFS