Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: t31135461 / Refine jest-worker API #6676

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added jest-worker
Empty file.
10 changes: 6 additions & 4 deletions packages/jest-resolve/src/is_builtin_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ declare var process: {
binding(type: string): {},
};

const EXPERIMENTAL_MODULES = ['worker_threads'];

const BUILTIN_MODULES =
builtinModules ||
Object.keys(process.binding('natives')).filter(
(module: string) => !/^internal\//.test(module),
);
builtinModules.concat(EXPERIMENTAL_MODULES) ||
Object.keys(process.binding('natives'))
.filter((module: string) => !/^internal\//.test(module))
.concat([EXPERIMENTAL_MODULES]);

export default function isBuiltinModule(module: string): boolean {
return BUILTIN_MODULES.indexOf(module) !== -1;
Expand Down
127 changes: 127 additions & 0 deletions packages/jest-worker/src/QueueManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import type {QueueChildMessage} from './types';

export default class QueueManager {
_callback: Function;
_last: Array<QueueChildMessage>;
_locks: Array<boolean>;
_numOfWorkers: number;
_offset: number;
_queue: Array<?QueueChildMessage>;

constructor(numOfWorkers: number, callback: Function) {
this._callback = callback;
this._numOfWorkers = numOfWorkers;
this._queue = [];
this._last = [];
this._locks = [];
this._offset = 0;

// If we exceeded the amount of retries, we will emulate an error reply
// coming from the child. This avoids code duplication related with cleaning
// the queue, and scheduling the next call.

// if (this._retries > this._options.maxRetries) {
// const error = new Error('Call retries were exceeded');

// this.onMessage([
// PARENT_MESSAGE_ERROR,
// error.name,
// error.message,
// error.stack,
// {type: 'WorkerError'},
// ]);
// }
}

_process(workerId: number): QueueManager {
if (this.isLocked(workerId)) {
return this;
}

const job = this.getNextJob(workerId);

if (!job) {
return this;
}

const onEnd = (error: ?Error, result: mixed) => {
job.onEnd(error, result);
this.unlock(workerId);
this._process(workerId);
};

this.lock(workerId);

this._callback(workerId, job.request, job.onStart, onEnd);

job.request[1] = true;

return this;
}

getNextJob(workerId: number): ?QueueChildMessage {
let queueHead = this._queue[workerId];

if (!queueHead) {
return null;
}

while (queueHead && queueHead.request[1]) {
queueHead = queueHead.next;
}

this._queue[workerId] = queueHead;

return queueHead;
}

enqueue(task: QueueChildMessage, workerId: number): QueueManager {
if (task.request[1]) {
return this;
}

if (this._queue[workerId]) {
this._last[workerId].next = task;
} else {
this._queue[workerId] = task;
}

this._last[workerId] = task;
this._process(workerId);

return this;
}

push(task: QueueChildMessage): QueueManager {
for (let i = 0; i < this._numOfWorkers; i++) {
const workerIdx = (this._offset + i) % this._numOfWorkers;
this.enqueue(task, workerIdx);
}
this._offset++;

return this;
}

lock(workerId: number): void {
this._locks[workerId] = true;
}

unlock(workerId: number): void {
this._locks[workerId] = false;
}

isLocked(workerId: number): boolean {
return this._locks[workerId];
}
}
42 changes: 42 additions & 0 deletions packages/jest-worker/src/WorkerPool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
Copy link
Member

@SimenB SimenB Jul 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18? I have to admit I don't know what these should say

*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import BaseWorkerPool from './base/BaseWorkerPool';
import ChildProcessWorker from './workers/ChildProcessWorker';
import NodeThreadsWorker from './workers/NodeThreadsWorker';

import type {
ChildMessage,
WorkerOptions,
OnStart,
OnEnd,
WorkerPoolInterface,
WorkerInterface,
} from './types';

class WorkerPool extends BaseWorkerPool implements WorkerPoolInterface {
send(
workerId: number,
request: ChildMessage,
onStart: OnStart,
onEnd: OnEnd,
): void {
this.getWorkerById(workerId).send(request, onStart, onEnd);
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
return this._options.useWorkers
? new NodeThreadsWorker(workerOptions)
: new ChildProcessWorker(workerOptions);
}
}

export default WorkerPool;
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ beforeEach(() => {
return forkInterface;
});

Worker = require('../worker').default;
Worker = require('../workers/ChildProcessWorker').default;
});

afterEach(() => {
Expand All @@ -47,7 +47,7 @@ afterEach(() => {
});

it('passes fork options down to child_process.fork, adding the defaults', () => {
const child = require.resolve('../child');
const child = require.resolve('../workers/processChild');

process.execArgv = ['--inspect', '-p'];

Expand Down
8 changes: 4 additions & 4 deletions packages/jest-worker/src/__tests__/child.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ beforeEach(() => {
mockCount = 0;

jest.mock(
'../my-fancy-worker',
'../workers/my-fancy-worker',
() => {
mockCount++;

Expand Down Expand Up @@ -74,15 +74,15 @@ beforeEach(() => {
);

jest.mock(
'../my-fancy-standalone-worker',
'../workers/my-fancy-standalone-worker',
() => jest.fn().mockImplementation(() => 12345),
{virtual: true},
);

// This mock emulates a transpiled Babel module that carries a default export
// that corresponds to a method.
jest.mock(
'../my-fancy-babel-worker',
'../workers/my-fancy-babel-worker',
() => ({
__esModule: true,
default: jest.fn().mockImplementation(() => 67890),
Expand All @@ -94,7 +94,7 @@ beforeEach(() => {
process.send = jest.fn();

// Require the child!
require('../child');
require('../workers/processChild');
});

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions packages/jest-worker/src/__tests__/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ beforeEach(() => {
// The worker mock returns a worker with custom methods, plus it stores them
// in a global list, so that they can be accessed later. This list is reset in
// every test.
jest.mock('../worker', () => {
jest.mock('../workers/ChildProcessWorker', () => {
const fakeClass = jest.fn(() => {
const fakeWorker = {
getStderr: () => ({once() {}, pipe() {}}),
Expand Down Expand Up @@ -63,8 +63,8 @@ beforeEach(() => {
virtual: true,
});

Worker = require('../worker').default;
Farm = require('../index').default;
Worker = require('../workers/ChildProcessWorker').default;
Farm = require('..').default;
});

afterEach(() => {
Expand Down
100 changes: 100 additions & 0 deletions packages/jest-worker/src/base/BaseWorkerPool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/

'use strict';

import mergeStream from 'merge-stream';
import path from 'path';

import {CHILD_MESSAGE_END} from '../types';

import type {Readable} from 'stream';
import type {WorkerPoolOptions, WorkerOptions, WorkerInterface} from '../types';

/* istanbul ignore next */
const emptyMethod = () => {};

export default class BaseWorkerPool {
_stderr: Readable;
_stdout: Readable;
_options: WorkerPoolOptions;
_workers: Array<WorkerInterface>;

constructor(workerPath: string, options: WorkerPoolOptions) {
this._options = options;
this._workers = new Array(options.numWorkers);

if (!path.isAbsolute(workerPath)) {
workerPath = require.resolve(workerPath);
}

const stdout = mergeStream();
const stderr = mergeStream();

const {forkOptions, maxRetries} = options;

for (let i = 0; i < options.numWorkers; i++) {
const workerOptions: WorkerOptions = {
forkOptions,
maxRetries,
workerId: i,
workerPath,
};

const worker = this.createWorker(workerOptions);
const workerStdout = worker.getStdout();
const workerStderr = worker.getStderr();

if (workerStdout) {
stdout.add(workerStdout);
}

if (workerStderr) {
stderr.add(workerStderr);
}

this._workers[i] = worker;
}

this._stdout = stdout;
this._stderr = stderr;
}

getStderr(): Readable {
return this._stderr;
}

getStdout(): Readable {
return this._stdout;
}

getWorkers(): Array<WorkerInterface> {
return this._workers;
}

getWorkerById(workerId: number): WorkerInterface {
return this._workers[workerId];
}

createWorker(workerOptions: WorkerOptions): WorkerInterface {
throw Error('Missing method createWorker in WorkerPool');
}

end(): void {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
for (let i = 0; i < this._workers.length; i++) {
this._workers[i].send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
);
}
}
}
Loading