From 5cf3f5dc79276d853f9795f922c9a9921a987ba3 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 19 Dec 2022 11:26:01 -0600 Subject: [PATCH 1/4] Add failing test --- .../fixtures/package.json | 8 ++++++ .../fixtures/test.js | 26 +++++++++++++++++++ .../fixtures/worker.mjs | 9 +++++++ .../multiple-workers-are-loaded/test.js | 9 +++++++ 4 files changed, 52 insertions(+) create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/package.json create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/test.js create mode 100644 test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs create mode 100644 test/shared-workers/multiple-workers-are-loaded/test.js diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json b/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json new file mode 100644 index 000000000..54f672450 --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/package.json @@ -0,0 +1,8 @@ +{ + "type": "module", + "ava": { + "files": [ + "*.js" + ] + } +} diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js new file mode 100644 index 000000000..d7dbee1ee --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js @@ -0,0 +1,26 @@ +import test from 'ava'; +import {registerSharedWorker} from 'ava/plugin'; + +const worker1 = registerSharedWorker({ + filename: new URL('worker.mjs#1', import.meta.url), + supportedProtocols: ['ava-4'], + initialData: { + id: '1', + }, +}); + +const worker2 = registerSharedWorker({ + filename: new URL('worker.mjs#2', import.meta.url), + supportedProtocols: ['ava-4'], + initialData: { + id: '2', + }, +}); + +test('can load multiple workers', async t => { + const {value: {data: dataFromWorker1}} = await worker1.subscribe().next(); + const {value: {data: dataFromWorker2}} = await worker2.subscribe().next(); + + t.deepEqual(dataFromWorker1, {id: '1'}); + t.deepEqual(dataFromWorker2, {id: '2'}); +}); diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs b/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs new file mode 100644 index 000000000..1943f14ca --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/worker.mjs @@ -0,0 +1,9 @@ +export default async ({negotiateProtocol}) => { + const protocol = negotiateProtocol(['ava-4']); + + await protocol.ready(); + + for await (const testWorker of protocol.testWorkers()) { + testWorker.publish(protocol.initialData); + } +}; diff --git a/test/shared-workers/multiple-workers-are-loaded/test.js b/test/shared-workers/multiple-workers-are-loaded/test.js new file mode 100644 index 000000000..a15ff6a2d --- /dev/null +++ b/test/shared-workers/multiple-workers-are-loaded/test.js @@ -0,0 +1,9 @@ +import test from '@ava/test'; + +import {fixture} from '../../helpers/exec.js'; + +test('can load multiple workers', async t => { + await fixture(); + + t.pass(); +}); From 05af18eebf406c731f8d8aa0b62d9d5455c0d55b Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 19 Dec 2022 13:39:55 -0600 Subject: [PATCH 2/4] Deregister workers by filename --- lib/plugin-support/shared-workers.js | 33 ++++++++++++++----- .../fixtures/test.js | 8 +++-- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 1a23fa230..18e2f5875 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -51,11 +51,21 @@ function launchWorker(filename, initialData) { export async function observeWorkerProcess(fork, runStatus) { let registrationCount = 0; let signalDeregistered; - let launched; + const launchedWorkerMap = new Map(); + const degregisterWorker = (filename) => { + launchedWorkerMap.get(filename)?.worker.unref(); + launchedWorkerMap.delete(filename); + + registrationCount--; + + if (registrationCount === 0) { + signalDeregistered(); + } + } + const deregistered = new Promise(resolve => { signalDeregistered = () => { // Only unref the worker once all test workers have been deregistered, otherwise the worker may exit before test workers are deregistered - launched?.worker.unref(); resolve(); }; }); @@ -67,22 +77,27 @@ export async function observeWorkerProcess(fork, runStatus) { }); fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => { - launched = launchWorker(filename, initialData); + const launched = launchWorker(filename, initialData); + + launchedWorkerMap.set(filename, launched); const handleWorkerMessage = async message => { if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) { - launched.worker.off('message', handleWorkerMessage); + // launched.worker.off('message', handleWorkerMessage); + + // registrationCount--; + // if (registrationCount === 0) { + // signalDeregistered(filename); + // } - registrationCount--; - if (registrationCount === 0) { - signalDeregistered(); - } + degregisterWorker(filename); } }; launched.statePromises.error.then(error => { - signalDeregistered(); launched.worker.off('message', handleWorkerMessage); + degregisterWorker(filename); + signalDeregistered(); runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)}); signalError(); }); diff --git a/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js index d7dbee1ee..6cf895d78 100644 --- a/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js +++ b/test/shared-workers/multiple-workers-are-loaded/fixtures/test.js @@ -17,10 +17,14 @@ const worker2 = registerSharedWorker({ }, }); +const messageFromWorker1 = worker1.subscribe().next(); +const messageFromWorker2 = worker2.subscribe().next(); + test('can load multiple workers', async t => { - const {value: {data: dataFromWorker1}} = await worker1.subscribe().next(); - const {value: {data: dataFromWorker2}} = await worker2.subscribe().next(); + const {value: {data: dataFromWorker1}} = await messageFromWorker1; + const {value: {data: dataFromWorker2}} = await messageFromWorker2; t.deepEqual(dataFromWorker1, {id: '1'}); t.deepEqual(dataFromWorker2, {id: '2'}); + t.pass(); }); From 56ffe121b2552ab462fe53fa666a3226385891df Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 19 Dec 2022 16:15:36 -0600 Subject: [PATCH 3/4] Cleanup --- lib/plugin-support/shared-workers.js | 57 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 18e2f5875..3358a9cb1 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -48,56 +48,57 @@ function launchWorker(filename, initialData) { return launched; } +/** + * Handles the shared worker lifecycle within a test worker thread. + */ export async function observeWorkerProcess(fork, runStatus) { - let registrationCount = 0; - let signalDeregistered; - const launchedWorkerMap = new Map(); - const degregisterWorker = (filename) => { - launchedWorkerMap.get(filename)?.worker.unref(); - launchedWorkerMap.delete(filename); - - registrationCount--; + let signalAllDeregistered; + + const launchedSharedWorkerMap = new Map(); + + /** + * If a filename is provided, unreferences the shared worker associated with that filename. Otherwise unreferences all shared workers. + * Resolves the main de-registration promise if all shared workers have been unregistered. + */ + const deregisterSharedWorker = filename => { + if (filename) { + launchedSharedWorkerMap.get(filename)?.worker.unref(); + launchedSharedWorkerMap.delete(filename); + } else { + for (const filename of launchedSharedWorkerMap.keys()) { + deregisterSharedWorker(filename); + } + } - if (registrationCount === 0) { - signalDeregistered(); + if (launchedSharedWorkerMap.size === 0) { + signalAllDeregistered(); } - } + }; const deregistered = new Promise(resolve => { - signalDeregistered = () => { - // Only unref the worker once all test workers have been deregistered, otherwise the worker may exit before test workers are deregistered + signalAllDeregistered = () => { resolve(); }; }); fork.promise.finally(() => { - if (registrationCount === 0) { - signalDeregistered(); - } + deregisterSharedWorker(); }); fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => { const launched = launchWorker(filename, initialData); - launchedWorkerMap.set(filename, launched); + launchedSharedWorkerMap.set(filename, launched); const handleWorkerMessage = async message => { if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) { - // launched.worker.off('message', handleWorkerMessage); - - // registrationCount--; - // if (registrationCount === 0) { - // signalDeregistered(filename); - // } - - degregisterWorker(filename); + deregisterSharedWorker(filename); } }; launched.statePromises.error.then(error => { launched.worker.off('message', handleWorkerMessage); - degregisterWorker(filename); - signalDeregistered(); + deregisterSharedWorker(filename); runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)}); signalError(); }); @@ -105,8 +106,6 @@ export async function observeWorkerProcess(fork, runStatus) { try { await launched.statePromises.available; - registrationCount++; - port.postMessage({type: 'ready'}); launched.worker.postMessage({ From 22e59419b2ec1d1b756ef9fb25440a1571453a76 Mon Sep 17 00:00:00 2001 From: Mark Wubben Date: Sun, 8 Jan 2023 20:56:30 +0100 Subject: [PATCH 4/4] Bikeshed abstractions and tracking mechanism to perhaps be clearer --- lib/plugin-support/shared-workers.js | 64 ++++++++++++++-------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 3358a9cb1..2d65454ab 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -48,57 +48,55 @@ function launchWorker(filename, initialData) { return launched; } -/** - * Handles the shared worker lifecycle within a test worker thread. - */ export async function observeWorkerProcess(fork, runStatus) { - let signalAllDeregistered; - - const launchedSharedWorkerMap = new Map(); - - /** - * If a filename is provided, unreferences the shared worker associated with that filename. Otherwise unreferences all shared workers. - * Resolves the main de-registration promise if all shared workers have been unregistered. - */ - const deregisterSharedWorker = filename => { - if (filename) { - launchedSharedWorkerMap.get(filename)?.worker.unref(); - launchedSharedWorkerMap.delete(filename); - } else { - for (const filename of launchedSharedWorkerMap.keys()) { - deregisterSharedWorker(filename); - } - } - - if (launchedSharedWorkerMap.size === 0) { - signalAllDeregistered(); - } - }; + let signalDone; - const deregistered = new Promise(resolve => { - signalAllDeregistered = () => { + const done = new Promise(resolve => { + signalDone = () => { resolve(); }; }); + const activeInstances = new Set(); + + const removeInstance = instance => { + instance.worker.unref(); + activeInstances.delete(instance); + + if (activeInstances.size === 0) { + signalDone(); + } + }; + + const removeAllInstances = () => { + if (activeInstances.size === 0) { + signalDone(); + return; + } + + for (const instance of activeInstances) { + removeInstance(instance); + } + }; + fork.promise.finally(() => { - deregisterSharedWorker(); + removeAllInstances(); }); fork.onConnectSharedWorker(async ({filename, initialData, port, signalError}) => { const launched = launchWorker(filename, initialData); - - launchedSharedWorkerMap.set(filename, launched); + activeInstances.add(launched); const handleWorkerMessage = async message => { if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) { - deregisterSharedWorker(filename); + launched.worker.off('message', handleWorkerMessage); + removeInstance(launched); } }; launched.statePromises.error.then(error => { launched.worker.off('message', handleWorkerMessage); - deregisterSharedWorker(filename); + removeAllInstances(); runStatus.emitStateChange({type: 'shared-worker-error', err: serializeError('Shared worker error', true, error)}); signalError(); }); @@ -126,5 +124,5 @@ export async function observeWorkerProcess(fork, runStatus) { } catch {} }); - return deregistered; + return done; }