📚 Pattern Overview
Concurrency patterns are reusable solutions to common problems in concurrent programming. They help manage complexity, improve performance, and ensure correctness in multi-threaded environments.
👷 Worker Pool Pattern
Manage a fixed number of goroutines to process work from a queue, controlling resource usage and preventing goroutine explosion.
package main import ( "context" "fmt" "sync" "time" ) // Job represents a unit of work type Job struct { ID int Data interface{} Handler func(context.Context, interface{}) error } // Result contains job execution results type Result struct { JobID int Output interface{} Error error Duration time.Duration } // WorkerPool manages concurrent workers type WorkerPool struct { workers int jobQueue chan Job results chan Result ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewWorkerPool creates a new worker pool func NewWorkerPool(ctx context.Context, workers int, queueSize int) *WorkerPool { poolCtx, cancel := context.WithCancel(ctx) wp := &WorkerPool{ workers: workers, jobQueue: make(chan Job, queueSize), results: make(chan Result, queueSize), ctx: poolCtx, cancel: cancel, } wp.start() return wp } func (wp *WorkerPool) start() { for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go wp.worker(i) } } func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() for { select { case job, ok := <-wp.jobQueue: if !ok { return } start := time.Now() output, err := wp.executeJob(job) wp.results <- Result{ JobID: job.ID, Output: output, Error: err, Duration: time.Since(start), } case <-wp.ctx.Done(): return } } } func (wp *WorkerPool) executeJob(job Job) (interface{}, error) { // Create timeout context for job jobCtx, cancel := context.WithTimeout(wp.ctx, 30*time.Second) defer cancel() // Execute with panic recovery var result interface{} var err error done := make(chan bool) go func() { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic: %v", r) } done <- true }() err = job.Handler(jobCtx, job.Data) }() select { case <-done: return result, err case <-jobCtx.Done(): return nil, jobCtx.Err() } } // Submit adds a job to the queue func (wp *WorkerPool) Submit(job Job) error { select { case wp.jobQueue <- job: return nil case <-wp.ctx.Done(): return wp.ctx.Err() default: return fmt.Errorf("job queue full") } } // Shutdown gracefully stops the worker pool func (wp *WorkerPool) Shutdown() { close(wp.jobQueue) wp.wg.Wait() close(wp.results) wp.cancel() }
🔄 Pipeline Pattern
Sequential Processing
Each stage processes data and passes it to the next stage.
Concurrent Stages
Each stage runs in its own goroutine for parallel processing.
Error Handling
Propagate errors through dedicated error channel.
// Pipeline with error handling and cancellation type Pipeline struct { stages []Stage ctx context.Context cancel context.CancelFunc } type Stage func(context.Context, <-chan interface{}) <-chan interface{} func NewPipeline(ctx context.Context) *Pipeline { pipeCtx, cancel := context.WithCancel(ctx) return &Pipeline{ ctx: pipeCtx, cancel: cancel, } } func (p *Pipeline) AddStage(stage Stage) { p.stages = append(p.stages, stage) } func (p *Pipeline) Run(input <-chan interface{}) <-chan interface{} { var output <-chan interface{} = input for _, stage := range p.stages { output = stage(p.ctx, output) } return output } // Example stages func filterStage(predicate func(interface{}) bool) Stage { return func(ctx context.Context, in <-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) for { select { case val, ok := <-in: if !ok { return } if predicate(val) { select { case out <- val: case <-ctx.Done(): return } } case <-ctx.Done(): return } } }() return out } } func transformStage(transform func(interface{}) interface{}) Stage { return func(ctx context.Context, in <-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) for { select { case val, ok := <-in: if !ok { return } result := transform(val) select { case out <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() return out } }
🚦 Rate Limiting Patterns
Algorithm | Characteristics | Use Case |
---|---|---|
Token Bucket | Allows bursts, refills at constant rate | API rate limiting |
Leaky Bucket | Smooth output rate, no bursts | Network traffic shaping |
Sliding Window | Accurate rate calculation | Real-time monitoring |
Fixed Window | Simple, resets at intervals | Billing periods |
// Token Bucket Rate Limiter type TokenBucket struct { tokens chan struct{} maxTokens int refillRate time.Duration stop chan struct{} } func NewTokenBucket(maxTokens int, refillRate time.Duration) *TokenBucket { tb := &TokenBucket{ tokens: make(chan struct{}, maxTokens), maxTokens: maxTokens, refillRate: refillRate, stop: make(chan struct{}), } // Fill initial tokens for i := 0; i < maxTokens; i++ { tb.tokens <- struct{}{} } go tb.refill() return tb } func (tb *TokenBucket) refill() { ticker := time.NewTicker(tb.refillRate) defer ticker.Stop() for { select { case <-ticker.C: select { case tb.tokens <- struct{}{}: default: // Bucket full } case <-tb.stop: return } } } func (tb *TokenBucket) Allow() bool { select { case <-tb.tokens: return true default: return false } } func (tb *TokenBucket) Wait(ctx context.Context) error { select { case <-tb.tokens: return nil case <-ctx.Done(): return ctx.Err() } }
🏆 Best Practices
✅ Pattern Selection Guidelines
- ✓ Use Worker Pool for CPU-bound tasks with fixed concurrency
- ✓ Use Pipeline for sequential data transformations
- ✓ Use Fan-In/Out for parallel processing with aggregation
- ✓ Use Rate Limiting to protect downstream services
- ✓ Use Circuit Breaker for fault tolerance
- ✓ Always provide graceful shutdown mechanisms
❌ Common Mistakes
- ✗ Creating unlimited goroutines
- ✗ Not handling context cancellation
- ✗ Ignoring backpressure in pipelines
- ✗ Missing error propagation
- ✗ Not closing channels properly
- ✗ Deadlocks from improper channel usage
🎯 Practice Exercises
Exercise 1: Advanced Worker Pool
Extend the worker pool with priority queues, dynamic worker scaling, and metrics collection.
Exercise 2: Multi-Stage Pipeline
Build a data processing pipeline with error handling, retries, and progress tracking.
Exercise 3: Adaptive Rate Limiter
Implement a rate limiter that adjusts limits based on system load and response times.
Exercise 4: Circuit Breaker
Create a circuit breaker with half-open state, configurable thresholds, and event logging.
Challenge: Stream Processor
Design a real-time stream processing system combining multiple patterns: fan-out for distribution, pipelines for transformation, and rate limiting for flow control.