Skip to content

Commit

Permalink
sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
samthor committed Sep 13, 2024
1 parent f4fa1b0 commit 6971636
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export * from './record.js';
export * from './promise.js';
export * from './primitives.js';
export * from './queue.js';
export * from './sequence.js';
export * from './signal.js';
export * from './stream.js';
export * from './task.js';
Expand Down
95 changes: 95 additions & 0 deletions src/sequence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { promiseWithResolvers, unresolvedPromise } from './promise.js';
import { promiseForSignal } from './signal.js';

export type SequenceListener<T> = (
value: T,
next: (args?: { signal?: AbortSignal }) => Promise<T>,
) => any;

/**
* Sequencer is a reduced form of an event listener, which allows listeners to be run
* when a `notify()` call is made, but which provides a way for those listeners to internally get
* future events.
*
* It does not support `removeListener`, rather, pass a {@link AbortSignal}.
*/
export type Sequencer<T> = {
addListener: (fn: SequenceListener<T>, args?: { signal?: AbortSignal; once?: boolean }) => void;
notify(value: T): void;
};

type Node<T> = {
next?: Node<T>;
p: Promise<T>;
};

/**
* Builds a sequencer. This can be spread onto another object as it does not use `this`.
*/
export function buildSequencer<T>(): Sequencer<T> {
let { promise: p, resolve } = promiseWithResolvers<T>();
let head: Node<T> & { p: Promise<T> } = { p };

const listeners: SequenceListener<T>[] = [];

return {
notify: (value: T) => {
// #1: resolve head (release prior listeners)
resolve(value);
const oldHead: Node<T> = head;

({ promise: p, resolve } = promiseWithResolvers<T>());
head = { p };
oldHead.next = head;

// #2: fire all listeners
for (const listener of listeners) {
let target: Node<T> = head;

const next = async (args?: { signal?: AbortSignal }): Promise<T> => {
const ps = args?.signal ? promiseForSignal(args?.signal) : unresolvedPromise;
const out = await Promise.race([ps, target.p]);
target = target.next!;
return out;
};
listener(value, next);
}
},

addListener: (fn, args) => {
if (args?.signal?.aborted) {
return;
}

let removed = false;
const remove = () => {
if (removed) {
return;
}
removed = true;

const index = listeners.indexOf(fn);
if (index === -1) {
return;
}
listeners.splice(index, 1);
// nb. we don't check for listeners.length === 0; we might have some in-flight
};

// allow duplicates
if (args?.once) {
const realFn = fn;
fn = (value, next) => {
remove();
realFn(value, next);
};
} else if (listeners.includes(fn)) {
const realFn = fn;
fn = (value, next) => realFn(value, next);
}

args?.signal?.addEventListener('abort', remove);
listeners.push(fn);
},
};
}
1 change: 1 addition & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import './primitives.js';
import './promise.js';
import './record.js';
import './queue.js';
import './sequence.js';
import './stream.js';
import './support.js';
import './task.js';
20 changes: 20 additions & 0 deletions test/sequence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import test from 'node:test';
import * as assert from 'node:assert';
import { buildSequencer } from '../src/sequence.js';

test('buildSequenceListener', () => {
const { addListener, notify } = buildSequencer<number>();

const values: number[] = [];

addListener(async (value, next) => {
values.push(value);
const v2 = await next();
values.push(v2);
});

notify(1);
notify(2);

assert.deepStrictEqual(values, [1, 2]);
});
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

// configure as you like: these are my preferred defaults!
"strict": true,
"skipLibCheck": true,
"skipLibCheck": false,
"forceConsistentCasingInFileNames": true,

// "strict" implies this, but you'll want to enable it when you're
Expand Down

0 comments on commit 6971636

Please sign in to comment.