This repository has been archived by the owner on Jan 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 112
/
cluster.js
186 lines (170 loc) · 6.32 KB
/
cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
const drivers = require('./drivers.js');
const cp = require('child_process');
const util = require('util');
class Cluster {
/**
* A helper class that wraps all communication with worker processes that do the actual import/export
* @param {Environment} env
* @param {number} numWorkers
* @constructor
*/
constructor(env, numWorkers) {
this.workers = {};
this.workListeners = [];
this.errorListeners = [];
this.endListeners = [];
this.processed = 0;
this.total = env.statistics.source.docs.total;
this.messageReceiver = m => {
let allDone = false;
switch (m.type) {
case 'Error':
this.errorListeners.forEach(listener => listener(m.message));
break;
case 'Done':
this.workers[m.id].state = 'ready';
this.processed += m.processed;
this.workListeners.forEach(listener => listener(m.processed));
for (let id in this.workers) {
allDone = allDone || this.workers[id].state == 'ready' || this.workers[id].state == 'end';
}
if (m.memUsage.heapUsed > env.statistics.memory.peak) {
exports.env.statistics.memory.peak = m.memUsage.heapUsed;
exports.env.statistics.memory.ratio = m.memUsage.ratio;
}
break;
case 'End':
this.workers[m.id].state = 'end';
break;
}
if (allDone && this.processed == this.total) {
this.endListeners.forEach(listener => listener());
for (let doneId in this.workers) {
this.send.done(doneId);
}
}
};
this.send = {
initialize: (id, env) => {
this.workers[id].process.send({ type: 'Initialize', id, env });
this.workers[id].state = 'ready';
},
message: (id, from, size) => {
this.workers[id].process.send({ type: 'Work', from, size });
this.workers[id].state = 'working';
},
done: id => this.workers[id].process.send({ type: 'Done' })
};
for (let i = 0; i < numWorkers; i++) {
let worker = cp.fork(exports.workerPath, ["--nouse-idle-notification", "--expose-gc", "--always_compact"]);
worker.on('message', this.messageReceiver);
this.workers[i] = { process: worker };
this.send.initialize(i, env);
}
}
/**
* Sends a json object to an idle worker. If no worker is idle this will block until one becomes idle.
* @param {number} from
* @param {number} size
* @param callback
*/
work(from, size, callback) {
let allEnded = true;
for (let id in this.workers) {
let worker = this.workers[id];
if (worker.state == 'ready') {
this.send.message(id, from, size);
callback();
return;
}
if (worker.state != 'end') {
allEnded = false;
}
if (allEnded && this.processed != this.total) {
callback("The export has finished, but fewer documents than expected have beene exported.");
}
}
process.nextTick(() => this.work(from, size, callback));
}
/**
* Add a listener here to receive messages from _workers whenever they send a message.
* @param {Cluster~workDone} callback
*/
onWorkDone(callback) {
this.workListeners.push(callback);
}
/**
* @callback Cluster~workDone
* @param {number} processed Number of documents processed during this work step
*/
/**
* Add a listener here to receive messages from _workers whenever they throw an error.
* @param {errorCb} callback
*/
onError(callback) {
this.errorListeners.push(callback);
}
/**
* When all _workers are in idle mode (and no more messages are queued up) this listener will be fired.
* @param {emptyCb} callback
*/
onEnd(callback) {
this.endListeners.push(callback);
}
}
/**
* An implementation of the cluster that is not a cluster, but instead calls the worker directly in the same process.
* @param {Environment} env
*/
class NoCluster extends Cluster {
constructor(env) {
super(env, 0);
this.worker = require(exports.workerPath);
this.worker.env = env;
this.worker.id = 0;
this.worker.initialize_transform();
let source = drivers.get(env.options.drivers.source).driver;
if (source.prepareTransfer) {
source.prepareTransfer(env, true);
}
let target = drivers.get(env.options.drivers.target).driver;
if (target.prepareTransfer) {
target.prepareTransfer(env, false);
}
this.worker.send.done = processed => {
this.processed += processed;
this.workListeners.forEach(listener => listener(processed));
if (this.processed == this.total) {
this.endListeners.forEach(listener => listener());
this.worker.end();
}
this.workDoneListener();
};
this.worker.send.error = exception => this.errorListeners.forEach(listener => listener(exception));
this.worker.state = 'ready';
}
/**
* Overrides the work function of the parent which would use process.send to communicate with the worker instead of
* calling him directly.
*
* @param {number} from
* @param {number} size
* @param {Cluster~workDone} callback
*/
work(from, size, callback) {
this.workDoneListener = callback;
this.worker.work(from, size);
}
}
exports.workerPath = './worker.js';
/**
* Create a new adapter for the data transfer which will be either a Clustered implementation (if numWorkers is
* greater 1) or a direct calling implementation that will be executed in the same process.
*
* @param {Environment} env
* @param {number} numWorkers
* @returns {Cluster}
*/
exports.run = (env, numWorkers) => {
return numWorkers < 2 ? new NoCluster(env) : new Cluster(env, numWorkers);
};