🎨 Concurrency Patterns in Go

Master advanced concurrency patterns for building scalable, maintainable systems. Learn worker pools, pipelines, rate limiting, and production-ready patterns.

📚 Pattern Overview

Concurrency patterns are reusable solutions to common problems in concurrent programming. They help manage complexity, improve performance, and ensure correctness in multi-threaded environments.

Concurrency Patterns Landscape Worker Pool Workers Job Queue Pipeline Stage 1 Stage 2 Stage 3 Stage 4 Fan-In/Out Rate Limiter Token Bucket 5 req/sec Circuit Breaker Closed Half Open States Semaphore Limit: 3 Pub-Sub Broker Publisher Subscribers Pattern Categories Work Distribution: • Worker Pool - Fixed workers process jobs • Fan-Out - Distribute to multiple workers Data Processing: • Pipeline - Sequential stages • Fan-In - Merge multiple sources Flow Control: • Rate Limiting - Control throughput • Circuit Breaker - Fault tolerance Communication: • Pub-Sub - Event broadcasting • Request-Reply - Bidirectional Resource Management: • Semaphore - Limit concurrent access • Object Pool - Reuse resources

👷 Worker Pool Pattern

Manage a fixed number of goroutines to process work from a queue, controlling resource usage and preventing goroutine explosion.

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Job represents a unit of work
type Job struct {
    ID      int
    Data    interface{}
    Handler func(context.Context, interface{}) error
}

// Result contains job execution results
type Result struct {
    JobID    int
    Output   interface{}
    Error    error
    Duration time.Duration
}

// WorkerPool manages concurrent workers
type WorkerPool struct {
    workers    int
    jobQueue   chan Job
    results    chan Result
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(ctx context.Context, workers int, queueSize int) *WorkerPool {
    poolCtx, cancel := context.WithCancel(ctx)
    
    wp := &WorkerPool{
        workers:  workers,
        jobQueue: make(chan Job, queueSize),
        results:  make(chan Result, queueSize),
        ctx:      poolCtx,
        cancel:   cancel,
    }
    
    wp.start()
    return wp
}

func (wp *WorkerPool) start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    for {
        select {
        case job, ok := <-wp.jobQueue:
            if !ok {
                return
            }
            
            start := time.Now()
            output, err := wp.executeJob(job)
            
            wp.results <- Result{
                JobID:    job.ID,
                Output:   output,
                Error:    err,
                Duration: time.Since(start),
            }
            
        case <-wp.ctx.Done():
            return
        }
    }
}

func (wp *WorkerPool) executeJob(job Job) (interface{}, error) {
    // Create timeout context for job
    jobCtx, cancel := context.WithTimeout(wp.ctx, 30*time.Second)
    defer cancel()
    
    // Execute with panic recovery
    var result interface{}
    var err error
    
    done := make(chan bool)
    go func() {
        defer func() {
            if r := recover(); r != nil {
                err = fmt.Errorf("panic: %v", r)
            }
            done <- true
        }()
        
        err = job.Handler(jobCtx, job.Data)
    }()
    
    select {
    case <-done:
        return result, err
    case <-jobCtx.Done():
        return nil, jobCtx.Err()
    }
}

// Submit adds a job to the queue
func (wp *WorkerPool) Submit(job Job) error {
    select {
    case wp.jobQueue <- job:
        return nil
    case <-wp.ctx.Done():
        return wp.ctx.Err()
    default:
        return fmt.Errorf("job queue full")
    }
}

// Shutdown gracefully stops the worker pool
func (wp *WorkerPool) Shutdown() {
    close(wp.jobQueue)
    wp.wg.Wait()
    close(wp.results)
    wp.cancel()
}

🔄 Pipeline Pattern

Sequential Processing

Each stage processes data and passes it to the next stage.

Composable
Clean

Concurrent Stages

Each stage runs in its own goroutine for parallel processing.

Performance
Complex

Error Handling

Propagate errors through dedicated error channel.

Robust
Essential
// Pipeline with error handling and cancellation
type Pipeline struct {
    stages []Stage
    ctx    context.Context
    cancel context.CancelFunc
}

type Stage func(context.Context, <-chan interface{}) <-chan interface{}

