📚 Theory: Synchronization Primitives
The sync package provides basic synchronization primitives for safe concurrent access to shared resources. These are the building blocks for writing correct concurrent programs.
🔍 Key Concepts
- Mutex: Mutual exclusion lock for protecting shared state
- RWMutex: Reader/Writer mutex allowing multiple readers or one writer
- WaitGroup: Wait for a collection of goroutines to finish
- Once: Perform an action exactly once
- Atomic: Lock-free synchronization for simple values
- Memory Model: Defines happens-before relationships
🔒 Mutexes and Locking
Basic Mutex Usage
// Thread-safe counter with Mutex package main import ( "fmt" "sync" "time" ) type Counter struct { mu sync.Mutex value int } func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() // Always use defer for unlock c.value++ } func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value } // Avoid deadlocks with proper locking order type Account struct { mu sync.Mutex balance int id int } func transfer(from, to *Account, amount int) { // Always lock in consistent order to avoid deadlock if from.id < to.id { from.mu.Lock() defer from.mu.Unlock() to.mu.Lock() defer to.mu.Unlock() } else { to.mu.Lock() defer to.mu.Unlock() from.mu.Lock() defer from.mu.Unlock() } from.balance -= amount to.balance += amount }
RWMutex for Read-Heavy Workloads
type Cache struct { mu sync.RWMutex items map[string]interface{} hits int64 miss int64 } func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() // Multiple readers allowed defer c.mu.RUnlock() val, ok := c.items[key] if ok { atomic.AddInt64(&c.hits, 1) } else { atomic.AddInt64(&c.miss, 1) } return val, ok } func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() // Exclusive write lock defer c.mu.Unlock() if c.items == nil { c.items = make(map[string]interface{}) } c.items[key] = value } // Upgradeable locks pattern func (c *Cache) GetOrCompute(key string, compute func() interface{}) interface{} { // Try with read lock first c.mu.RLock() if val, ok := c.items[key]; ok { c.mu.RUnlock() return val } c.mu.RUnlock() // Upgrade to write lock c.mu.Lock() defer c.mu.Unlock() // Double-check after acquiring write lock if val, ok := c.items[key]; ok { return val } val := compute() c.items[key] = val return val }
⏳ WaitGroup and Coordination
WaitGroup
Wait for multiple goroutines to complete before proceeding.
Cond
Signal waiting goroutines when conditions change.
Once
Execute initialization code exactly once.
WaitGroup Patterns
// Parallel task execution with error handling func parallelTasks(tasks []func() error) []error { var wg sync.WaitGroup errors := make([]error, len(tasks)) for i, task := range tasks { wg.Add(1) go func(index int, fn func() error) { defer wg.Done() errors[index] = fn() }(i, task) } wg.Wait() return errors } // Bounded parallelism func boundedParallel(items []string, maxWorkers int, process func(string)) { var wg sync.WaitGroup semaphore := make(chan struct{}, maxWorkers) for _, item := range items { wg.Add(1) go func(item string) { defer wg.Done() semaphore <- struct{}{} // Acquire defer func() { <-semaphore }() // Release process(item) }(item) } wg.Wait() } // Context-aware WaitGroup type ContextWaitGroup struct { wg sync.WaitGroup ctx context.Context } func (cwg *ContextWaitGroup) Go(fn func(context.Context)) { cwg.wg.Add(1) go func() { defer cwg.wg.Done() fn(cwg.ctx) }() } func (cwg *ContextWaitGroup) Wait() { cwg.wg.Wait() }
⚡ Atomic Operations
Operation | Mutex | Atomic | Use Case |
---|---|---|---|
Simple counter | ~50ns | ~5ns | High-frequency updates |
Complex state | ✅ Flexible | ❌ Limited | Multi-field updates |
Memory overhead | Higher | Minimal | Resource-constrained |
Contention handling | Blocking | Lock-free | High concurrency |
Atomic Operations Examples
import ( "sync/atomic" "unsafe" ) // Atomic counter with methods type AtomicInt64 struct { value int64 } func (a *AtomicInt64) Add(delta int64) int64 { return atomic.AddInt64(&a.value, delta) } func (a *AtomicInt64) Load() int64 { return atomic.LoadInt64(&a.value) } func (a *AtomicInt64) Store(val int64) { atomic.StoreInt64(&a.value, val) } func (a *AtomicInt64) CompareAndSwap(old, new int64) bool { return atomic.CompareAndSwapInt64(&a.value, old, new) } // Atomic boolean flag type AtomicBool struct { flag int32 } func (b *AtomicBool) Set() { atomic.StoreInt32(&b.flag, 1) } func (b *AtomicBool) Unset() { atomic.StoreInt32(&b.flag, 0) } func (b *AtomicBool) IsSet() bool { return atomic.LoadInt32(&b.flag) != 0 } func (b *AtomicBool) SetIfUnset() bool { return atomic.CompareAndSwapInt32(&b.flag, 0, 1) } // atomic.Value for complex types type Config struct { settings atomic.Value // stores *Settings } type Settings struct { MaxConnections int Timeout time.Duration Features []string } func (c *Config) Update(s *Settings) { c.settings.Store(s) } func (c *Config) Load() *Settings { return c.settings.Load().(*Settings) } // Atomic pointer operations type Node struct { value int next *Node } type LockFreeStack struct { head unsafe.Pointer // *Node } func (s *LockFreeStack) Push(val int) { newNode := &Node{value: val} for { oldHead := (*Node)(atomic.LoadPointer(&s.head)) newNode.next = oldHead if atomic.CompareAndSwapPointer( &s.head, unsafe.Pointer(oldHead), unsafe.Pointer(newNode), ) { break } } }
🔄 Advanced Synchronization
sync.Once for Initialization
// Singleton pattern with sync.Once type Singleton struct { data string } var ( instance *Singleton once sync.Once ) func GetInstance() *Singleton { once.Do(func() { fmt.Println("Creating singleton instance") instance = &Singleton{ data: "initialized", } }) return instance } // Lazy initialization with error handling type LazyResource struct { once sync.Once resource *Resource err error } func (l *LazyResource) Get() (*Resource, error) { l.once.Do(func() { l.resource, l.err = initializeResource() }) return l.resource, l.err }
sync.Pool for Object Reuse
// Buffer pool to reduce allocations var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } func processData(data []byte) string { buf := bufferPool.Get().(*bytes.Buffer) defer func() { buf.Reset() bufferPool.Put(buf) }() buf.Write(data) // Process buffer... return buf.String() } // Benchmark showing pool benefits func BenchmarkWithPool(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { data := []byte("test data") _ = processData(data) } }) }
sync.Map for Concurrent Access
// Thread-safe map without explicit locking type SafeCache struct { data sync.Map } func (c *SafeCache) Set(key, value interface{}) { c.data.Store(key, value) } func (c *SafeCache) Get(key interface{}) (interface{}, bool) { return c.data.Load(key) } func (c *SafeCache) GetOrStore(key, value interface{}) (interface{}, bool) { return c.data.LoadOrStore(key, value) } func (c *SafeCache) Delete(key interface{}) { c.data.Delete(key) } func (c *SafeCache) Range(fn func(key, value interface{}) bool) { c.data.Range(fn) }
🏆 Best Practices
✅ DO's
- ✓ Always use
defer
for unlocking mutexes - ✓ Keep critical sections as small as possible
- ✓ Use RWMutex for read-heavy workloads
- ✓ Prefer channels over mutexes for communication
- ✓ Use atomic operations for simple counters
- ✓ Test with
-race
flag - ✓ Document which data is protected by which mutex
❌ DON'Ts
- ✗ Don't copy sync types (pass pointers)
- ✗ Don't call WaitGroup.Add inside goroutines
- ✗ Don't forget to call cancel functions
- ✗ Don't hold locks during I/O operations
- ✗ Don't use busy-waiting instead of proper synchronization
- ✗ Don't ignore the race detector warnings
⚠️ Mutex vs Channel Decision
Use Mutex when:
- Protecting internal state of a struct
- Critical section is short
- Performance is critical
Use Channels when:
- Passing ownership of data
- Coordinating goroutines
- Implementing pipelines
🎯 Practice Exercises
Exercise 1: Thread-Safe LRU Cache
Implement an LRU cache with concurrent access support using sync primitives. Include Get, Set, and eviction logic.
Exercise 2: Read-Write Lock Benchmark
Create benchmarks comparing Mutex vs RWMutex performance with varying read/write ratios.
Exercise 3: Barrier Implementation
Implement a cyclic barrier using sync primitives that allows goroutines to wait for each other at a synchronization point.
Exercise 4: Lock-Free Queue
Build a lock-free queue using atomic operations and compare its performance with a mutex-based implementation.
Challenge: Distributed Counter
Design a high-performance distributed counter that minimizes contention using sharding and periodic aggregation.