Skip to content

Commit

Permalink
events: add EventEmitter.on to async iterate over events
Browse files Browse the repository at this point in the history
Fixes: nodejs#27847.
  • Loading branch information
mcollina committed Dec 21, 2019
1 parent 118b28a commit fb4052a
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 0 deletions.
31 changes: 31 additions & 0 deletions doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,37 @@ Value: `Symbol.for('nodejs.rejection')`

See how to write a custom [rejection handler][rejection].

## events.on(emitter, eventName)
<!-- YAML
added: REPLACEME
-->

* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`

```js
const { on, EventEmitter } = require('events');

(async () => {
const ee = new EventEmitter();

// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

for await (const event of on(ee, 'foo')) {
console.log(event); // prints ['bar'] [42]
}
})();
```

Returns an `AsyncIterator` that iterates `eventName` events. It will throw
if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop.

[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
[`--trace-warnings`]: cli.html#cli_trace_warnings
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners
Expand Down
78 changes: 78 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const {
ObjectCreate,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
ObjectKeys,
Promise,
ReflectApply,
Expand Down Expand Up @@ -62,6 +63,7 @@ function EventEmitter(opts) {
}
module.exports = EventEmitter;
module.exports.once = once;
module.exports.on = on;

// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
Expand Down Expand Up @@ -657,3 +659,79 @@ function once(emitter, name) {
emitter.once(name, eventListener);
});
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function createIterResult(value, done) {
return { value, done };
}

function on(emitter, event) {
const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;

const iterator = ObjectSetPrototypeOf({
next() {
if (error) {
return Promise.reject(error);
}

const value = unconsumedEvents.shift();
if (value) {
return Promise.resolve(createIterResult(value, false));
}

return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},

return() {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);

return Promise.resolve(createIterResult(undefined, true));
},

throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
},

[Symbol.asyncIterator]() {
return this;
}
}, AsyncIteratorPrototype);

emitter.on(event, eventHandler);
emitter.on('error', errorHandler);

return iterator;

function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEvents.push(args);
}
}

function errorHandler(err) {
const promise = unconsumedPromises.shift();
if (promise) {
promise.reject(err);
} else {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
error = err;
}
}
}
164 changes: 164 additions & 0 deletions test/parallel/test-event-on-async-iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { on, EventEmitter } = require('events');

async function basic() {
const ee = new EventEmitter();
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

const iterable = on(ee, 'foo');

const expected = [['bar'], [42]];

for await (const event of iterable) {
const current = expected.shift();

assert.deepStrictEqual(current, event);

if (expected.length === 0) {
break;
}
}
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function error() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('error', _err);
});

const iterable = on(ee, 'foo');
let looped = false;
let thrown = false;

try {
// eslint-disable-next-line no-unused-vars
for await (const event of iterable) {
looped = true;
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(looped, false);
}

async function errorDelayed() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('foo', 42);
ee.emit('error', _err);
});

const iterable = on(ee, 'foo');
const expected = [[42]];
let thrown = false;

try {
for await (const event of iterable) {
const current = expected.shift();
assert.deepStrictEqual(current, event);
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function throwInLoop() {
const ee = new EventEmitter();
const _err = new Error('kaboom');

process.nextTick(() => {
ee.emit('foo', 42);
});

try {
for await (const event of on(ee, 'foo')) {
assert.deepStrictEqual(event, [42]);
throw _err;
}
} catch (err) {
assert.strictEqual(err, _err);
}

assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function next() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
process.nextTick(function() {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});
const results = await Promise.all([
iterable.next(),
iterable.next()
]);
assert.deepStrictEqual(results, [{
value: ['bar'],
done: false
}, {
value: [42],
done: false
}]);
}

async function iterableThrow() {
const ee = new EventEmitter();
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42); // lost in the queue
});

const iterable = on(ee, 'foo');
const _err = new Error('kaboom');
let thrown = false;

try {
for await (const event of iterable) {
assert.deepStrictEqual(event, ['bar']);
assert.throws(() => {
// No argument
iterable.throw();
}, {
message: 'The "EventEmitter.AsyncIterator" property must be' +
' of type Error. Received type undefined',
name: 'TypeError'
});
iterable.throw(_err);
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function run() {
await basic();
await error();
await errorDelayed();
await throwInLoop();
await next();
await iterableThrow();
}

run().then(common.mustCall());

0 comments on commit fb4052a

Please sign in to comment.