Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { ASeq } from "./seqs/aseq.class.js"
export { aseq } from "./seqs/aseq.ctor.js"
export { Seq } from "./seqs/seq.class.js"
export { seq } from "./seqs/seq.ctor.js"
export { PSeq } from "./pseq.js"
84 changes: 84 additions & 0 deletions src/pseq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { _aiter } from "./utils.js"

const MAX_CONCURRENCY = 12

class PSeqFlatteningIterator<T> implements AsyncIterator<T> {
private readonly outer: AsyncIterator<AsyncIterable<T>>
private readonly inners: {
iterator: AsyncIterator<T>
next: Promise<IteratorResult<T>>
}[] = []
private outerDone = false
private outerPending: Promise<void> | undefined

constructor(source: AsyncIterable<AsyncIterable<T>>) {
this.outer = _aiter(source)
}

private ensureFetch() {
if (
this.inners.length >= MAX_CONCURRENCY ||
this.outerDone ||
this.outerPending
) {
return
}
this.outerPending = (async () => {
const { done, value } = await this.outer.next()
this.outerPending = undefined
if (done) {
this.outerDone = true
return
}
const iterator = _aiter(value)
this.inners.push({ iterator, next: iterator.next() })
})().then(() => {
this.ensureFetch()
})
}

async next(): Promise<IteratorResult<T>> {
for (;;) {
this.ensureFetch()
if (this.inners.length === 0) {
if (this.outerDone) {
return { done: true, value: undefined as any }
}
if (this.outerPending) {
await this.outerPending
continue
}
await Promise.resolve()
continue
}
const races: Promise<
| { type: "inner"; index: number; result: IteratorResult<T> }
| { type: "outer" }
>[] = this.inners.map((s, i) =>
s.next.then(result => ({ type: "inner", index: i, result }))
)
if (this.outerPending) {
races.push(this.outerPending.then(() => ({ type: "outer" as const })))
}
const winner = await Promise.race(races)
if (winner.type === "outer") {
continue
}
const { index, result } = winner
if (result.done) {
this.inners.splice(index, 1)
continue
}
this.inners[index].next = this.inners[index].iterator.next()
return { done: false, value: result.value }
}
}
}

export class PSeq<T> implements AsyncIterable<T> {
constructor(private readonly source: AsyncIterable<AsyncIterable<T>>) {}
[Symbol.asyncIterator](): AsyncIterator<T> {
return new PSeqFlatteningIterator(this.source)
}
}

41 changes: 41 additions & 0 deletions test/pseq/matrix.helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const FRAME = 20

const delay = (ms: number) => new Promise<void>(resolve => setTimeout(resolve, ms))

const isDigit = (ch: string) => ch >= '0' && ch <= '9'

export function matrix(diagram: string): AsyncIterable<AsyncIterable<number>> {
const lines = diagram
.split('\n')
.map(line => line.trimEnd())
.filter(line => line.trim().length > 0)

const innerFrom = (timeline: string): AsyncIterable<number> => ({
async *[Symbol.asyncIterator]() {
for (let i = 0; i < timeline.length; i++) {
if (i > 0) {
await delay(FRAME)
}
const ch = timeline[i]
if (isDigit(ch)) {
yield Number(ch)
}
}
}
})

return (async function* () {
let current = 0
for (const line of lines) {
const index = line.indexOf('|')
if (index === -1) continue
const start = index * FRAME
const wait = start - current
if (wait > 0) {
await delay(wait)
}
current = start
yield innerFrom(line.slice(index + 1))
}
})()
}
19 changes: 19 additions & 0 deletions test/pseq/pseq.class.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { PSeq } from "@lib"
import { matrix } from "./matrix.helper"

describe("PSeq", () => {
it("flattens nested async iterables based on earliest emissions", async () => {
const diagram = `
~~|-1--2--
~~~~~~|-1--1
~~~~~~~~|-1
`
const source = matrix(diagram)
const seq = new PSeq(source)
const result: number[] = []
for await (const x of seq) {
result.push(x)
}
expect(result).toEqual([1, 2, 1, 1, 1])
})
})
Loading