Async Programming, Event Loops, and ASGI

Ten thousand clients, one thread

Author

Karsten Naert

Published

February 9, 2026

Introduction

We’ve come a long way. In Lecture 1 we learned what processes and threads are. In Lecture 2 we combined them into a live TKinter application. In Lecture 3 we sent bytes over the network with raw sockets. In Lecture 4 we built a proper REST API with Flask, and understood how WSGI separates the server from the application.

But we left a loose thread (pun intended). Lecture 4 ended with a warning:

WSGI is inherently synchronous. Each request ties up a thread for its entire duration. This becomes a problem when you have many slow clients or long-lived connections.

This lecture resolves that. By the end you’ll understand:

  • Why threads don’t scale to thousands of concurrent connections.
  • What async def and await actually do under the hood.
  • How an event loop replaces a thread pool.
  • What ASGI is and how it relates to WSGI.
  • How Uvicorn + FastAPI form the modern async web stack.

This is the fifth and final lecture in the series:

  1. Processes and Threads
  2. Multiprocessing and Multithreading in Practice
  3. Interprocess Communication and Sockets
  4. Client-Server Architectures and RESTful APIs
  5. Async Programming, Event Loops, and ASGI (you are here)

The Thread Scalability Wall

Recap: Thread-Per-Client

In Lecture 3 we built a threaded echo server. The pattern was straightforward:

while True:
    conn, addr = server.accept()
    threading.Thread(target=handle_client, args=(conn, addr)).start()

Each client gets its own thread. The thread calls conn.recv(), blocks until data arrives, processes it, sends a response, and loops. Simple, correct, and easy to reason about.

In Lecture 4 we saw that WSGI servers like Waitress use the same idea: a pool of threads, each handling one HTTP request at a time.

So what’s the problem?

Threads Are Not Free

Every thread costs resources:

  • Memory: each thread gets its own call stack, typically 1–8 MB depending on the OS. On Windows, the default is 1 MB.
  • Context switching: the OS must save and restore CPU registers, stack pointers, and other state every time it switches between threads. With dozens of threads, this is negligible. With thousands, it becomes a real cost.
  • Scheduling overhead: the OS kernel maintains data structures for each thread. More threads means more work for the scheduler.

Let’s do some napkin math:

Concurrent clients Threads needed Stack memory (1 MB each)
100 100 100 MB
1,000 1,000 1 GB
10,000 10,000 10 GB
100,000 100,000 100 GB

By 10,000 concurrent connections you’re spending 10 GB on stack memory alone—and most of those threads are doing nothing but sitting in recv(), waiting for bytes that haven’t arrived yet. That’s 10 GB of memory for the privilege of waiting.

The C10K Problem

In 1999, Dan Kegel published a famous essay asking: “How do you handle 10,000 concurrent connections on a single server?” This was called the C10K problem, and at the time it was genuinely hard. The thread-per-client model couldn’t do it. The solutions that emerged—select, poll, epoll, kqueue—are the OS-level mechanisms that event loops are built on. Python’s asyncio sits on top of these.

The Real Issue: Waiting

Here’s the insight that motivates everything in this lecture. Look at what a typical web server thread actually does:

Thread timeline for one HTTP request:
──────────────────────────────────────────────────
[recv request]  ██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
[parse headers] ██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
[query database]░░░░░░░░░░░░░░░░██████████████░░░  ← waiting for DB
[build response]██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
[send response] ██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░

██ = actually computing     ░░ = blocked, waiting for IO

The thread spends the vast majority of its life blocked on IO—waiting for the network, waiting for the database, waiting for a file read. The CPU sits idle. And yet the thread still consumes its full stack memory and still occupies a slot in the OS scheduler.

What if, instead of dedicating a whole thread to each client (most of which is spent waiting), we had one thread that could juggle many clients by working on whichever one is ready right now?

That’s async programming.

Cooperative Multitasking

Two Styles of Multitasking

We’ve already seen preemptive multitasking: the OS decides when to switch between threads. Your code has no say in the matter. The OS can interrupt a thread between any two bytecode instructions—which is why race conditions happen and why we need locks.

Cooperative multitasking flips this around: your code decides when to yield control. Nobody interrupts you mid-calculation. You explicitly say “I’m about to wait for IO—go do something else in the meantime, and come back to me when my data is ready.”

The Barista Analogy

Imagine a coffee shop with one barista serving 10 customers.

Threaded model (preemptive): hire 10 baristas, one per customer. Each barista takes an order, walks to the machine, waits for the espresso to brew (staring at it), serves the drink, then stands around until their customer wants something else. Expensive. Most of the time they’re just standing there.

Async model (cooperative): one barista. She takes Customer 1’s order, starts the espresso machine, and while the machine is running she takes Customer 2’s order and starts steaming milk. When the espresso finishes (an “event”), she pours it and moves on to whatever task is ready next. She never waits idle—she’s always working on whichever order has something ready to do.

This is exactly how async code works:

  • Starting the espresso machine = initiating an IO operation (sending a database query, making an HTTP request).
  • await = “I’ve started the IO. Go handle other tasks. Wake me up when the result is ready.”
  • The event loop = the barista’s brain, deciding what to work on next.
