Skip to content

Commit

Permalink
fix: errors occuring during application boot fail silently (#316)
Browse files Browse the repository at this point in the history
* fix: errors occuring during application boot fail silently

* fix: Cluster#reload does not work Cluster#workers is empty

* fix: prevent Cluster#fork from never resolving

* style: move .from() to a second line

* refactor: simplify type decls in Cluster
  • Loading branch information
zacharygolba authored Aug 17, 2016
1 parent b8c5815 commit bace2de
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 32 deletions.
7 changes: 6 additions & 1 deletion src/packages/compiler/utils/create-boot-script.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ export default async function createBootScript(dir: string, {
path: CWD,
port: PORT
})
);
).catch(err => {
process.send({
error: err ? err.stack : void 0,
message: 'error'
});
});
`;

if (useStrict) {
Expand Down
98 changes: 67 additions & 31 deletions src/packages/pm/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { red, green } from 'chalk';

import { NODE_ENV } from '../../../constants';

import { line } from '../../logger';

import omit from '../../../utils/omit';
import range from '../../../utils/range';

import type { Worker } from 'cluster';
Expand Down Expand Up @@ -83,7 +86,7 @@ class Cluster extends EventEmitter {
this.forkAll().then(() => this.emit('ready'));
}

fork(retry: boolean = true): Promise<Worker> {
fork(retry: boolean = true) {
return new Promise(resolve => {
if (this.workers.size < this.maxWorkers) {
const worker = cluster.fork({
Expand All @@ -94,9 +97,6 @@ class Cluster extends EventEmitter {
const timeout = setTimeout(() => {
handleError();

worker.removeListener('exit', handleExit);
worker.kill();

if (retry) {
this.fork(false);
}
Expand All @@ -108,36 +108,67 @@ class Cluster extends EventEmitter {
worker.removeListener('message', handleMessage);

if (typeof code === 'number') {
this.logger.info(
`Worker process: ${red(`${pid}`)} exited with code ${code}`
);
this.logger.info(line`
Worker process: ${red(`${pid}`)} exited with code ${code}
`);
}

this.logger.info(`Removing worker process: ${red(`${pid}`)}`);

this.workers.delete(worker);
cleanUp(true);

this.fork();
};

const handleError = () => {
this.logger.info(
`Removing worker process: ${red(`${worker.process.pid}`)}`
);
const handleError = (err?: string) => {
if (err) {
this.logger.error(err);
}

this.workers.delete(worker);
clearTimeout(timeout);
this.logger.info(line`
Removing worker process: ${red(`${worker.process.pid}`)}
`);

cleanUp(true);
resolve(worker);
};

const handleMessage = (message: string) => {
if (message === 'ready') {
this.logger.info(
`Adding worker process: ${green(`${worker.process.pid}`)}`
);
const handleMessage = (message: string | Object) => {
let data = {};

if (typeof message === 'object') {
data = omit(message, 'message');
message = message.message;
}

switch (message) {
case 'ready':
this.logger.info(line`
Adding worker process: ${green(`${worker.process.pid}`)}
`);

this.workers.add(worker);
this.workers.add(worker);

clearTimeout(timeout);
resolve(worker);
cleanUp(false);
resolve(worker);
break;

case 'error':
handleError(data.error);
break;
}
};

const cleanUp = (remove: boolean) => {
clearTimeout(timeout);

if (remove) {
worker.kill();
worker.removeAllListeners();

this.workers.delete(worker);
} else {
worker.removeListener('error', handleError);
}
};

Expand All @@ -148,7 +179,7 @@ class Cluster extends EventEmitter {
});
}

shutdown(worker: Worker): Promise<Worker> {
shutdown<T: Worker>(worker: T): Promise<T> {
return new Promise(resolve => {
this.workers.delete(worker);

Expand All @@ -164,18 +195,23 @@ class Cluster extends EventEmitter {
});
}

async reload(): Promise<void> {
const workers: Array<[Worker, Worker]> = Array.from(this.workers)
.reduce((arr, item, idx, src) => {
return (idx + 1) % 2 ? [...arr, src.slice(idx, idx + 2)] : arr;
}, []);
async reload() {
if (this.workers.size) {
const workers = Array
.from(this.workers)
.reduce((arr, item, idx, src) => {
return (idx + 1) % 2 ? [...arr, src.slice(idx, idx + 2)] : arr;
}, []);

for (const group of workers) {
await Promise.all(group.map(worker => this.shutdown(worker)));
for (const group of workers) {
await Promise.all(group.map(worker => this.shutdown(worker)));
}
} else {
await this.fork();
}
}

forkAll(): Promise<Worker> {
forkAll() {
return Promise.race(Array.from(range(1, this.maxWorkers)).map(() => {
return this.fork();
}));
Expand Down

0 comments on commit bace2de

Please sign in to comment.