-
Notifications
You must be signed in to change notification settings - Fork 250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(child process): Make all child processes silent #1039
Changes from 54 commits
01a9b36
187855e
a6feb8c
8e50b4f
09eb2f2
c50d69d
98c2bfb
c002d5c
7d2f3df
6590624
691e3a7
042117e
188bada
d5d483e
4fdc250
1729794
c14a520
0a30c08
5a92cc3
be5477f
3339086
10bed57
a820577
ee1efda
33aa4d9
c2882b9
f59d0c2
9d6a4f6
82db388
145e583
d10b523
cbe94cc
c37a500
055cf87
e1a5191
6054561
556b80f
f6b9d91
142e879
f776825
9e91a25
85c9d3a
9d62efe
b35cfe6
075f84f
f060649
06e9998
dd2ee86
cb63eab
4df4ae5
61af88a
787e249
86e7fe2
3caa27f
81cc92f
60360e5
16d29a9
4d3150c
44823ad
36e8984
b99d21a
364d2af
71154e8
5b0e941
42d95a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import StrykerError from '../utils/StrykerError'; | ||
|
||
export default class ChildProcessCrashedError extends StrykerError { | ||
constructor( | ||
public readonly pid: number, | ||
public readonly exitCode?: number, | ||
message = `Child process exited unexpectedly (code ${exitCode === null ? 'unknown' : exitCode})`, | ||
innerError?: Error) { | ||
super(message, innerError); | ||
Error.captureStackTrace(this, ChildProcessCrashedError); | ||
// TS recommendation: https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work | ||
Object.setPrototypeOf(this, ChildProcessCrashedError.prototype); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,80 +1,114 @@ | ||
import * as os from 'os'; | ||
import { fork, ChildProcess } from 'child_process'; | ||
import { File } from 'stryker-api/core'; | ||
import { getLogger } from 'stryker-api/logging'; | ||
import { WorkerMessage, WorkerMessageKind, ParentMessage, autoStart, ParentMessageKind } from './messageProtocol'; | ||
import { serialize, deserialize } from '../utils/objectUtils'; | ||
import Task from '../utils/Task'; | ||
import { serialize, deserialize, kill, isErrnoException, padLeft } from '../utils/objectUtils'; | ||
import { Task, ExpirableTask } from '../utils/Task'; | ||
import LoggingClientContext from '../logging/LoggingClientContext'; | ||
import ChildProcessCrashedError from './ChildProcessCrashedError'; | ||
import OutOfMemoryError from './OutOfMemoryError'; | ||
|
||
export type ChildProxy<T> = { | ||
[K in keyof T]: (...args: any[]) => Promise<any>; | ||
type MethodPromised = { (...args: any[]): Promise<any> }; | ||
|
||
export type Promisified<T> = { | ||
[K in keyof T]: T[K] extends MethodPromised ? T[K] : T[K] extends Function ? MethodPromised : () => Promise<T[K]>; | ||
}; | ||
|
||
const BROKEN_PIPE_ERROR_CODE = 'EPIPE'; | ||
const IPC_CHANNEL_CLOSED_ERROR_CODE = 'ERR_IPC_CHANNEL_CLOSED'; | ||
const TIMEOUT_FOR_DISPOSE = 2000; | ||
const MESSAGE_QUEUE_SIZE = 20; | ||
|
||
export default class ChildProcessProxy<T> { | ||
readonly proxy: ChildProxy<T> = {} as ChildProxy<T>; | ||
readonly proxy: Promisified<T>; | ||
|
||
private worker: ChildProcess; | ||
private initTask: Task; | ||
private disposeTask: Task<void>; | ||
private disposeTask: ExpirableTask<void> | undefined; | ||
private currentError: ChildProcessCrashedError | undefined; | ||
private workerTasks: Task<any>[] = []; | ||
private log = getLogger(ChildProcessProxy.name); | ||
private recentMessagesQueue: string[] = []; | ||
private isDisposed = false; | ||
|
||
private constructor(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], private constructorFunction: { new(...params: any[]): T }, constructorParams: any[]) { | ||
this.worker = fork(require.resolve('./ChildProcessProxyWorker'), [autoStart], { silent: false, execArgv: [] }); | ||
private constructor(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, constructorParams: any[]) { | ||
this.worker = fork(require.resolve('./ChildProcessProxyWorker'), [autoStart], { silent: true, execArgv: [] }); | ||
this.initTask = new Task(); | ||
this.log.debug('Starting %s in a child process', requirePath); | ||
this.send({ | ||
kind: WorkerMessageKind.Init, | ||
loggingContext, | ||
plugins, | ||
requirePath, | ||
constructorArgs: constructorParams | ||
constructorArgs: constructorParams, | ||
workingDirectory | ||
}); | ||
this.listenToWorkerMessages(); | ||
this.initProxy(); | ||
this.listenForMessages(); | ||
this.listenToStdoutAndStderr(); | ||
// This is important! Be sure to bind to `this` | ||
this.handleUnexpectedExit = this.handleUnexpectedExit.bind(this); | ||
this.handleError = this.handleError.bind(this); | ||
this.worker.on('exit', this.handleUnexpectedExit); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we save the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean? The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean NodeJS's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's a node thing. It works based on events. Not too great if you ask me, but that's how it works. |
||
this.worker.on('error', this.handleError); | ||
this.proxy = this.initProxy(); | ||
} | ||
|
||
/** | ||
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process | ||
*/ | ||
static create<T, P1>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(arg: P1): T }, arg: P1): ChildProcessProxy<T>; | ||
static create<T, P1>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, constructorFunction: { new(arg: P1): T }, arg: P1): ChildProcessProxy<T>; | ||
/** | ||
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process | ||
*/ | ||
static create<T, P1, P2>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(arg: P1, arg2: P2): T }, arg1: P1, arg2: P2): ChildProcessProxy<T>; | ||
static create<T, P1, P2>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, constructorFunction: { new(arg: P1, arg2: P2): T }, arg1: P1, arg2: P2): ChildProcessProxy<T>; | ||
/** | ||
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process | ||
*/ | ||
static create<T>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(...params: any[]): T }, ...constructorArgs: any[]) { | ||
return new ChildProcessProxy(requirePath, loggingContext, plugins, constructorFunction, constructorArgs); | ||
static create<T>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, _: { new(...params: any[]): T }, ...constructorArgs: any[]) { | ||
return new ChildProcessProxy(requirePath, loggingContext, plugins, workingDirectory, constructorArgs); | ||
} | ||
|
||
private send(message: WorkerMessage) { | ||
this.worker.send(serialize(message)); | ||
} | ||
|
||
private initProxy() { | ||
Object.keys(this.constructorFunction.prototype).forEach(methodName => { | ||
this.proxyMethod(methodName as keyof T); | ||
private initProxy(): Promisified<T> { | ||
// This proxy is a genuine javascript `Proxy` class | ||
// More info: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy | ||
const self = this; | ||
return new Proxy({} as Promisified<T>, { | ||
get(_, propertyKey) { | ||
if (typeof propertyKey === 'string') { | ||
return self.forward(propertyKey); | ||
} else { | ||
return undefined; | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private proxyMethod(methodName: any) { | ||
this.proxy[(methodName as keyof T)] = (...args: any[]) => { | ||
const workerTask = new Task<any>(); | ||
this.initTask.promise.then(() => { | ||
private forward(methodName: string) { | ||
return (...args: any[]) => { | ||
if (this.currentError) { | ||
return Promise.reject(this.currentError); | ||
} else { | ||
const workerTask = new Task<any>(); | ||
const correlationId = this.workerTasks.push(workerTask) - 1; | ||
this.send({ | ||
kind: WorkerMessageKind.Work, | ||
correlationId, | ||
methodName, | ||
args | ||
this.initTask.promise.then(() => { | ||
this.send({ | ||
kind: WorkerMessageKind.Call, | ||
correlationId, | ||
methodName, | ||
args | ||
}); | ||
}); | ||
}); | ||
return workerTask.promise; | ||
return workerTask.promise; | ||
} | ||
}; | ||
} | ||
|
||
private listenToWorkerMessages() { | ||
private listenForMessages() { | ||
this.worker.on('message', (serializedMessage: string) => { | ||
const message: ParentMessage = deserialize(serializedMessage, [File]); | ||
switch (message.kind) { | ||
|
@@ -83,12 +117,16 @@ export default class ChildProcessProxy<T> { | |
break; | ||
case ParentMessageKind.Result: | ||
this.workerTasks[message.correlationId].resolve(message.result); | ||
delete this.workerTasks[message.correlationId]; | ||
break; | ||
case ParentMessageKind.Rejection: | ||
this.workerTasks[message.correlationId].reject(new Error(message.error)); | ||
delete this.workerTasks[message.correlationId]; | ||
break; | ||
case ParentMessageKind.DisposeCompleted: | ||
this.disposeTask.resolve(undefined); | ||
if (this.disposeTask) { | ||
this.disposeTask.resolve(undefined); | ||
} | ||
break; | ||
default: | ||
this.logUnidentifiedMessage(message); | ||
|
@@ -97,12 +135,93 @@ export default class ChildProcessProxy<T> { | |
}); | ||
} | ||
|
||
private listenToStdoutAndStderr() { | ||
const traceEnabled = this.log.isTraceEnabled(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this value ever change during execution? If not, I would suggest just reading the value on the line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Will change that |
||
const handleData = (data: Buffer) => { | ||
const message = data.toString(); | ||
this.recentMessagesQueue.push(message); | ||
if (this.recentMessagesQueue.length > MESSAGE_QUEUE_SIZE) { | ||
this.recentMessagesQueue.shift(); | ||
} | ||
|
||
if (traceEnabled) { | ||
this.log.trace(message); | ||
} | ||
}; | ||
|
||
if (this.worker.stdout) { | ||
this.worker.stdout.on('data', handleData); | ||
} | ||
|
||
if (this.worker.stderr) { | ||
this.worker.stderr.on('data', handleData); | ||
} | ||
} | ||
|
||
private reportError(error: Error) { | ||
this.workerTasks | ||
.filter(task => !task.isResolved) | ||
.forEach(task => task.reject(error)); | ||
} | ||
|
||
private handleUnexpectedExit(code: number, signal: string) { | ||
this.isDisposed = true; | ||
const output = this.recentMessagesQueue.join(os.EOL); | ||
|
||
if (processOutOfMemory()) { | ||
this.currentError = new OutOfMemoryError(this.worker.pid, code); | ||
this.log.warn(`Child process [pid ${this.currentError.pid}] ran out of memory. Stdout and stderr are logged on debug level.`); | ||
this.log.debug(stdoutAndStderr()); | ||
} else { | ||
this.currentError = new ChildProcessCrashedError(this.worker.pid, code); | ||
this.log.warn(`Child process [pid ${this.worker.pid}] exited unexpectedly with exit code ${code} (${signal || 'without signal'}). ${stdoutAndStderr()}`, this.currentError); | ||
} | ||
|
||
this.reportError(this.currentError); | ||
|
||
function processOutOfMemory() { | ||
return output.indexOf('JavaScript heap out of memory') >= 0; | ||
} | ||
|
||
function stdoutAndStderr() { | ||
if (output.length) { | ||
return `Last part of stdout and stderr was:${os.EOL}${padLeft(output)}`; | ||
} else { | ||
return 'Stdout and stderr were empty.'; | ||
} | ||
} | ||
} | ||
|
||
private handleError(error: Error) { | ||
if (this.innerProcessIsCrashed(error)) { | ||
this.log.warn(`Child process [pid ${this.worker.pid}] has crashed. See other warning messages for more info.`, error); | ||
this.reportError(new ChildProcessCrashedError(this.worker.pid, undefined, undefined, error)); | ||
} else { | ||
this.reportError(error); | ||
} | ||
} | ||
|
||
private innerProcessIsCrashed(error: any) { | ||
return isErrnoException(error) && (error.code === BROKEN_PIPE_ERROR_CODE || error.code === IPC_CHANNEL_CLOSED_ERROR_CODE); | ||
} | ||
|
||
public dispose(): Promise<void> { | ||
this.disposeTask = new Task(); | ||
this.send({ kind: WorkerMessageKind.Dispose }); | ||
return this.disposeTask.promise | ||
.then(() => this.worker.kill()) | ||
.catch(() => this.worker.kill()); | ||
this.worker.removeListener('exit', this.handleUnexpectedExit); | ||
if (this.isDisposed) { | ||
return Promise.resolve(); | ||
} else { | ||
this.log.debug('Disposing of worker process %s', this.worker.pid); | ||
const killWorker = () => { | ||
this.log.debug('Kill %s', this.worker.pid); | ||
kill(this.worker.pid); | ||
this.isDisposed = true; | ||
}; | ||
this.disposeTask = new ExpirableTask(TIMEOUT_FOR_DISPOSE); | ||
this.send({ kind: WorkerMessageKind.Dispose }); | ||
return this.disposeTask.promise | ||
.then(killWorker) | ||
.catch(killWorker); | ||
} | ||
} | ||
|
||
private logUnidentifiedMessage(message: never) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case would you cant to use
T[K] extends MethodPromised ? T[K]
? Can't you just always call back onT[K] extends Function ? MethodPromised
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you are getting at. There are 3 scenarios supported here. In case the method already returns a promise, it is left alone. In case the method returns something else, it is changed to returning a promise. In case it's not a method, it is mapped to a method returning a promise of that type. This is not perfect (still missing the type arguments in some cases), but is the best I could came up with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you mean. You mean change it to this:
The problem is that you don't really want to use
MethodPromised
if you don't have to. It doesn't help you with method types, converting it to(...args: any[]): Promise<any>
. It would be awesome if we could keep the method signature, but I don't know how. I've created a stack overflow question for this:https://stackoverflow.com/questions/51587576/using-generics-with-mapped-conditional-types-in-typescript
I would say: let's keep it like this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow! We have an answer on stack overflow! Fast. It seems that it is possible using
infer
. The problem now is that we do not have support for rest type parameters yet. So needs to be implemented using the death by a thousand overloads syntax. Let's keep it like it is for now and improve once TS 3 is out.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As TS 3 is out since a couple of hours, I've implemented this in the latest commit. Awesome!