Hands-On: Building Communication Patterns
Complete, production-ready code examples you can run and modify.
Example 1: REST API with JWT Authentication
from flask import Flask, jsonify, request
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from functools import wraps
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'your-secret-key' # Change in production
jwt = JWTManager(app)
# Mock database
users = {
"user1": {"password": "pass123", "role": "admin"},
"user2": {"password": "pass456", "role": "user"}
}
products = [
{"id": 1, "name": "Laptop", "price": 999.99},
{"id": 2, "name": "Mouse", "price": 29.99}
]
@app.route('/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
user = users.get(username)
if not user or user['password'] != password:
return jsonify({"error": "Invalid credentials"}), 401
access_token = create_access_token(
identity=username,
additional_claims={"role": user['role']}
)
return jsonify({"access_token": access_token})
@app.route('/products', methods=['GET'])
@jwt_required()
def get_products():
current_user = get_jwt_identity()
return jsonify({
"user": current_user,
"products": products
})
@app.route('/products', methods=['POST'])
@jwt_required()
def create_product():
# Admin-only endpoint
from flask_jwt_extended import get_jwt
claims = get_jwt()
if claims.get('role') != 'admin':
return jsonify({"error": "Admin access required"}), 403
data = request.json
new_product = {
"id": len(products) + 1,
"name": data['name'],
"price": data['price']
}
products.append(new_product)
return jsonify(new_product), 201
if __name__ == '__main__':
app.run(debug=True, port=5000)
# Usage Example:
# 1. Login: POST http://localhost:5000/login
# Body: {"username": "user1", "password": "pass123"}
# Response: {"access_token": "eyJ0eXAi..."}
#
# 2. Get Products: GET http://localhost:5000/products
# Headers: Authorization: Bearer eyJ0eXAi...
#
# 3. Create Product: POST http://localhost:5000/products
# Headers: Authorization: Bearer eyJ0eXAi...
# Body: {"name": "Keyboard", "price": 79.99}
Example 2: gRPC Bi-directional Streaming
syntax = "proto3";
package chat;
service ChatService {
// Bi-directional streaming for real-time chat
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
// Server streaming for chat history
rpc GetHistory(HistoryRequest) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string message = 2;
int64 timestamp = 3;
string room_id = 4;
}
message HistoryRequest {
string room_id = 1;
int32 limit = 2;
}
import grpc
from concurrent import futures
import chat_pb2
import chat_pb2_grpc
import time
from collections import defaultdict
class ChatServicer(chat_pb2_grpc.ChatServiceServicer):
def __init__(self):
self.rooms = defaultdict(list) # room_id -> [messages]
self.active_streams = defaultdict(list) # room_id -> [response_streams]
def Chat(self, request_iterator, context):
"""Bi-directional streaming for real-time chat"""
room_id = None
response_queue = []
for message in request_iterator:
room_id = message.room_id
# Store message
self.rooms[room_id].append(message)
# Broadcast to all clients in this room
for stream_queue in self.active_streams[room_id]:
stream_queue.append(message)
# Add current client's stream
if response_queue not in self.active_streams[room_id]:
self.active_streams[room_id].append(response_queue)
# Yield messages for this client
while response_queue:
yield response_queue.pop(0)
def GetHistory(self, request, context):
"""Server streaming for chat history"""
room_id = request.room_id
limit = request.limit or 100
messages = self.rooms[room_id][-limit:]
for message in messages:
yield message
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
chat_pb2_grpc.add_ChatServiceServicer_to_server(ChatServicer(), server)
server.add_insecure_port('[::]:50051')
print("gRPC Chat Server started on port 50051")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
Example 3: GraphQL Server with Mutations
from flask import Flask
from flask_graphql import GraphQLView
import graphene
from datetime import datetime
# Mock database
products_db = [
{"id": "1", "name": "Laptop", "price": 999.99, "stock": 50},
{"id": "2", "name": "Mouse", "price": 29.99, "stock": 200}
]
class Product(graphene.ObjectType):
id = graphene.ID()
name = graphene.String()
price = graphene.Float()
stock = graphene.Int()
class Query(graphene.ObjectType):
product = graphene.Field(Product, id=graphene.ID(required=True))
products = graphene.List(Product)
search_products = graphene.List(Product, name=graphene.String())
def resolve_product(self, info, id):
return next((p for p in products_db if p['id'] == id), None)
def resolve_products(self, info):
return products_db
def resolve_search_products(self, info, name):
return [p for p in products_db if name.lower() in p['name'].lower()]
class CreateProduct(graphene.Mutation):
class Arguments:
name = graphene.String(required=True)
price = graphene.Float(required=True)
stock = graphene.Int(required=True)
product = graphene.Field(Product)
def mutate(self, info, name, price, stock):
new_id = str(len(products_db) + 1)
product = {"id": new_id, "name": name, "price": price, "stock": stock}
products_db.append(product)
return CreateProduct(product=product)
class UpdateProduct(graphene.Mutation):
class Arguments:
id = graphene.ID(required=True)
name = graphene.String()
price = graphene.Float()
stock = graphene.Int()
product = graphene.Field(Product)
def mutate(self, info, id, name=None, price=None, stock=None):
product = next((p for p in products_db if p['id'] == id), None)
if not product:
raise Exception("Product not found")
if name:
product['name'] = name
if price:
product['price'] = price
if stock is not None:
product['stock'] = stock
return UpdateProduct(product=product)
class Mutation(graphene.ObjectType):
create_product = CreateProduct.Field()
update_product = UpdateProduct.Field()
schema = graphene.Schema(query=Query, mutation=Mutation)
app = Flask(__name__)
app.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True)
)
if __name__ == '__main__':
app.run(debug=True, port=5000)
# Access GraphiQL at http://localhost:5000/graphql
# Example Queries:
# query {
# products {
# id name price stock
# }
# }
#
# mutation {
# createProduct(name: "Keyboard", price: 79.99, stock: 100) {
# product { id name price }
# }
# }
Example 4: Production RabbitMQ with Retry Logic
import pika
import json
import time
from functools import wraps
class RobustRabbitMQConsumer:
def __init__(self):
self.connection = None
self.channel = None
self.setup_connection()
self.setup_queues()
def setup_connection(self):
"""Setup connection with retry logic"""
max_retries = 5
for i in range(max_retries):
try:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
'localhost',
heartbeat=600,
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
print("Connected to RabbitMQ")
return
except pika.exceptions.AMQPConnectionError:
if i < max_retries - 1:
time.sleep(2 ** i) # Exponential backoff
else:
raise
def setup_queues(self):
"""Setup main queue, retry queue, and dead letter queue"""
# Dead Letter Exchange for failed messages
self.channel.exchange_declare(
exchange='dlx',
exchange_type='direct',
durable=True
)
# Dead Letter Queue - messages that failed all retries
self.channel.queue_declare(
queue='failed_orders',
durable=True
)
self.channel.queue_bind(
exchange='dlx',
queue='failed_orders',
routing_key='failed'
)
# Main queue with DLX
self.channel.queue_declare(
queue='orders',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed'
}
)
# Fair dispatch
self.channel.basic_qos(prefetch_count=1)
def process_order(self, order_data):
"""Business logic - may fail"""
print(f"Processing order: {order_data['order_id']}")
# Simulate processing
if order_data.get('should_fail'):
raise Exception("Payment gateway timeout")
# Success
print(f"Order {order_data['order_id']} processed successfully")
return True
def callback(self, ch, method, properties, body):
"""Message handler with retry logic"""
order_data = json.loads(body)
# Track retry count
headers = properties.headers or {}
retry_count = headers.get('x-retry-count', 0)
max_retries = 3
try:
self.process_order(order_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing order: {e}")
if retry_count < max_retries:
# Retry with exponential backoff
retry_count += 1
print(f"Retrying... (attempt {retry_count}/{max_retries})")
# Republish with updated retry count
ch.basic_publish(
exchange='',
routing_key='orders',
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-retry-count': retry_count}
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# Max retries exceeded - send to DLQ
print(f"Max retries exceeded for order {order_data['order_id']}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def start(self):
"""Start consuming messages"""
self.channel.basic_consume(
queue='orders',
on_message_callback=self.callback
)
print("Waiting for orders. Press CTRL+C to exit")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
finally:
if self.connection:
self.connection.close()
if __name__ == '__main__':
consumer = RobustRabbitMQConsumer()
consumer.start()
Try It Yourself
- Install dependencies:
pip install flask flask-jwt-extended grpcio graphene-flask pika kafka-python
- Run RabbitMQ:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:management
- Run Kafka:
docker-compose up -d kafka zookeeper
- Test each example and modify parameters to see how they behave