Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let the user choose worker implementation #64

Merged
merged 36 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c94cf75
Let the user choose worker implementation
yurynix Jun 18, 2023
0ae9e68
Limit concurrency even further to see if it pleases CI
yurynix Jun 18, 2023
61f76a5
Add a version field to test it in some projects
yurynix Jun 18, 2023
7ed2373
This test was passing only because threads get parent process' uptime
yurynix Jun 18, 2023
208b183
Add the new option to types
yurynix Jun 18, 2023
19d5f26
Please the linter
yurynix Jun 18, 2023
a177b9b
Refine the type
yurynix Jun 18, 2023
2a3ebf6
Remove version, as it was.
yurynix Jun 18, 2023
1c28b1a
Refine the test to better assert expectations
yurynix Jun 18, 2023
5ee7a31
Test raw data transfer on process as well
yurynix Jun 19, 2023
dfaacdc
Bring Transport back from v1
yurynix Jun 19, 2023
2d37dfd
Install required dependency
yurynix Jun 19, 2023
cb058fe
Put a version so I can install it in a different project
yurynix Jun 19, 2023
fd8f9c2
Since maxWorkers & minWorkers set to 2, we should expect 2 workers to…
yurynix Jun 19, 2023
eecff9a
Produce less noise when exiting
yurynix Jun 19, 2023
e0fec4d
It can take a bit for a worker to start
yurynix Jun 19, 2023
66c2b50
Bring back forgotten tests
yurynix Jun 19, 2023
579a00d
Refactor data transport to use named pipes directly
yurynix Jun 19, 2023
e6aeb1a
Try differen test structure
yurynix Jun 19, 2023
289117c
Kickoff pool termination on process exit
yurynix Jun 19, 2023
4307da4
Tell AVA to use processes instead of worker_threads to run tests
yurynix Jun 19, 2023
2998f1c
Try to see why CI fails on this one
yurynix Jun 19, 2023
89fb4c1
Lets ensure the order
yurynix Jun 19, 2023
e14280f
Print more data on the test
yurynix Jun 19, 2023
ef3f75e
Wait explictly for workers to startup
yurynix Jun 19, 2023
8010dc0
Remove no longer needed printout
yurynix Jun 19, 2023
615751c
Run prettier on the files to fix identation
yurynix Jun 20, 2023
b130070
Add threadId getter
yurynix Jun 20, 2023
4d24f1a
Identifiy worker by it's apropriate identifier
yurynix Jun 20, 2023
408f509
Split the workers start time more
yurynix Jun 20, 2023
9dc2115
This can be simplified
yurynix Jun 20, 2023
782da08
Improve test
yurynix Jun 20, 2023
49ffad8
Allow worker to become idle
yurynix Jun 20, 2023
74f93a9
Bring back the code that was breaking the test
yurynix Jun 20, 2023
3feaa2c
Remove printout
yurynix Jun 20, 2023
cdaa597
Remove version field to align with main branch
yurynix Jun 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Describes a WorkerNodes options.
* [.workerEndurance](#WorkerNodesOptions+workerEndurance) : <code>Number</code>
* [.workerStopTimeout](#WorkerNodesOptions+workerStopTimeout) : <code>Number</code>
* [.resourceLimits](#WorkerNodesOptions+resourceLimits) : <code>Object</code>
* [.workerType](#WorkerNodesOptions+workerType) : <code>string</code>

<a name="WorkerNodesOptions+autoStart"></a>

Expand Down Expand Up @@ -195,7 +196,7 @@ The timeout value (in milliseconds) for the worker to stop before sending SIGKIL
<a name="WorkerNodesOptions+resourceLimits"></a>

### options.resourceLimits : <code>Object</code>
Provides the set of JS engine resource constraints inside this Worker thread.
Provides the set of JS engine resource constraints inside this Worker thread. (Usable when using `workerType: thread` only)

**Kind**: instance property of [<code>WorkerNodesOptions</code>](#WorkerNodesOptions)
**Properties**
Expand All @@ -207,6 +208,10 @@ Provides the set of JS engine resource constraints inside this Worker thread.
| codeRangeSizeMb | <code>Number</code> | The size of a pre-allocated memory range used for generated code |
| stackSizeMb | <code>Number</code> | The default maximum stack size for the thread. Small values may lead to unusable Worker instances |

### options.workerType : <code>string</code>
Can be either `process` or `thread` (default), that controls the underlying implementation used, either `child_process` or `worker_threads`.
Most usecases are perfectly fine with `thread` implementation, some work loads though, might need to use `process`, for example, if you are using
`process.chdir()` call which is [not supported](https://github.com/nodejs/node/issues/41673) in `worker_threads`.

## Example

Expand Down
3 changes: 3 additions & 0 deletions e2e/async-initialization-process.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./async-initialization.base");

testBase("process");
3 changes: 3 additions & 0 deletions e2e/async-initialization-thread.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./async-initialization.base");

testBase("thread");
35 changes: 35 additions & 0 deletions e2e/async-initialization.base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const test = require('ava');

const WorkerNodes = require('../');
const { fixture } = require('./utils');

module.exports = function describe(workerType) {
test(`should not mark worker as ready until module fully initialized`, t => {
// given
const workerNodes = new WorkerNodes(fixture('async-initialization'), {
maxWorkers: 1,
asyncWorkerInitialization: true,
autoStart: true,
workerType
});

// then
t.falsy(workerNodes.pickWorker());
});

test(`should correctly handle task after initialization`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-initialization'), {
maxWorkers: 1,
asyncWorkerInitialization: true,
autoStart: true,
workerType
});

// when
const result = await workerNodes.call.result();

// then
t.is(result, 'result');
});
}
31 changes: 0 additions & 31 deletions e2e/async-initialization.spec.js

This file was deleted.

3 changes: 3 additions & 0 deletions e2e/auto-start-process.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./auto-start.base");

testBase("process");
3 changes: 3 additions & 0 deletions e2e/auto-start-thread.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./auto-start.base");

testBase("thread");
76 changes: 76 additions & 0 deletions e2e/auto-start.base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
const test = require('ava');

const WorkerNodes = require('../');
const { fixture, unique, repeatCall, eventually } = require('./utils');

module.exports = function describe(workerType) {
test(`should be disabled by default`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('process-info'), { workerType });

// when
const callTime = new Date();
await workerNodes.call.noop();

// then
workerNodes.workersQueue.forEach(worker => {
const startDate = worker.process.startDate;

if (startDate) {
t.true(startDate >= callTime);
}
});
});

test(`should result in spawn of the workers before the first call if active`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('process-info'), { autoStart: true, minWorkers: 1, maxWorkers: 1, workerType });
await workerNodes.ready();

// when
const callTime = Date.now();
const workerStartTime = await workerNodes.call.getStartTime();

// then
t.true(workerStartTime <= callTime);
});

