Distributed Consensus & Coordination

Hard 28 min read

Why Consensus Matters

Why This Matters

The Problem: In a distributed system, nodes can crash, networks can partition, and messages can be delayed. How do multiple servers agree on the same value (e.g., who is the leader, what is the latest write) when any of them might fail at any time?

The Solution: Consensus algorithms ensure that a group of nodes can agree on a single value even when some nodes fail. They are the foundation of replicated state machines, distributed databases, and coordination services.

Real Impact: etcd (used by Kubernetes) relies on Raft for leader election and configuration storage. ZooKeeper (used by Kafka, Hadoop) uses a Paxos variant called ZAB. Google's Spanner uses Paxos for global replication.

Real-World Analogy

Think of consensus like a jury deliberation:

  • Jurors = Distributed nodes (each has an opinion)
  • Verdict = The agreed-upon value (all must reach the same conclusion)
  • Majority vote = Quorum (at least N/2+1 must agree)
  • Foreman = Leader node (proposes values, coordinates voting)
  • Absent juror = Failed node (the rest can still reach a verdict)

The CAP Theorem

The CAP theorem, proven by Eric Brewer in 2000, states that a distributed data store can provide at most two out of three guarantees simultaneously: Consistency, Availability, and Partition Tolerance. Since network partitions are unavoidable in practice, the real choice is between consistency and availability during a partition.

CAP Theorem
Consistency Every read returns the latest write Availability Every request gets a response Partition Tolerance System works despite network splits CA Single node RDBMS CP etcd, ZooKeeper MongoDB, HBase AP Cassandra, DynamoDB CouchDB
ChoiceDuring PartitionExamplesUse When
CP (Consistency + Partition Tolerance)Some requests may be rejected to maintain consistencyetcd, ZooKeeper, HBaseFinancial transactions, leader election, configuration
AP (Availability + Partition Tolerance)All requests get a response but data may be staleCassandra, DynamoDB, CouchDBSocial feeds, product catalogs, shopping carts
CA (Consistency + Availability)Not possible with network partitions (single node only)Traditional RDBMS on one serverWhen you do not need distribution

Raft Consensus Algorithm

Raft was designed by Diego Ongaro and John Ousterhout in 2013 as an understandable alternative to Paxos. It breaks consensus into three sub-problems: leader election, log replication, and safety. Raft is used by etcd, Consul, CockroachDB, and TiKV.

Raft Leader Election Steps
Step 1: All Followers F F F Term 1: No leader yet Step 2: Candidate F C F Timeout expires, votes for self Step 3: Leader Elected F L F Majority granted votes (Term 2) Log Replication (Normal Operation) Leader x=1 y=2 z=3 Leader's log (committed + uncommitted) AppendEntries F1 x=1 y=2 F2 x=1 y=2 Entry committed when majority replicates it

Raft Key Concepts

How Raft Works

  • Term: A logical clock that increments with each election. Used to detect stale leaders.
  • Election Timeout: Random interval (150-300ms). When a follower receives no heartbeat, it becomes a candidate.
  • Quorum: A majority of nodes (3 of 5, 2 of 3). Writes and elections require quorum agreement.
  • AppendEntries: The leader sends new log entries to followers. Also serves as heartbeat when empty.
  • Commit: A log entry is committed when replicated to a majority. Only committed entries are applied to the state machine.
  • Safety guarantee: Once committed, an entry will never be overwritten. This ensures all nodes converge on the same log.

Paxos Overview

Paxos, proposed by Leslie Lamport in 1989, is the foundational consensus algorithm. While theoretically elegant, it is notoriously difficult to implement correctly. Most practical systems use Raft or a Paxos variant (Multi-Paxos, Fast Paxos).

AspectPaxosRaft
ComplexityHighly abstract, hard to implementDesigned for understandability
LeaderProposer role; any node can proposeStrong leader; only leader proposes
Log orderingEntries can be committed out of orderStrictly ordered (append-only)
Membership changesRequires separate protocolBuilt-in joint consensus
Used byGoogle Spanner, Chubbyetcd, Consul, CockroachDB, TiKV

Leader Election

Many distributed systems need a single leader for coordination: the leader accepts writes, assigns work, or manages a shared resource. Leader election ensures exactly one leader exists at any time and that a new leader is elected when the current one fails.

Raft-Based Election

Nodes vote for a candidate; majority wins. Term numbers prevent split-brain. Used by etcd and Consul. Automatically detects leader failure via heartbeat timeouts.

ZooKeeper Election

Nodes create ephemeral sequential znodes. The node with the lowest sequence number becomes leader. When the leader disconnects, its znode is deleted and the next node takes over.

Bully Algorithm

The node with the highest ID wins. When a node detects the leader is down, it starts an election and bullies lower-ID nodes into accepting it. Simple but not partition-tolerant.

Distributed Locking with ZooKeeper

A distributed lock ensures that only one process across multiple machines can access a shared resource at a time. This is essential for preventing double-spending, avoiding duplicate processing, and coordinating schema migrations.

distributed_lock.py
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
import time

class DistributedLock:
    """Distributed lock using ZooKeeper via Kazoo."""

    def __init__(self, zk_hosts='localhost:2181'):
        self.zk = KazooClient(hosts=zk_hosts)
        self.zk.start()

    def acquire(self, lock_path, timeout=30):
        """Acquire a distributed lock with timeout."""
        lock = Lock(self.zk, lock_path)
        acquired = lock.acquire(timeout=timeout)
        if not acquired:
            raise TimeoutError(f"Could not acquire lock: {lock_path}")
        return lock

    def release(self, lock):
        """Release a distributed lock."""
        lock.release()

    def close(self):
        self.zk.stop()


