From 015e234ecb2dcdb61bc182f082edb3335bfcd6c4 Mon Sep 17 00:00:00 2001 From: Greg Ros Date: Sat, 9 Aug 2025 01:01:59 +0300 Subject: [PATCH] test: add async iterable matrix helper --- src/index.ts | 1 + src/pseq.ts | 84 ++++++++++++++++++++++++++++++++++++ test/pseq/matrix.helper.ts | 41 ++++++++++++++++++ test/pseq/pseq.class.test.ts | 19 ++++++++ 4 files changed, 145 insertions(+) create mode 100644 src/pseq.ts create mode 100644 test/pseq/matrix.helper.ts create mode 100644 test/pseq/pseq.class.test.ts diff --git a/src/index.ts b/src/index.ts index 4769d9d..670e3b1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,3 +3,4 @@ export { ASeq } from "./seqs/aseq.class.js" export { aseq } from "./seqs/aseq.ctor.js" export { Seq } from "./seqs/seq.class.js" export { seq } from "./seqs/seq.ctor.js" +export { PSeq } from "./pseq.js" diff --git a/src/pseq.ts b/src/pseq.ts new file mode 100644 index 0000000..bf7fef7 --- /dev/null +++ b/src/pseq.ts @@ -0,0 +1,84 @@ +import { _aiter } from "./utils.js" + +const MAX_CONCURRENCY = 12 + +class PSeqFlatteningIterator implements AsyncIterator { + private readonly outer: AsyncIterator> + private readonly inners: { + iterator: AsyncIterator + next: Promise> + }[] = [] + private outerDone = false + private outerPending: Promise | undefined + + constructor(source: AsyncIterable>) { + this.outer = _aiter(source) + } + + private ensureFetch() { + if ( + this.inners.length >= MAX_CONCURRENCY || + this.outerDone || + this.outerPending + ) { + return + } + this.outerPending = (async () => { + const { done, value } = await this.outer.next() + this.outerPending = undefined + if (done) { + this.outerDone = true + return + } + const iterator = _aiter(value) + this.inners.push({ iterator, next: iterator.next() }) + })().then(() => { + this.ensureFetch() + }) + } + + async next(): Promise> { + for (;;) { + this.ensureFetch() + if (this.inners.length === 0) { + if (this.outerDone) { + return { done: true, value: undefined as any } + } + if (this.outerPending) { + await this.outerPending + continue + } + await Promise.resolve() + continue + } + const races: Promise< + | { type: "inner"; index: number; result: IteratorResult } + | { type: "outer" } + >[] = this.inners.map((s, i) => + s.next.then(result => ({ type: "inner", index: i, result })) + ) + if (this.outerPending) { + races.push(this.outerPending.then(() => ({ type: "outer" as const }))) + } + const winner = await Promise.race(races) + if (winner.type === "outer") { + continue + } + const { index, result } = winner + if (result.done) { + this.inners.splice(index, 1) + continue + } + this.inners[index].next = this.inners[index].iterator.next() + return { done: false, value: result.value } + } + } +} + +export class PSeq implements AsyncIterable { + constructor(private readonly source: AsyncIterable>) {} + [Symbol.asyncIterator](): AsyncIterator { + return new PSeqFlatteningIterator(this.source) + } +} + diff --git a/test/pseq/matrix.helper.ts b/test/pseq/matrix.helper.ts new file mode 100644 index 0000000..53e5764 --- /dev/null +++ b/test/pseq/matrix.helper.ts @@ -0,0 +1,41 @@ +const FRAME = 20 + +const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)) + +const isDigit = (ch: string) => ch >= '0' && ch <= '9' + +export function matrix(diagram: string): AsyncIterable> { + const lines = diagram + .split('\n') + .map(line => line.trimEnd()) + .filter(line => line.trim().length > 0) + + const innerFrom = (timeline: string): AsyncIterable => ({ + async *[Symbol.asyncIterator]() { + for (let i = 0; i < timeline.length; i++) { + if (i > 0) { + await delay(FRAME) + } + const ch = timeline[i] + if (isDigit(ch)) { + yield Number(ch) + } + } + } + }) + + return (async function* () { + let current = 0 + for (const line of lines) { + const index = line.indexOf('|') + if (index === -1) continue + const start = index * FRAME + const wait = start - current + if (wait > 0) { + await delay(wait) + } + current = start + yield innerFrom(line.slice(index + 1)) + } + })() +} diff --git a/test/pseq/pseq.class.test.ts b/test/pseq/pseq.class.test.ts new file mode 100644 index 0000000..3fb7851 --- /dev/null +++ b/test/pseq/pseq.class.test.ts @@ -0,0 +1,19 @@ +import { PSeq } from "@lib" +import { matrix } from "./matrix.helper" + +describe("PSeq", () => { + it("flattens nested async iterables based on earliest emissions", async () => { + const diagram = ` +~~|-1--2-- +~~~~~~|-1--1 +~~~~~~~~|-1 +` + const source = matrix(diagram) + const seq = new PSeq(source) + const result: number[] = [] + for await (const x of seq) { + result.push(x) + } + expect(result).toEqual([1, 2, 1, 1, 1]) + }) +})