Producer / Consumer with asyncio.Queue
Why Async Patterns Matters
The Problem: Raw coroutines + gather is fine for toy programs. Real systems need rate limits, cancellation, error handling, and lifecycle guarantees.
The Solution: asyncio.Semaphore caps concurrency, Lock serializes critical sections, Queue routes work, TaskGroup (3.11+) gives structured concurrency that cancels siblings on failure.
Real Impact: These patterns are how production async services stay correct under load — without them, async code accumulates leaks and partial-failure modes.
Real-World Analogy
Think of async patterns as kitchen stations with rules:
- Semaphore = only N cooks at the grill at once
- Lock = only one person uses the slicer at a time
- Queue = tickets in order; cooks pull when free
- TaskGroup = if any cook drops a plate, everyone stops and resets together
- Cancellation = calling 'stop' propagates instantly through every in-flight task
import asyncio
async def producer(q, n):
for i in range(n):
await q.put(i)
for _ in range(NUM_WORKERS):
await q.put(None) # sentinel per worker
async def consumer(name, q):
while True:
item = await q.get()
if item is None:
break
print(f"{name} got {item}")
await asyncio.sleep(0.1)
NUM_WORKERS = 3
async def main():
q = asyncio.Queue(maxsize=10)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(q, 20))
for i in range(NUM_WORKERS):
tg.create_task(consumer(f"w{i}", q))
asyncio.run(main())
Rate Limiting with Semaphore
Limit how many coroutines can run concurrently — essential for rate-limited APIs.
import asyncio
sem = asyncio.Semaphore(10) # at most 10 concurrent
async def fetch_with_limit(session, url):
async with sem: # blocks if 10 already in flight
async with session.get(url) as r:
return await r.text()
async def main():
async with aiohttp.ClientSession() as s:
await asyncio.gather(*[fetch_with_limit(s, u) for u in urls])
Locks and Events
# asyncio.Lock — serialize a critical section
lock = asyncio.Lock()
async def update_balance(account, amount):
async with lock:
bal = await read_balance(account)
await write_balance(account, bal + amount)
# asyncio.Event — one signal, many awaiters
ready = asyncio.Event()
async def waiter(name):
await ready.wait()
print(f"{name} got the signal")
async def main():
async with asyncio.TaskGroup() as tg:
for i in range(5): tg.create_task(waiter(f"w{i}"))
await asyncio.sleep(1)
ready.set() # wakes all waiters
Structured Concurrency with TaskGroup
TaskGroup (Python 3.11+) is the modern, safer way to manage groups of tasks. It guarantees:
- All child tasks finish before the
async withexits. - If any task raises, siblings are cancelled.
- Exceptions are collected into an
ExceptionGroup.
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(might_fail())
tg.create_task(also_might_fail())
tg.create_task(normal_task())
except* ValueError as eg:
log(f"value errors: {eg.exceptions}")
Prefer TaskGroup over gather
gather doesn't cancel siblings when one fails — you can leak tasks. TaskGroup makes failure semantics explicit and is what new code should use.
Bridging Sync and Async
Run sync code in a thread pool
async def main():
# Don't block the event loop with sync I/O — offload it
result = await asyncio.to_thread(blocking_function, arg1, arg2)
Run async from sync code
# Top-level entry point — only one per program
asyncio.run(main())
# In an existing event loop:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
Common Pitfalls
⚠️ asyncio gotchas
- Calling sync I/O in async code: blocks the whole event loop. Use async libraries or
asyncio.to_thread. - Forgetting to await:
fetch(url)returns a coroutine object that never runs. Alwaysawaitor wrap in a Task. - Mixing event loops: Don't call
asyncio.runfrom inside an already-running loop. Useawait. - CPU-bound work in coroutines: A tight CPU loop in async code blocks everything else. Offload with
to_threador usemultiprocessing. - Unawaited Tasks: If you create a Task and never await it, exceptions are silently logged when the task is garbage-collected.
🎯 Practice Exercises
Exercise 1: Bounded crawler
Crawl a list of URLs with a 5-concurrent limit using Semaphore. Print each as it completes.
Exercise 2: TaskGroup error handling
Spawn 5 tasks where one raises after 2 seconds. Verify siblings are cancelled.
Exercise 3: Event signal
Build a server-startup pattern: many workers await ready.wait() before processing.
Exercise 4: to_thread bridge
Call a blocking function (e.g. a synchronous DB driver) from async code without blocking the event loop.