# Usage: ensure only one worker processes a payment
dl = DistributedLock()

try:
    lock = dl.acquire('/locks/payment/order-12345')
    print("Lock acquired - processing payment")

    # Critical section: process the payment
    process_payment(order_id=12345)

finally:
    dl.release(lock)
    print("Lock released")

dl.close()

Common Pitfall: Lock Expiration

Problem: Process A acquires a lock with a 30-second TTL. A long garbage collection pause makes it take 35 seconds. The lock expires, Process B acquires it, and now both processes are in the critical section simultaneously.

Solution: Use fencing tokens -- monotonically increasing numbers issued with each lock acquisition. The protected resource rejects requests with a fencing token lower than one it has already seen. This prevents stale lock holders from making changes.

Practice Problems

Medium CAP Trade-off Analysis

For each scenario, decide whether to prioritize consistency or availability:

  1. A bank transfer system (debit account A, credit account B)
  2. A social media "like" counter
  3. An inventory system for concert ticket sales

Ask: "What is the cost of showing stale data vs. rejecting a request?" For financial transactions, showing stale balance can lead to double-spending. For like counters, showing 999 instead of 1000 is acceptable.

# 1. Bank transfer: CP (Consistency)
#    - Showing wrong balance = double-spending risk
#    - Better to reject a transfer than allow overdraft
#    - Use: PostgreSQL with synchronous replication

# 2. Social media likes: AP (Availability)
#    - Showing 999 instead of 1000 is fine
#    - Users expect instant response when clicking "like"
#    - Use: Cassandra with eventual consistency

# 3. Concert tickets: CP (Consistency)
#    - Overselling = angry customers, refund costs
#    - Better to show "temporarily unavailable" than oversell
#    - Use: Redis with distributed lock per seat

Hard Raft Failure Scenarios

You have a 5-node Raft cluster. Analyze these failure scenarios:

  1. The leader crashes. How long until a new leader is elected?
  2. Two nodes crash (including the leader). Can the cluster still operate?
  3. A network partition splits the cluster 2-3. Which side can elect a leader?

Quorum for 5 nodes = 3. An election requires majority votes. A partition means each side can only get votes from its own members. Think about which side has enough nodes for a quorum.

# 1. Leader crashes:
# - Followers detect missing heartbeat after election timeout
# - Random timeout: 150-300ms (prevents simultaneous elections)
# - New leader elected in ~300-500ms typically
# - Client retries to new leader after ~1 second total

# 2. Two nodes crash (leader + one follower):
# - 3 remaining nodes = quorum (3/5 = majority)
# - YES, cluster can still elect a leader and accept writes
# - If 3 crash: only 2 left, no quorum, cluster is read-only

# 3. Network partition 2-3:
# - Side with 3 nodes CAN elect a leader (has quorum)
# - Side with 2 nodes CANNOT (no quorum)
# - Old leader on the 2-node side steps down
#   (can't get heartbeat acks from majority)
# - When partition heals, 2-node side rejoins and
#   syncs its log with the new leader

Hard Distributed Lock Service

Design a distributed lock service for a payment processing system:

  1. Ensure at most one payment processor handles each transaction
  2. Handle lock holder crashes (locks must not be held forever)
  3. Implement fencing tokens to prevent stale lock holders from writing

Use a consensus-backed store (etcd or ZooKeeper) for the lock. Set a TTL so crashed holders' locks expire. Issue a monotonically increasing fencing token with each lock acquisition.

class FencedLockService:
    def __init__(self, etcd_client):
        self.client = etcd_client
        self.token_counter = 0

    def acquire(self, resource, ttl=30):
        """Acquire lock with fencing token."""
        self.token_counter += 1
        token = self.token_counter

        # Atomic compare-and-swap in etcd
        success = self.client.put_if_absent(
            key=f"/locks/{resource}",
            value=f"{token}",
            lease=self.client.lease(ttl=ttl)
        )
        if success:
            return token
        raise LockNotAcquired(resource)

    def release(self, resource, token):
        """Release only if we still hold the lock."""
        current = self.client.get(f"/locks/{resource}")
        if current and int(current) == token:
            self.client.delete(f"/locks/{resource}")

# Payment processor uses fencing token
def process_payment(lock_service, db, order_id):
    token = lock_service.acquire(f"payment:{order_id}")
    try:
        # DB rejects if token < last seen token for this order
        db.execute(
            "UPDATE orders SET status='paid' WHERE id=%s AND fence_token < %s",
            (order_id, token)
        )
    finally:
        lock_service.release(f"payment:{order_id}", token)

Quick Reference

SystemAlgorithmQuorumUse Case
etcdRaftN/2 + 1Kubernetes config, service discovery
ZooKeeperZAB (Paxos variant)N/2 + 1Kafka coordination, distributed locks
ConsulRaftN/2 + 1Service mesh, KV store
Google SpannerPaxosN/2 + 1Global SQL database
CockroachDBRaftN/2 + 1Distributed SQL

Key Takeaways

  • The CAP theorem means you must choose between consistency and availability during network partitions
  • Raft breaks consensus into understandable sub-problems: election, replication, safety
  • A quorum (majority) is needed for both elections and commits to ensure safety
  • Distributed locks need TTLs (to handle crashes) and fencing tokens (to handle stale holders)
  • Use odd cluster sizes (3, 5, 7) to avoid split-brain during network partitions
  • Consensus is expensive -- use it for metadata and coordination, not for high-throughput data