📡 Channels in Go

Master goroutine communication with channels. Learn synchronization patterns, select statements, and build robust concurrent systems.

📚 Theory: Understanding Channels

Channels are the pipes that connect concurrent goroutines. They allow you to pass values between goroutines with synchronization, ensuring safe communication without explicit locks or condition variables.

Channel Communication Flow Unbuffered Channel Goroutine 1 Sender ch Goroutine 2 Receiver Blocks until both ready Buffered Channel (capacity: 3) Goroutine 1 Sender buffer Goroutine 2 Receiver Non-blocking until full Select Statement Select Multiplexer ch1 ch2 ch3 ch4 Waits on multiple channels Channel States Open Closed Nil Blocked

🔍 Channel Axioms

  • Send on nil channel: Blocks forever
  • Receive from nil channel: Blocks forever
  • Send on closed channel: Panic!
  • Receive from closed channel: Returns zero value immediately
  • Close nil channel: Panic!
  • Close already closed channel: Panic!

🎯 Channel Fundamentals

Creating and Using Channels

// Channel creation and basic operations
package main

import (
    "fmt"
    "time"
)

func main() {
    // Creating channels
    unbuffered := make(chan int)        // Unbuffered channel
    buffered := make(chan string, 5)   // Buffered with capacity 5
    
    // Channel directions in function signatures
    var sendOnly chan<- int = unbuffered    // Send-only
    var receiveOnly <-chan int = unbuffered // Receive-only
    
    // Basic send and receive
    go func() {
        unbuffered <- 42 // Send value
    }()
    value := <-unbuffered // Receive value
    fmt.Println("Received:", value)
    
    // Check channel state
    select {
    case buffered <- "test":
        fmt.Println("Sent without blocking")
    default:
        fmt.Println("Would block")
    }
}

// Direction-restricted functions
func send(ch chan<- int, value int) {
    ch <- value // Can only send
}

func receive(ch <-chan int) int {
    return <-ch // Can only receive
}

Unbuffered Channels

  • Synchronous communication
  • Sender blocks until receiver ready
  • Receiver blocks until sender ready
  • Guarantees handoff occurred
  • Zero capacity
Synchronous
Blocking

Buffered Channels

  • Asynchronous communication
  • Sender blocks only when buffer full
  • Receiver blocks only when buffer empty
  • Decouples sender and receiver
  • Configurable capacity
Asynchronous
Non-blocking

⚡ Select Statement and Patterns

Select for Non-blocking Operations

package main

import (
    "fmt"
    "time"
)

func selectPatterns() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    // Multiple producers
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from ch2"
    }()
    
    // Select waits on multiple channels
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received:", msg2)
        case <-time.After(3 * time.Second):
            fmt.Println("Timeout!")
            return
        }
    }
}

// Non-blocking channel operations
func nonBlockingOps() {
    messages := make(chan string, 1)
    signals := make(chan bool)
    
    // Non-blocking receive
    select {
    case msg := <-messages:
        fmt.Println("Received message:", msg)
    default:
        fmt.Println("No message received")
    }
    
    // Non-blocking send
    msg := "hi"
    select {
    case messages <- msg:
        fmt.Println("Sent message:", msg)
    default:
        fmt.Println("No message sent")
    }
    
    // Multi-way non-blocking select
    select {
    case msg := <-messages:
        fmt.Println("Received message:", msg)
    case sig := <-signals:
        fmt.Println("Received signal:", sig)
    default:
        fmt.Println("No activity")
    }
}

Priority Select Pattern

// Priority select - prefer one channel over another
func prioritySelect(highPriority, lowPriority <-chan string) string {
    // Try high priority first
    select {
    case msg := <-highPriority:
        return msg
    default:
        // Fall through to check both
    }
    
    // Check both channels
    select {
    case msg := <-highPriority:
        return msg
    case msg := <-lowPriority:
        return msg
    }
}

🔧 Advanced Channel Patterns

Pipeline Pattern

Chain operations where output of one stage is input to the next.

Composable
Clean

Fan-In/Fan-Out

Distribute work to multiple goroutines and collect results.

Scalable
Complex

Worker Pool

Fixed number of workers processing from a job queue.

Resource Control
Efficient

Pub-Sub

Broadcast messages to multiple subscribers.

Broadcasting
Decoupled

Rate Limiting

Control the rate of operations using time.Ticker.

Throttling
Controlled

Semaphore

Limit concurrent access to resources.

Synchronization
Blocking

Pipeline Implementation

package main

import "fmt"

