Skip to main content

Build a Streaming AI API Backend: FastAPI + LangChain + SSE

Advanced150 min3 exercises60 XP
0/3 exercises

Your LLM call takes eight seconds. The user stares at a spinner. They leave. The fix is streaming — pushing tokens to the client the instant the model generates them. This tutorial builds the full pipeline: async generators yielding tokens from LangChain, a FastAPI route wrapping them in Server-Sent Events, structured error handling mid-stream, and the deployment patterns I rely on in production.

Why Streaming Changes Everything

A GPT-4o call on a 2,000-token response takes 4-10 seconds. Without streaming, your frontend sits idle the entire time. With streaming, the first token arrives in 200-400ms and the user starts reading immediately. Perceived latency drops by an order of magnitude.

The architecture is straightforward once you see it. The LLM generates tokens one at a time. LangChain exposes an async iterator over those tokens. FastAPI wraps that iterator in a StreamingResponse. The client reads the stream using the SSE protocol. Four components, one data flow.

The complete data flow
Loading editor...

Async Generators — The Core Abstraction

Before we touch FastAPI or LangChain, we need to understand the Python primitive that makes all of this work. An async generator is a function that uses both async def and yield. It produces values one at a time, and the caller can pause between values to do other work — like handling another HTTP request.

Async generator basics
Loading editor...

The async for loop pulls one value at a time from the generator. While count_slowly is sleeping, the event loop can serve other requests. This is why async generators are perfect for streaming — your server stays responsive even while waiting for the LLM.

I spent a long time early on confusing regular generators with async generators. The difference matters: a regular generator blocks the event loop during yield. An async generator releases it. In a FastAPI server handling hundreds of concurrent streams, that distinction is the difference between smooth performance and a frozen server.

LangChain's Streaming Interface

Streaming with LangChain's astream
Loading editor...

That's the entire LangChain streaming layer. The .astream() method returns an async iterator of AIMessageChunk objects. Each chunk contains a .content string — usually one to three tokens. Some chunks arrive with empty content (metadata updates), so we filter those out.

Notice we set streaming=True on the model. Without it, .astream() still works but receives the entire response in a single chunk — defeating the purpose. I've debugged this exact misconfiguration more times than I care to admit.

Streaming with Chains and Prompts

In a real application, you rarely call the LLM directly. You run a chain — a prompt template piped into the model, sometimes with an output parser. LangChain's LCEL (LangChain Expression Language) makes every chain streamable by default.

Streaming through a LangChain chain
Loading editor...

The | operator pipes the prompt into the model into the parser. When you call .astream() on the resulting chain, LangChain streams through every component. The StrOutputParser extracts the string content from each chunk, so token is already a plain string.

The SSE Protocol — What Actually Goes Over the Wire

Server-Sent Events is a simple text protocol built on HTTP. The server sends a stream of events, each formatted as data: <content>\n\n. The client reads them using the EventSource API (built into every browser) or any HTTP client that supports streaming.

Here is what a raw SSE stream looks like on the wire. Each event is a text line starting with data:, followed by two newlines to mark the boundary.

Raw SSE wire format
Loading editor...

Quick check: If a server sends data: {"token": "Hi"}\n\ndata: {"token": " there"}\n\n, how many SSE events does the client receive? Two. Each data: line followed by a blank line is one event. If the server accidentally sent data: {"token": "Hi"}\ndata: {"token": " there"}\n\n (single newline between them), the client would receive ONE event with both lines concatenated.

FastAPI Streaming Endpoint — Putting It Together

This is where the pieces connect. FastAPI's StreamingResponse accepts any async generator and sends each yielded value to the client. We set the content type to text/event-stream so clients know to parse it as SSE.

Complete FastAPI streaming endpoint
Loading editor...

Three things to notice. First, generate_stream is an async generator — it yields SSE-formatted strings. Second, each event is data: <json>\n\n — the double newline is non-negotiable. Third, those response headers matter: Cache-Control: no-cache prevents intermediate caches from buffering, and X-Accel-Buffering: no tells Nginx to pass chunks through immediately.

I learned the Nginx buffering lesson the hard way. My stream worked perfectly in development but chunked into 4KB batches behind Nginx in production. One header fixed it, but it took two hours of debugging to find.

Build an Async Token Generator
Write Code

Write an async generator function called token_generator that takes a text string, splits it into words, and yields each word one at a time. After all words are yielded, yield the string "[DONE]" as a final signal.

