Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): add start method to worker farms #13937

Merged
merged 6 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Features

- `[jest-worker]` Add `start` method to worker farms ([#13937](https://github.com/facebook/jest/pull/13937))

### Fixes

### Chore & Maintenance
Expand Down
8 changes: 7 additions & 1 deletion packages/jest-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,17 @@ Returns a `ReadableStream` where the standard output of all workers is piped. No

Returns a `ReadableStream` where the standard error of all workers is piped. Note that the `silent` option of the child workers must be set to `true` to make it work. This is the default set by `jest-worker`, but keep it in mind when overriding options through `forkOptions`.

#### `start()`

Starts up every worker and calls their `setup` function, if it exists. Returns a `Promise` which resolves when all workers are running and have completed their `setup`.

This is useful if you want to start up all your workers eagerly before they are used to call any other functions.

#### `end()`

Finishes the workers by killing all workers. No further calls can be done to the `Worker` instance.

Returns a Promise that resolves with `{ forceExited: boolean }` once all workers are dead. If `forceExited` is `true`, at least one of the workers did not exit gracefully, which likely happened because it executed a leaky task that left handles open. This should be avoided, force exiting workers is a last resort to prevent creating lots of orphans.
Returns a `Promise` that resolves with `{ forceExited: boolean }` once all workers are dead. If `forceExited` is `true`, at least one of the workers did not exit gracefully, which likely happened because it executed a leaky task that left handles open. This should be avoided, force exiting workers is a last resort to prevent creating lots of orphans.

**Note:**

Expand Down
5 changes: 2 additions & 3 deletions packages/jest-worker/__benchmarks__/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ function testJestWorker() {

async function countToFinish() {
if (++count === calls) {
farm.end();
const endTime = performance.now();

// Let all workers go down.
await sleep(2000);
await farm.end();

resolve({
globalTime: endTime - startTime - 2000,
Expand All @@ -110,7 +109,7 @@ function testJestWorker() {
farm.getStderr().pipe(process.stderr);

// Let all workers come up.
await sleep(2000);
await farm.start();

const startProcess = performance.now();

Expand Down
2 changes: 2 additions & 0 deletions packages/jest-worker/__typetests__/jest-worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ expectError(typedWorkerFarm.runTestAsync());
expectError(typedWorkerFarm.setup());
expectError(typedWorkerFarm.teardown());

expectType<Promise<void>>(typedWorkerFarm.start());

expectError<Promise<void>>(typedWorkerFarm.end());
expectType<Promise<{forceExited: boolean}>>(typedWorkerFarm.end());

Expand Down
1 change: 1 addition & 0 deletions packages/jest-worker/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ it('exposes the right API using passed worker', () => {
getStdout: jest.fn(),
getWorkers: jest.fn(),
send: jest.fn(),
start: jest.fn(),
}));

const farm = new WorkerFarm('/tmp/baz.js', {
Expand Down
24 changes: 24 additions & 0 deletions packages/jest-worker/src/base/BaseWorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import mergeStream = require('merge-stream');
import {
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
PoolExitResult,
WorkerInterface,
Expand Down Expand Up @@ -87,6 +88,29 @@ export default class BaseWorkerPool {
throw Error('Missing method createWorker in WorkerPool');
}

async start(): Promise<void> {
await Promise.all(
this._workers.map(async worker => {
await worker.waitForWorkerReady();

await new Promise<void>((resolve, reject) => {
worker.send(
[CHILD_MESSAGE_CALL_SETUP],
emptyMethod,
error => {
if (error) {
reject(error);
} else {
resolve();
}
},
emptyMethod,
);
});
}),
);
}

async end(): Promise<PoolExitResult> {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
Expand Down
4 changes: 4 additions & 0 deletions packages/jest-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ export class Worker {
return this._workerPool.getStdout();
}

async start(): Promise<void> {
await this._workerPool.start();
}

async end(): Promise<PoolExitResult> {
if (this._ending) {
throw new Error('Farm is ended, no more calls can be done to it');
Expand Down
7 changes: 6 additions & 1 deletion packages/jest-worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const CHILD_MESSAGE_INITIALIZE = 0;
export const CHILD_MESSAGE_CALL = 1;
export const CHILD_MESSAGE_END = 2;
export const CHILD_MESSAGE_MEM_USAGE = 3;
export const CHILD_MESSAGE_CALL_SETUP = 4;

export const PARENT_MESSAGE_OK = 0;
export const PARENT_MESSAGE_CLIENT_ERROR = 1;
Expand All @@ -61,6 +62,7 @@ export interface WorkerPoolInterface {
getWorkers(): Array<WorkerInterface>;
createWorker(options: WorkerOptions): WorkerInterface;
send: WorkerCallback;
start(): Promise<void>;
end(): Promise<PoolExitResult>;
}

Expand Down Expand Up @@ -223,11 +225,14 @@ export type ChildMessageEnd = [

export type ChildMessageMemUsage = [type: typeof CHILD_MESSAGE_MEM_USAGE];

export type ChildMessageCallSetup = [type: typeof CHILD_MESSAGE_CALL_SETUP];

export type ChildMessage =
| ChildMessageInitialize
| ChildMessageCall
| ChildMessageEnd
| ChildMessageMemUsage;
| ChildMessageMemUsage
| ChildMessageCallSetup;

// Messages passed from the children to the parent.

Expand Down
23 changes: 23 additions & 0 deletions packages/jest-worker/src/workers/processChild.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import {isPromise} from 'jest-util';
import {
CHILD_MESSAGE_CALL,
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
CHILD_MESSAGE_INITIALIZE,
CHILD_MESSAGE_MEM_USAGE,
Expand Down Expand Up @@ -61,6 +62,28 @@ const messageListener: NodeJS.MessageListener = (request: any) => {
reportMemoryUsage();
break;

case CHILD_MESSAGE_CALL_SETUP:
if (initialized) {
reportSuccess(void 0);
} else {
const main = require(file!);

initialized = true;

if (main.setup) {
execFunction(
main.setup,
main,
setupArgs,
reportSuccess,
reportInitializeError,
);
} else {
reportSuccess(void 0);
}
}
break;

default:
throw new TypeError(
`Unexpected request from parent process: ${request[0]}`,
Expand Down
23 changes: 23 additions & 0 deletions packages/jest-worker/src/workers/threadChild.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {isMainThread, parentPort} from 'worker_threads';
import {isPromise} from 'jest-util';
import {
CHILD_MESSAGE_CALL,
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
CHILD_MESSAGE_INITIALIZE,
CHILD_MESSAGE_MEM_USAGE,
Expand Down Expand Up @@ -63,6 +64,28 @@ const messageListener = (request: any) => {
reportMemoryUsage();
break;

case CHILD_MESSAGE_CALL_SETUP:
if (initialized) {
reportSuccess(void 0);
} else {
const main = require(file!);

initialized = true;

if (main.setup) {
execFunction(
main.setup,
main,
setupArgs,
reportSuccess,
reportInitializeError,
);
} else {
reportSuccess(void 0);
}
}
break;

default:
throw new TypeError(
`Unexpected request from parent process: ${request[0]}`,
Expand Down