From 6971636e8e69c75a42d1565f641848627496eccb Mon Sep 17 00:00:00 2001 From: Sam Thorogood Date: Sat, 14 Sep 2024 07:38:21 +1000 Subject: [PATCH] sequencer --- src/index.ts | 1 + src/sequence.ts | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ test/index.ts | 1 + test/sequence.ts | 20 ++++++++++ tsconfig.json | 2 +- 5 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 src/sequence.ts create mode 100644 test/sequence.ts diff --git a/src/index.ts b/src/index.ts index 6b8f9cb..45e1bd0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ export * from './record.js'; export * from './promise.js'; export * from './primitives.js'; export * from './queue.js'; +export * from './sequence.js'; export * from './signal.js'; export * from './stream.js'; export * from './task.js'; diff --git a/src/sequence.ts b/src/sequence.ts new file mode 100644 index 0000000..6538f68 --- /dev/null +++ b/src/sequence.ts @@ -0,0 +1,95 @@ +import { promiseWithResolvers, unresolvedPromise } from './promise.js'; +import { promiseForSignal } from './signal.js'; + +export type SequenceListener = ( + value: T, + next: (args?: { signal?: AbortSignal }) => Promise, +) => any; + +/** + * Sequencer is a reduced form of an event listener, which allows listeners to be run + * when a `notify()` call is made, but which provides a way for those listeners to internally get + * future events. + * + * It does not support `removeListener`, rather, pass a {@link AbortSignal}. + */ +export type Sequencer = { + addListener: (fn: SequenceListener, args?: { signal?: AbortSignal; once?: boolean }) => void; + notify(value: T): void; +}; + +type Node = { + next?: Node; + p: Promise; +}; + +/** + * Builds a sequencer. This can be spread onto another object as it does not use `this`. + */ +export function buildSequencer(): Sequencer { + let { promise: p, resolve } = promiseWithResolvers(); + let head: Node & { p: Promise } = { p }; + + const listeners: SequenceListener[] = []; + + return { + notify: (value: T) => { + // #1: resolve head (release prior listeners) + resolve(value); + const oldHead: Node = head; + + ({ promise: p, resolve } = promiseWithResolvers()); + head = { p }; + oldHead.next = head; + + // #2: fire all listeners + for (const listener of listeners) { + let target: Node = head; + + const next = async (args?: { signal?: AbortSignal }): Promise => { + const ps = args?.signal ? promiseForSignal(args?.signal) : unresolvedPromise; + const out = await Promise.race([ps, target.p]); + target = target.next!; + return out; + }; + listener(value, next); + } + }, + + addListener: (fn, args) => { + if (args?.signal?.aborted) { + return; + } + + let removed = false; + const remove = () => { + if (removed) { + return; + } + removed = true; + + const index = listeners.indexOf(fn); + if (index === -1) { + return; + } + listeners.splice(index, 1); + // nb. we don't check for listeners.length === 0; we might have some in-flight + }; + + // allow duplicates + if (args?.once) { + const realFn = fn; + fn = (value, next) => { + remove(); + realFn(value, next); + }; + } else if (listeners.includes(fn)) { + const realFn = fn; + fn = (value, next) => realFn(value, next); + } + + args?.signal?.addEventListener('abort', remove); + listeners.push(fn); + }, + }; +} diff --git a/test/index.ts b/test/index.ts index 86706a5..477704b 100644 --- a/test/index.ts +++ b/test/index.ts @@ -13,6 +13,7 @@ import './primitives.js'; import './promise.js'; import './record.js'; import './queue.js'; +import './sequence.js'; import './stream.js'; import './support.js'; import './task.js'; diff --git a/test/sequence.ts b/test/sequence.ts new file mode 100644 index 0000000..42e9275 --- /dev/null +++ b/test/sequence.ts @@ -0,0 +1,20 @@ +import test from 'node:test'; +import * as assert from 'node:assert'; +import { buildSequencer } from '../src/sequence.js'; + +test('buildSequenceListener', () => { + const { addListener, notify } = buildSequencer(); + + const values: number[] = []; + + addListener(async (value, next) => { + values.push(value); + const v2 = await next(); + values.push(v2); + }); + + notify(1); + notify(2); + + assert.deepStrictEqual(values, [1, 2]); +}); diff --git a/tsconfig.json b/tsconfig.json index beb6670..d7575c9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -14,7 +14,7 @@ // configure as you like: these are my preferred defaults! "strict": true, - "skipLibCheck": true, + "skipLibCheck": false, "forceConsistentCasingInFileNames": true, // "strict" implies this, but you'll want to enable it when you're