test(`should force the workerNodes to wait for all the required workers to start before reporting ready`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('process-info'), { autoStart: true, minWorkers: 4, maxWorkers: 4, workerType });
await workerNodes.ready();
const callStartTime = Date.now();

// when
const results = await repeatCall(workerNodes.call.getStartTime, 4);

// then
t.is(results.length, 4);

results.forEach(result => t.true(result <= callStartTime));
});

test(`should only use workers that are fully initialized`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('slow-module'), {
autoStart: true,
minWorkers: 2,
maxWorkers: 2,
taskMaxRetries: Infinity,
workerType
});
await workerNodes.ready();

await repeatCall(workerNodes.call.getPid, 4);

// when
workerNodes.workersQueue.storage[0].process.exit();
await repeatCall(workerNodes.call.getPid, 4);

// then
// we wait for all workers to come back a live
const getLiveWorkers = () => workerNodes.workersQueue.filter(worker => worker.isProcessAlive);
await eventually(() => unique(getLiveWorkers()).length === 2);
t.is(unique(getLiveWorkers()).length, 2);
});
}
71 changes: 0 additions & 71 deletions e2e/auto-start.spec.js

This file was deleted.

3 changes: 3 additions & 0 deletions e2e/call-timeout-process.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./call-timeout.base");

testBase("process");
3 changes: 3 additions & 0 deletions e2e/call-timeout-thread.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const testBase = require("./call-timeout.base");

testBase("thread");
92 changes: 92 additions & 0 deletions e2e/call-timeout.base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
const test = require('ava');

const WorkerNodes = require('../');
const errors = require('../lib/errors');
const { fixture, wait, eventually } = require('./utils');

module.exports = function describe(workerType) {
test(`should not affect fast method calls`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-tasks'), { taskTimeout: 500, maxWorkers: 1, workerType });
await workerNodes.ready();

// then
await t.notThrowsAsync(workerNodes.call.task100ms);
});

test(`should result in an error when a single method call takes too long`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-tasks'), { taskTimeout: 250, maxWorkers: 1, workerType });
await workerNodes.ready();

// then
await t.throwsAsync(workerNodes.call.task500ms, { instanceOf: errors.TimeoutError });
});

test(`should kill the worker that was involved in processing the task`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-tasks'), { taskTimeout: 250, maxWorkers: 1, workerType });
await workerNodes.ready();

// when
let executingWorkerId;
await t.throwsAsync(async () => {
const p = workerNodes.call.task500ms();
workerNodes.workersQueue.storage.forEach(worker => executingWorkerId = worker.id);
await p;
}, { instanceOf: errors.TimeoutError });
await eventually(() => workerNodes.workersQueue.storage.filter(worker => worker.id === executingWorkerId).length === 0);

// then
t.is(workerNodes.workersQueue.storage.filter(worker => worker.id === executingWorkerId).length, 0);
});

test(`should result with rejection of all the calls that the worker was processing at the moment`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-tasks'), {
autoStart: true,
minWorkers: 1,
maxWorkers: 1,
maxTasksPerWorker: 2,
taskTimeout: 250,
workerType
});
await workerNodes.ready();

// when

const failingCall = workerNodes.call.task500ms().catch(error => error);
await wait(200);

const secondCall = workerNodes.call.task100ms().catch(error => error);
const results = await Promise.all([failingCall, secondCall]);

// then
results.forEach(result => {
t.true(result instanceof errors.TimeoutError);
});
});

test(`should result in the spawn of a new worker`, async t => {
// given
const workerNodes = new WorkerNodes(fixture('async-tasks'), {
maxWorkers: 1,
maxTasksPerWorker: Infinity,
taskTimeout: 250,
workerType
});
await workerNodes.ready();

// when
await workerNodes.call.noop();
const idBefore = workerNodes.workersQueue.storage[0].id;
const callResult = await workerNodes.call.task500ms().catch(error => error);
await workerNodes.call.noop();
const idAfter = workerNodes.workersQueue.storage[0].id;

// then
t.true(callResult instanceof errors.TimeoutError);
t.not(idBefore, idAfter);
t.is(typeof idBefore, 'number');
});
}
Loading