This Only Works for IO-Bound Work

If the barista needs to manually grind every bean (CPU-bound work), she can’t multitask—she’s physically occupied. One barista can juggle 10 espresso machines (IO-bound), but she can’t grind 10 batches of beans simultaneously. For CPU-bound work, you still need multiple processes—just like we discussed in Lecture 1.

No Locks Needed

Here’s a subtle but important benefit. In threaded code, the OS can switch threads at any moment, so shared data needs locks. In async code, you control the yield points (await statements). Between two awaits, your code runs uninterrupted on a single thread. No other coroutine can sneak in and modify your variables.

This doesn’t mean async code is immune to all concurrency bugs—you can still have logical race conditions if you’re not careful about the order of awaits—but the most common class of data races simply doesn’t exist.

async and await — The Mechanics

Enough theory. Let’s write some async code.

Coroutines

A regular function runs from start to finish when called. A coroutine can pause partway through (at an await), let other things happen, and resume later. In Python, you create a coroutine by using async def instead of def:

import asyncio

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)  # Pause for 1 second (non-blocking!)
    print(f"Goodbye, {name}!")

A few things to notice:

  1. async def makes this a coroutine function. Calling it doesn’t execute the body—it returns a coroutine object.
  2. await asyncio.sleep(1) is an async sleep. Unlike time.sleep(1) (which blocks the entire thread), asyncio.sleep(1) yields control back to the event loop for 1 second. Other coroutines can run during that time.
  3. You can only use await inside an async def. Trying to await in a regular function is a syntax error.

Let’s try calling it:

async def greet(name):
    print(f"Hello, {name}!")
    await asyncio.sleep(1)
    print(f"Goodbye, {name}!")

# This does NOT run the coroutine:
result = greet("Alice")
print(type(result))  # <class 'coroutine'>

# You need asyncio.run() to actually execute it:
asyncio.run(greet("Alice"))

asyncio.run() is the entry point from the synchronous world into the async world. It creates an event loop, runs the coroutine to completion, and tears down the loop. Think of it as the root.mainloop() of the async world (remember Lecture 2?).

Sequential vs. Concurrent

Here’s where it gets interesting. Let’s time two coroutines running sequentially:

import asyncio
import time

async def brew_coffee(name, seconds):
    print(f"  Starting {name}...")
    await asyncio.sleep(seconds)
    print(f"  {name} done!")
    return name

async def main_sequential():
    t0 = time.perf_counter()
    result1 = await brew_coffee("Espresso", 3)
    result2 = await brew_coffee("Latte", 2)
    print(f"  Total: {time.perf_counter() - t0:.1f}s — got {result1} and {result2}")

asyncio.run(main_sequential())

Output:

  Starting Espresso...
  Espresso done!
  Starting Latte...
  Latte done!
  Total: 5.0s — got Espresso and Latte

Five seconds. The latte didn’t start until the espresso finished. Each await paused main_sequential until that particular coroutine was done. This is sequential—no better than synchronous code.

Now let’s run them concurrently with asyncio.gather():

async def main_concurrent():
    t0 = time.perf_counter()
    result1, result2 = await asyncio.gather(
        brew_coffee("Espresso", 3),
        brew_coffee("Latte", 2),
    )
    print(f"  Total: {time.perf_counter() - t0:.1f}s — got {result1} and {result2}")

asyncio.run(main_concurrent())

Output:

  Starting Espresso...
  Starting Latte...
  Latte done!
  Espresso done!
  Total: 3.0s — got Espresso and Latte

Three seconds—the time of the longest task, not the sum. Both coroutines started immediately. When the espresso was “brewing” (sleeping), the event loop switched to the latte. When the latte finished (after 2s), the loop waited for the espresso to finish (1 more second). Total: 3 seconds.

This is the barista in action. One thread, two concurrent operations, no time wasted waiting.

asyncio.gather() vs. asyncio.create_task()

gather() is convenient when you want to launch several coroutines and wait for all of them. For more control, use asyncio.create_task():

async def main_tasks():
    t0 = time.perf_counter()

    # Schedule both coroutines as tasks
    task1 = asyncio.create_task(brew_coffee("Espresso", 3))
    task2 = asyncio.create_task(brew_coffee("Latte", 2))

    # Do other work while they run...
    print("  Tasks are running, I can do other things!")
    await asyncio.sleep(0.5)
    print("  Still doing things...")

    # Now wait for the results
    result1 = await task1
    result2 = await task2
    print(f"  Total: {time.perf_counter() - t0:.1f}s — got {result1} and {result2}")

asyncio.run(main_tasks())

create_task() schedules the coroutine to start immediately (well, as soon as the event loop gets a chance). You get back a Task object that you can await later to get the result. Between create_task() and await task, other code can run—including code in the same function.

When to Use Which
  • await some_coroutine() — run one thing, wait for it. Sequential.
  • asyncio.gather(coro1(), coro2(), ...) — run many things concurrently, wait for all of them.
  • asyncio.create_task(coro()) — start something in the background, await it later when you need the result.

The Yield Point: Where the Magic Happens

Here’s the most important mental model for async code:

Between two await statements, your code runs uninterrupted on a single thread.

