Concurrency Patterns

Hard40 min read

Producer / Consumer with asyncio.Queue

Why Async Patterns Matters

The Problem: Raw coroutines + gather is fine for toy programs. Real systems need rate limits, cancellation, error handling, and lifecycle guarantees.

The Solution: asyncio.Semaphore caps concurrency, Lock serializes critical sections, Queue routes work, TaskGroup (3.11+) gives structured concurrency that cancels siblings on failure.

Real Impact: These patterns are how production async services stay correct under load — without them, async code accumulates leaks and partial-failure modes.

Real-World Analogy

Think of async patterns as kitchen stations with rules:

  • Semaphore = only N cooks at the grill at once
  • Lock = only one person uses the slicer at a time
  • Queue = tickets in order; cooks pull when free
  • TaskGroup = if any cook drops a plate, everyone stops and resets together
  • Cancellation = calling 'stop' propagates instantly through every in-flight task
import asyncio

async def producer(q, n):
    for i in range(n):
        await q.put(i)
    for _ in range(NUM_WORKERS):
        await q.put(None)              # sentinel per worker

async def consumer(name, q):
    while True:
        item = await q.get()
        if item is None:
            break
        print(f"{name} got {item}")
        await asyncio.sleep(0.1)

NUM_WORKERS = 3

async def main():
    q = asyncio.Queue(maxsize=10)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(q, 20))
        for i in range(NUM_WORKERS):
            tg.create_task(consumer(f"w{i}", q))

asyncio.run(main())

Rate Limiting with Semaphore

Limit how many coroutines can run concurrently — essential for rate-limited APIs.

import asyncio

sem = asyncio.Semaphore(10)        # at most 10 concurrent

async def fetch_with_limit(session, url):
    async with sem:                # blocks if 10 already in flight
        async with session.get(url) as r:
            return await r.text()

async def main():
    async with aiohttp.ClientSession() as s:
        await asyncio.gather(*[fetch_with_limit(s, u) for u in urls])

Locks and Events

# asyncio.Lock — serialize a critical section
lock = asyncio.Lock()

async def update_balance(account, amount):
    async with lock:
        bal = await read_balance(account)
        await write_balance(account, bal + amount)

# asyncio.Event — one signal, many awaiters
ready = asyncio.Event()

async def waiter(name):
    await ready.wait()
    print(f"{name} got the signal")

async def main():
    async with asyncio.TaskGroup() as tg:
        for i in range(5): tg.create_task(waiter(f"w{i}"))
        await asyncio.sleep(1)
        ready.set()            # wakes all waiters

Structured Concurrency with TaskGroup

TaskGroup (Python 3.11+) is the modern, safer way to manage groups of tasks. It guarantees:

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(might_fail())
            tg.create_task(also_might_fail())
            tg.create_task(normal_task())
    except* ValueError as eg:
        log(f"value errors: {eg.exceptions}")

Prefer TaskGroup over gather

gather doesn't cancel siblings when one fails — you can leak tasks. TaskGroup makes failure semantics explicit and is what new code should use.

Bridging Sync and Async

Run sync code in a thread pool

async def main():
    # Don't block the event loop with sync I/O — offload it
    result = await asyncio.to_thread(blocking_function, arg1, arg2)

Run async from sync code

# Top-level entry point — only one per program
asyncio.run(main())

# In an existing event loop:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())

Common Pitfalls

⚠️ asyncio gotchas

  • Calling sync I/O in async code: blocks the whole event loop. Use async libraries or asyncio.to_thread.
  • Forgetting to await: fetch(url) returns a coroutine object that never runs. Always await or wrap in a Task.
  • Mixing event loops: Don't call asyncio.run from inside an already-running loop. Use await.
  • CPU-bound work in coroutines: A tight CPU loop in async code blocks everything else. Offload with to_thread or use multiprocessing.
  • Unawaited Tasks: If you create a Task and never await it, exceptions are silently logged when the task is garbage-collected.

🎯 Practice Exercises

Exercise 1: Bounded crawler

Crawl a list of URLs with a 5-concurrent limit using Semaphore. Print each as it completes.

Exercise 2: TaskGroup error handling

Spawn 5 tasks where one raises after 2 seconds. Verify siblings are cancelled.

Exercise 3: Event signal

Build a server-startup pattern: many workers await ready.wait() before processing.

Exercise 4: to_thread bridge

Call a blocking function (e.g. a synchronous DB driver) from async code without blocking the event loop.