Skip to content

Commit

Permalink
feat(child process): Make all child processes silent (#1039)
Browse files Browse the repository at this point in the history
Make all test runner and transpiler child processes silent. The standard out and standard error (stdout and stderr) are now only visible when `loglevel: 'trace'`. If a child process crashes, the last 10 messages received are logged as warning. 

This is also a refactoring of the way we spawn child processes. Instead of having 2 similar implementations (one for transpiler and one for test runners), they are both consolidated in one coherent `ChildProcessProxy` abstraction. 

Also clean up the test runner decorator pattern. Timeouts and retries are now implemented only once. Recognizing that the child process crashed is done by validating that the error is an instance of `ChildProcessCrashedError`. No process specifics other than the name of the error is known from the outside.

The `Task` class is also refactored. Instead of relying on a custom implementation, it uses the `Promise.race` method for timeout functionality.

Fixes #1038 #976
  • Loading branch information
nicojs authored and simondel committed Aug 2, 2018
1 parent efa0242 commit 80b044a
Show file tree
Hide file tree
Showing 43 changed files with 1,103 additions and 998 deletions.
20 changes: 9 additions & 11 deletions packages/stryker/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
{
"files": {
"exclude": {
".git": "",
".tscache": "",
"**/*.js": {
"when": "$(basename).ts"
},
"**/*.d.ts": true,
"**/*.map": {
"when": "$(basename)"
}
"files.exclude": {
".git": true,
".tscache": true,
"**/*.js": {
"when": "$(basename).ts"
},
"**/*.d.ts": true,
"**/*.map": {
"when": "$(basename)"
}
}
}
4 changes: 2 additions & 2 deletions packages/stryker/src/Sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { RunResult, RunnerOptions } from 'stryker-api/test_runner';
import { File } from 'stryker-api/core';
import { TestFramework } from 'stryker-api/test_framework';
import { wrapInClosure, normalizeWhiteSpaces } from './utils/objectUtils';
import TestRunnerDecorator from './isolated-runner/TestRunnerDecorator';
import ResilientTestRunnerFactory from './isolated-runner/ResilientTestRunnerFactory';
import TestRunnerDecorator from './test-runner/TestRunnerDecorator';
import ResilientTestRunnerFactory from './test-runner/ResilientTestRunnerFactory';
import { TempFolder } from './utils/TempFolder';
import { writeFile, findNodeModules, symlinkJunction } from './utils/fileUtils';
import TestableMutant, { TestSelectionResult } from './TestableMutant';
Expand Down
15 changes: 15 additions & 0 deletions packages/stryker/src/child-proxy/ChildProcessCrashedError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import StrykerError from '../utils/StrykerError';

export default class ChildProcessCrashedError extends StrykerError {
constructor(
public readonly pid: number,
message: string,
public readonly exitCode?: number,
public readonly signal?: string,
innerError?: Error) {
super(message, innerError);
Error.captureStackTrace(this, ChildProcessCrashedError);
// TS recommendation: https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, ChildProcessCrashedError.prototype);
}
}
198 changes: 156 additions & 42 deletions packages/stryker/src/child-proxy/ChildProcessProxy.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,114 @@
import * as os from 'os';
import { fork, ChildProcess } from 'child_process';
import { File } from 'stryker-api/core';
import { getLogger } from 'stryker-api/logging';
import { WorkerMessage, WorkerMessageKind, ParentMessage, autoStart, ParentMessageKind } from './messageProtocol';
import { serialize, deserialize } from '../utils/objectUtils';
import Task from '../utils/Task';
import { serialize, deserialize, kill, isErrnoException, padLeft } from '../utils/objectUtils';
import { Task, ExpirableTask } from '../utils/Task';
import LoggingClientContext from '../logging/LoggingClientContext';
import ChildProcessCrashedError from './ChildProcessCrashedError';
import OutOfMemoryError from './OutOfMemoryError';
import StringBuilder from '../utils/StringBuilder';

export type ChildProxy<T> = {
[K in keyof T]: (...args: any[]) => Promise<any>;
interface Func<TS extends any[], R> {
(...args: TS): R;
}
interface PromisifiedFunc<TS extends any[], R> {
(...args: TS): Promise<R>;
}
interface Constructor<T, TS extends any[]> {
new (...args: TS): T;
}
export type Promisified<T> = {
[K in keyof T]: T[K] extends PromisifiedFunc<any, any> ? T[K] : T[K] extends Func<infer TS, infer R> ? PromisifiedFunc<TS, R> : () => Promise<T[K]>;
};

const BROKEN_PIPE_ERROR_CODE = 'EPIPE';
const IPC_CHANNEL_CLOSED_ERROR_CODE = 'ERR_IPC_CHANNEL_CLOSED';
const TIMEOUT_FOR_DISPOSE = 2000;

export default class ChildProcessProxy<T> {
readonly proxy: ChildProxy<T> = {} as ChildProxy<T>;
readonly proxy: Promisified<T>;

private worker: ChildProcess;
private initTask: Task;
private disposeTask: Task<void>;
private disposeTask: ExpirableTask<void> | undefined;
private currentError: ChildProcessCrashedError | undefined;
private workerTasks: Task<any>[] = [];
private log = getLogger(ChildProcessProxy.name);
private stdoutAndStderrBuilder = new StringBuilder();
private isDisposed = false;

private constructor(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], private constructorFunction: { new(...params: any[]): T }, constructorParams: any[]) {
this.worker = fork(require.resolve('./ChildProcessProxyWorker'), [autoStart], { silent: false, execArgv: [] });
private constructor(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, constructorParams: any[]) {
this.worker = fork(require.resolve('./ChildProcessProxyWorker'), [autoStart], { silent: true, execArgv: [] });
this.initTask = new Task();
this.log.debug('Starting %s in child process %s', requirePath, this.worker.pid);
this.send({
kind: WorkerMessageKind.Init,
loggingContext,
plugins,
requirePath,
constructorArgs: constructorParams
constructorArgs: constructorParams,
workingDirectory
});
this.listenToWorkerMessages();
this.initProxy();
this.listenForMessages();
this.listenToStdoutAndStderr();
// This is important! Be sure to bind to `this`
this.handleUnexpectedExit = this.handleUnexpectedExit.bind(this);
this.handleError = this.handleError.bind(this);
this.worker.on('exit', this.handleUnexpectedExit);
this.worker.on('error', this.handleError);
this.proxy = this.initProxy();
}

/**
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process
*/
static create<T, P1>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(arg: P1): T }, arg: P1): ChildProcessProxy<T>;
/**
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process
*/
static create<T, P1, P2>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(arg: P1, arg2: P2): T }, arg1: P1, arg2: P2): ChildProcessProxy<T>;
/**
* Creates a proxy where each function of the object created using the constructorFunction arg is ran inside of a child process
*/
static create<T>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], constructorFunction: { new(...params: any[]): T }, ...constructorArgs: any[]) {
return new ChildProcessProxy(requirePath, loggingContext, plugins, constructorFunction, constructorArgs);
static create<T, TS extends any[]>(requirePath: string, loggingContext: LoggingClientContext, plugins: string[], workingDirectory: string, _: Constructor<T, TS>, ...constructorArgs: TS):
ChildProcessProxy<T> {
return new ChildProcessProxy(requirePath, loggingContext, plugins, workingDirectory, constructorArgs);
}

private send(message: WorkerMessage) {
this.worker.send(serialize(message));
}

private initProxy() {
Object.keys(this.constructorFunction.prototype).forEach(methodName => {
this.proxyMethod(methodName as keyof T);
private initProxy(): Promisified<T> {
// This proxy is a genuine javascript `Proxy` class
// More info: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy
const self = this;
return new Proxy({} as Promisified<T>, {
get(_, propertyKey) {
if (typeof propertyKey === 'string') {
return self.forward(propertyKey);
} else {
return undefined;
}
}
});
}

private proxyMethod(methodName: any) {
this.proxy[(methodName as keyof T)] = (...args: any[]) => {
const workerTask = new Task<any>();
this.initTask.promise.then(() => {
private forward(methodName: string) {
return (...args: any[]) => {
if (this.currentError) {
return Promise.reject(this.currentError);
} else {
const workerTask = new Task<any>();
const correlationId = this.workerTasks.push(workerTask) - 1;
this.send({
kind: WorkerMessageKind.Work,
correlationId,
methodName,
args
this.initTask.promise.then(() => {
this.send({
kind: WorkerMessageKind.Call,
correlationId,
methodName,
args
});
});
});
return workerTask.promise;
return workerTask.promise;
}
};
}

private listenToWorkerMessages() {
private listenForMessages() {
this.worker.on('message', (serializedMessage: string) => {
const message: ParentMessage = deserialize(serializedMessage, [File]);
switch (message.kind) {
Expand All @@ -83,12 +117,16 @@ export default class ChildProcessProxy<T> {
break;
case ParentMessageKind.Result:
this.workerTasks[message.correlationId].resolve(message.result);
delete this.workerTasks[message.correlationId];
break;
case ParentMessageKind.Rejection:
this.workerTasks[message.correlationId].reject(new Error(message.error));
delete this.workerTasks[message.correlationId];
break;
case ParentMessageKind.DisposeCompleted:
this.disposeTask.resolve(undefined);
if (this.disposeTask) {
this.disposeTask.resolve(undefined);
}
break;
default:
this.logUnidentifiedMessage(message);
Expand All @@ -97,12 +135,88 @@ export default class ChildProcessProxy<T> {
});
}

private listenToStdoutAndStderr() {
const handleData = (data: Buffer | string) => {
const output = data.toString();
this.stdoutAndStderrBuilder.append(output);
if (this.log.isTraceEnabled()) {
this.log.trace(output);
}
};

if (this.worker.stdout) {
this.worker.stdout.on('data', handleData);
}

if (this.worker.stderr) {
this.worker.stderr.on('data', handleData);
}
}

private reportError(error: Error) {
this.workerTasks
.filter(task => !task.isCompleted)
.forEach(task => task.reject(error));
}

private handleUnexpectedExit(code: number, signal: string) {
this.isDisposed = true;
const output = this.stdoutAndStderrBuilder.toString();

if (processOutOfMemory()) {
this.currentError = new OutOfMemoryError(this.worker.pid, code);
this.log.warn(`Child process [pid ${this.currentError.pid}] ran out of memory. Stdout and stderr are logged on debug level.`);
this.log.debug(stdoutAndStderr());
} else {
this.currentError = new ChildProcessCrashedError(this.worker.pid, `Child process [pid ${this.worker.pid}] exited unexpectedly with exit code ${code} (${signal || 'without signal'}). ${stdoutAndStderr()}`, code, signal);
this.log.warn(this.currentError.message, this.currentError);
}

this.reportError(this.currentError);

function processOutOfMemory() {
return output.indexOf('JavaScript heap out of memory') >= 0;
}

function stdoutAndStderr() {
if (output.length) {
return `Last part of stdout and stderr was:${os.EOL}${padLeft(output)}`;
} else {
return 'Stdout and stderr were empty.';
}
}
}

private handleError(error: Error) {
if (this.innerProcessIsCrashed(error)) {
this.log.warn(`Child process [pid ${this.worker.pid}] has crashed. See other warning messages for more info.`, error);
this.reportError(new ChildProcessCrashedError(this.worker.pid, `Child process [pid ${this.worker.pid}] has crashed`, undefined, undefined, error));
} else {
this.reportError(error);
}
}

private innerProcessIsCrashed(error: any) {
return isErrnoException(error) && (error.code === BROKEN_PIPE_ERROR_CODE || error.code === IPC_CHANNEL_CLOSED_ERROR_CODE);
}

public dispose(): Promise<void> {
this.disposeTask = new Task();
this.send({ kind: WorkerMessageKind.Dispose });
return this.disposeTask.promise
.then(() => this.worker.kill())
.catch(() => this.worker.kill());
this.worker.removeListener('exit', this.handleUnexpectedExit);
if (this.isDisposed) {
return Promise.resolve();
} else {
this.log.debug('Disposing of worker process %s', this.worker.pid);
const killWorker = () => {
this.log.debug('Kill %s', this.worker.pid);
kill(this.worker.pid);
this.isDisposed = true;
};
this.disposeTask = new ExpirableTask(TIMEOUT_FOR_DISPOSE);
this.send({ kind: WorkerMessageKind.Dispose });
return this.disposeTask.promise
.then(killWorker)
.catch(killWorker);
}
}

private logUnidentifiedMessage(message: never) {
Expand Down
Loading

0 comments on commit 80b044a

Please sign in to comment.