The event loop can only switch to another coroutine at an await. This is what “cooperative” means—you cooperate by yielding at await points.

async def transfer(account_a, account_b, amount):
    # No other coroutine can interrupt this block
    balance = account_a.balance
    account_a.balance = balance - amount
    account_b.balance = account_b.balance + amount
    # ↑ All of the above runs atomically (no await in between)
    
    await save_to_database(account_a)  # ← yield point! Other coroutines may run here
    await save_to_database(account_b)  # ← another yield point

Between the first line and the await save_to_database call, no other coroutine can touch account_a or account_b. No locks needed for the in-memory manipulation. But between the two await calls, another coroutine could read stale data from the database—so you’d still need database-level transactions. Async removes threading bugs, not architectural ones.

A Common Mistake: Forgetting await

This bites everyone at least once:

async def main():
    asyncio.sleep(1)  # Missing await! This does nothing.
    print("This prints immediately — no sleep happened.")

asyncio.run(main())

Without await, calling a coroutine function just creates a coroutine object and discards it. Python will usually warn you: RuntimeWarning: coroutine 'sleep' was never awaited. Read those warnings—they’re trying to help.

Another Common Mistake: Blocking the Event Loop

import time

async def bad_handler():
    time.sleep(5)  # BAD! This blocks the entire event loop for 5 seconds.
    return "done"

time.sleep() is a synchronous blocking call. It freezes the thread—including the event loop running on that thread. While it sleeps, no other coroutine can run. It’s like the barista standing in front of the espresso machine staring at it instead of serving other customers.

The fix: use await asyncio.sleep() for delays, await for async IO operations, and if you must call blocking synchronous code (like a CPU-heavy function or a library that doesn’t support async), run it in a thread pool:

import asyncio
import time

def blocking_computation():
    """A sync function that takes a long time."""
    time.sleep(2)
    return 42

async def main():
    # Run the blocking function in a thread pool, without blocking the event loop
    result = await asyncio.to_thread(blocking_computation)
    print(f"Result: {result}")

asyncio.run(main())

asyncio.to_thread() (Python 3.9+) offloads a synchronous function to a background thread and wraps it in an awaitable. The event loop stays free to handle other coroutines while the blocking call runs in the thread.

The Event Loop

What It Actually Is

We’ve been saying “the event loop decides what to run next.” But what is it? At its core, an event loop is surprisingly simple—it’s a while True loop that:

  1. Checks which IO operations are ready (data arrived on a socket, a timer expired, etc.).
  2. Resumes the coroutines that were waiting for those operations.
  3. Runs each resumed coroutine until it hits the next await.
  4. Goes back to step 1.

Here’s a (very simplified) pseudocode sketch:

# Pseudocode — NOT real asyncio internals, but captures the idea
def event_loop(initial_coroutine):
    ready_queue = [initial_coroutine]
    waiting = {}  # {io_event: coroutine}

    while ready_queue or waiting:
        # Step 1: check which IO operations completed
        completed_events = poll_os_for_ready_io(timeout=0.01)
        for event in completed_events:
            coro = waiting.pop(event)
            ready_queue.append(coro)

        # Step 2-3: run each ready coroutine until it awaits again
        for coro in ready_queue:
            try:
                next_event = coro.resume()  # run until next await
                waiting[next_event] = coro   # park it until IO completes
            except StopIteration:
                pass  # coroutine finished

        ready_queue.clear()

The poll_os_for_ready_io() call is the key—it asks the operating system “which of these sockets/timers/file descriptors have data ready?” On Linux this is epoll, on macOS it’s kqueue, on Windows it’s IOCP. These are the efficient OS mechanisms that solved the C10K problem. Python’s asyncio wraps them in a cross-platform API so you never have to think about them.

Connection to TKinter’s mainloop()

This should feel familiar. In Lecture 2 we used TKinter, where root.mainloop() runs an event loop that:

  1. Checks for user events (mouse clicks, key presses, window resize).
  2. Dispatches them to your callbacks.
  3. Checks for root.after() scheduled callbacks.
  4. Repeats.

The async event loop is the same pattern, but instead of GUI events, it watches for IO events (data arriving on sockets, DNS lookups completing, timers expiring). Same idea, different domain.

asyncio.run() Under the Hood

When you call asyncio.run(main()), Python:

  1. Creates a new event loop.
  2. Schedules main() as the first task.
  3. Runs the loop until main() completes.
  4. Cleans up pending tasks and closes the loop.

You almost never need to interact with the event loop directly. asyncio.run() is the standard entry point. The only exception is when you’re inside an already-running event loop (e.g., in Jupyter notebooks, which run their own loop)—but for scripts run from CMD, asyncio.run() is all you need.

A Note on uvloop

The default asyncio event loop is written in pure Python (with some C acceleration). uvloop is a drop-in replacement written in Cython around libuv (the same library that powers Node.js). It’s typically 2–4× faster. Uvicorn uses it by default on Linux/macOS. On Windows, uvloop is not available, so Uvicorn falls back to the standard asyncio loop—which is still plenty fast for most use cases.

Async in Practice: Concurrent HTTP Requests

The coffee-brewing examples were illustrative, but let’s do something real. One of the most common async use cases is making many HTTP requests concurrently—fetching data from multiple APIs, scraping pages, checking URLs.