// Pipeline stages
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func filter(in <-chan int, threshold int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n > threshold {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // Set up the pipeline
    numbers := generate(2, 3, 4, 5, 6)
    squares := square(numbers)
    filtered := filter(squares, 10)
    
    // Consume the output
    for n := range filtered {
        fmt.Println(n) // Prints: 16, 25, 36
    }
}

Fan-In/Fan-Out Pattern

// Fan-out: distribute work to multiple workers
// Fan-in: combine results from multiple workers
package main

import (
    "fmt"
    "sync"
)

func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    
    for i := 0; i < workers; i++ {
        out := make(chan int)
        outs[i] = out
        
        go func() {
            for n := range in {
                out <- process(n) // Heavy computation
            }
            close(out)
        }()
    }
    
    return outs
}

func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // Start a goroutine for each input channel
    wg.Add(len(channels))
    for _, ch := range channels {
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(ch)
    }
    
    // Close out channel when all inputs are done
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func process(n int) int {
    // Simulate heavy computation
    return n * n
}

🚨 Deadlock Prevention and Error Handling

Deadlock Scenario Cause Solution
Unbuffered Send/Receive No goroutine to complete handshake Use goroutines or buffered channels
All Goroutines Blocked Circular channel dependencies Use select with timeout/default
Forgotten Channel Close Range loop waiting forever Always close channels when done sending
Select Without Default All cases blocking Add default case or timeout
Nil Channel Operations Operating on uninitialized channel Initialize channels before use

Common Deadlock Examples and Fixes

// DEADLOCK: Unbuffered channel without goroutine
func deadlockExample() {
    ch := make(chan int)
    ch <- 42 // Deadlock! No receiver
    fmt.Println(<-ch)
}

// FIX 1: Use goroutine
func fixWithGoroutine() {
    ch := make(chan int)
    go func() {
        ch <- 42
    }()
    fmt.Println(<-ch)
}

// FIX 2: Use buffered channel
func fixWithBuffer() {
    ch := make(chan int, 1)
    ch <- 42 // Doesn't block
    fmt.Println(<-ch)
}

// DEADLOCK: Range without close
func rangeDeadlock() {
    ch := make(chan int)
    go func() {
        ch <- 1
        ch <- 2
        // Forgot to close!
    }()
    
    for n := range ch { // Waits forever
        fmt.Println(n)
    }
}

// FIX: Always close when done sending
func fixRangeDeadlock() {
    ch := make(chan int)
    go func() {
        defer close(ch)
        ch <- 1
        ch <- 2
    }()
    
    for n := range ch {
        fmt.Println(n)
    }
}

⚙️ Performance Considerations

📊 Channel Performance Tips

  • Buffered vs Unbuffered: Buffered channels reduce context switches but use more memory
  • Channel Size: Benchmark to find optimal buffer size for your use case
  • Select Performance: Random case selection prevents starvation
  • Channel Passing: Channels are reference types, cheap to pass around
  • Close Channels: Closing channels signals completion and prevents leaks

Benchmarking Channels

package main

import (
    "testing"
)

func BenchmarkUnbufferedChannel(b *testing.B) {
    ch := make(chan int)
    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

func BenchmarkBufferedChannel(b *testing.B) {
    ch := make(chan int, 100)
    go func() {
        for i := 0; i < b.N; i++ {
            <-ch
        }
    }()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        ch <- i
    }
}

🏆 Best Practices

✅ DO's

  • ✓ Close channels from the sender side only
  • ✓ Use channel directions in function signatures
  • ✓ Handle the two-value receive form for closed channels
  • ✓ Use select for timeouts and cancellation
  • ✓ Pass channels as parameters for better composition
  • ✓ Use context.Context for cancellation propagation

❌ DON'Ts

  • ✗ Don't close channels from the receiver side
  • ✗ Don't send on closed channels (causes panic)
  • ✗ Don't leave channels unclosed when using range
  • ✗ Don't use channels when shared memory is simpler
  • ✗ Don't create goroutines without knowing their lifetime
  • ✗ Don't ignore potential deadlocks in tests

⚠️ Channel vs Mutex Decision

Use Channels when:

  • Transferring ownership of data
  • Distributing work to multiple goroutines
  • Communicating async results

Use Mutex when:

  • Protecting internal state
  • Caching or reference counting
  • Performance-critical sections

🎯 Practice Exercises

Exercise 1: Rate Limiter

Implement a rate limiter using channels that allows N operations per second. Use time.Ticker for timing.

Exercise 2: Pub-Sub System

Build a publish-subscribe system where multiple subscribers can listen to a single publisher. Handle subscriber disconnection gracefully.

Exercise 3: Timeout Service

Create a service that processes requests with a timeout. If processing takes too long, cancel the operation and return an error.

Exercise 4: Pipeline with Error Handling

Build a multi-stage pipeline that handles errors at each stage. Errors should propagate through a separate error channel.

Challenge: Concurrent Map

Implement a thread-safe map using channels instead of mutexes. Support Get, Set, Delete operations with proper synchronization.