Back to Blog
Python·PythonAsyncIOData Pipelines

Python AsyncIO for Data Engineers: Building High-Throughput Production Pipelines

A practical guide to async Python for data engineering — covering asyncio patterns, async database queries, concurrent API calls, and the mistakes that kill throughput in production.

Rishabh Bhartiya8 min read
Python AsyncIO for Data Engineers: Building High-Throughput Production Pipelines

AsyncIO transformed Python from a "good enough" language for data engineering to a genuinely high-performance one. But async Python has sharp edges — and the mistakes are subtle enough that they don't show up in development, only at production scale.

This post covers the async patterns I use in production pipelines, from the AI Notes Generator (async PDF generation with GCS) to the E-News personalization engine.

When to Use AsyncIO (and When Not To)

AsyncIO shines for I/O-bound tasks: HTTP requests, database queries, file reads. It does not help with CPU-bound tasks — those still block the event loop.

  • ✅ AsyncIO: API calls, database queries, file I/O, WebSocket connections
  • ❌ AsyncIO: NumPy computations, model inference, image processing, sorting large arrays
  • ✅ CPU-bound: Use ProcessPoolExecutor or run_in_executor

Pattern 1: Concurrent API Calls with gather()

The most common use case — calling multiple APIs and waiting for all results:


import asyncio
import httpx

async def fetch_article(client: httpx.AsyncClient, article_id: str) -> dict:
    response = await client.get(f"https://api.example.com/articles/{article_id}")
    return response.json()

async def fetch_all_articles(article_ids: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=30.0) as client:
        tasks = [fetch_article(client, aid) for aid in article_ids]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter out exceptions
    return [r for r in results if not isinstance(r, Exception)]

# Fetch 100 articles concurrently instead of sequentially
articles = asyncio.run(fetch_all_articles(article_ids))

Pattern 2: Semaphore for Rate-Limited APIs

gather() on 1000 items will fire 1000 concurrent requests — which will get you rate-limited or banned. Use a Semaphore to cap concurrency:


async def fetch_with_semaphore(semaphore: asyncio.Semaphore,
                                client: httpx.AsyncClient,
                                url: str) -> dict:
    async with semaphore:
        response = await client.get(url)
        return response.json()

async def fetch_batch(urls: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_semaphore(semaphore, client, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

Pattern 3: Producer-Consumer with asyncio.Queue

For streaming pipelines where you produce data faster than you consume it:


async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
    # Signal consumers to stop
    for _ in range(NUM_CONSUMERS):
        await queue.put(None)

async def consumer(queue: asyncio.Queue, results: list, worker_id: int):
    while True:
        item = await queue.get()
        if item is None:
            break
        result = await process_item(item)
        results.append(result)
        queue.task_done()

async def pipeline(items: list) -> list:
    queue = asyncio.Queue(maxsize=100)  # Backpressure via maxsize
    results = []
    NUM_CONSUMERS = 8

    producers = [asyncio.create_task(producer(queue, items))]
    consumers = [
        asyncio.create_task(consumer(queue, results, i))
        for i in range(NUM_CONSUMERS)
    ]

    await asyncio.gather(*producers, *consumers)
    return results

The 3 Most Common Async Mistakes in Production

Mistake 1: Blocking the Event Loop with Sync Code


# BAD — time.sleep() blocks the entire event loop
async def bad_handler():
    time.sleep(2)  # All other requests frozen for 2 seconds

# GOOD — asyncio.sleep() yields control back
async def good_handler():
    await asyncio.sleep(2)  # Other requests can proceed

Mistake 2: Creating a New Event Loop in Every Function


# BAD — creates a new loop each time, can't nest
def bad_wrapper(coroutine):
    return asyncio.run(coroutine)   # Fails if already in async context

# GOOD — use asyncio.create_task() inside async context
async def good_wrapper(coroutine):
    return await asyncio.create_task(coroutine)

Mistake 3: Forgetting Exceptions in gather()


# BAD — one exception cancels ALL tasks
results = await asyncio.gather(*tasks)

# GOOD — return_exceptions collects failures without cancelling others
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures  = [r for r in results if isinstance(r, Exception)]

Real Throughput Numbers

In the AI Notes Generator pipeline, switching from sequential to async PDF generation:

  • Sequential (before): ~12 PDFs/minute
  • Async with 8 workers (after): ~94 PDFs/minute — 7.8× speedup
  • Bottleneck shifted from I/O to WeasyPrint CPU rendering

Tags

PythonAsyncIOData PipelinesConcurrencyProduction Engineering

Author

Rishabh Bhartiya

ML Engineer · NatrajX

Related Posts

All posts