The Sync Baseline

First, the synchronous version using requests (which you know from Lecture 4):

import requests
import time

urls = [
    "https://httpbin.org/delay/1",  # Each takes ~1 second to respond
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

t0 = time.perf_counter()
for url in urls:
    r = requests.get(url)
    print(f"  {url}{r.status_code}")
print(f"  Total: {time.perf_counter() - t0:.1f}s")

Each request takes ~1 second. Five requests in sequence = ~5 seconds. The thread sits idle for 80% of that time, waiting for the network.

The Async Version with httpx

httpx is a modern HTTP client that supports both sync and async. Install it:

pip install httpx

Now the async version:

import httpx
import asyncio
import time

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

async def fetch(client, url):
    r = await client.get(url)
    print(f"  {url}{r.status_code}")
    return r.status_code

async def main():
    t0 = time.perf_counter()
    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"  Total: {time.perf_counter() - t0:.1f}s")

asyncio.run(main())

Expected output:

  https://httpbin.org/delay/1 → 200
  https://httpbin.org/delay/1 → 200
  https://httpbin.org/delay/1 → 200
  https://httpbin.org/delay/1 → 200
  https://httpbin.org/delay/1 → 200
  Total: 1.2s

~1.2 seconds instead of ~5. All five requests were in flight simultaneously. The event loop started all of them, then waited for whichever finished first, then the next, and so on. One thread, five concurrent network operations.

async with

Notice async with httpx.AsyncClient() as client. This is an async context manager—the async equivalent of the with statement we covered in the Architecture series. It ensures the HTTP client’s connection pool is properly cleaned up when we’re done, even if an exception occurs. The __aenter__ and __aexit__ methods are coroutines instead of regular methods.

When Async Shines (and When It Doesn’t)

Scenario Sync time Async time Speedup
5 HTTP requests, each 1s ~5s ~1s
100 HTTP requests, each 1s ~100s ~2–3s 30–50×
5 CPU-heavy computations, each 1s ~5s ~5s 1× (no speedup!)

Async gives dramatic speedups for IO-bound work with high concurrency. For CPU-bound work, it’s useless—the event loop runs on one thread, and CPU work doesn’t yield at await points. That’s when you reach for multiprocessing (or asyncio.to_thread() as a quick workaround).

From WSGI to ASGI

WSGI’s Limitation

In Lecture 4 we celebrated WSGI for cleanly separating the server from the application. The entire interface was this:

def application(environ, start_response):
    start_response("200 OK", [("Content-Type", "text/plain")])
    return [b"Hello, World!"]

Beautiful in its simplicity. But notice: it’s a regular def. The server calls it, and it blocks until it returns. If your handler needs to query a database, the thread sits idle during that query. If you have 100 concurrent requests, you need 100 threads. We’ve just spent several sections explaining why that doesn’t scale.

WSGI was designed in 2003, before async/await existed. It fundamentally cannot support coroutines. You can’t await inside a regular function, and WSGI expects a regular function.

ASGI: The Async Successor

ASGI (Asynchronous Server Gateway Interface) is the async replacement for WSGI. It was created by the Django Channels project and has since been adopted across the Python web ecosystem.

The ASGI interface looks like this:

async def application(scope, receive, send):
    ...

Three parameters instead of WSGI’s two:

Parameter Type What it provides
scope dict Request metadata: type ("http", "websocket"), method, path, headers, query string. Similar to WSGI’s environ.
receive async callable Call await receive() to read incoming data (the request body, or a WebSocket message).
send async callable Call await send(...) to write outgoing data (response headers, response body).

The key difference: everything is async. The application is a coroutine. Reading the request body is an await. Sending the response is an await. This means the event loop can handle other requests while yours is waiting for IO.

A Minimal ASGI Application

Let’s build the simplest possible ASGI app—by hand, no framework:

# minimal_asgi.py
async def application(scope, receive, send):
    if scope["type"] != "http":
        return

    # Read the request (we don't need the body for this example)
    await receive()

    # Extract request info from scope
    method = scope["method"]
    path = scope["path"]

    # Simple routing
    if path == "/" and method == "GET":
        body = b"Welcome to the minimal ASGI app!"
        status = 200
    elif path == "/hello" and method == "GET":
        body = b"Hello from ASGI!"
        status = 200
    else:
        body = b"Not found."
        status = 404

    # Send the response (two messages: headers, then body)
    await send({
        "type": "http.response.start",
        "status": status,
        "headers": [
            [b"content-type", b"text/plain"],
            [b"content-length", str(len(body)).encode()],
        ],
    })
    await send({
        "type": "http.response.body",
        "body": body,
    })

Notice the response is sent in two steps: first the headers (http.response.start), then the body (http.response.body). This two-phase design allows for streaming responses—you can start sending headers before the entire body is ready.

Compare this to the WSGI equivalent from Lecture 4. The structure is similar, but everything is async, and the interface is message-based (receive/send) rather than callback-based (start_response).

WSGI vs ASGI: Side by Side

