Skip to content

stream: experimental stream/iter implementation#62066

Open
jasnell wants to merge 42 commits intonodejs:mainfrom
jasnell:jasnell/new-streams-prototype
Open

stream: experimental stream/iter implementation#62066
jasnell wants to merge 42 commits intonodejs:mainfrom
jasnell:jasnell/new-streams-prototype

Conversation

@jasnell
Copy link
Member

@jasnell jasnell commented Mar 1, 2026

Opening this for discussion. Not intending to land this yet. It adds an implementation of the "new streams" to core and adds support to FileHandle with tests and benchmarks just to explore implementation feasibility, performance, etc.

This is an implementation of the "new streams" API for Node.js along with an example integration with FileHandle. This covers the core part of the implementation.

The module is stream/iter. It is gated behind the --experimental-stream-iter CLI flag.

Benchmark results comparing Node.js streams, Web streams, and stream/iter (higher number is better)

Benchmark classic webstream iter iter-sync iter vs classic
Identity 1MB 1,245 582 3,110 16,658 2.5x
Identity 64MB 31,410 14,980 33,894 62,111 1.1x
Transform 1MB 287 227 325 327 1.1x
Transform 64MB 595 605 605 573 1.0x
Compression 1MB 123 98 110 -- 0.9x
Compression 64MB 329 303 308 -- 0.9x
pipeTo 1MB 1,137 494 2,740 13,611 2.4x
pipeTo 64MB 22,081 15,377 30,036 60,976 1.4x
Broadcast 1c 1MB 1,365 521 1,991 -- 1.5x
Broadcast 2c 1MB 1,285 439 1,962 -- 1.5x
Broadcast 4c 1MB 1,217 322 750 -- 0.6x
File read 16MB 1,469 537 1,639 -- 1.1x

It's worth noting that the performance of the FileHandle benchmarked added, that reads files, converts them to upper case and then compresses them, is on par with node.js streams and twice as fast as web streams. (tho... web streams are not perf optimized in any way so take that 2x with a grain of salt). The majority of the perf cost in the benchmark is due to compression overhead. Without the compression transform, the new stream can be up to 15% faster than reading the file with classic node.js streams.

The main thing this shows is that the new streams impl can (a) perform reasonably and (b) sit comfortably alongside the existing impls without any backwards compat concerns.

Benchmark runs:

fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="classic": 0.4520276595366672
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="classic": 0.5974527572097321
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="classic": 0.6425952035725405
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="webstream": 0.1911778984563999
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="webstream": 0.2179878501077266
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="webstream": 0.2446390516960688
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="pull": 0.5118129753083176
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="pull": 0.6280697056085692
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="pull": 0.596177892010514
--- 
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="classic": 0.44890689503274533
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="classic": 0.5922959407897667
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="classic": 0.6151916200977057
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="webstream": 0.22796906713941217
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="webstream": 0.2517499148269662
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="webstream": 0.2613608248108332
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=1048576 api="pull": 0.4725187688512099
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=16777216 api="pull": 0.5180217625521253
fs/bench-filehandle-pull-vs-webstream.js n=5 filesize=67108864 api="pull": 0.616770183722841

Opencode/Opus 4.6 were leveraged heavily in the process of creating this PR following a strict iterative jasnell-in-the-loop process.

@jasnell jasnell requested review from mcollina and ronag March 1, 2026 18:37
@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/performance
  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. labels Mar 1, 2026
mohityadav8

This comment was marked as off-topic.

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super impressed! This is amazing.

One note. Since this is supposed to be "web compatible" it looks to me like everything is based on Uint8Array which is a bit unfortunate for Node. Could the node implementation use Buffer it would still be compatible it's just that we can access the Buffer prototype methods without doing hacks like Buffer.prototype.write.call(...).

@ronag
Copy link
Member

ronag commented Mar 2, 2026

Also could you do some mitata based benchmarks so that we can see the gc and memory pressure relative to node streams?

@ronag
Copy link
Member

ronag commented Mar 2, 2026

Another thing, in the async generator case, can we pass an optional AbortSignal? i.e. async function * (src, { signal }). We maybe could even check the function signature and if it doesn't take a second parameter don't allocate the abort controller at all.

@jasnell
Copy link
Member Author

jasnell commented Mar 2, 2026

One note. Since this is supposed to be "web compatible" it looks to me like everything is based on Uint8Array which is a bit unfortunate for Node. Could the node implementation use Buffer it would still be compatible it's just that we can access the Buffer prototype methods without doing hacks like Buffer.prototype.write.call(...).

