How Consumers Work
A Kafka consumer reads messages from topics. Unlike traditional queues where the broker pushes messages to consumers, Kafka uses a pull model — consumers ask for messages when they're ready. This gives consumers full control over their pace and allows them to re-read old messages by resetting their offset.
The Poll Loop
Every Kafka consumer follows the same basic pattern: subscribe to topics, poll for messages in a loop, process each message, and commit the offset to record progress.
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-application',
'auto.offset.reset': 'earliest', # Start from beginning if no committed offset
})
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(timeout=1.0) # Wait up to 1 second for messages
if msg is None:
continue # No message available
if msg.error():
print(f"Error: {msg.error()}")
continue
# Process the message
print(f"Partition: {msg.partition()}, Offset: {msg.offset()}")
print(f"Key: {msg.key()}, Value: {msg.value().decode('utf-8')}")
finally:
consumer.close() # Commit final offsets and leave consumer group
Offset Management
The offset tracks where each consumer is in each partition. When a consumer commits offset 42, it's saying "I've successfully processed all messages up to and including 42." If the consumer crashes and restarts, it resumes from offset 43.
| Strategy | Setting | Behavior | Risk |
|---|---|---|---|
| Auto commit | enable.auto.commit=true | Commits every 5 seconds automatically | May lose messages if crash happens between commit and processing |
| Manual sync | consumer.commit() | Commit after processing each batch | Slower but safer — at-least-once delivery |
| Manual async | consumer.commit(asynchronous=True) | Commit without waiting for confirmation | Faster but may duplicate on failure |
⚠️ Common Mistake: Auto-commit with slow processing
Wrong: Using auto-commit (default) when message processing takes 30+ seconds.
Why: Auto-commit happens every 5 seconds. If your consumer commits offset 100 but crashes while processing message 95, messages 95-100 are lost — the consumer resumes from 101.
Instead: Disable auto-commit and commit manually after processing: consumer.commit()
auto.offset.reset
What happens when a consumer starts for the first time (no committed offset exists)?
earliest— Start reading from the beginning of the topic. Use this when you need to process all historical data.latest— Start reading only new messages (produced after the consumer started). Use this for real-time processing where history doesn't matter.none— Throw an error if no offset exists. Use this when you want to explicitly manage starting positions.
earliest for data pipelines and ETL (need all data). Use latest for real-time monitoring and alerts (only care about new events). Always disable auto-commit for critical processing.Consumer Rebalancing
When a consumer joins or leaves a group, Kafka rebalances — it reassigns partitions among the remaining consumers. During rebalancing, no messages are processed. Frequent rebalancing kills throughput.
Common causes of unnecessary rebalancing:
- Processing takes longer than
max.poll.interval.ms(default 5 min) — Kafka thinks the consumer is dead - Consumer instances scaling up/down frequently
- Network issues causing heartbeat timeouts
🔍 Deep Dive: Cooperative Incremental Rebalancing
Traditional "eager" rebalancing revokes ALL partitions from ALL consumers during rebalancing, causing a full stop-the-world pause. Cooperative incremental rebalancing (Kafka 2.4+) only moves the specific partitions that need to change, minimizing downtime. Enable it with partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor.
Start from beginning
Start from now
Error if no offset
Practice Exercises
Easy Hello World Variant
Modify the example to accept user input and print a personalized greeting.
Easy Code Reading
Read through the code examples above and predict the output before running them.
Medium Extend the Example
Take one code example and add error handling, input validation, or a new feature.