WSGI ASGI
Interface def app(environ, start_response) async def app(scope, receive, send)
Concurrency One thread per request One event loop, many concurrent requests
Protocol HTTP only HTTP, WebSocket, and custom protocols
Request body Read from environ["wsgi.input"] await receive()
Response start_response() + return iterable await send() (streaming)
Servers Waitress, Gunicorn, uWSGI Uvicorn, Hypercorn, Daphne
Frameworks Flask, Django (traditional) FastAPI, Starlette, Django (with Channels)
PEP PEP 3333 ASGI spec
You Won’t Write Raw ASGI

Just as you rarely write raw WSGI (Flask wraps it for you), you’ll rarely write raw ASGI. FastAPI and Starlette provide the friendly layer on top. But understanding what’s underneath—scope, receive, send—helps you debug issues and appreciate what the framework does for you.

Uvicorn: The ASGI Server

What Uvicorn Is

Uvicorn is to ASGI what Waitress is to WSGI: the server layer that handles TCP connections, parses HTTP, and calls your application.

WSGI world:                    ASGI world:
Client ↔ Waitress ↔ Flask     Client ↔ Uvicorn ↔ FastAPI

Under the hood, Uvicorn runs an asyncio event loop (with uvloop on Linux/macOS for extra speed). For each incoming HTTP request, it constructs the scope dict and provides receive/send callables, then calls your ASGI application as a coroutine. While your coroutine is await-ing (database query, file read, etc.), the event loop handles other connections.

Installing and Running

pip install uvicorn

Run our minimal ASGI app:

uvicorn minimal_asgi:application --host 127.0.0.1 --port 8000

Test it:

curl http://127.0.0.1:8000/
curl http://127.0.0.1:8000/hello
curl http://127.0.0.1:8000/nope

Same experience as the WSGI version from Lecture 4—but the server is fundamentally different inside. Instead of a thread pool, there’s a single event loop handling all connections concurrently.

Uvicorn’s Worker Model

For production, Uvicorn can spawn multiple worker processes, each running its own event loop. This combines the best of both worlds:

  • Within each process: an event loop handles thousands of concurrent connections (async).
  • Across processes: multiple CPU cores are utilized (multiprocessing).
uvicorn myapp:application --host 0.0.0.0 --port 8000 --workers 4

This is the full-circle connection back to Lecture 1: processes for CPU parallelism, async for IO concurrency. The two are complementary, not competing.

┌──────────────────────────────────────────────────────────────┐
│  Uvicorn with 4 workers                                      │
│                                                              │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│  │ Worker 1     │ │ Worker 2     │ │ Worker 3     │ │ Worker 4     │ │
│  │ (PID 1001)   │ │ (PID 1002)   │ │ (PID 1003)   │ │ (PID 1004)   │ │
│  │              │ │              │ │              │ │              │ │
│  │ Event loop   │ │ Event loop   │ │ Event loop   │ │ Event loop   │ │
│  │ handles 100s │ │ handles 100s │ │ handles 100s │ │ handles 100s │ │
│  │ of conns     │ │ of conns     │ │ of conns     │ │ of conns     │ │
│  └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
│                                                              │
│  Total capacity: thousands of concurrent connections         │
└──────────────────────────────────────────────────────────────┘

FastAPI: An ASGI Framework

What FastAPI Is

FastAPI is to ASGI what Flask is to WSGI: a framework that wraps the raw protocol in a friendly, Pythonic API. It was created by Sebastián Ramírez in 2018 and has rapidly become one of the most popular Python web frameworks.

FastAPI is built on top of Starlette (a lightweight ASGI toolkit) and Pydantic (a data validation library). The combination gives you:

  • Async-native request handling.
  • Automatic request validation and serialization via Python type hints.
  • Auto-generated interactive API documentation (Swagger UI and ReDoc).
  • Excellent performance (on par with Node.js and Go for IO-bound workloads).

Install it (along with Uvicorn):

pip install fastapi uvicorn

A Minimal FastAPI App

# hello_fastapi.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello from FastAPI!"}

@app.get("/hello/{name}")
async def greet(name: str):
    return {"message": f"Hello, {name}!"}

Run it:

uvicorn hello_fastapi:app --host 127.0.0.1 --port 8000 --reload

The --reload flag watches for file changes and restarts the server automatically—perfect for development.

Test it:

curl http://127.0.0.1:8000/
curl http://127.0.0.1:8000/hello/Alice

If you’re coming from Flask, this should feel extremely familiar. The main differences:

  • @app.get("/") instead of @app.route("/", methods=["GET"]).
  • Functions are async def (though def works too—more on that shortly).
  • Return a dict and FastAPI automatically serializes it to JSON with proper Content-Type headers.
  • Path parameters like {name} are extracted via function argument names and type hints.

The Bookstore API — FastAPI Edition

Let’s rewrite the Flask bookstore from Lecture 4 in FastAPI. Side by side, you’ll see how the two compare.

# bookstore_fastapi.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(title="Bookstore API", version="1.0")


# --- Pydantic models for request validation ---

class BookCreate(BaseModel):
    title: str
    author: str
    year: int | None = None

class Book(BookCreate):
    id: int


# --- In-memory "database" ---