The function should yield plain strings (the words themselves), not SSE-formatted data.

Loading editor...

Error Handling Mid-Stream

What happens when the LLM throws an error after you've already started streaming? You can't return a 500 status code — the response headers (including 200 OK) were sent with the first chunk. This is the hardest part of streaming architecture.

The solution: send the error as an SSE event with a distinct event type. The client watches for error events and handles them separately from token events.

Error handling in the stream generator
Loading editor...

The event: error prefix is an SSE feature. Regular data events have no event: line. Named events let the client use addEventListener("error", ...) instead of onmessage. This separation means token events and error events never collide.

Timeout Protection

LLM APIs occasionally hang — the connection stays open but no tokens arrive. Without a timeout, your server holds that connection forever. Here is a pattern I use on every streaming endpoint.

Adding timeout protection to the stream
Loading editor...

Two layers of protection here. The outer asyncio.wait_for caps the total stream duration. The inner wait_for on __anext__() catches stalls — if 10 seconds pass with no new token, we bail. In production, I typically set the total timeout to 60 seconds and the per-token timeout to 15.

Quick check: Why can't we return a normal HTTP 500 error when the LLM fails mid-stream? Because the HTTP status code (200 OK) was already sent with the first byte of the response. HTTP status codes live in the response headers, and headers are sent before the body. Once streaming starts, the only way to communicate errors is inside the stream itself — hence the event: error SSE pattern.

Request Validation and Rate Limiting

A streaming endpoint is expensive — each request holds a server connection and an LLM API call. Without validation and rate limiting, a single abusive client can exhaust your OpenAI quota in minutes.

Request model with Pydantic validation
Loading editor...

Pydantic validates the request before we even touch the LLM. Max length 4,000 characters prevents prompt injection attacks that stuff enormous payloads. The model whitelist stops clients from switching to expensive models you haven't budgeted for.

Client-Side SSE Parsing in Python

Whether you're testing your API or building a Python client, you need to parse the SSE stream. The httpx library handles streaming HTTP responses cleanly.

Python SSE client with httpx
Loading editor...

The response.aiter_lines() method yields one line at a time as they arrive — it does not buffer the entire response. We strip the data: prefix (6 characters), check for the [DONE] sentinel, and parse the JSON payload. The flush=True on print ensures tokens appear immediately in the terminal.

Parse an SSE Stream
Write Code

Write a function parse_sse_events that takes a list of raw SSE lines (strings) and returns a list of parsed token strings. The function should:

1. Skip empty lines

2. Skip lines starting with "event:"

3. For lines starting with "data: ", extract the payload after the prefix

4. If the payload is "[DONE]", stop processing and do NOT include it in the output

5. Otherwise, parse the JSON payload and extract the "token" value

Return the list of extracted token strings.

Loading editor...

Multi-Turn Conversations with Streaming

A single-question endpoint is a demo. A real chatbot maintains conversation history. The client sends the full message history with each request, and the server passes it to the LLM. The streaming layer doesn't change — only the input format gets richer.

Multi-turn streaming endpoint
Loading editor...

The key detail is max_length=50 on the messages list. Without a cap, a client could send thousands of messages and blow through your context window (and your budget). Fifty messages covers most conversations. Adjust based on your model's context limit.

CORS, Security, and Middleware

Your frontend lives on app.example.com. Your API lives on api.example.com. The browser blocks the request unless your API explicitly allows cross-origin requests. Here is the minimal CORS setup for a streaming endpoint.

CORS configuration for streaming
Loading editor...

Never set allow_origins=["*"] in production. That opens your expensive LLM endpoint to any website. Whitelist your specific frontend domains.

API Key Authentication

API key dependency for FastAPI
Loading editor...

The Security dependency reads the X-API-Key header from every request. Invalid keys get a 403 before the stream starts. In production, I store keys in a database with per-key rate limits and usage tracking, but environment variables work for smaller deployments.

Full Application — All Pieces Together

Here is the complete server file. Copy it into main.py, set your environment variables, and run with uvicorn main:app --reload.

main.py — Complete streaming API server
Loading editor...

Deployment Patterns and Production Considerations

Development servers and production servers have different constraints. Here are the patterns I've found reliable across several streaming AI deployments.

Gunicorn with Uvicorn Workers

Production launch command
Loading editor...

Each worker can handle hundreds of concurrent streaming connections because the async event loop releases the thread during await. Four workers on a 2-core machine gives you capacity for roughly 400-800 simultaneous streams. The bottleneck is almost always the LLM API rate limit, not your server.

