From fb4052a3a11e8e0b16620765c47c6bd7deb8fff4 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 30 May 2019 17:58:55 +0200 Subject: [PATCH] events: add EventEmitter.on to async iterate over events Fixes: https://github.com/nodejs/node/issues/27847. --- doc/api/events.md | 31 ++++ lib/events.js | 78 +++++++++ test/parallel/test-event-on-async-iterator.js | 164 ++++++++++++++++++ 3 files changed, 273 insertions(+) create mode 100644 test/parallel/test-event-on-async-iterator.js diff --git a/doc/api/events.md b/doc/api/events.md index 69f309a73bcd34..12e4101d07ec7c 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -886,6 +886,37 @@ Value: `Symbol.for('nodejs.rejection')` See how to write a custom [rejection handler][rejection]. +## events.on(emitter, eventName) + + +* `emitter` {EventEmitter} +* `eventName` {string|symbol} The name of the event being listened for +* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter` + +```js +const { on, EventEmitter } = require('events'); + +(async () => { + const ee = new EventEmitter(); + + // Emit later on + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + }); + + for await (const event of on(ee, 'foo')) { + console.log(event); // prints ['bar'] [42] + } +})(); +``` + +Returns an `AsyncIterator` that iterates `eventName` events. It will throw +if the `EventEmitter` emits `'error'`. It removes all listeners when +exiting the loop. + [WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget [`--trace-warnings`]: cli.html#cli_trace_warnings [`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners diff --git a/lib/events.js b/lib/events.js index c91792cbfbda8a..5d1c0dbe97368c 100644 --- a/lib/events.js +++ b/lib/events.js @@ -29,6 +29,7 @@ const { ObjectCreate, ObjectDefineProperty, ObjectGetPrototypeOf, + ObjectSetPrototypeOf, ObjectKeys, Promise, ReflectApply, @@ -62,6 +63,7 @@ function EventEmitter(opts) { } module.exports = EventEmitter; module.exports.once = once; +module.exports.on = on; // Backwards-compat with node 0.10.x EventEmitter.EventEmitter = EventEmitter; @@ -657,3 +659,79 @@ function once(emitter, name) { emitter.once(name, eventListener); }); } + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +function createIterResult(value, done) { + return { value, done }; +} + +function on(emitter, event) { + const unconsumedEvents = []; + const unconsumedPromises = []; + let error = null; + + const iterator = ObjectSetPrototypeOf({ + next() { + if (error) { + return Promise.reject(error); + } + + const value = unconsumedEvents.shift(); + if (value) { + return Promise.resolve(createIterResult(value, false)); + } + + return new Promise(function(resolve, reject) { + unconsumedPromises.push({ resolve, reject }); + }); + }, + + return() { + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + + return Promise.resolve(createIterResult(undefined, true)); + }, + + throw(err) { + if (!err || !(err instanceof Error)) { + throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', + 'Error', err); + } + error = err; + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + }, + + [Symbol.asyncIterator]() { + return this; + } + }, AsyncIteratorPrototype); + + emitter.on(event, eventHandler); + emitter.on('error', errorHandler); + + return iterator; + + function eventHandler(...args) { + const promise = unconsumedPromises.shift(); + if (promise) { + promise.resolve(createIterResult(args, false)); + } else { + unconsumedEvents.push(args); + } + } + + function errorHandler(err) { + const promise = unconsumedPromises.shift(); + if (promise) { + promise.reject(err); + } else { + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + error = err; + } + } +} diff --git a/test/parallel/test-event-on-async-iterator.js b/test/parallel/test-event-on-async-iterator.js new file mode 100644 index 00000000000000..b6815d9f3f1807 --- /dev/null +++ b/test/parallel/test-event-on-async-iterator.js @@ -0,0 +1,164 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { on, EventEmitter } = require('events'); + +async function basic() { + const ee = new EventEmitter(); + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + }); + + const iterable = on(ee, 'foo'); + + const expected = [['bar'], [42]]; + + for await (const event of iterable) { + const current = expected.shift(); + + assert.deepStrictEqual(current, event); + + if (expected.length === 0) { + break; + } + } + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function error() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + process.nextTick(() => { + ee.emit('error', _err); + }); + + const iterable = on(ee, 'foo'); + let looped = false; + let thrown = false; + + try { + // eslint-disable-next-line no-unused-vars + for await (const event of iterable) { + looped = true; + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(looped, false); +} + +async function errorDelayed() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + process.nextTick(() => { + ee.emit('foo', 42); + ee.emit('error', _err); + }); + + const iterable = on(ee, 'foo'); + const expected = [[42]]; + let thrown = false; + + try { + for await (const event of iterable) { + const current = expected.shift(); + assert.deepStrictEqual(current, event); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function throwInLoop() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + + process.nextTick(() => { + ee.emit('foo', 42); + }); + + try { + for await (const event of on(ee, 'foo')) { + assert.deepStrictEqual(event, [42]); + throw _err; + } + } catch (err) { + assert.strictEqual(err, _err); + } + + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function next() { + const ee = new EventEmitter(); + const iterable = on(ee, 'foo'); + process.nextTick(function() { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + }); + const results = await Promise.all([ + iterable.next(), + iterable.next() + ]); + assert.deepStrictEqual(results, [{ + value: ['bar'], + done: false + }, { + value: [42], + done: false + }]); +} + +async function iterableThrow() { + const ee = new EventEmitter(); + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); // lost in the queue + }); + + const iterable = on(ee, 'foo'); + const _err = new Error('kaboom'); + let thrown = false; + + try { + for await (const event of iterable) { + assert.deepStrictEqual(event, ['bar']); + assert.throws(() => { + // No argument + iterable.throw(); + }, { + message: 'The "EventEmitter.AsyncIterator" property must be' + + ' of type Error. Received type undefined', + name: 'TypeError' + }); + iterable.throw(_err); + } + } catch (err) { + thrown = true; + assert.strictEqual(err, _err); + } + assert.strictEqual(thrown, true); + assert.strictEqual(ee.listenerCount('foo'), 0); + assert.strictEqual(ee.listenerCount('error'), 0); +} + +async function run() { + await basic(); + await error(); + await errorDelayed(); + await throwInLoop(); + await next(); + await iterableThrow(); +} + +run().then(common.mustCall());