books: dict[int, Book] = {
    1: Book(id=1, title="Dune", author="Frank Herbert", year=1965),
    2: Book(id=2, title="Neuromancer", author="William Gibson", year=1984),
    3: Book(id=3, title="Snow Crash", author="Neal Stephenson", year=1992),
}
next_id = 4


# --- Endpoints ---

@app.get("/books", response_model=list[Book])
async def list_books(author: str | None = None):
    """List all books, with optional author filter."""
    if author:
        return [b for b in books.values() if b.author == author]
    return list(books.values())

@app.get("/books/{book_id}", response_model=Book)
async def get_book(book_id: int):
    """Get a single book by ID."""
    if book_id not in books:
        raise HTTPException(status_code=404, detail="Book not found")
    return books[book_id]

@app.post("/books", response_model=Book, status_code=201)
async def create_book(book: BookCreate):
    """Create a new book."""
    global next_id
    new_book = Book(id=next_id, **book.model_dump())
    books[next_id] = new_book
    next_id += 1
    return new_book

@app.put("/books/{book_id}", response_model=Book)
async def replace_book(book_id: int, book: BookCreate):
    """Replace a book entirely."""
    if book_id not in books:
        raise HTTPException(status_code=404, detail="Book not found")
    updated = Book(id=book_id, **book.model_dump())
    books[book_id] = updated
    return updated

@app.delete("/books/{book_id}", status_code=204)
async def delete_book(book_id: int):
    """Delete a book."""
    if book_id not in books:
        raise HTTPException(status_code=404, detail="Book not found")
    del books[book_id]

Run it:

uvicorn bookstore_fastapi:app --host 127.0.0.1 --port 8000 --reload

Test with curl (same commands as the Flask version from Lecture 4):

curl http://127.0.0.1:8000/books
curl http://127.0.0.1:8000/books/1
curl -X POST http://127.0.0.1:8000/books -H "Content-Type: application/json" -d "{\"title\": \"Foundation\", \"author\": \"Isaac Asimov\", \"year\": 1951}"
curl -X DELETE http://127.0.0.1:8000/books/3

What’s Different from Flask?

Pydantic models for validation. In the Flask version, we manually checked if not data or "title" not in data. In FastAPI, we define BookCreate as a Pydantic model with typed fields. FastAPI automatically:

  • Parses the JSON request body.
  • Validates that title and author are strings, year is an optional int.
  • Returns a detailed 422 Unprocessable Entity error if validation fails.

Try sending invalid data:

curl -X POST http://127.0.0.1:8000/books -H "Content-Type: application/json" -d "{\"title\": 42}"

You’ll get a structured error response telling you exactly what’s wrong—for free.

Type hints drive the API. The book_id: int in the path tells FastAPI to parse the URL parameter as an integer. The author: str | None = None in list_books tells it to look for an optional query parameter. The response_model=Book tells it to serialize the response using the Book model. Type hints aren’t just documentation—they’re the API contract.

Automatic documentation. Open your browser and visit:

  • http://127.0.0.1:8000/docs — Swagger UI (interactive!)
  • http://127.0.0.1:8000/redoc — ReDoc (readable alternative)

You’ll see a complete, interactive API documentation page—generated entirely from your code and type hints. You can even test endpoints directly from the browser. This is one of FastAPI’s killer features: your documentation is always in sync with your code because it is your code.

async def vs def in FastAPI

FastAPI accepts both async def and regular def endpoints:

@app.get("/async-endpoint")
async def async_handler():
    result = await some_async_database_query()
    return {"data": result}

@app.get("/sync-endpoint")
def sync_handler():
    result = some_blocking_database_query()
    return {"data": result}

What happens under the hood:

  • async def — FastAPI runs the coroutine directly on the event loop. This is the ideal case for async database drivers, async HTTP clients, etc.
  • def — FastAPI runs the function in a thread pool (using asyncio.to_thread() internally). The event loop isn’t blocked—other requests can still be handled. This is why existing synchronous code (like SQLAlchemy with a sync driver) works fine in FastAPI without freezing the server.

This is a pragmatic design choice. You can migrate from Flask to FastAPI incrementally: start with regular def endpoints (they work correctly, just not as efficiently) and convert to async def as you adopt async libraries.

Rule of Thumb
  • If your endpoint calls await (async DB, async HTTP, etc.) → use async def.
  • If your endpoint uses only synchronous libraries → use def (FastAPI handles the threading for you).
  • Never use async def with synchronous blocking calls inside—that blocks the event loop. Either use def (so FastAPI moves it to a thread) or use await asyncio.to_thread(...) explicitly.

The Complete Architecture

Let’s zoom out and see the full picture. Here’s the WSGI stack from Lecture 4 next to the ASGI stack we’ve built in this lecture:

WSGI Stack                         ASGI Stack
──────────                         ──────────
Client (browser, curl, ...)        Client (browser, curl, ...)
  ↕ HTTP over TCP                    ↕ HTTP over TCP
Waitress / Gunicorn                Uvicorn / Hypercorn
  (thread pool)                      (async event loop)
  ↕ WSGI interface                   ↕ ASGI interface
  environ, start_response            scope, receive, send
  ↕                                  ↕
Flask / Django                     FastAPI / Starlette
  (your routes + logic)              (your routes + logic)

