-
Notifications
You must be signed in to change notification settings - Fork 47
/
worker-pool.ts
234 lines (195 loc) · 6.63 KB
/
worker-pool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// @ts-ignore
import { Worker } from 'worker_threads'
import v8 from 'v8'
import os from 'os'
import path from 'path'
import { Task, WorkerWrapper, SetupConfig } from './interfaces'
const WORKER_STATE_READY = 'ready'
const WORKER_STATE_SPAWNING = 'spawning'
const WORKER_STATE_BUSY = 'busy'
const WORKER_STATE_OFF = 'off'
const WORKER_POOL_STATE_ON = 'on'
const WORKER_POOL_STATE_OFF = 'off'
const AVAILABLE_CPUS = os.cpus().length
class WorkerPool {
private maxWorkers = AVAILABLE_CPUS
private taskQueue: Task[] = []
private workers: WorkerWrapper[] = []
private state = WORKER_POOL_STATE_ON
resurrect(deadWorker: WorkerWrapper): void {
// self healing procedure
const worker = new Worker(path.resolve(__dirname, './worker.js'))
deadWorker.status = WORKER_STATE_SPAWNING
deadWorker.worker = worker
worker.once('online', () =>
// next tick, so the worker js gets interpreted
process.nextTick(() => {
deadWorker.status = WORKER_STATE_READY
// remove previous listeners, like the startup error handler
// @ts-ignore
worker.removeAllListeners()
this.tick()
})
)
// startup error handler: should not be thrown or at least handled
worker.once('error', (error: Error) => {
console.error(error)
deadWorker.status = WORKER_STATE_OFF
// @ts-ignore
worker.removeAllListeners()
this.tick()
})
}
tick(): void {
// check for dead threads and resurrect them
this.workers
.filter(({ status }) => status === WORKER_STATE_OFF)
.forEach((deadWorker: WorkerWrapper) => this.resurrect(deadWorker))
if (this.taskQueue.length === 0) return
let availableWorker: WorkerWrapper
for (let i = 0; i < this.workers.length; i++) {
if (this.workers[i].status === WORKER_STATE_READY) {
availableWorker = this.workers[i]
break
}
}
if (typeof availableWorker === 'undefined') return
const work = this.taskQueue.shift()
availableWorker.status = WORKER_STATE_BUSY
const { worker } = availableWorker
const { handler, config, resolve, reject } = work
try {
let variables = ''
for (const key in config.ctx) {
if (!config.ctx.hasOwnProperty(key)) continue
let variable
switch (typeof config.ctx[key]) {
case 'string':
variable = `'${config.ctx[key]}'`
break
case 'object':
variable = JSON.stringify(config.ctx[key])
break
default:
variable = config.ctx[key]
}
variables += `let ${key} = ${variable}\n`
}
// @ts-ignore
const dataSerialized = v8.serialize(config.data)
const dataStr = JSON.stringify(dataSerialized)
const workerStr = `
async function __executor__() {
const v8 = require('v8')
${variables}
const dataParsed = JSON.parse('${dataStr}')
const dataBuffer = Buffer.from(dataParsed.data)
const dataDeserialized = v8.deserialize(dataBuffer)
return await (${handler.toString()})(dataDeserialized)
}
`
// @ts-ignore
worker.once('message', (message: any) => {
this.free(worker)
if (typeof message.error === 'undefined' || message.error === null)
return resolve(message.data)
const error = new Error(message.error.message)
error.stack = message.error.stack
reject(error)
})
// @ts-ignore
worker.once('error', (error: Error) => {
availableWorker.status = WORKER_STATE_OFF
reject(error)
this.tick()
})
worker.postMessage(workerStr)
} catch (err) {
this.free(worker)
reject(err)
}
}
enqueue({ handler, config, resolve, reject }: Task): void {
this.taskQueue.push({ handler, config, resolve, reject })
this.tick()
}
free(worker: any): void {
for (let i = 0; i < this.workers.length; i++) {
// @ts-ignore
if (worker.threadId === this.workers[i].worker.threadId) {
this.workers[i].status = WORKER_STATE_READY
// remove previous listeners
// @ts-ignore
this.workers[i].worker.removeAllListeners()
this.tick()
break
}
}
}
setup(config: SetupConfig = { maxWorkers: AVAILABLE_CPUS }): Promise<void> {
this.maxWorkers = config.maxWorkers > 0 ? config.maxWorkers : AVAILABLE_CPUS
if (this.maxWorkers > 10) console.warn(`Worker pool has more than 10 workers.\nYou should also increase the Max Listeners of Node.js (https://nodejs.org/docs/latest/api/events.html#events_emitter_setmaxlisteners_n)\nOtherwise, limit them with start({maxWorkers: 10})`)
return new Promise((resolve, reject) => {
let counterSuccess = 0
let counterFailure = 0
for (let i = 0; i < this.maxWorkers; i++) {
const worker = new Worker(`${__dirname}/worker.js`)
this.workers.push({
status: WORKER_STATE_SPAWNING,
worker
})
worker.once(
'online',
(index => () => {
// next tick, so the worker js gets interpreted
process.nextTick(() => {
this.workers[index].status = WORKER_STATE_READY
// remove previous listeners, like the startup error handler
// @ts-ignore
this.workers[index].worker.removeAllListeners()
counterSuccess++
// if there's at least one working thread, go ahead
if (
counterSuccess > 0 &&
counterSuccess + counterFailure === this.maxWorkers
)
resolve()
})
})(i)
)
// startup error handler: should not be thrown or at least handled
worker.once(
'error',
(index => (error: Error) => {
this.workers[index].status = WORKER_STATE_OFF
// @ts-ignore
this.workers[index].worker.removeAllListeners()
counterFailure++
// stop the worker pool if no worker is spawned
if (counterFailure === this.maxWorkers) {
this.state = WORKER_POOL_STATE_OFF
reject(error)
}
})(i)
)
}
})
}
teardown(): Promise<void> {
return new Promise(resolve => {
let counter = 0
for (let i = 0; i < this.workers.length; i++) {
// @ts-ignore
this.workers[i].worker.terminate(() => {
counter++
if (counter === this.workers.length) {
this.state = WORKER_POOL_STATE_OFF
this.workers = []
resolve()
}
})
}
})
}
}
export default new WorkerPool()