Skip to content

Commit

Permalink
[7.8] [kbn/optimizer] poll parent process to avoid zombie processes (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Spencer authored May 20, 2020
1 parent e439eb4 commit 46f916d
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 28 deletions.
1 change: 1 addition & 0 deletions packages/kbn-optimizer/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export * from './bundle';
export * from './bundle_cache';
export * from './worker_config';
export * from './worker_messages';
export * from './parent_messages';
export * from './compiler_messages';
export * from './ts_helpers';
export * from './rxjs_helpers';
Expand Down
33 changes: 33 additions & 0 deletions packages/kbn-optimizer/src/common/parent_messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

export interface ParentPongMsg {
type: 'pong';
}

export const isParentPong = (value: any): value is ParentPongMsg =>
typeof value === 'object' && value && value.type === 'pong';

export class ParentMsgs {
pong(): ParentPongMsg {
return {
type: 'pong',
};
}
}
19 changes: 18 additions & 1 deletion packages/kbn-optimizer/src/common/worker_messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ import {
CompilerErrorMsg,
} from './compiler_messages';

export type WorkerMsg =
export type InternalWorkerMsg =
| WorkerPingMsg
| CompilerRunningMsg
| CompilerIssueMsg
| CompilerSuccessMsg
| CompilerErrorMsg
| WorkerErrorMsg;

// ping messages are internal, they don't apper in public message streams
export type WorkerMsg = Exclude<InternalWorkerMsg, WorkerPingMsg>;

/**
* Message sent when the worker encounters an error that it can't
* recover from, no more messages will be sent and the worker
Expand All @@ -42,6 +46,10 @@ export interface WorkerErrorMsg {
errorStack?: string;
}

export interface WorkerPingMsg {
type: 'ping';
}

const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
'running',
'compiler issue',
Expand All @@ -50,10 +58,19 @@ const WORKER_STATE_TYPES: ReadonlyArray<WorkerMsg['type']> = [
'worker error',
];

export const isWorkerPing = (value: any): value is WorkerPingMsg =>
typeof value === 'object' && value && value.type === 'ping';

export const isWorkerMsg = (value: any): value is WorkerMsg =>
typeof value === 'object' && value && WORKER_STATE_TYPES.includes(value.type);

export class WorkerMsgs {
ping(): WorkerPingMsg {
return {
type: 'ping',
};
}

error(error: Error): WorkerErrorMsg {
return {
type: 'worker error',
Expand Down
16 changes: 14 additions & 2 deletions packages/kbn-optimizer/src/optimizer/observe_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import { Readable } from 'stream';
import { inspect } from 'util';

import * as Rx from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';
import { map, filter, takeUntil } from 'rxjs/operators';

import { isWorkerMsg, WorkerConfig, WorkerMsg, Bundle } from '../common';
import { isWorkerMsg, isWorkerPing, WorkerConfig, WorkerMsg, Bundle, ParentMsgs } from '../common';

import { OptimizerConfig } from './optimizer_config';

const parentMsgs = new ParentMsgs();

export interface WorkerStdio {
type: 'worker stdio';
stream: 'stdout' | 'stderr';
Expand Down Expand Up @@ -146,6 +148,16 @@ export function observeWorker(
observeStdio$(proc.stderr, 'stderr'),
Rx.fromEvent<[unknown]>(proc, 'message')
.pipe(
// filter out ping messages so they don't end up in the general message stream
filter(([msg]) => {
if (!isWorkerPing(msg)) {
return true;
}

proc.send(parentMsgs.pong());
return false;
}),

// validate the messages from the process
map(([msg]) => {
if (!isWorkerMsg(msg)) {
Expand Down
178 changes: 178 additions & 0 deletions packages/kbn-optimizer/src/worker/observe_parent_offline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { EventEmitter } from 'events';
import { inspect } from 'util';

import * as Rx from 'rxjs';
import { tap, takeUntil } from 'rxjs/operators';

import { observeParentOffline, Process } from './observe_parent_offline';
import { WorkerMsgs, ParentMsgs, isWorkerPing } from '../common';

jest.useFakeTimers();

beforeEach(() => {
jest.clearAllTimers();
});

const workerMsgs = new WorkerMsgs();
const parentMsgs = new ParentMsgs();
class MockProcess extends EventEmitter implements Process {
connected?: boolean;
send?: jest.Mock;

constructor(options: { connected?: boolean; send?: jest.Mock | false } = {}) {
super();

this.connected = options.connected ?? true;
this.send = options.send === false ? undefined : options.send ?? jest.fn();
}
}

async function record(observable: Rx.Observable<any>): Promise<string[]> {
const notes: string[] = [];

await observable
.pipe(
tap({
next(value) {
notes.push(`next: ${inspect(value)}`);
},
error(error) {
notes.push(`error: ${inspect(error)}`);
},
complete() {
notes.push(`complete`);
},
})
)
.toPromise();

return notes;
}

async function waitForTick() {
await new Promise(resolve => {
process.nextTick(resolve);
});
}

describe('emits and completes when parent exists because:', () => {
test('"disconnect" event', async () => {
const mockProc = new MockProcess();
const promise = record(observeParentOffline(mockProc, workerMsgs));
mockProc.emit('disconnect');
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnect event)'",
"complete",
]
`);
});

test('process.connected is false', async () => {
const mockProc = new MockProcess({
connected: false,
});

const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnected)'",
"complete",
]
`);
});

test('process.send is falsey', async () => {
const mockProc = new MockProcess({
send: false,
});

const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (disconnected)'",
"complete",
]
`);
});

test('process.send throws "ERR_IPC_CHANNEL_CLOSED"', async () => {
const mockProc = new MockProcess({
send: jest.fn(() => {
const error = new Error();
(error as any).code = 'ERR_IPC_CHANNEL_CLOSED';
throw error;
}),
});

const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersToNextTimer();
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (ipc channel exception)'",
"complete",
]
`);
});

test('ping timeout', async () => {
const mockProc = new MockProcess({});

const promise = record(observeParentOffline(mockProc, workerMsgs));
jest.advanceTimersByTime(10000);
expect(await promise).toMatchInlineSnapshot(`
Array [
"next: 'parent offline (ping timeout)'",
"complete",
]
`);
});
});

test('it emits nothing if parent responds with pongs', async () => {
const send = jest.fn((msg: any) => {
if (isWorkerPing(msg)) {
process.nextTick(() => {
mockProc.emit('message', parentMsgs.pong(), undefined);
});
}
});

const mockProc = new MockProcess({ send });
const unsub$ = new Rx.Subject();
const promise = record(observeParentOffline(mockProc, workerMsgs).pipe(takeUntil(unsub$)));

jest.advanceTimersByTime(5000);
await waitForTick();
jest.advanceTimersByTime(5000);
await waitForTick();
unsub$.next();

expect(await promise).toMatchInlineSnapshot(`
Array [
"complete",
]
`);
expect(send).toHaveBeenCalledTimes(2);
});
Loading

0 comments on commit 46f916d

Please sign in to comment.