Both stacks solve the same problem—running your Python web application—but with fundamentally different concurrency models. The choice depends on your workload:

Use WSGI (Flask + Waitress) when… Use ASGI (FastAPI + Uvicorn) when…
Your app is CPU-bound (number crunching, image processing) Your app is IO-bound (many API calls, DB queries)
You need mature ecosystem support (some libraries are sync-only) You need high concurrency (thousands of connections)
Simplicity is priority — no need to think about async/await You want automatic API docs and validation (FastAPI)
You’re maintaining an existing Flask/Django codebase You’re starting a new API project

Neither is universally better. WSGI is simpler, battle-tested, and has a massive ecosystem. ASGI shines for high-concurrency IO-bound workloads and modern API design. Many production systems use both—a FastAPI frontend for the high-concurrency API, with Flask microservices handling specific tasks behind the scenes.

The Full Series Arc

Let’s trace the path from Lecture 1 to here:

Processes & threads (L1)
  → Threads share memory, processes are isolated
  → GIL limits threaded CPU parallelism
  → Use multiprocessing for CPU-bound work

Combining threads & processes (L2)
  → TKinter event loop + background threads + worker processes
  → The pattern: event loop polls for results from workers

Sockets & networking (L3)
  → TCP sockets for reliable communication
  → HTTP is just text over TCP
  → Thread-per-client works but doesn't scale

Client-server & REST (L4)
  → REST principles for API design
  → WSGI separates server from application
  → Flask is a WSGI application; Waitress is a WSGI server

Async & ASGI (L5)
  → Event loops replace thread pools for IO-bound concurrency
  → async/await for cooperative multitasking
  → ASGI is the async successor to WSGI
  → FastAPI is an ASGI application; Uvicorn is an ASGI server
  → Uvicorn workers = multiprocessing + async (the best of both worlds)

Every lecture built on the previous one. The TKinter event loop from Lecture 2 was a preview of the asyncio event loop. The threaded server from Lecture 3 was the problem that async solves. The WSGI interface from Lecture 4 was the synchronous predecessor to ASGI. And Uvicorn’s worker model combines the multiprocessing from Lecture 1 with the async from this lecture.

It’s all connected.

What We Didn’t Cover

This lecture (and this series) focused on building a clear mental model from processes all the way up to async web servers. But the real-world async ecosystem is much larger. Here are some important topics we didn’t have time for, with pointers so you know where to look.

WebSockets

HTTP is request-response: the client asks, the server answers, the connection (logically) closes. WebSockets are a different protocol that provides a persistent, bidirectional channel—both sides can send messages at any time, without waiting for the other to ask first.

Remember the chat server from Lecture 3? We built it on raw TCP sockets. WebSockets give you the same “both sides can talk anytime” capability, but over an HTTP-compatible connection that works through firewalls, proxies, and browsers.

ASGI natively supports WebSockets (notice the scope["type"] check in our minimal ASGI app—it can be "websocket" instead of "http"). FastAPI has built-in WebSocket support:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/chat")
async def chat(websocket: WebSocket):
    await websocket.accept()
    while True:
        message = await websocket.receive_text()
        await websocket.send_text(f"Echo: {message}")

This is one of ASGI’s advantages over WSGI—WSGI simply cannot handle WebSockets because it’s designed around the request-response cycle.

Middleware

Middleware is code that wraps every request/response cycle. It sits between the server and your application and can inspect, modify, or short-circuit requests. Common uses:

  • CORS (Cross-Origin Resource Sharing) — adding headers that allow browsers on different domains to call your API.
  • Authentication — checking API keys or JWTs before the request reaches your endpoint.
  • Logging — recording every request’s method, path, status code, and duration.
  • Compression — gzip-encoding response bodies.

FastAPI (via Starlette) supports middleware out of the box:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])

Middleware is essentially the decorator pattern from the Architecture series applied to the ASGI application itself.

Async Database Drivers

To get the full benefit of async, your database driver needs to be async too. Some popular options:

  • asyncpg — async PostgreSQL driver (very fast).
  • aiosqlite — async wrapper around SQLite.
  • motor — async MongoDB driver.
  • SQLAlchemy 2.0 — supports async via create_async_engine and AsyncSession.

Using a synchronous driver in an async def endpoint blocks the event loop—exactly the mistake we warned about earlier. Either use def (so FastAPI offloads it to a thread) or use an async driver.

Structured Concurrency and Task Groups

Python 3.11 introduced asyncio.TaskGroup, which provides a safer way to manage concurrent tasks:

async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(fetch_url(url1))
    task2 = tg.create_task(fetch_url(url2))
# Both tasks are guaranteed to be done (or cancelled) when you exit the block

This is the async equivalent of the with pattern we love—guaranteed cleanup, even if a task raises an exception. It’s the recommended approach over raw gather() for new code.

Deployment

Running uvicorn myapp:app from CMD is fine for development. For production, you’d typically add:

  • A reverse proxy like Nginx in front of Uvicorn, handling TLS termination, static files, and load balancing.
  • Process management via systemd (Linux), Docker, or a cloud platform like AWS/GCP/Azure.
  • Environment-based configuration for secrets, database URLs, etc. (often via .env files and Pydantic’s BaseSettings).

