-
Notifications
You must be signed in to change notification settings - Fork 42
/
pool.ts
299 lines (251 loc) · 8.09 KB
/
pool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
type PhpInstance = any;
type request = (instance: PhpInstance) => Promise<any>;
export type PoolOptions = {
spawn: () => Promise<PhpInstance>;
reap?: (instance: PhpInstance) => void;
fatal?: (instance: PhpInstance, error: any) => any;
maxRequests?: number;
maxJobs?: number;
};
let childCount = 0;
/**
* Tracks stats of instances in a pool.
* @private
*/
class PoolInfo {
id: number; // Unique ID for debugging purposes.
started: number; // Time spawned.
requests = 0; // Total requests processed.
active = false; // Whether instance is considered active.
constructor() {
this.id = childCount++;
this.started = Date.now();
}
}
/**
* Spawns new instances if the pool is not full.
* Returns a list of new instances.
* @param pool the pool object to work on
* @private
*/
const spawn = async (pool: Pool) => {
const newInstances = new Set();
if (pool.maxJobs <= 0) return newInstances;
while (pool.instanceInfo.size < pool.maxJobs) {
const info = new PoolInfo();
const instance = await pool.spawn();
pool.instanceInfo.set(instance, info);
info.active = true;
newInstances.add(instance);
}
return newInstances;
};
/**
* Reaps children if they've passed the maxRequest count.
* @param pool the pool object to work on
* @private
*/
const reap = (pool: Pool) => {
for (const [instance, info] of pool.instanceInfo) {
if (pool.maxRequests > 0 && info.requests >= pool.maxRequests) {
info.active = false;
pool.instanceInfo.delete(instance);
pool.reap(instance);
continue;
}
}
};
/**
* Handle fatal errors gracefully.
* @param pool the pool object to work on
* @param instance the php instance to clean up
* @param error the actual error that got us here
* @private
*/
const fatal = (pool: Pool, instance: PhpInstance, error: Error) => {
console.error(error);
if (instance && pool.instanceInfo.has(instance)) {
const info = pool.instanceInfo.get(instance);
info.active = false;
pool.instanceInfo.delete(instance);
}
return pool.fatal(instance, error);
};
/**
* Find the next available idle instance.
* @private
*/
const getIdleInstance = (pool: Pool) => {
const sorted = [...pool.instanceInfo].sort(
(a, b) => a[1].requests - b[1].requests
);
for (const [instance, info] of sorted) {
if (pool.running.has(instance)) {
continue;
}
if (!info.active) {
continue;
}
return instance;
}
return false;
};
/**
* Maintains and refreshes a list of php instances
* such that each one will only be fed X number of requests
* before being discarded and replaced.
*
* Since we're dealing with a linear, "physical" memory array, as opposed to a
* virtual memory system afforded by most modern OSes, we're prone to things
* like memory fragmentation. In that situation, we could have the entire
* gigabyte empty except for a few sparse allocations. If no contiguous region
* of memory exists for the length requested, memory allocations will fail.
* This tends to happen when a new request attempts to initialize a heap
* structure but cannot find a contiguous 2mb chunk of memory.
*
* We can go as far as debugging PHP itself, and contributing the fix upstream.
* But even in this case we cannot guarantee that a third party extension will
* not introduce a leak sometime in the future. Therefore, we should have a
* solution robust to memory leaks that come from upstream code. I think that
* following the native strategy is the best way.
*
* https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-requests
*
*/
export class Pool {
instanceInfo = new Map(); // php => PoolInfo
spawn: () => Promise<any>; // Async callback to create new instances.
fatal: (instance: PhpInstance, error: any) => any; // Callback called on instance fatal errors.
reap: (instance: PhpInstance) => void; // Callback called on destroyed instances.
maxRequests: number; // Max requests to feed each instance
maxJobs: number; // Max number of instances to maintain at once.
resolvers = new Map(); // Inverted promises to notify async code of backlogged item processed.
running = new Set(); // Set of busy PHP instances.
backlog = []; // Set of request callbacks waiting to be run.
/**
* Create a new pool.
* @param options - {spawn, maxRequests, maxJobs}
*/
constructor({
maxRequests = 128,
maxJobs = 1,
spawn = undefined,
fatal = (instance: PhpInstance, error: any) => error,
reap = (instance: PhpInstance) => {},
} = {}) {
if (!spawn) {
throw new Error('Spawn method is required for pool.');
}
Object.defineProperties(this, {
maxRequests: { value: maxRequests },
maxJobs: { value: maxJobs },
spawn: { value: spawn },
fatal: { value: fatal },
reap: { value: reap },
});
}
/**
* Queue up a callback that will make a request when an
* instance becomes idle.
* @param item Callback to run when intance becomes available. Should accept the instance as the first and only param, and return a promise that resolves when the request is complete.
* @public
*/
async enqueue(item: request): Promise<any> {
reap(this);
let idleInstance;
try {
await spawn(this);
idleInstance = getIdleInstance(this);
} catch (error) {
return Promise.reject(fatal(this, idleInstance, error));
}
// Defer the callback if we don't have an idle instance available.
if (!idleInstance) {
this.backlog.push(item);
// Split a promise open so it can be resolved or
// rejected later when the item is processed.
const notifier = new Promise((resolve, reject) =>
this.resolvers.set(item, { resolve, reject })
);
// Return the notifier so async calling code
// can still respond correctly when the item
// is finally processed.
return notifier;
}
// Given an instance, create a new callback that will clean up
// after the instance processes a request, and optionally
// will also kick off the next request.
const onCompleted = (instance) => async () => {
this.running.delete(instance);
reap(this);
// Break out here if the backlog is empty.
if (!this.backlog.length) {
return;
}
// This is the instance that completed
// so we can re-use it...
let nextInstance = instance;
const next = this.backlog.shift();
const info = this.instanceInfo.get(nextInstance);
this.running.add(nextInstance);
info.requests++;
let request;
try {
const newInstances = await spawn(this);
// ... but, if we've just spanwed a fresh
// instance, use that one instead.
if (newInstances.size) {
for (const instance of newInstances) {
nextInstance = instance;
break;
}
}
request = next(nextInstance);
} catch (error) {
// Grab the reject handler from the notfier
// promise and run it if there is an error.
const resolver = this.resolvers.get(next);
this.resolvers.delete(next);
resolver.reject(fatal(this, nextInstance, error));
return;
}
const completed = onCompleted(nextInstance);
// Make sure onComplete & running.delete run
// no matter how the request resolves.
request.finally(() => {
this.running.delete(nextInstance);
completed();
});
// Grab the resolve handler from the notfier
// promise and run it if the request resolves.
request.then((ret) => {
const resolver = this.resolvers.get(next);
this.resolvers.delete(next);
resolver.resolve(ret);
});
// Grab the reject handler from the notfier
// promise and run it if the request rejects.
request.catch((error) => {
const resolver = this.resolvers.get(next);
this.resolvers.delete(next);
resolver.reject(fatal(this, nextInstance, error));
});
};
const info = this.instanceInfo.get(idleInstance);
this.running.add(idleInstance);
info.requests++;
let request;
// If we've got an instance available, run the provided callback.
try {
request = item(idleInstance);
} catch (error) {
return Promise.reject(fatal(this, idleInstance, error));
}
// Make sure onComplete runs no matter how the request resolves.
request.finally(onCompleted(idleInstance));
// Catch any errors and log to the console.
// Deactivate the instance.
request.catch((error) => fatal(this, idleInstance, error));
return request;
}
}