diff --git a/docs/realtime/backend/input-streams.mdx b/docs/realtime/backend/input-streams.mdx
new file mode 100644
index 00000000000..1224e24244e
--- /dev/null
+++ b/docs/realtime/backend/input-streams.mdx
@@ -0,0 +1,155 @@
+---
+title: Input Streams
+sidebarTitle: Input Streams
+description: Send data into running tasks from your backend code
+---
+
+The Input Streams API allows you to send data into running Trigger.dev tasks from your backend code. This enables bidirectional communication — while [output streams](/realtime/backend/streams) let you read data from tasks, input streams let you push data into them.
+
+
+ To learn how to receive input stream data inside your tasks, see [Input
+ Streams](/tasks/streams#input-streams) in the Streams doc.
+
+
+## Sending data to a running task
+
+### Using defined input streams (Recommended)
+
+The recommended approach is to use [defined input streams](/tasks/streams#defining-input-streams) for full type safety:
+
+```ts
+import { cancelSignal, approval } from "./trigger/streams";
+
+// Cancel a running AI stream
+await cancelSignal.send(runId, { reason: "User clicked stop" });
+
+// Approve a draft
+await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
+```
+
+The `.send()` method is fully typed — the data parameter must match the generic type you defined on the input stream.
+
+
+ `.send()` works the same regardless of how the task is listening — whether it uses `.wait()`
+ (suspending), `.once()` (non-suspending), or `.on()` (continuous). The sender doesn't need to know
+ how the task is consuming the data. See [Input Streams](/tasks/streams#input-streams) for details on each
+ receiving method.
+
+
+## Practical examples
+
+### Cancel from a Next.js API route
+
+```ts app/api/cancel/route.ts
+import { cancelStream } from "@/trigger/streams";
+
+export async function POST(req: Request) {
+ const { runId } = await req.json();
+
+ await cancelStream.send(runId, { reason: "User clicked stop" });
+
+ return Response.json({ cancelled: true });
+}
+```
+
+### Approval workflow API
+
+```ts app/api/approve/route.ts
+import { approval } from "@/trigger/streams";
+
+export async function POST(req: Request) {
+ const { runId, approved, reviewer } = await req.json();
+
+ await approval.send(runId, {
+ approved,
+ reviewer,
+ });
+
+ return Response.json({ success: true });
+}
+```
+
+### Remix action handler
+
+```ts app/routes/api.approve.ts
+import { json, type ActionFunctionArgs } from "@remix-run/node";
+import { approval } from "~/trigger/streams";
+
+export async function action({ request }: ActionFunctionArgs) {
+ const formData = await request.formData();
+ const runId = formData.get("runId") as string;
+ const approved = formData.get("approved") === "true";
+ const reviewer = formData.get("reviewer") as string;
+
+ await approval.send(runId, { approved, reviewer });
+
+ return json({ success: true });
+}
+```
+
+### Express handler
+
+```ts
+import express from "express";
+import { cancelSignal } from "./trigger/streams";
+
+const app = express();
+app.use(express.json());
+
+app.post("/api/cancel", async (req, res) => {
+ const { runId, reason } = req.body;
+
+ await cancelSignal.send(runId, { reason });
+
+ res.json({ cancelled: true });
+});
+```
+
+### Sending from another task
+
+You can send input stream data from one task to another running task:
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { approval } from "./streams";
+
+export const reviewerTask = task({
+ id: "auto-reviewer",
+ run: async (payload: { targetRunId: string }) => {
+ // Perform automated review logic...
+ const isApproved = await performReview();
+
+ // Send approval to the waiting task
+ await approval.send(payload.targetRunId, {
+ approved: isApproved,
+ reviewer: "auto-reviewer",
+ });
+ },
+});
+```
+
+## Error handling
+
+The `.send()` method will throw if:
+
+- The run has already completed, failed, or been canceled
+- The payload exceeds the 1MB size limit
+- The run ID is invalid
+
+```ts
+import { cancelSignal } from "./trigger/streams";
+
+try {
+ await cancelSignal.send(runId, { reason: "User clicked stop" });
+} catch (error) {
+ console.error("Failed to send:", error);
+ // Handle the error — the run may have already completed
+}
+```
+
+## Important notes
+
+- Maximum payload size per `.send()` call is **1MB**
+- You cannot send data to a completed, failed, or canceled run
+- Data sent before a listener is registered inside the task is **buffered** and delivered when a listener attaches
+- Input streams require the current streams implementation (v2 is the default in SDK 4.1.0+). See [Streams](/tasks/streams) for details.
diff --git a/docs/realtime/react-hooks/streams.mdx b/docs/realtime/react-hooks/streams.mdx
index 670941e8997..f5c1ab0679e 100644
--- a/docs/realtime/react-hooks/streams.mdx
+++ b/docs/realtime/react-hooks/streams.mdx
@@ -151,7 +151,70 @@ const { parts, error } = useRealtimeStream(runId, {
});
```
-For more information on defining and using streams, see the [Realtime Streams v2](/tasks/streams) documentation.
+For more information on defining and using streams, see the [Realtime Streams](/tasks/streams) documentation.
+
+## useInputStreamSend
+
+The `useInputStreamSend` hook lets you send data from your frontend into a running task's [input stream](/tasks/streams#input-streams). Use it for cancel buttons, approval forms, or any UI that needs to push typed data into a running task.
+
+### Basic usage
+
+Pass the input stream's `id` (string), the run ID, and options such as `accessToken`. You typically get `runId` and `accessToken` from the object returned when you trigger the task (e.g. `handle.id`, `handle.publicAccessToken`). The hook returns `send`, `isLoading`, `error`, and `isReady`:
+
+```tsx
+"use client";
+
+import { useInputStreamSend } from "@trigger.dev/react-hooks";
+import { approval } from "@/trigger/streams";
+
+export function ApprovalForm({
+ runId,
+ accessToken,
+}: {
+ runId: string;
+ accessToken: string;
+}) {
+ const { send, isLoading, isReady } = useInputStreamSend(
+ approval.id,
+ runId,
+ { accessToken }
+ );
+
+ return (
+
+ );
+}
+```
+
+With a generic for type-safe payloads when not using a defined stream:
+
+```tsx
+type ApprovalPayload = { approved: boolean; reviewer: string };
+const { send } = useInputStreamSend("approval", runId, {
+ accessToken,
+});
+send({ approved: true, reviewer: "alice" });
+```
+
+### Options and return value
+
+- **`streamId`**: The input stream identifier (string). Use the `id` from your defined stream (e.g. `approval.id`) or the same string you used in `streams.input({ id: "approval" })`.
+- **`runId`**: The run to send input to. When `runId` is undefined, `isReady` is false and `send` will not trigger.
+- **`options`**: `accessToken` (required for client usage), `baseURL` (optional). See [Realtime auth](/realtime/auth) for generating a public access token with the right scopes (e.g. input streams write for that run).
+
+Return value:
+
+- **`send(data)`**: Sends typed data to the input stream. Uses SWR mutation under the hood.
+- **`isLoading`**: True while a send is in progress.
+- **`error`**: Set if the last send failed.
+- **`isReady`**: True when both `runId` and access token are available.
+
+For receiving input stream data inside a task (`.wait()`, `.once()`, `.on()`), see [Input Streams](/tasks/streams#input-streams) in the Streams doc.
## useRealtimeRunWithStreams
diff --git a/docs/tasks/streams.mdx b/docs/tasks/streams.mdx
index 2d494977a32..563ccee2639 100644
--- a/docs/tasks/streams.mdx
+++ b/docs/tasks/streams.mdx
@@ -4,17 +4,17 @@ sidebarTitle: "Streams"
description: "Stream data in realtime from your Trigger.dev tasks to your frontend or backend applications."
---
-Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow.
+Realtime Streams allow you to pipe streaming data from your Trigger.dev tasks to your frontend or backend applications in real-time. This is perfect for use cases like streaming AI completions, progress updates, or any continuous data flow. You can also **send data into** running tasks with [Input Streams](#input-streams) for bidirectional flows (e.g. cancel buttons, approvals).
- Streams v2 requires SDK version **4.1.0 or later**. Make sure to upgrade your `@trigger.dev/sdk`
- and `@trigger.dev/react-hooks` packages to use these features. If you're on an earlier version,
- see the [metadata.stream()](/runs/metadata#stream) documentation.
+ Streams require SDK version **4.1.0 or later** (`@trigger.dev/sdk` and `@trigger.dev/react-hooks`).
+ This doc describes the current streams behavior (v2 is the default). For pre-4.1.0 streams, see
+ [Pre-4.1.0 streams (legacy)](#pre-410-streams-legacy) below.
## Overview
-Streams v2 is a major upgrade that provides:
+Realtime Streams provide:
- **Unlimited stream length** (previously capped at 2000 chunks)
- **Unlimited active streams per run** (previously 5)
@@ -23,47 +23,27 @@ Streams v2 is a major upgrade that provides:
- **Multiple client streams** can pipe to a single stream
- **Enhanced dashboard visibility** for viewing stream data in real-time
-## Enabling Streams v2
-
-Streams v2 is **automatically enabled** when triggering runs from the SDK using 4.1.0 or later. If you aren't triggering via the SDK, you'll need to explicitly enable v2 streams via setting the `x-trigger-realtime-streams-version=v2` header when triggering the task.
-
-If you'd like to **opt-out** of the v2 streams, you can see so in one of the following two ways:
-
-### Option 1: Configure the SDK
-
-```ts
-import { auth } from "@trigger.dev/sdk";
-
-auth.configure({
- future: {
- v2RealtimeStreams: false,
- },
-});
-```
-
-### Option 2: Environment Variable
-
-Set the `TRIGGER_V2_REALTIME_STREAMS=0` environment variable in your backend code (where you trigger tasks).
+Streams v2 is the **default** when using SDK 4.1.0 or later. If you trigger tasks outside the SDK, set the `x-trigger-realtime-streams-version=v2` header. To opt out, use `auth.configure({ future: { v2RealtimeStreams: false } })` or `TRIGGER_V2_REALTIME_STREAMS=0`.
## Limits Comparison
-| Limit | Streams v1 | Streams v2 |
-| -------------------------------- | ---------- | ---------- |
-| Maximum stream length | 2000 | Unlimited |
-| Number of active streams per run | 5 | Unlimited |
-| Maximum streams per run | 10 | Unlimited |
-| Maximum stream TTL | 1 day | 28 days |
-| Maximum stream size | 10MB | 300 MiB |
+| Limit | Legacy (pre-4.1.0) | Current |
+| -------------------------------- | ------------------ | --------- |
+| Maximum stream length | 2000 | Unlimited |
+| Number of active streams per run | 5 | Unlimited |
+| Maximum streams per run | 10 | Unlimited |
+| Maximum stream TTL | 1 day | 28 days |
+| Maximum stream size | 10MB | 300 MiB |
## Quick Start
-The recommended workflow for using Realtime Streams v2:
+The recommended workflow for **output** streams (data from task to client):
1. **Define your streams** in a shared location using `streams.define()`
2. **Use the defined stream** in your tasks with `.pipe()`, `.append()`, or `.writer()`
3. **Read from the stream** using `.read()` or the `useRealtimeStream` hook in React
-This approach gives you full type safety, better code organization, and easier maintenance as your application grows.
+This approach gives you full type safety, better code organization, and easier maintenance as your application grows. For **input** streams (sending data into a running task), see [Input Streams](#input-streams) below.
## Defining Typed Streams (Recommended)
@@ -517,6 +497,161 @@ const { parts, error } = useRealtimeStream(streamDef, runId, {
});
```
+## Input Streams
+
+Input Streams let you send data **into** a running task from your backend or frontend. While output streams (above) send data out of tasks, input streams complete the loop — enabling bidirectional communication.
+
+
+ Input Streams require SDK version **4.4.2 or later** and use the same streams infrastructure (v2 is the default). If you're on an older SDK, calling `.on()` or `.once()` will throw with instructions to enable v2 streams. See [Pre-4.1.0 streams (legacy)](#pre-410-streams-legacy) for the older metadata-based API.
+
+
+### Input Streams overview
+
+Input Streams solve three common problems:
+
+- **Cancelling AI streams mid-generation.** When you use AI SDK's `streamText` inside a task, the LLM keeps generating until it's done — even if the user clicked "Stop." With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately.
+- **Human-in-the-loop workflows.** A task generates a draft, then pauses and waits for the user to approve or edit it before continuing.
+- **Interactive agents.** An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context.
+
+### Quick Start (Input Streams)
+
+1. **Define** input streams in a shared file with `streams.input({ id: "..." })`.
+2. **Receive** in your task with `.wait()`, `.once()`, `.on()`, or `.peek()`.
+3. **Send** from your backend with `.send(runId, data)` or from the frontend with the `useInputStreamSend` hook (see [Realtime React hooks](/realtime/react-hooks/streams#useinputstreamsend)).
+
+### Defining Input Streams
+
+Use `streams.input()` to define a typed input stream. The generic parameter controls the shape of data that can be sent:
+
+```ts
+import { streams } from "@trigger.dev/sdk";
+
+export const cancelSignal = streams.input<{ reason?: string }>({
+ id: "cancel",
+});
+
+export const approval = streams.input<{ approved: boolean; reviewer: string }>({
+ id: "approval",
+});
+
+export const userResponse = streams.input<{
+ action: "approve" | "reject" | "edit";
+ message?: string;
+ edits?: Record;
+}>({
+ id: "user-response",
+});
+```
+
+Type safety is enforced through the generic — both `.send()` and the receiving methods (`.wait()`, `.once()`, `.on()`, `.peek()`) share the same type.
+
+### Receiving data inside a task
+
+| Method | Task suspended? | Compute cost while waiting | Best for |
+|--------|-----------------|----------------------------|-----------|
+| `.wait()` | **Yes** | **None** — process freed | Approval gates, human-in-the-loop, long waits |
+| `.once()` | No | Full — process stays alive | Short waits, concurrent work; returns result object with `.unwrap()` |
+| `.on(handler)` | No | Full — process stays alive | Continuous listening (cancel signals, live updates) |
+| `.peek()` | No | None | Non-blocking check for latest buffered value |
+
+#### `wait()` — Suspend until data arrives
+
+Suspends the task entirely, freeing compute resources. The task resumes when data arrives via `.send()`. Returns a [`ManualWaitpointPromise`](/wait-for-token) — the same type as `wait.forToken()`.
+
+```ts
+import { task } from "@trigger.dev/sdk";
+import { approval } from "./streams";
+
+export const publishPost = task({
+ id: "publish-post",
+ run: async (payload: { postId: string }) => {
+ const draft = await prepareDraft(payload.postId);
+ await notifyReviewer(draft);
+
+ const result = await approval.wait({ timeout: "7d" });
+
+ if (result.ok) {
+ if (result.output.approved) {
+ await publish(draft);
+ return { published: true, reviewer: result.output.reviewer };
+ }
+ return { published: false, reviewer: result.output.reviewer };
+ }
+ return { published: false, timedOut: true };
+ },
+});
+```
+
+Use `.unwrap()` to throw on timeout: `const data = await approval.wait({ timeout: "24h" }).unwrap();`
+
+**Options:** `timeout` (e.g. `"30s"`, `"5m"`, `"24h"`, `"7d"`), `idempotencyKey`, `idempotencyKeyTTL`, `tags`. Use `idempotencyKey` when your task has retries so the same waitpoint is resumed across retries.
+
+#### `once()` — Wait for the next value (non-suspending)
+
+Blocks until data arrives but keeps the task process alive. Returns a result object; use `.unwrap()` to get the data or throw on timeout.
+
+```ts
+const result = await approval.once({ timeoutMs: 300_000 });
+if (result.ok) {
+ console.log(result.output.approved);
+}
+// Or: const data = await approval.once({ timeoutMs: 300_000 }).unwrap();
+```
+
+`once()` also accepts a `signal` (e.g. `AbortController.signal`) for cancellation.
+
+#### `on()` — Listen for every value
+
+Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes. Call `.off()` on the returned subscription to stop listening early.
+
+```ts
+const controller = new AbortController();
+cancelSignal.on((data) => {
+ console.log("Cancelled:", data.reason);
+ controller.abort();
+});
+const result = await streamText({ ..., abortSignal: controller.signal });
+```
+
+#### `peek()` — Non-blocking check
+
+Returns the most recent buffered value without waiting, or `undefined` if nothing has been received yet.
+
+```ts
+const latest = cancelSignal.peek();
+if (latest) {
+ // A cancel was already sent before we checked
+}
+```
+
+### Sending data to a running task
+
+Use `.send(runId, data)` from your backend to push data into a running task. See the [backend input streams guide](/realtime/backend/input-streams) for API route patterns.
+
+```ts
+import { cancelSignal, approval } from "./trigger/streams";
+
+await cancelSignal.send(runId, { reason: "User clicked stop" });
+await approval.send(runId, { approved: true, reviewer: "alice@example.com" });
+```
+
+### Complete example: Cancellable AI streaming
+
+Stream an AI response while allowing the user to cancel mid-generation.
+
+**Define the streams:**
+
+```ts
+import { streams } from "@trigger.dev/sdk";
+
+export const aiOutput = streams.define({ id: "ai" });
+export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" });
+```
+
+**Task:** Register `cancelStream.on()` to abort an `AbortController`, then pipe `streamText(...).textStream` to `aiOutput`. **Backend:** POST to an API route that calls `cancelStream.send(runId, { reason: "User clicked stop" })`. **Frontend:** Use `useRealtimeStream(aiOutput, runId, { accessToken })` and a button that calls your cancel API (or use the `useInputStreamSend` hook; see [Realtime React hooks](/realtime/react-hooks/streams#useinputstreamsend)).
+
+**Important notes (input streams):** You cannot send to a completed, failed, or canceled run. Max payload per `.send()` is 1MB. Data sent before a listener is registered is buffered and delivered when a listener attaches; `.wait()` handles the buffering race automatically. Use `.wait()` for long waits to free compute; use `.once()` for short waits or concurrent work. Define input streams in a shared location and combine with output streams for full bidirectional communication.
+
## Complete Example: AI Streaming
### Define the stream
@@ -709,11 +844,14 @@ Streams are now visible in the Trigger.dev dashboard, allowing you to:
6. **Throttle frontend updates**: Use `throttleInMs` in `useRealtimeStream` to prevent excessive re-renders
7. **Use descriptive stream IDs**: Choose clear, descriptive IDs like `"ai-output"` or `"progress"` instead of generic names
+## Pre-4.1.0 streams (legacy)
+
+Prior to SDK 4.1.0, streams used the older metadata-based API. If you're on an earlier version, see [metadata.stream()](/runs/metadata#stream) for legacy usage. With 4.4.2+, [Input Streams](#input-streams) are available and documented in this page.
+
## Troubleshooting
### Stream not appearing in dashboard
-- Ensure you've enabled Streams v2 via the future flag or environment variable
- Verify your task is actually writing to the stream
- Check that the stream key matches between writing and reading
@@ -725,6 +863,11 @@ Streams are now visible in the Trigger.dev dashboard, allowing you to:
### Missing chunks
-- With v2, chunks should never be lost due to automatic resumption
+- With the current streams implementation, chunks should not be lost due to automatic resumption
- Verify you're reading from the correct stream key
- Check the `startIndex` option if you're not seeing expected chunks
+
+### Input streams not working
+
+- Input streams require SDK **4.4.2 or later** and the default streams (v2) infrastructure. Ensure you're on a recent SDK and not using the legacy metadata.stream() API.
+- If `.on()` or `.once()` throw, follow the error message to enable v2 streams (they are default in 4.1.0+).
diff --git a/docs/wait-for-token.mdx b/docs/wait-for-token.mdx
index 98d7ec96e7d..9ac13af2c20 100644
--- a/docs/wait-for-token.mdx
+++ b/docs/wait-for-token.mdx
@@ -7,6 +7,10 @@ Waitpoint tokens pause task runs until you complete the token. They're commonly
You can complete a token using the SDK or by making a POST request to the token's URL.
+
+ If you're waiting for data from an [input stream](/tasks/streams#input-streams), use [`inputStream.wait()`](/tasks/streams#wait--suspend-until-data-arrives) instead — it uses waitpoint tokens internally but provides a simpler API with full type safety from your stream definition.
+
+
## Usage
To get started using wait tokens, you need to first create a token using the `wait.createToken` function:
diff --git a/docs/wait.mdx b/docs/wait.mdx
index cfe5b2385bf..e67fbad9e6e 100644
--- a/docs/wait.mdx
+++ b/docs/wait.mdx
@@ -10,8 +10,9 @@ Waiting allows you to write complex tasks as a set of async code, without having
-| Function | What it does |
-| :--------------------------------- | :----------------------------------------------- |
-| [wait.for()](/wait-for) | Waits for a specific period of time, e.g. 1 day. |
-| [wait.until()](/wait-until) | Waits until the provided `Date`. |
-| [wait.forToken()](/wait-for-token) | Pauses runs until a token is completed. |
+| Function | What it does |
+| :------------------------------------------------ | :--------------------------------------------------------------- |
+| [wait.for()](/wait-for) | Waits for a specific period of time, e.g. 1 day. |
+| [wait.until()](/wait-until) | Waits until the provided `Date`. |
+| [wait.forToken()](/wait-for-token) | Pauses runs until a token is completed. |
+| [inputStream.wait()](/tasks/streams#wait--suspend-until-data-arrives) | Pauses runs until data arrives on an input stream. |