These are operational concerns beyond the scope of this course, but they’re important to know about.

Exercises & Project Ideas

Exercise 1: Sequential vs. Concurrent Fetcher

Write two scripts that fetch the same list of 10 URLs:

  1. sync_fetcher.py — uses requests in a for loop.
  2. async_fetcher.py — uses httpx.AsyncClient with asyncio.gather().

Use URLs like https://httpbin.org/delay/1 that take a known amount of time. Compare the total wall-clock times. Then try with 20, 50, and 100 URLs. Plot the results.

Bonus: add a third version using concurrent.futures.ThreadPoolExecutor with requests. How does it compare to the async version?

Exercise 2: Convert the Flask Bookstore to FastAPI

Take the Flask bookstore API from Lecture 4 and convert it to FastAPI yourself (without looking at the FastAPI version in this lecture). Focus on:

  1. Defining Pydantic models for request/response validation.
  2. Using the correct HTTP method decorators (@app.get, @app.post, etc.).
  3. Returning proper status codes (201 for creation, 204 for deletion).
  4. Adding query parameter filtering.

Once done, visit /docs and explore the auto-generated documentation. Try sending invalid data and observe how FastAPI’s validation errors differ from what your Flask version returned.

Exercise 3: Async Echo Server

Rewrite the threaded echo server from Lecture 3 using asyncio instead of threads. The asyncio module has built-in support for TCP servers:

import asyncio

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Client {addr} connected")

    while True:
        data = await reader.read(1024)
        if not data:
            break
        writer.write(data)
        await writer.drain()

    print(f"Client {addr} disconnected")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_client, "127.0.0.1", 9999)
    print("Async echo server on 127.0.0.1:9999 ...")
    async with server:
        await server.serve_forever()

asyncio.run(main())

This handles multiple clients concurrently on a single thread—no threading.Thread needed. Test it with the same tcp_echo_client.py from Lecture 3. Open multiple clients simultaneously and verify they all work concurrently.

Compare this code to the threaded version. Which is simpler? Which would you choose for 10 clients? For 10,000?

Exercise 4: Benchmark Waitress vs. Uvicorn

Create two identical API endpoints—one in Flask (served by Waitress), one in FastAPI (served by Uvicorn). Each endpoint should simulate a slow database query with a 0.5-second sleep.

Write a load-testing script that sends 50 concurrent requests and measures total time:

import httpx
import asyncio
import time

async def load_test(url, n=50):
    async with httpx.AsyncClient() as client:
        t0 = time.perf_counter()
        tasks = [client.get(url) for _ in range(n)]
        responses = await asyncio.gather(*tasks)
        dt = time.perf_counter() - t0
        statuses = [r.status_code for r in responses]
        print(f"  {url}: {n} requests in {dt:.2f}s ({n/dt:.0f} req/s)")
        print(f"  Status codes: {set(statuses)}")

asyncio.run(load_test("http://127.0.0.1:8000/slow"))

Run it against both servers. The Waitress version (limited by thread pool size, default 4 threads) should take significantly longer than the Uvicorn version (async, all 50 requests handled concurrently).

Important: for the Flask version, use time.sleep(0.5). For the FastAPI version, use await asyncio.sleep(0.5). This ensures the async version actually yields during the simulated wait.

Project Idea: Async Web Scraper

Build a command-line tool that:

  1. Takes a list of URLs from a file (one per line).
  2. Fetches all of them concurrently using httpx.AsyncClient.
  3. Extracts the <title> tag from each HTML response.
  4. Writes the results to a CSV file: url, status_code, title, response_time.
  5. Limits concurrency to at most 10 simultaneous requests (use asyncio.Semaphore).
import asyncio

# Use a semaphore to limit concurrency
sem = asyncio.Semaphore(10)

async def fetch_with_limit(client, url):
    async with sem:  # At most 10 coroutines enter this block at once
        response = await client.get(url)
        return response

This is a realistic async use case. The semaphore pattern is the async equivalent of the Semaphore from Lecture 1—same concept, different concurrency model.

Bonus: add a progress bar using a simple counter and periodic prints, or integrate the tqdm library (which has async support via tqdm.asyncio).

Project Idea: Full-Stack Bookstore

Combine everything from Lectures 4 and 5:

  1. Build a FastAPI backend with the bookstore API (from this lecture) backed by an SQLite database using aiosqlite.
  2. Add Pydantic models for validation and response_model for serialization.
  3. Add a middleware that logs every request (method, path, status code, duration).
  4. Write an async Python client using httpx that exercises every endpoint.
  5. Deploy with Uvicorn using 2 worker processes.
  6. Benchmark with the load-testing script from Exercise 4.

This touches on every major concept from the series: processes (Uvicorn workers), async (event loop), TCP/HTTP (the protocol), REST (API design), ASGI (the interface), and FastAPI (the framework).

Additional Resources


This concludes the REST API lecture series. We started with os.getpid() and ended with a production-grade async web server. Along the way, we built chat servers, estimated π with multiprocessing, hand-crafted HTTP over raw sockets, and designed RESTful APIs. The abstractions keep getting higher, but the foundations stay the same: it’s processes, threads, sockets, and bytes—all the way down.