Hands-On Implementation
Complete, production-ready implementations of core patterns.
Saga Pattern - Complete Orchestration
class SagaStep:
def __init__(self, action, compensation):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
def __init__(self):
self.steps = []
self.completed_steps = []
def add_step(self, action, compensation):
self.steps.append(SagaStep(action, compensation))
def execute(self):
try:
for step in self.steps:
result = step.action()
self.completed_steps.append(step)
return {"status": "success"}
except Exception as e:
# Compensate in reverse order
for step in reversed(self.completed_steps):
try:
step.compensation()
except Exception as comp_error:
print(f"Compensation failed: {comp_error}")
raise e
# Order Creation Saga
def create_order_saga(order_data):
saga = SagaOrchestrator()
# Step 1: Create Order
saga.add_step(
action=lambda: order_service.create(order_data),
compensation=lambda: order_service.cancel(order_id)
)
# Step 2: Reserve Inventory
saga.add_step(
action=lambda: inventory_service.reserve(order_data['items']),
compensation=lambda: inventory_service.release(order_data['items'])
)
# Step 3: Process Payment
saga.add_step(
action=lambda: payment_service.charge(order_data['total']),
compensation=lambda: payment_service.refund(transaction_id)
)
# Step 4: Ship Order
saga.add_step(
action=lambda: shipping_service.create_shipment(order_id),
compensation=lambda: shipping_service.cancel_shipment(shipment_id)
)
return saga.execute()
CQRS with Event-Driven Updates
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
# Command Side (Write Model)
@dataclass
class CreateOrderCommand:
customer_id: str
items: list
total: float
class OrderCommandHandler:
def __init__(self, event_bus):
self.event_bus = event_bus
def handle(self, command: CreateOrderCommand):
# Create order in write database
order = Order(
id=generate_id(),
customer_id=command.customer_id,
items=command.items,
total=command.total,
status="PENDING",
created_at=datetime.now()
)
write_db.orders.insert(order)
# Publish event for read model
self.event_bus.publish(OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
total=order.total,
timestamp=order.created_at
))
return order.id
# Query Side (Read Model)
class OrderQueryHandler:
def get_order_summary(self, order_id):
# Query optimized read model
return read_db.order_summaries.find_one({"order_id": order_id})
def get_customer_orders(self, customer_id):
# Denormalized view for fast queries
return read_db.customer_orders.find({"customer_id": customer_id})
# Event Handler - Updates Read Model
class OrderEventHandler:
def on_order_created(self, event: OrderCreatedEvent):
# Update denormalized read model
read_db.order_summaries.insert({
"order_id": event.order_id,
"customer_id": event.customer_id,
"total": event.total,
"status": "PENDING",
"created_at": event.timestamp
})
read_db.customer_orders.insert({
"customer_id": event.customer_id,
"order_id": event.order_id,
"total": event.total
})
Java Circuit Breaker with Resilience4j
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import java.time.Duration;
public class PaymentService {
private final CircuitBreaker circuitBreaker;
public PaymentService() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 50% failure rate triggers open
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10) // Last 10 calls
.permittedNumberOfCallsInHalfOpenState(3)
.build();
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
this.circuitBreaker = registry.circuitBreaker("paymentService");
}
public PaymentResult processPayment(Order order) {
return circuitBreaker.executeSupplier(() -> {
// Call external payment gateway
return externalPaymentGateway.charge(order.getTotal());
});
}
public PaymentResult processPaymentWithFallback(Order order) {
return circuitBreaker.executeSupplier(
() -> externalPaymentGateway.charge(order.getTotal()),
throwable -> {
// Fallback: Queue payment for later processing
paymentQueue.enqueue(order);
return new PaymentResult(Status.QUEUED, "Payment queued");
}
);
}
}
Node.js API Gateway with Rate Limiting
const express = require('express');
const rateLimit = require('express-rate-limit');
const axios = require('axios');
const app = express();
app.use(express.json());
// Rate limiting middleware
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Max 100 requests per window
message: 'Too many requests from this IP'
});
app.use('/api/', limiter);
// Service registry
const services = {
users: 'http://user-service:3001',
products: 'http://product-service:3002',
orders: 'http://order-service:3003'
};
// Route requests to appropriate service
app.all('/api/:service/*', async (req, res) => {
const serviceName = req.params.service;
const serviceUrl = services[serviceName];
if (!serviceUrl) {
return res.status(404).json({ error: 'Service not found' });
}
const targetUrl = req.url.replace(`/api/${serviceName}`, '');
try {
const response = await axios({
method: req.method,
url: serviceUrl + targetUrl,
data: req.body,
headers: {
'Authorization': req.headers.authorization
},
timeout: 5000
});
res.status(response.status).json(response.data);
} catch (error) {
if (error.code === 'ECONNABORTED') {
res.status(504).json({ error: 'Service timeout' });
} else {
res.status(500).json({ error: 'Service unavailable' });
}
}
});
app.listen(8000, () => console.log('API Gateway running on port 8000'));
Event Sourcing with Go
package main
import (
"time"
)
type Event struct {
ID string
AggregateID string
Type string
Data map[string]interface{}
Timestamp time.Time
}
type EventStore struct {
events []Event
}
func NewEventStore() *EventStore {
return &EventStore{events: make([]Event, 0)}
}
func (es *EventStore) AppendEvent(event Event) {
event.Timestamp = time.Now()
es.events = append(es.events, event)
}
func (es *EventStore) GetEvents(aggregateID string) []Event {
result := make([]Event, 0)
for _, event := range es.events {
if event.AggregateID == aggregateID {
result = append(result, event)
}
}
return result
}
// Account Aggregate
type Account struct {
ID string
Balance float64
}
func (a *Account) ApplyEvent(event Event) {
switch event.Type {
case "DEPOSITED":
a.Balance += event.Data["amount"].(float64)
case "WITHDRAWN":
a.Balance -= event.Data["amount"].(float64)
}
}
func ReconstructAccount(accountID string, store *EventStore) *Account {
account := &Account{ID: accountID, Balance: 0}
events := store.GetEvents(accountID)
for _, event := range events {
account.ApplyEvent(event)
}
return account
}
// Usage
func main() {
store := NewEventStore()
store.AppendEvent(Event{
AggregateID: "acc-123",
Type: "DEPOSITED",
Data: map[string]interface{}{"amount": 100.0},
})
store.AppendEvent(Event{
AggregateID: "acc-123",
Type: "WITHDRAWN",
Data: map[string]interface{}{"amount": 30.0},
})
account := ReconstructAccount("acc-123", store)
// account.Balance = 70.0
}
Try It Yourself
Clone these examples and experiment:
- Modify the Circuit Breaker thresholds and observe behavior
- Add more steps to the Saga and test compensation logic
- Implement a simple rate limiter using the Token Bucket algorithm
- Create event sourcing for a shopping cart aggregate