Docker Configuration

Dockerfile for the streaming API
Loading editor...

Nginx Configuration

If Nginx sits in front of your API (which it should in production), you need to disable response buffering. Otherwise Nginx collects chunks into 4KB batches and the stream becomes jerky.

Nginx config for SSE passthrough
Loading editor...

The proxy_buffering off directive is the important one. Without it, Nginx buffers the response body before forwarding it. Combined with the X-Accel-Buffering: no header we set in FastAPI, this ensures tokens flow through immediately.

Common Mistakes and How to Fix Them

Wrong: Missing streaming=True on LLM
# Receives entire response as one chunk
llm = ChatOpenAI(model="gpt-4o-mini")
async for chunk in llm.astream([msg]):
    print(chunk.content)  # Prints everything at once
Fixed: Enable streaming on the model
# Receives tokens incrementally
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
async for chunk in llm.astream([msg]):
    print(chunk.content)  # Prints token by token
Wrong: Using a sync generator with StreamingResponse
def generate_sync(question):
    for token in chain.invoke(question):
        yield f"data: {token}\n\n"
# This blocks the event loop!
Fixed: Use async generator with astream
async def generate_async(question):
    async for token in chain.astream({"question": question}):
        yield f"data: {json.dumps({'token': token})}\n\n"
# Event loop stays responsive

Other mistakes I see regularly:

Buffered print statements in the generator. Adding print() calls inside your async generator for debugging is fine, but Python buffers stdout by default. In a Docker container, you won't see logs until the buffer flushes. Set PYTHONUNBUFFERED=1 in your environment or use logging instead.

No health check endpoint. Load balancers need a /health route to know your service is alive. Without it, they send traffic to crashed workers. A one-line GET /health that returns {"status": "ok"} is enough.

Forgetting `flush=True` on the client side. When printing tokens in a Python client, print(token, end="") buffers output. Add flush=True to see tokens as they arrive.

Build an SSE Event Formatter
Write Code

Write a function format_sse_events that takes a list of token strings and an optional include_done boolean (default True). It should return a list of properly formatted SSE event strings.

Each token should be wrapped in a JSON object with a "token" key and formatted as data: {"token": "..."} followed by two newlines (\n\n).

If include_done is True, append a final data: [DONE]\n\n event.

Also handle a special case: if a token dict is passed with an "error" key set to True, format it as a named event: event: error\ndata: {"error": true, "message": "..."}\n\n

Loading editor...

Frequently Asked Questions

Can I use WebSockets instead of SSE for streaming?

Yes, and WebSockets are the right choice when the client also sends messages mid-stream (e.g., a "stop generating" button). For pure LLM streaming where the client sends one request and receives many tokens, SSE is simpler because it uses plain HTTP, works through all proxies, and auto-reconnects. Most AI APIs use SSE.

How do I implement a "Stop Generating" button with SSE?

SSE is server-to-client only, so the client can't send a stop signal through the same connection. Two approaches work: (1) the client closes the EventSource connection — FastAPI detects the disconnect and you cancel the LLM call, or (2) use a separate POST endpoint that sets a cancellation flag the generator checks between tokens.

Detecting client disconnect
Loading editor...

How do I stream from Anthropic/Claude instead of OpenAI?

Swap ChatOpenAI for ChatAnthropic from langchain-anthropic. The streaming interface is identical — .astream() yields AIMessageChunk objects. The rest of the code (FastAPI route, SSE formatting, error handling) stays exactly the same. That's the point of LangChain's abstraction.

Switching to Claude
Loading editor...

What about streaming with multiple LLM calls in sequence (chains with retrieval)?

RAG chains (retrieval + generation) stream the generation step automatically. The retrieval step runs first and blocks, then the generation streams. If you want the client to know the retrieval is happening, yield a status event before starting the generation: yield "data: {\"status\": \"searching\"}\n\n".

References

  • FastAPI documentation — StreamingResponse. Link
  • LangChain documentation — Streaming with LCEL. Link
  • MDN Web Docs — Server-Sent Events. Link
  • HTML Living Standard — Server-Sent Events specification. Link
  • Python documentation — Asynchronous Generators (PEP 525). Link
  • Uvicorn documentation — Deployment guide. Link
  • OpenAI API documentation — Streaming. Link
  • LangChain documentation — ChatOpenAI streaming. Link
  • Related Tutorials