🔐 Sync Package in Go

Master synchronization primitives for safe concurrent access. Learn mutexes, atomic operations, and advanced patterns for high-performance Go applications.

📚 Theory: Synchronization Primitives

The sync package provides basic synchronization primitives for safe concurrent access to shared resources. These are the building blocks for writing correct concurrent programs.

Go Sync Primitives Hierarchy sync Mutual Exclusion Mutex RWMutex Coordination WaitGroup Cond One-time Once Advanced Pool Map sync/atomic AddInt32 CAS Value Go Memory Model Happens-Before Relationships • Channel send happens-before receive • Lock happens-before unlock

🔍 Key Concepts

  • Mutex: Mutual exclusion lock for protecting shared state
  • RWMutex: Reader/Writer mutex allowing multiple readers or one writer
  • WaitGroup: Wait for a collection of goroutines to finish
  • Once: Perform an action exactly once
  • Atomic: Lock-free synchronization for simple values
  • Memory Model: Defines happens-before relationships

🔒 Mutexes and Locking

Basic Mutex Usage

// Thread-safe counter with Mutex
package main

import (
    "fmt"
    "sync"
    "time"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock() // Always use defer for unlock
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// Avoid deadlocks with proper locking order
type Account struct {
    mu      sync.Mutex
    balance int
    id      int
}

func transfer(from, to *Account, amount int) {
    // Always lock in consistent order to avoid deadlock
    if from.id < to.id {
        from.mu.Lock()
        defer from.mu.Unlock()
        to.mu.Lock()
        defer to.mu.Unlock()
    } else {
        to.mu.Lock()
        defer to.mu.Unlock()
        from.mu.Lock()
        defer from.mu.Unlock()
    }
    
    from.balance -= amount
    to.balance += amount
}

RWMutex for Read-Heavy Workloads

type Cache struct {
    mu    sync.RWMutex
    items map[string]interface{}
    hits  int64
    miss  int64
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock() // Multiple readers allowed
    defer c.mu.RUnlock()
    
    val, ok := c.items[key]
    if ok {
        atomic.AddInt64(&c.hits, 1)
    } else {
        atomic.AddInt64(&c.miss, 1)
    }
    return val, ok
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock() // Exclusive write lock
    defer c.mu.Unlock()
    
    if c.items == nil {
        c.items = make(map[string]interface{})
    }
    c.items[key] = value
}

// Upgradeable locks pattern
func (c *Cache) GetOrCompute(key string, compute func() interface{}) interface{} {
    // Try with read lock first
    c.mu.RLock()
    if val, ok := c.items[key]; ok {
        c.mu.RUnlock()
        return val
    }
    c.mu.RUnlock()
    
    // Upgrade to write lock
    c.mu.Lock()
    defer c.mu.Unlock()
    
    // Double-check after acquiring write lock
    if val, ok := c.items[key]; ok {
        return val
    }
    
    val := compute()
    c.items[key] = val
    return val
}

⏳ WaitGroup and Coordination

WaitGroup

Wait for multiple goroutines to complete before proceeding.

Coordination
Simple

Cond

Signal waiting goroutines when conditions change.

Advanced
Complex

Once

Execute initialization code exactly once.

Singleton
Thread-safe

WaitGroup Patterns

// Parallel task execution with error handling
func parallelTasks(tasks []func() error) []error {
    var wg sync.WaitGroup
    errors := make([]error, len(tasks))
    
    for i, task := range tasks {
        wg.Add(1)
        
        go func(index int, fn func() error) {
            defer wg.Done()
            errors[index] = fn()
        }(i, task)
    }
    
    wg.Wait()
    return errors
}

// Bounded parallelism
func boundedParallel(items []string, maxWorkers int, process func(string)) {
    var wg sync.WaitGroup
    semaphore := make(chan struct{}, maxWorkers)
    
    for _, item := range items {
        wg.Add(1)
        
        go func(item string) {
            defer wg.Done()
            
            semaphore <- struct{}{} // Acquire
            defer func() { <-semaphore }() // Release
            
            process(item)
        }(item)
    }
    
    wg.Wait()
}

// Context-aware WaitGroup
type ContextWaitGroup struct {
    wg  sync.WaitGroup
    ctx context.Context
}

func (cwg *ContextWaitGroup) Go(fn func(context.Context)) {
    cwg.wg.Add(1)
    go func() {
        defer cwg.wg.Done()
        fn(cwg.ctx)
    }()
}

func (cwg *ContextWaitGroup) Wait() {
    cwg.wg.Wait()
}

⚡ Atomic Operations

Operation Mutex Atomic Use Case
Simple counter ~50ns ~5ns High-frequency updates
Complex state ✅ Flexible ❌ Limited Multi-field updates
Memory overhead Higher Minimal Resource-constrained
Contention handling Blocking Lock-free High concurrency

Atomic Operations Examples

import (
    "sync/atomic"
    "unsafe"
)

// Atomic counter with methods
type AtomicInt64 struct {
    value int64
}

func (a *AtomicInt64) Add(delta int64) int64 {
    return atomic.AddInt64(&a.value, delta)
}

func (a *AtomicInt64) Load() int64 {
    return atomic.LoadInt64(&a.value)
}

func (a *AtomicInt64) Store(val int64) {
    atomic.StoreInt64(&a.value, val)
}

func (a *AtomicInt64) CompareAndSwap(old, new int64) bool {
    return atomic.CompareAndSwapInt64(&a.value, old, new)
}

// Atomic boolean flag
type AtomicBool struct {
    flag int32
}

func (b *AtomicBool) Set() {
    atomic.StoreInt32(&b.flag, 1)
}

func (b *AtomicBool) Unset() {
    atomic.StoreInt32(&b.flag, 0)
}

func (b *AtomicBool) IsSet() bool {
    return atomic.LoadInt32(&b.flag) != 0
}

func (b *AtomicBool) SetIfUnset() bool {
    return atomic.CompareAndSwapInt32(&b.flag, 0, 1)
}

// atomic.Value for complex types
type Config struct {
    settings atomic.Value // stores *Settings
}

type Settings struct {
    MaxConnections int
    Timeout        time.Duration
    Features       []string
}

func (c *Config) Update(s *Settings) {
    c.settings.Store(s)
}

func (c *Config) Load() *Settings {
    return c.settings.Load().(*Settings)
}

// Atomic pointer operations
type Node struct {
    value int
    next  *Node
}

type LockFreeStack struct {
    head unsafe.Pointer // *Node
}

func (s *LockFreeStack) Push(val int) {
    newNode := &Node{value: val}
    for {
        oldHead := (*Node)(atomic.LoadPointer(&s.head))
        newNode.next = oldHead
        if atomic.CompareAndSwapPointer(
            &s.head,
            unsafe.Pointer(oldHead),
            unsafe.Pointer(newNode),
        ) {
            break
        }
    }
}

🔄 Advanced Synchronization

sync.Once for Initialization

// Singleton pattern with sync.Once
type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        fmt.Println("Creating singleton instance")
        instance = &Singleton{
            data: "initialized",
        }
    })
    return instance
}