This makes me a bit nervous for code portability. If some one starts working with this in node.js, they would end up writing code that depends on the values being Buffer and not just Uint8Array. They go to move that to another runtime implementation or standalone impl like https://github.com/jasnell/new-streams and suddenly that assumption breaks.

Copy link
Member

@benjamingr benjamingr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to explore implementation feasibility, performance, etc

Sounds fine as this isn't exposed outside at the time


// Buffer is full
switch (this._backpressure) {
case 'strict':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure strict should be the default and not block here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll be a big part of the discussion around this. A big part of the challenge with web streams is that backpressure can be fully ignored. One of the design principles for this new approach is to apply it strictly by default. We'll need to debate this. Recommend opening an issue at https://github.com/jasnell/new-streams

Copy link
Member

@benjamingr benjamingr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry meant to approve, regardless of design changes/suggestions regarding timing and a lot of other stuff as experimental this is fine.

I would maybe update the docs to emphasize the experimentality even further than normal

@jasnell
Copy link
Member Author

jasnell commented Mar 3, 2026

@ronag ... implemented a couple of mitata benchmarks in the https://github.com/jasnell/new-streams repo (the reference impl)

--

Memory Benchmark Results

Environment: Node 25.6.0, Intel Xeon w9-3575X, --expose-gc, mitata with .gc('inner')

Per-Operation Allocations (New Streams vs Web Streams)

Scenario Speed Heap/iter (new) Heap/iter (web)
Push write/read (1K x 4KB) 2.24x faster 2.06 MB 1.43 MB
Pull + transform (1K x 4KB) 2.44x faster 334 KB 5.57 MB
pipeTo + transform (1K x 4KB) 3.15x faster 303 KB 7.47 MB
Broadcast 2 consumers (500 x 4KB) 1.04x faster 1.92 MB 1.81 MB
Large pull 40MB (10K x 4KB) 1.26x faster 2.62 MB 52.35 MB

Pipeline scenarios (pull, pipeTo) show the biggest gains: 16-25x less heap because transforms are inline function calls, not stream-to-stream pipes with internal queues. Push is faster but uses slightly more heap due to batch iteration (Uint8Array[]). Broadcast/tee are comparable at this scale.

Sustained Load (97.7 MB volume)

Scenario Peak Heap (new) Peak Heap (web stream)
pipeTo + transform 6.9 MB 50.6 MB
Broadcast 2 consumers 0.5 MB 42.8 MB
Push write/read 5.9 MB 2.5 MB
Pull + transform 6.1 MB 2.8 MB

pipeTo and broadcast show the largest sustained-load heap difference. Web Streams' pipeThrough chain buffers ~50% of total volume in flight; new streams' pipeTo pulls synchronously through the transform. Broadcast's shared ring buffer (0.5 MB) vs tee's per-branch queues (42.8 MB).

Zero retained memory for both APIs after completion -- no leaks.

@jedwards1211
Copy link

jedwards1211 commented Mar 3, 2026

@ronag passing a signal to an async generator allows the underlying source to abort it, but we're lacking a builtin way for the consumer iterating the async generator to safely cancel the stream. It can .return() its iterator when it's done, but that won't break the async generator and until it receives the next chunk, which isn't guaranteed to happen if the underlying source is something nondeterministic like pubsub events. In this case, there would be leaks that are kind of awkward to blame on user error.

Barring an improvement at the language level, the consumer can only safely cancel the underlying source if it has a reference to an AbortController that signals it.

WHATWG Streams don't have this problem if the consumer .cancel()s their reader, though they do if the consumer is async iterating them.

Happy to create examples to reproduce this if it's not clear what I'm talking about.

@ronag
Copy link
Member

ronag commented Mar 3, 2026

I think you misunderstand. The signal would be for any async calls inside the generator.

@jedwards1211
Copy link

jedwards1211 commented Mar 3, 2026

Yes, I'm just saying that doesn't allow the consumer to abort calls the async generator is making, but the consumer often decides when streaming should be aborted.

For example say I'm using a library that handles subscriptions from the frontend. When it gets a subscription it asks me to build an async iterable of events to stream back. Then it's responsible for iterating, then cancelling once the frontend unsubscribes. If the iterable I pass to that library is from an async generator, I'll have to also pass an AbortController to that library for it to safely clean up once the client unsubscribes. If all it has is an AsyncIterable interface, it may leak resources after the client unsubscribes.

This is a fundamental weakness in using async generators for transformation and my longtime frustration with async iteration in general.

In contrast, with WHATWG streams, when a consumer cancels its reader, the underlying source and any TransformStreams and get notified to clean up right away.

@jedwards1211
Copy link

jedwards1211 commented Mar 3, 2026

@benjamingr was actually talking about the same thing I'm trying to resurrect awareness of in this old issue in the async-iteration proposal

Note one of his comments: tc39/proposal-async-iteration#126 (comment)

This was eight years ago but there hasn't been much improvement on this front, unfortunately.

I'm really hoping I can get everyone to fully understand this pitfall and have a good plan for how to help people avoid it before getting too far along with this new proposed API.

@jasnell jasnell force-pushed the jasnell/new-streams-prototype branch from 9f8af01 to e1e1911 Compare March 3, 2026 17:07
jasnell added 10 commits March 18, 2026 00:42
Invalid backpressure values like `'banana'` would fall through
switch statements at write time with a confusing `ERR_INVALID_STATE`
error about "`writeSync` should have handled non-strict policy".

Add `validateBackpressure()` in utils.js and call it from the
`PushQueue`, `BroadcastImpl`, `ShareImpl`, and `SyncShareImpl`
constructors. Invalid values now throw `ERR_INVALID_ARG_VALUE`
immediately at construction.
Optimize the stream/iter implementation based on benchmark analysis
comparing classic streams, web streams, and stream/iter.

- Eliminate `withFlushSignalAsync`/`withFlushSignalSync` generator wrappers
  from the stateless transform pipeline. Stateless transforms now handle
  their own flush (`null`) signal internally after the for-await loop,
  removing an entire async generator layer per pipeline. Stateful
  transforms retain the wrapper since their cost is dominated by the
  transform operation itself (compression, encryption, etc).
- Hoist writer capability checks in `pipeTo`/`pipeToSync`. Property lookups
  for `writeSync`/`writevSync`/`endSync`/`failSync` are done once before the
  loop instead of per-chunk via optional chaining. Split signal/no-signal
  loops to avoid per-batch null checks. Added `writevSync` batch write
  support to `pipeToSync`.
- Optimize `isUint8ArrayBatch` with single-element fast path and plain
  for loop. Replaces `ArrayPrototypeEvery` (function call per element)
  with direct indexed loop. Short-circuits on length 1 (most common
  from transforms) and checks first/last before iterating middle.
- Make broadcast consumer `next()`/`return()`/`throw()` non-async. Returns
  `PromiseResolve()` directly on the fast path (data in buffer) instead
  of wrapping through async function machinery. Caches the done result.
- `RingBuffer`: replace modulo with bitwise AND. Capacity is always a
  power of 2, so index computation uses & mask instead of % capacity.
- `Broadcast`: incremental min-cursor tracking. Replaces O(N) full scan
  of all consumers on every `next()` call with a cached min cursor that
  is only recomputed when dirty (consumer at the minimum advances or
  detaches). Eliminates O(N^2) per-write-cycle scaling.
- `Broadcast`: separate waiters list for `notifyConsumers`. Only iterates
  consumers with pending resolve callbacks instead of scanning all
  consumers on every write.
- `concatBytes`: cache per-chunk byte lengths to avoid reading `byteLength`
  twice per chunk (once for total, once for offset advance). Remove
  dead `totalByteLength` function.

Benchmark results (MB/s, higher is better):

| Benchmark        | classic | webstream | iter   | iter-sync | iter vs classic |
| ---------------- | ------- | --------- | ------ | --------- | --------------- |
| Identity 1MB     | 1,245   | 582       | **3,110**  | 16,658    | 2.5x            |
| Identity 64MB    | 31,410  | 14,980    | **33,894** | 62,111    | 1.1x            |
| Transform 1MB    | 287     | 227       | **325**    | 327       | 1.1x            |
| Transform 64MB   | 595     | 605       | **605**    | 573       | 1.0x            |
| Compression 1MB  | **123**     | 98        | 110    | --        | 0.9x            |
| Compression 64MB | **329**     | 303       | 308    | --        | 0.9x            |
| pipeTo 1MB       | 1,137   | 494       | **2,740**  | 13,611    | 2.4x            |
| pipeTo 64MB      | 22,081  | 15,377    | **30,036** | 60,976    | 1.4x            |
| Broadcast 1c 1MB | 1,365   | 521       | **1,991**  | --        | 1.5x            |
| Broadcast 2c 1MB | 1,285   | 439       | **1,962**  | --        | 1.5x            |
| Broadcast 4c 1MB | **1,217**   | 322       | 750    | --        | 0.6x            |
| File read 16MB   | 1,469   | 537       | **1,639**  | --        | 1.1x            |

The creation benchmarks show the raw cost of constructing the various
objects without any other activity. The `classic` Node.js streams are
faster here simply because they do less work on actual creation.

| Creation (ops/sec) | classic   | webstream | iter      | iter vs classic |
| ------------------ | --------- | --------- | --------- | --------------- |
| readable           | 8,662,361 | 505,889   | 1,144,385 | 0.1x            |
| writable           | 3,856,139 | 269,950   | 1,285,210 | 0.3x            |
| pair               | 3,120,224 | 141,988   | 349,176   | 0.1x            |
@jasnell jasnell marked this pull request as ready for review March 18, 2026 19:44
@jasnell jasnell requested review from Qard, benjamingr and ronag March 18, 2026 19:47
@jasnell jasnell added stream Issues and PRs related to the stream subsystem. semver-minor PRs that contain new features and should be released in the next minor version. experimental Issues and PRs related to experimental features. labels Mar 18, 2026
@jasnell jasnell changed the title [DRAFT] stream: prototype for new stream implementation stream: experimental stream/iter implementation Mar 18, 2026
@nodejs-github-bot

This comment was marked as outdated.

@jasnell
Copy link
Member Author

jasnell commented Mar 18, 2026

I've updated the implementation to address the remaining outstanding issues, round out tests, add benchmarks, fix bugs, etc. It's also now behind an experimental cli flag.

This is ready for review.

@nodejs-github-bot

This comment was marked as outdated.

@nodejs-github-bot

This comment was marked as outdated.

@nodejs-github-bot

This comment was marked as outdated.

@nodejs-github-bot
Copy link
Collaborator

@codecov
Copy link

codecov bot commented Mar 19, 2026

Codecov Report

❌ Patch coverage is 85.43011% with 813 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.50%. Comparing base (9fc6b64) to head (ecf1053).
⚠️ Report is 7 commits behind head on main.

Files with missing lines Patch % Lines
lib/internal/streams/iter/pull.js 74.59% 187 Missing and 3 partials ⚠️
lib/internal/streams/iter/broadcast.js 80.37% 150 Missing and 6 partials ⚠️
lib/internal/streams/iter/share.js 81.93% 123 Missing and 2 partials ⚠️
lib/internal/streams/iter/from.js 82.08% 112 Missing ⚠️
lib/internal/streams/iter/push.js 91.12% 56 Missing and 1 partial ⚠️
lib/internal/fs/promises.js 80.91% 54 Missing ⚠️
lib/internal/streams/iter/consumers.js 91.52% 44 Missing and 2 partials ⚠️
lib/internal/streams/iter/transform.js 93.83% 31 Missing and 4 partials ⚠️
lib/internal/streams/iter/ringbuffer.js 80.79% 29 Missing ⚠️
lib/internal/streams/iter/duplex.js 90.10% 9 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #62066      +/-   ##
==========================================
- Coverage   89.68%   89.50%   -0.18%     
==========================================
  Files         676      688      +12     
  Lines      206575   211999    +5424     
  Branches    39549    40498     +949     
==========================================
+ Hits       185262   189756    +4494     
- Misses      13446    14355     +909     
- Partials     7867     7888      +21     
Files with missing lines Coverage Δ
lib/internal/bootstrap/realm.js 96.21% <100.00%> (ø)
lib/internal/process/pre_execution.js 97.47% <100.00%> (+0.54%) ⬆️
lib/internal/streams/iter/types.js 100.00% <100.00%> (ø)
lib/internal/streams/iter/utils.js 100.00% <100.00%> (ø)
lib/stream/iter.js 100.00% <100.00%> (ø)
src/node_builtins.cc 76.14% <100.00%> (ø)
src/node_options.cc 76.47% <100.00%> (+0.02%) ⬆️
src/node_options.h 97.94% <100.00%> (+0.01%) ⬆️
lib/internal/streams/iter/duplex.js 90.10% <90.10%> (ø)
lib/internal/streams/iter/ringbuffer.js 80.79% <80.79%> (ø)
... and 8 more

... and 56 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

experimental Issues and PRs related to experimental features. lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. semver-minor PRs that contain new features and should be released in the next minor version. stream Issues and PRs related to the stream subsystem.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants