-
-
Notifications
You must be signed in to change notification settings - Fork 3k
/
buffered-worker-pool.js
174 lines (159 loc) · 4.74 KB
/
buffered-worker-pool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
* A wrapper around a third-party child process worker pool implementation.
* Used by {@link module:buffered-runner}.
* @private
* @module buffered-worker-pool
*/
'use strict';
const serializeJavascript = require('serialize-javascript');
const workerpool = require('workerpool');
const {deserialize} = require('./serializer');
const debug = require('debug')('mocha:parallel:buffered-worker-pool');
const {createInvalidArgumentTypeError} = require('../errors');
const WORKER_PATH = require.resolve('./worker.js');
/**
* A mapping of Mocha `Options` objects to serialized values.
*
* This is helpful because we tend to same the same options over and over
* over IPC.
* @type {WeakMap<Options,string>}
*/
let optionsCache = new WeakMap();
/**
* These options are passed into the [workerpool](https://npm.im/workerpool) module.
* @type {Partial<WorkerPoolOptions>}
*/
const WORKER_POOL_DEFAULT_OPTS = {
// use child processes, not worker threads!
workerType: 'process',
// ensure the same flags sent to `node` for this `mocha` invocation are passed
// along to children
forkOpts: {execArgv: process.execArgv},
maxWorkers: workerpool.cpus - 1
};
/**
* A wrapper around a third-party worker pool implementation.
* @private
*/
class BufferedWorkerPool {
/**
* Creates an underlying worker pool instance; determines max worker count
* @param {Partial<WorkerPoolOptions>} [opts] - Options
*/
constructor(opts = {}) {
const maxWorkers = Math.max(
1,
typeof opts.maxWorkers === 'undefined'
? WORKER_POOL_DEFAULT_OPTS.maxWorkers
: opts.maxWorkers
);
/* istanbul ignore next */
if (workerpool.cpus < 2) {
// TODO: decide whether we should warn
debug(
'not enough CPU cores available to run multiple jobs; avoid --parallel on this machine'
);
} else if (maxWorkers >= workerpool.cpus) {
// TODO: decide whether we should warn
debug(
'%d concurrent job(s) requested, but only %d core(s) available',
maxWorkers,
workerpool.cpus
);
}
/* istanbul ignore next */
debug(
'run(): starting worker pool of max size %d, using node args: %s',
maxWorkers,
process.execArgv.join(' ')
);
this.options = Object.assign({}, WORKER_POOL_DEFAULT_OPTS, opts, {
maxWorkers
});
this._pool = workerpool.pool(WORKER_PATH, this.options);
}
/**
* Terminates all workers in the pool.
* @param {boolean} [force] - Whether to force-kill workers. By default, lets workers finish their current task before termination.
* @private
* @returns {Promise<void>}
*/
async terminate(force = false) {
/* istanbul ignore next */
debug('terminate(): terminating with force = %s', force);
return this._pool.terminate(force);
}
/**
* Adds a test file run to the worker pool queue for execution by a worker process.
*
* Handles serialization/deserialization.
*
* @param {string} filepath - Filepath of test
* @param {Options} [options] - Options for Mocha instance
* @private
* @returns {Promise<SerializedWorkerResult>}
*/
async run(filepath, options = {}) {
if (!filepath || typeof filepath !== 'string') {
throw createInvalidArgumentTypeError(
'Expected a non-empty filepath',
'filepath',
'string'
);
}
const serializedOptions = BufferedWorkerPool.serializeOptions(options);
const result = await this._pool.exec('run', [filepath, serializedOptions]);
return deserialize(result);
}
/**
* Returns stats about the state of the worker processes in the pool.
*
* Used for debugging.
*
* @private
*/
stats() {
return this._pool.stats();
}
/**
* Instantiates a {@link WorkerPool}.
* @private
*/
static create(...args) {
return new BufferedWorkerPool(...args);
}
/**
* Given Mocha options object `opts`, serialize into a format suitable for
* transmission over IPC.
*
* @param {Options} [opts] - Mocha options
* @private
* @returns {string} Serialized options
*/
static serializeOptions(opts = {}) {
if (!optionsCache.has(opts)) {
const serialized = serializeJavascript(opts, {
unsafe: true, // this means we don't care about XSS
ignoreFunction: true // do not serialize functions
});
optionsCache.set(opts, serialized);
/* istanbul ignore next */
debug(
'serializeOptions(): serialized options %O to: %s',
opts,
serialized
);
}
return optionsCache.get(opts);
}
/**
* Resets internal cache of serialized options objects.
*
* For testing/debugging
* @private
*/
static resetOptionsCache() {
optionsCache = new WeakMap();
}
}
exports.BufferedWorkerPool = BufferedWorkerPool;