Build a Streaming AI API Backend: FastAPI + LangChain + SSE
Your LLM call takes eight seconds. The user stares at a spinner. They leave. The fix is streaming — pushing tokens to the client the instant the model generates them. This tutorial builds the full pipeline: async generators yielding tokens from LangChain, a FastAPI route wrapping them in Server-Sent Events, structured error handling mid-stream, and the deployment patterns I rely on in production.
Why Streaming Changes Everything
A GPT-4o call on a 2,000-token response takes 4-10 seconds. Without streaming, your frontend sits idle the entire time. With streaming, the first token arrives in 200-400ms and the user starts reading immediately. Perceived latency drops by an order of magnitude.
The architecture is straightforward once you see it. The LLM generates tokens one at a time. LangChain exposes an async iterator over those tokens. FastAPI wraps that iterator in a StreamingResponse. The client reads the stream using the SSE protocol. Four components, one data flow.
Async Generators — The Core Abstraction
Before we touch FastAPI or LangChain, we need to understand the Python primitive that makes all of this work. An async generator is a function that uses both async def and yield. It produces values one at a time, and the caller can pause between values to do other work — like handling another HTTP request.
The async for loop pulls one value at a time from the generator. While count_slowly is sleeping, the event loop can serve other requests. This is why async generators are perfect for streaming — your server stays responsive even while waiting for the LLM.
I spent a long time early on confusing regular generators with async generators. The difference matters: a regular generator blocks the event loop during yield. An async generator releases it. In a FastAPI server handling hundreds of concurrent streams, that distinction is the difference between smooth performance and a frozen server.
LangChain's Streaming Interface
That's the entire LangChain streaming layer. The .astream() method returns an async iterator of AIMessageChunk objects. Each chunk contains a .content string — usually one to three tokens. Some chunks arrive with empty content (metadata updates), so we filter those out.
Notice we set streaming=True on the model. Without it, .astream() still works but receives the entire response in a single chunk — defeating the purpose. I've debugged this exact misconfiguration more times than I care to admit.
Streaming with Chains and Prompts
In a real application, you rarely call the LLM directly. You run a chain — a prompt template piped into the model, sometimes with an output parser. LangChain's LCEL (LangChain Expression Language) makes every chain streamable by default.
The | operator pipes the prompt into the model into the parser. When you call .astream() on the resulting chain, LangChain streams through every component. The StrOutputParser extracts the string content from each chunk, so token is already a plain string.
The SSE Protocol — What Actually Goes Over the Wire
Server-Sent Events is a simple text protocol built on HTTP. The server sends a stream of events, each formatted as data: <content>\n\n. The client reads them using the EventSource API (built into every browser) or any HTTP client that supports streaming.
Here is what a raw SSE stream looks like on the wire. Each event is a text line starting with data:, followed by two newlines to mark the boundary.
Quick check: If a server sends data: {"token": "Hi"}\n\ndata: {"token": " there"}\n\n, how many SSE events does the client receive? Two. Each data: line followed by a blank line is one event. If the server accidentally sent data: {"token": "Hi"}\ndata: {"token": " there"}\n\n (single newline between them), the client would receive ONE event with both lines concatenated.
FastAPI Streaming Endpoint — Putting It Together
This is where the pieces connect. FastAPI's StreamingResponse accepts any async generator and sends each yielded value to the client. We set the content type to text/event-stream so clients know to parse it as SSE.
Three things to notice. First, generate_stream is an async generator — it yields SSE-formatted strings. Second, each event is data: <json>\n\n — the double newline is non-negotiable. Third, those response headers matter: Cache-Control: no-cache prevents intermediate caches from buffering, and X-Accel-Buffering: no tells Nginx to pass chunks through immediately.
I learned the Nginx buffering lesson the hard way. My stream worked perfectly in development but chunked into 4KB batches behind Nginx in production. One header fixed it, but it took two hours of debugging to find.
Write an async generator function called token_generator that takes a text string, splits it into words, and yields each word one at a time. After all words are yielded, yield the string "[DONE]" as a final signal.
The function should yield plain strings (the words themselves), not SSE-formatted data.
Error Handling Mid-Stream
What happens when the LLM throws an error after you've already started streaming? You can't return a 500 status code — the response headers (including 200 OK) were sent with the first chunk. This is the hardest part of streaming architecture.
The solution: send the error as an SSE event with a distinct event type. The client watches for error events and handles them separately from token events.
The event: error prefix is an SSE feature. Regular data events have no event: line. Named events let the client use addEventListener("error", ...) instead of onmessage. This separation means token events and error events never collide.
Timeout Protection
LLM APIs occasionally hang — the connection stays open but no tokens arrive. Without a timeout, your server holds that connection forever. Here is a pattern I use on every streaming endpoint.
Two layers of protection here. The outer asyncio.wait_for caps the total stream duration. The inner wait_for on __anext__() catches stalls — if 10 seconds pass with no new token, we bail. In production, I typically set the total timeout to 60 seconds and the per-token timeout to 15.
Quick check: Why can't we return a normal HTTP 500 error when the LLM fails mid-stream? Because the HTTP status code (200 OK) was already sent with the first byte of the response. HTTP status codes live in the response headers, and headers are sent before the body. Once streaming starts, the only way to communicate errors is inside the stream itself — hence the event: error SSE pattern.
Request Validation and Rate Limiting
A streaming endpoint is expensive — each request holds a server connection and an LLM API call. Without validation and rate limiting, a single abusive client can exhaust your OpenAI quota in minutes.
Pydantic validates the request before we even touch the LLM. Max length 4,000 characters prevents prompt injection attacks that stuff enormous payloads. The model whitelist stops clients from switching to expensive models you haven't budgeted for.
Client-Side SSE Parsing in Python
Whether you're testing your API or building a Python client, you need to parse the SSE stream. The httpx library handles streaming HTTP responses cleanly.
The response.aiter_lines() method yields one line at a time as they arrive — it does not buffer the entire response. We strip the data: prefix (6 characters), check for the [DONE] sentinel, and parse the JSON payload. The flush=True on print ensures tokens appear immediately in the terminal.
Write a function parse_sse_events that takes a list of raw SSE lines (strings) and returns a list of parsed token strings. The function should:
1. Skip empty lines
2. Skip lines starting with "event:"
3. For lines starting with "data: ", extract the payload after the prefix
4. If the payload is "[DONE]", stop processing and do NOT include it in the output
5. Otherwise, parse the JSON payload and extract the "token" value
Return the list of extracted token strings.
Multi-Turn Conversations with Streaming
A single-question endpoint is a demo. A real chatbot maintains conversation history. The client sends the full message history with each request, and the server passes it to the LLM. The streaming layer doesn't change — only the input format gets richer.
The key detail is max_length=50 on the messages list. Without a cap, a client could send thousands of messages and blow through your context window (and your budget). Fifty messages covers most conversations. Adjust based on your model's context limit.
CORS, Security, and Middleware
Your frontend lives on app.example.com. Your API lives on api.example.com. The browser blocks the request unless your API explicitly allows cross-origin requests. Here is the minimal CORS setup for a streaming endpoint.
Never set allow_origins=["*"] in production. That opens your expensive LLM endpoint to any website. Whitelist your specific frontend domains.
API Key Authentication
Full Application — All Pieces Together
Here is the complete server file. Copy it into main.py, set your environment variables, and run with uvicorn main:app --reload.
Deployment Patterns and Production Considerations
Development servers and production servers have different constraints. Here are the patterns I've found reliable across several streaming AI deployments.
Gunicorn with Uvicorn Workers
Each worker can handle hundreds of concurrent streaming connections because the async event loop releases the thread during await. Four workers on a 2-core machine gives you capacity for roughly 400-800 simultaneous streams. The bottleneck is almost always the LLM API rate limit, not your server.
Docker Configuration
Nginx Configuration
If Nginx sits in front of your API (which it should in production), you need to disable response buffering. Otherwise Nginx collects chunks into 4KB batches and the stream becomes jerky.
The proxy_buffering off directive is the important one. Without it, Nginx buffers the response body before forwarding it. Combined with the X-Accel-Buffering: no header we set in FastAPI, this ensures tokens flow through immediately.
Common Mistakes and How to Fix Them
# Receives entire response as one chunk
llm = ChatOpenAI(model="gpt-4o-mini")
async for chunk in llm.astream([msg]):
print(chunk.content) # Prints everything at once# Receives tokens incrementally
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
async for chunk in llm.astream([msg]):
print(chunk.content) # Prints token by tokendef generate_sync(question):
for token in chain.invoke(question):
yield f"data: {token}\n\n"
# This blocks the event loop!async def generate_async(question):
async for token in chain.astream({"question": question}):
yield f"data: {json.dumps({'token': token})}\n\n"
# Event loop stays responsiveOther mistakes I see regularly:
Buffered print statements in the generator. Adding print() calls inside your async generator for debugging is fine, but Python buffers stdout by default. In a Docker container, you won't see logs until the buffer flushes. Set PYTHONUNBUFFERED=1 in your environment or use logging instead.
No health check endpoint. Load balancers need a /health route to know your service is alive. Without it, they send traffic to crashed workers. A one-line GET /health that returns {"status": "ok"} is enough.
Forgetting `flush=True` on the client side. When printing tokens in a Python client, print(token, end="") buffers output. Add flush=True to see tokens as they arrive.
Write a function format_sse_events that takes a list of token strings and an optional include_done boolean (default True). It should return a list of properly formatted SSE event strings.
Each token should be wrapped in a JSON object with a "token" key and formatted as data: {"token": "..."} followed by two newlines (\n\n).
If include_done is True, append a final data: [DONE]\n\n event.
Also handle a special case: if a token dict is passed with an "error" key set to True, format it as a named event: event: error\ndata: {"error": true, "message": "..."}\n\n
Frequently Asked Questions
Can I use WebSockets instead of SSE for streaming?
Yes, and WebSockets are the right choice when the client also sends messages mid-stream (e.g., a "stop generating" button). For pure LLM streaming where the client sends one request and receives many tokens, SSE is simpler because it uses plain HTTP, works through all proxies, and auto-reconnects. Most AI APIs use SSE.
How do I implement a "Stop Generating" button with SSE?
SSE is server-to-client only, so the client can't send a stop signal through the same connection. Two approaches work: (1) the client closes the EventSource connection — FastAPI detects the disconnect and you cancel the LLM call, or (2) use a separate POST endpoint that sets a cancellation flag the generator checks between tokens.
How do I stream from Anthropic/Claude instead of OpenAI?
Swap ChatOpenAI for ChatAnthropic from langchain-anthropic. The streaming interface is identical — .astream() yields AIMessageChunk objects. The rest of the code (FastAPI route, SSE formatting, error handling) stays exactly the same. That's the point of LangChain's abstraction.
What about streaming with multiple LLM calls in sequence (chains with retrieval)?
RAG chains (retrieval + generation) stream the generation step automatically. The retrieval step runs first and blocks, then the generation streams. If you want the client to know the retrieval is happening, yield a status event before starting the generation: yield "data: {\"status\": \"searching\"}\n\n".