diff --git a/lib/events.js b/lib/events.js index 1950f19ddcb75d..9b97517f3c72ab 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1164,14 +1164,8 @@ function on(emitter, event, options = kEmptyObject) { addEventListener(emitter, closeEvents[i], closeHandler); } } - if (signal) { - kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - eventTargetAgnosticAddListener( - signal, - 'abort', - abortListener, - { __proto__: null, once: true, [kResistStopPropagation]: true }); - } + + const abortListenerDisposable = signal ? addAbortListener(signal, abortListener) : null; return iterator; @@ -1198,6 +1192,7 @@ function on(emitter, event, options = kEmptyObject) { } function closeHandler() { + abortListenerDisposable?.[SymbolDispose](); removeAll(); finished = true; const doneResult = createIterResult(undefined, true); diff --git a/test/parallel/test-events-on-async-iterator.js b/test/parallel/test-events-on-async-iterator.js index 94f66a81edb0c0..8e53849122b958 100644 --- a/test/parallel/test-events-on-async-iterator.js +++ b/test/parallel/test-events-on-async-iterator.js @@ -6,6 +6,7 @@ const assert = require('assert'); const { on, EventEmitter } = require('events'); const { NodeEventTarget, + kEvents } = require('internal/event_target'); async function basic() { @@ -363,6 +364,36 @@ async function abortableOnAfterDone() { }); } +async function abortListenerRemovedAfterComplete() { + const ee = new EventEmitter(); + const ac = new AbortController(); + + const i = setInterval(() => ee.emit('foo', 'foo'), 1); + try { + // Below: either the kEvents map is empty or the 'abort' listener list is empty + + // Return case + const endedIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + endedIterator.return(); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + + // Throw case + const throwIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + throwIterator.throw(new Error()); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + + // Abort case + on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + ac.abort(new Error()); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + } finally { + clearInterval(i); + } +} + async function run() { const funcs = [ basic, @@ -382,6 +413,7 @@ async function run() { eventTargetAbortableOnAfter, eventTargetAbortableOnAfter2, abortableOnAfterDone, + abortListenerRemovedAfterComplete, ]; for (const fn of funcs) {