Python AsyncIO for Data Engineers: Building High-Throughput Production Pipelines
A practical guide to async Python for data engineering — covering asyncio patterns, async database queries, concurrent API calls, and the mistakes that kill throughput in production.

AsyncIO transformed Python from a "good enough" language for data engineering to a genuinely high-performance one. But async Python has sharp edges — and the mistakes are subtle enough that they don't show up in development, only at production scale.
This post covers the async patterns I use in production pipelines, from the AI Notes Generator (async PDF generation with GCS) to the E-News personalization engine.
When to Use AsyncIO (and When Not To)
AsyncIO shines for I/O-bound tasks: HTTP requests, database queries, file reads. It does not help with CPU-bound tasks — those still block the event loop.
- ✅ AsyncIO: API calls, database queries, file I/O, WebSocket connections
- ❌ AsyncIO: NumPy computations, model inference, image processing, sorting large arrays
- ✅ CPU-bound: Use
ProcessPoolExecutororrun_in_executor
Pattern 1: Concurrent API Calls with gather()
The most common use case — calling multiple APIs and waiting for all results:
import asyncio
import httpx
async def fetch_article(client: httpx.AsyncClient, article_id: str) -> dict:
response = await client.get(f"https://api.example.com/articles/{article_id}")
return response.json()
async def fetch_all_articles(article_ids: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [fetch_article(client, aid) for aid in article_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
return [r for r in results if not isinstance(r, Exception)]
# Fetch 100 articles concurrently instead of sequentially
articles = asyncio.run(fetch_all_articles(article_ids))
Pattern 2: Semaphore for Rate-Limited APIs
gather() on 1000 items will fire 1000 concurrent requests — which will get
you rate-limited or banned. Use a Semaphore to cap concurrency:
async def fetch_with_semaphore(semaphore: asyncio.Semaphore,
client: httpx.AsyncClient,
url: str) -> dict:
async with semaphore:
response = await client.get(url)
return response.json()
async def fetch_batch(urls: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_semaphore(semaphore, client, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Pattern 3: Producer-Consumer with asyncio.Queue
For streaming pipelines where you produce data faster than you consume it:
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
# Signal consumers to stop
for _ in range(NUM_CONSUMERS):
await queue.put(None)
async def consumer(queue: asyncio.Queue, results: list, worker_id: int):
while True:
item = await queue.get()
if item is None:
break
result = await process_item(item)
results.append(result)
queue.task_done()
async def pipeline(items: list) -> list:
queue = asyncio.Queue(maxsize=100) # Backpressure via maxsize
results = []
NUM_CONSUMERS = 8
producers = [asyncio.create_task(producer(queue, items))]
consumers = [
asyncio.create_task(consumer(queue, results, i))
for i in range(NUM_CONSUMERS)
]
await asyncio.gather(*producers, *consumers)
return results
The 3 Most Common Async Mistakes in Production
Mistake 1: Blocking the Event Loop with Sync Code
# BAD — time.sleep() blocks the entire event loop
async def bad_handler():
time.sleep(2) # All other requests frozen for 2 seconds
# GOOD — asyncio.sleep() yields control back
async def good_handler():
await asyncio.sleep(2) # Other requests can proceed
Mistake 2: Creating a New Event Loop in Every Function
# BAD — creates a new loop each time, can't nest
def bad_wrapper(coroutine):
return asyncio.run(coroutine) # Fails if already in async context
# GOOD — use asyncio.create_task() inside async context
async def good_wrapper(coroutine):
return await asyncio.create_task(coroutine)
Mistake 3: Forgetting Exceptions in gather()
# BAD — one exception cancels ALL tasks
results = await asyncio.gather(*tasks)
# GOOD — return_exceptions collects failures without cancelling others
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
Real Throughput Numbers
In the AI Notes Generator pipeline, switching from sequential to async PDF generation:
- Sequential (before): ~12 PDFs/minute
- Async with 8 workers (after): ~94 PDFs/minute — 7.8× speedup
- Bottleneck shifted from I/O to WeasyPrint CPU rendering


