Python Kafka Libraries
| Library | Type | Performance | Best For |
|---|---|---|---|
confluent-kafka | C wrapper (librdkafka) | Excellent | Production applications |
kafka-python | Pure Python | Good | Simple scripts, learning |
faust | Stream processing | Good | Python Kafka Streams alternative |
pip install confluent-kafka
Producer Example
from confluent_kafka import Producer
import json
def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}')
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
})
# Send events
for i in range(10):
event = {'user_id': f'user_{i}', 'action': 'page_view', 'page': '/home'}
producer.produce(
topic='user-events',
key=f'user_{i}',
value=json.dumps(event),
callback=delivery_report,
)
producer.flush() # Wait for all deliveries
Consumer Example
from confluent_kafka import Consumer
import json
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-consumer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit for safety
})
consumer.subscribe(['user-events'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f'Error: {msg.error()}')
continue
event = json.loads(msg.value())
print(f"User: {event['user_id']}, Action: {event['action']}")
consumer.commit() # Commit after processing
finally:
consumer.close()
Key Takeaway: Use
confluent-kafka for production Python applications. Always use manual commit (enable.auto.commit=False) for reliable processing. Use delivery_report callbacks to catch send failures.Practice Exercises
Hard Production Scenario
Design a solution using these concepts for a real-world production system.
Hard Performance Analysis
Benchmark two different approaches and explain which is better and why.