Streaming Responses
Implementing real-time streaming responses with the Claude API
title: Streaming Responses description: Implementing real-time streaming responses with the Claude API
Streaming allows you to receive Claude's response in real-time as it's generated, providing a better user experience and enabling progressive rendering. This guide covers implementing streaming in various contexts.
Why Use Streaming?
- Better UX: Users see responses immediately instead of waiting
- Progressive rendering: Display partial results as they arrive
- Long responses: Handle lengthy outputs without timeout issues
- Cancellation: Users can stop generation mid-stream
Basic Streaming Implementation
Node.js / TypeScript
import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
async function streamResponse() {
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: "Explain quantum computing" }],
});
// Process chunks as they arrive
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
process.stdout.write(event.delta.text);
}
}
// Get final message
const finalMessage = await stream.finalMessage();
console.log("\n\nTotal tokens:", finalMessage.usage.output_tokens);
}
Python
import anthropic
client = anthropic.Anthropic()
def stream_response():
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum computing"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
Server-Sent Events (SSE)
Next.js API Route
// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
export async function POST(request: Request) {
const { message } = await request.json();
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: message }],
});
// Create readable stream for SSE
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
const data = JSON.stringify({ text: event.delta.text });
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
},
});
return new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Client-Side Consumption
async function chat(message: string) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") continue;
const parsed = JSON.parse(data);
// Update UI with parsed.text
console.log(parsed.text);
}
}
}
}
Stream Events
Understanding the event types:
| Event Type | Description |
|------------|-------------|
| message_start | Stream beginning, contains message metadata |
| content_block_start | New content block starting |
| content_block_delta | Incremental content update |
| content_block_stop | Content block complete |
| message_delta | Message-level updates (stop reason, usage) |
| message_stop | Stream complete |
Processing All Events
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: "Hello" }],
});
stream.on("message_start", (event) => {
console.log("Message started:", event.message.id);
});
stream.on("content_block_start", (event) => {
console.log("Content block started:", event.index);
});
stream.on("text", (text) => {
// Simplified text-only handler
process.stdout.write(text);
});
stream.on("message_stop", () => {
console.log("\nMessage complete");
});
stream.on("error", (error) => {
console.error("Stream error:", error);
});
await stream.finalMessage();
Streaming with Tool Use
When Claude uses tools, the stream includes tool calls:
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
tools: [
{
name: "get_weather",
description: "Get current weather for a location",
input_schema: {
type: "object",
properties: {
location: { type: "string" },
},
required: ["location"],
},
},
],
messages: [{ role: "user", content: "What's the weather in Tokyo?" }],
});
let currentToolInput = "";
for await (const event of stream) {
if (event.type === "content_block_start") {
if (event.content_block.type === "tool_use") {
console.log("Tool call:", event.content_block.name);
}
}
if (event.type === "content_block_delta") {
if (event.delta.type === "input_json_delta") {
currentToolInput += event.delta.partial_json;
}
}
if (event.type === "content_block_stop") {
if (currentToolInput) {
const toolInput = JSON.parse(currentToolInput);
console.log("Tool input:", toolInput);
currentToolInput = "";
}
}
}
Cancellation
Client-Side Cancellation
const controller = new AbortController();
// In UI: provide a cancel button
document.getElementById("cancel")?.addEventListener("click", () => {
controller.abort();
});
try {
const response = await fetch("/api/chat", {
method: "POST",
body: JSON.stringify({ message: "Write a long story" }),
signal: controller.signal,
});
// Process stream...
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream cancelled by user");
}
}
Server-Side Cancellation
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 8192,
messages: [{ role: "user", content: "Write a very long story" }],
});
// Abort after 10 seconds
setTimeout(() => {
stream.abort();
}, 10000);
try {
for await (const event of stream) {
// Process events...
}
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream aborted");
}
}
React Integration
Custom Hook for Streaming
import { useState, useCallback } from "react";
interface UseStreamingChatResult {
messages: string[];
isLoading: boolean;
streamingContent: string;
sendMessage: (message: string) => Promise<void>;
stopGeneration: () => void;
}
export function useStreamingChat(): UseStreamingChatResult {
const [messages, setMessages] = useState<string[]>([]);
const [isLoading, setIsLoading] = useState(false);
const [streamingContent, setStreamingContent] = useState("");
const [abortController, setAbortController] =
useState<AbortController | null>(null);
const sendMessage = useCallback(async (message: string) => {
setIsLoading(true);
setStreamingContent("");
const controller = new AbortController();
setAbortController(controller);
try {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
signal: controller.signal,
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let fullContent = "";
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ") && line !== "data: [DONE]") {
const { text } = JSON.parse(line.slice(6));
fullContent += text;
setStreamingContent(fullContent);
}
}
}
setMessages((prev) => [...prev, message, fullContent]);
setStreamingContent("");
} catch (error) {
if ((error as Error).name !== "AbortError") {
console.error("Chat error:", error);
}
} finally {
setIsLoading(false);
setAbortController(null);
}
}, []);
const stopGeneration = useCallback(() => {
abortController?.abort();
}, [abortController]);
return {
messages,
isLoading,
streamingContent,
sendMessage,
stopGeneration,
};
}
Best Practices
1. Handle Reconnection
async function streamWithRetry(message: string, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await streamResponse(message);
return;
} catch (error) {
if (attempt === maxRetries - 1) throw error;
await new Promise((r) => setTimeout(r, 1000 * (attempt + 1)));
}
}
}
2. Progress Indication
Show users that something is happening:
let totalChars = 0;
for await (const event of stream) {
if (event.type === "content_block_delta") {
totalChars += event.delta.text.length;
updateProgress(`Received ${totalChars} characters...`);
}
}
3. Buffer for Word Boundaries
Avoid showing partial words:
let buffer = "";
for await (const event of stream) {
if (event.type === "content_block_delta") {
buffer += event.delta.text;
// Only display complete words
const lastSpace = buffer.lastIndexOf(" ");
if (lastSpace > 0) {
display(buffer.slice(0, lastSpace));
buffer = buffer.slice(lastSpace + 1);
}
}
}
// Display remaining buffer
display(buffer);