func NewPipeline(ctx context.Context) *Pipeline {
    pipeCtx, cancel := context.WithCancel(ctx)
    return &Pipeline{
        ctx:    pipeCtx,
        cancel: cancel,
    }
}

func (p *Pipeline) AddStage(stage Stage) {
    p.stages = append(p.stages, stage)
}

func (p *Pipeline) Run(input <-chan interface{}) <-chan interface{} {
    var output <-chan interface{} = input
    
    for _, stage := range p.stages {
        output = stage(p.ctx, output)
    }
    
    return output
}

// Example stages
func filterStage(predicate func(interface{}) bool) Stage {
    return func(ctx context.Context, in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
        
        go func() {
            defer close(out)
            
            for {
                select {
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    if predicate(val) {
                        select {
                        case out <- val:
                        case <-ctx.Done():
                            return
                        }
                    }
                case <-ctx.Done():
                    return
                }
            }
        }()
        
        return out
    }
}

func transformStage(transform func(interface{}) interface{}) Stage {
    return func(ctx context.Context, in <-chan interface{}) <-chan interface{} {
        out := make(chan interface{})
        
        go func() {
            defer close(out)
            
            for {
                select {
                case val, ok := <-in:
                    if !ok {
                        return
                    }
                    result := transform(val)
                    select {
                    case out <- result:
                    case <-ctx.Done():
                        return
                    }
                case <-ctx.Done():
                    return
                }
            }
        }()
        
        return out
    }
}

🚦 Rate Limiting Patterns

Algorithm Characteristics Use Case
Token Bucket Allows bursts, refills at constant rate API rate limiting
Leaky Bucket Smooth output rate, no bursts Network traffic shaping
Sliding Window Accurate rate calculation Real-time monitoring
Fixed Window Simple, resets at intervals Billing periods
// Token Bucket Rate Limiter
type TokenBucket struct {
    tokens    chan struct{}
    maxTokens int
    refillRate time.Duration
    stop      chan struct{}
}

func NewTokenBucket(maxTokens int, refillRate time.Duration) *TokenBucket {
    tb := &TokenBucket{
        tokens:     make(chan struct{}, maxTokens),
        maxTokens:  maxTokens,
        refillRate: refillRate,
        stop:       make(chan struct{}),
    }
    
    // Fill initial tokens
    for i := 0; i < maxTokens; i++ {
        tb.tokens <- struct{}{}
    }
    
    go tb.refill()
    return tb
}

func (tb *TokenBucket) refill() {
    ticker := time.NewTicker(tb.refillRate)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            select {
            case tb.tokens <- struct{}{}:
            default: // Bucket full
            }
        case <-tb.stop:
            return
        }
    }
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait(ctx context.Context) error {
    select {
    case <-tb.tokens:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

🏆 Best Practices

✅ Pattern Selection Guidelines

  • ✓ Use Worker Pool for CPU-bound tasks with fixed concurrency
  • ✓ Use Pipeline for sequential data transformations
  • ✓ Use Fan-In/Out for parallel processing with aggregation
  • ✓ Use Rate Limiting to protect downstream services
  • ✓ Use Circuit Breaker for fault tolerance
  • ✓ Always provide graceful shutdown mechanisms

❌ Common Mistakes

  • ✗ Creating unlimited goroutines
  • ✗ Not handling context cancellation
  • ✗ Ignoring backpressure in pipelines
  • ✗ Missing error propagation
  • ✗ Not closing channels properly
  • ✗ Deadlocks from improper channel usage

🎯 Practice Exercises

Exercise 1: Advanced Worker Pool

Extend the worker pool with priority queues, dynamic worker scaling, and metrics collection.

Exercise 2: Multi-Stage Pipeline

Build a data processing pipeline with error handling, retries, and progress tracking.

Exercise 3: Adaptive Rate Limiter

Implement a rate limiter that adjusts limits based on system load and response times.

Exercise 4: Circuit Breaker

Create a circuit breaker with half-open state, configurable thresholds, and event logging.

Challenge: Stream Processor

Design a real-time stream processing system combining multiple patterns: fan-out for distribution, pipelines for transformation, and rate limiting for flow control.