// Lazy initialization with error handling
type LazyResource struct {
    once     sync.Once
    resource *Resource
    err      error
}

func (l *LazyResource) Get() (*Resource, error) {
    l.once.Do(func() {
        l.resource, l.err = initializeResource()
    })
    return l.resource, l.err
}

sync.Pool for Object Reuse

// Buffer pool to reduce allocations
var bufferPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func processData(data []byte) string {
    buf := bufferPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufferPool.Put(buf)
    }()
    
    buf.Write(data)
    // Process buffer...
    return buf.String()
}

// Benchmark showing pool benefits
func BenchmarkWithPool(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            data := []byte("test data")
            _ = processData(data)
        }
    })
}

sync.Map for Concurrent Access

// Thread-safe map without explicit locking
type SafeCache struct {
    data sync.Map
}

func (c *SafeCache) Set(key, value interface{}) {
    c.data.Store(key, value)
}

func (c *SafeCache) Get(key interface{}) (interface{}, bool) {
    return c.data.Load(key)
}

func (c *SafeCache) GetOrStore(key, value interface{}) (interface{}, bool) {
    return c.data.LoadOrStore(key, value)
}

func (c *SafeCache) Delete(key interface{}) {
    c.data.Delete(key)
}

func (c *SafeCache) Range(fn func(key, value interface{}) bool) {
    c.data.Range(fn)
}

🏆 Best Practices

✅ DO's

  • ✓ Always use defer for unlocking mutexes
  • ✓ Keep critical sections as small as possible
  • ✓ Use RWMutex for read-heavy workloads
  • ✓ Prefer channels over mutexes for communication
  • ✓ Use atomic operations for simple counters
  • ✓ Test with -race flag
  • ✓ Document which data is protected by which mutex

❌ DON'Ts

  • ✗ Don't copy sync types (pass pointers)
  • ✗ Don't call WaitGroup.Add inside goroutines
  • ✗ Don't forget to call cancel functions
  • ✗ Don't hold locks during I/O operations
  • ✗ Don't use busy-waiting instead of proper synchronization
  • ✗ Don't ignore the race detector warnings

⚠️ Mutex vs Channel Decision

Use Mutex when:

  • Protecting internal state of a struct
  • Critical section is short
  • Performance is critical

Use Channels when:

  • Passing ownership of data
  • Coordinating goroutines
  • Implementing pipelines

🎯 Practice Exercises

Exercise 1: Thread-Safe LRU Cache

Implement an LRU cache with concurrent access support using sync primitives. Include Get, Set, and eviction logic.

Exercise 2: Read-Write Lock Benchmark

Create benchmarks comparing Mutex vs RWMutex performance with varying read/write ratios.

Exercise 3: Barrier Implementation

Implement a cyclic barrier using sync primitives that allows goroutines to wait for each other at a synchronization point.

Exercise 4: Lock-Free Queue

Build a lock-free queue using atomic operations and compare its performance with a mutex-based implementation.

Challenge: Distributed Counter

Design a high-performance distributed counter that minimizes contention using sharding and periodic aggregation.