Skip to content

Commit

Permalink
Add signal option (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
MoLow authored Jun 14, 2023
1 parent 93d16fb commit 021f863
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 0 deletions.
5 changes: 5 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export interface Options<EmittedType extends unknown | unknown[]> {
```
*/
readonly filter?: FilterFunction<EmittedType>;

/**
An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort waiting for the event.
*/
readonly signal?: AbortSignal;
}

export interface MultiArgumentsOptions<EmittedType extends unknown[]>
Expand Down
12 changes: 12 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export function pEventMultiple(emitter, event, options) {
throw new TypeError('The `count` option should be at least 0 or more');
}

options.signal?.throwIfAborted();

// Allow multiple events
const events = [event].flat();

Expand Down Expand Up @@ -73,6 +75,10 @@ export function pEventMultiple(emitter, event, options) {
addListener(rejectionEvent, rejectHandler);
}

if (options.signal) {
options.signal.addEventListener('abort', () => rejectHandler(options.signal.reason), {once: true});
}

if (options.resolveImmediately) {
resolve(items);
}
Expand Down Expand Up @@ -129,6 +135,8 @@ export function pEventIterator(emitter, event, options) {
throw new TypeError('The `limit` option should be a non-negative integer or Infinity');
}

options.signal?.throwIfAborted();

if (limit === 0) {
// Return an empty async iterator to avoid any further cost
return {
Expand Down Expand Up @@ -244,6 +252,10 @@ export function pEventIterator(emitter, event, options) {
addListener(resolutionEvent, resolveHandler);
}

if (options.signal) {
options.signal.addEventListener('abort', () => rejectHandler(options.signal.reason), {once: true});
}

return {
[Symbol.asyncIterator]() {
return this;
Expand Down
6 changes: 6 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ const result = await pEvent(emitter, '🦄', value => value > 3);
// Do something with first 🦄 event with a value greater than 3
```

##### signal

Type: `AbortSignal`

An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) to abort waiting for the event.

### pEventMultiple(emitter, event, options)

Wait for multiple event emissions. Returns an array.
Expand Down
58 changes: 58 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,36 @@ test('filter option returned with `multiArgs`', async t => {
}), [10_000, '💩']);
});

test('AbortSignal rejects when aborted', async t => {
const emitter = new EventEmitter();

(async () => {
await delay(200);
emitter.emit('🦄', '🌈');
})();

await t.throwsAsync(pEvent(emitter, '🦄', {signal: AbortSignal.timeout(5)}), {
message: 'The operation was aborted due to timeout',
});
t.is(emitter.listenerCount('🦄'), 0);
});

test('AbortSignal that is already aborted rejects immediately', async t => {
const emitter = new EventEmitter();
const controller = new AbortController();
controller.abort(new Error('reason'));

(async () => {
await delay(200);
emitter.emit('🦄', '🌈');
})();

await t.throwsAsync(pEvent(emitter, '🦄', {signal: controller.signal}), {
message: 'reason',
});
t.is(emitter.listenerCount('🦄'), 0);
});

test('event to AsyncIterator', async t => {
const emitter = new EventEmitter();
const iterator = pEventIterator(emitter, '🦄');
Expand Down Expand Up @@ -420,6 +450,34 @@ test('resolve event resolves pending promises and finishes the iterator - when f
await t.deepEqual(await iterator.next(), {done: true, value: undefined});
});

test('AsyncIterator - AbortSignal rejects when aborted', async t => {
const emitter = new EventEmitter();
const controller = new AbortController();
const iterator = pEventIterator(emitter, '🦄', {signal: controller.signal});

(async () => {
await delay(200);
emitter.emit('🦄', '🌈');
emitter.emit('🦄', 'Something else.');
await delay(1);
controller.abort(new Error('reason'));
emitter.emit('🦄', 'Some third thing.');
})();

t.deepEqual(await iterator.next(), {done: false, value: '🌈'});
t.deepEqual(await iterator.next(), {done: false, value: 'Something else.'});
await t.throwsAsync(iterator.next(), {message: 'reason'});
t.is(emitter.listenerCount('🦄'), 0);
});

test('AsyncIterator - AbortSignal that is already aborted rejects immediately', t => {
const emitter = new EventEmitter();
const controller = new AbortController();
controller.abort(new Error('reason'));
t.throws(() => pEventIterator(emitter, '🦄', {signal: controller.signal}), {message: 'reason'});
t.is(emitter.listenerCount('🦄'), 0);
});

test('.multiple()', async t => {
const emitter = new EventEmitter();

Expand Down

0 comments on commit 021f863

Please sign in to comment.