diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 2d65454ab..74b2422c9 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -104,7 +104,7 @@ export async function observeWorkerProcess(fork, runStatus) { try { await launched.statePromises.available; - port.postMessage({type: 'ready'}); + port.postMessage({ava: {type: 'ready'}}); launched.worker.postMessage({ type: 'register-test-worker', diff --git a/lib/worker/channel.cjs b/lib/worker/channel.cjs index 5d4449810..9bcc6808e 100644 --- a/lib/worker/channel.cjs +++ b/lib/worker/channel.cjs @@ -7,42 +7,14 @@ const timers = require('../now-and-timers.cjs'); const {isRunningInChildProcess, isRunningInThread} = require('./utils.cjs'); -let pEvent = async (emitter, event, options) => { - // We need to import p-event, but import() is asynchronous. Buffer any events - // emitted in the meantime. Don't handle errors. - const buffer = []; - const addToBuffer = (...args) => buffer.push(args); - emitter.on(event, addToBuffer); - - try { - ({pEvent} = await import('p-event')); - } finally { - emitter.off(event, addToBuffer); - } - - if (buffer.length === 0) { - return pEvent(emitter, event, options); - } - - // Now replay buffered events. - const replayEmitter = new events.EventEmitter(); - const promise = pEvent(replayEmitter, event, options); - for (const args of buffer) { - replayEmitter.emit(event, ...args); - } - - const replay = (...args) => replayEmitter.emit(event, ...args); - emitter.on(event, replay); - - try { - return await promise; - } finally { - emitter.off(event, replay); +const selectAvaMessage = async (channel, type) => { + for await (const [message] of events.on(channel, 'message')) { + if (message.ava?.type === type) { + return message; + } } }; -const selectAvaMessage = type => message => message.ava && message.ava.type === type; - class RefCounter { constructor() { this.count = 0; @@ -133,8 +105,8 @@ if (isRunningInChildProcess) { // Node.js. In order to keep track, explicitly reference before attaching. handle.ref(); -exports.options = pEvent(handle.channel, 'message', selectAvaMessage('options')).then(message => message.ava.options); // eslint-disable-line unicorn/prefer-top-level-await -exports.peerFailed = pEvent(handle.channel, 'message', selectAvaMessage('peer-failed')); +exports.options = selectAvaMessage(handle.channel, 'options').then(message => message.ava.options); // eslint-disable-line unicorn/prefer-top-level-await +exports.peerFailed = selectAvaMessage(handle.channel, 'peer-failed'); // eslint-disable-line unicorn/prefer-top-level-await exports.send = handle.send.bind(handle); exports.unref = handle.unref.bind(handle); @@ -143,7 +115,7 @@ async function flush() { handle.ref(); const promise = pendingPings.then(async () => { handle.send({type: 'ping'}); - await pEvent(handle.channel, 'message', selectAvaMessage('pong')); + await selectAvaMessage(handle.channel, 'pong'); if (promise === pendingPings) { handle.unref(); } @@ -202,7 +174,7 @@ function registerSharedWorker(filename, initialData) { // The attaching of message listeners will cause the port to be referenced by // Node.js. In order to keep track, explicitly reference before attaching. sharedWorkerHandle.ref(); - const ready = pEvent(ourPort, 'message', ({type}) => type === 'ready').then(() => { + const ready = selectAvaMessage(ourPort, 'ready').then(() => { currentlyAvailable = error === null; }).finally(() => { // Once ready, it's up to user code to subscribe to messages, which (see @@ -214,7 +186,7 @@ function registerSharedWorker(filename, initialData) { // Errors are received over the test worker channel, not the message port // dedicated to the shared worker. - pEvent(channelEmitter, 'shared-worker-error').then(() => { + events.once(channelEmitter, 'shared-worker-error').then(() => { unsubscribe(); sharedWorkerHandle.forceUnref(); error = new Error('The shared worker is no longer available'); diff --git a/package-lock.json b/package-lock.json index 609477430..8bd4475d8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39,7 +39,6 @@ "matcher": "^5.0.0", "mem": "^9.0.2", "ms": "^2.1.3", - "p-event": "^6.0.0", "p-map": "^6.0.0", "picomatch": "^2.3.1", "pkg-conf": "^4.0.0", @@ -6936,20 +6935,6 @@ "node": ">=4" } }, - "node_modules/p-event": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/p-event/-/p-event-6.0.0.tgz", - "integrity": "sha512-Xbfxd0CfZmHLGKXH32k1JKjQYX6Rkv0UtQdaFJ8OyNcf+c0oWCeXHc1C4CX/IESZLmcvfPa5aFIO/vCr5gqtag==", - "dependencies": { - "p-timeout": "^6.1.2" - }, - "engines": { - "node": ">=16.17" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/p-limit": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", @@ -6991,17 +6976,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/p-timeout": { - "version": "6.1.2", - "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.2.tgz", - "integrity": "sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==", - "engines": { - "node": ">=14.16" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", diff --git a/package.json b/package.json index 58bb83637..db55f216e 100644 --- a/package.json +++ b/package.json @@ -110,7 +110,6 @@ "matcher": "^5.0.0", "mem": "^9.0.2", "ms": "^2.1.3", - "p-event": "^6.0.0", "p-map": "^6.0.0", "picomatch": "^2.3.1", "pkg-conf": "^4.0.0", diff --git a/test-tap/fixture/fail-fast/multiple-files/passes-slow.cjs b/test-tap/fixture/fail-fast/multiple-files/passes-slow.cjs index b2793f723..9dd4ebbd0 100644 --- a/test-tap/fixture/fail-fast/multiple-files/passes-slow.cjs +++ b/test-tap/fixture/fail-fast/multiple-files/passes-slow.cjs @@ -1,3 +1,4 @@ +const events = require('node:events'); const {parentPort} = require('node:worker_threads'); const test = require('../../../../entrypoints/main.cjs'); @@ -6,14 +7,12 @@ test.serial('first pass', async t => { t.pass(); const timer = setTimeout(() => {}, 60_000); // Ensure process stays alive. const source = parentPort || process; - const {pEvent} = await import('p-event'); - await pEvent(source, 'message', message => { - if (message.ava) { - return message.ava.type === 'peer-failed'; + for await (const [message] of events.on(source, 'message')) { + if (message.ava?.type === 'peer-failed') { + break; } + } - return false; - }); clearTimeout(timer); });