Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gracefully terminate plugin host process without rpc connection #7192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export function createProxyIdentifier<T>(identifier: string): ProxyIdentifier<T>
return new ProxyIdentifier(false, identifier);
}

export interface ConnectionClosedError extends Error {
code: 'RPC_PROTOCOL_CLOSED'
}
export namespace ConnectionClosedError {
const code: ConnectionClosedError['code'] = 'RPC_PROTOCOL_CLOSED';
export function create(message: string = 'connection is closed'): ConnectionClosedError {
return Object.assign(new Error(message), { code });
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function is(error: any): error is ConnectionClosedError {
return !!error && typeof error === 'object' && 'code' in error && error['code'] === code;
}
}

export class RPCProtocolImpl implements RPCProtocol {

private readonly locals = new Map<string, any>();
Expand All @@ -82,7 +96,7 @@ export class RPCProtocolImpl implements RPCProtocol {
this.toDispose.push(Disposable.create(() => {
this.proxies.clear();
for (const reply of this.pendingRPCReplies.values()) {
reply.reject(new Error('connection is closed'));
reply.reject(ConnectionClosedError.create());
}
this.pendingRPCReplies.clear();
}));
Expand All @@ -98,7 +112,7 @@ export class RPCProtocolImpl implements RPCProtocol {

getProxy<T>(proxyId: ProxyIdentifier<T>): T {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
let proxy = this.proxies.get(proxyId.id);
if (!proxy) {
Expand All @@ -110,7 +124,7 @@ export class RPCProtocolImpl implements RPCProtocol {

set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
this.locals.set(identifier.id, instance);
if (Disposable.is(instance)) {
Expand All @@ -135,7 +149,7 @@ export class RPCProtocolImpl implements RPCProtocol {

private remoteCall(proxyId: string, methodName: string, args: any[]): Promise<any> {
if (this.isDisposed) {
return Promise.reject(new Error('connection is closed'));
return Promise.reject(ConnectionClosedError.create());
}
const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
if (cancellationToken && cancellationToken.isCancellationRequested) {
Expand Down Expand Up @@ -320,7 +334,7 @@ class RPCMultiplexer implements Disposable {

public send(msg: string): void {
if (this.toDispose.disposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
if (this.messagesToSend.length === 0) {
if (typeof setImmediate !== 'undefined') {
Expand Down Expand Up @@ -460,11 +474,13 @@ function isSerializedObject(obj: any): obj is SerializedObject {
return obj && obj.$type !== undefined && obj.data !== undefined;
}

const enum MessageType {
export const enum MessageType {
Request = 1,
Reply = 2,
ReplyErr = 3,
Cancel = 4
Cancel = 4,
Terminate = 5,
Terminated = 6
}

class CancelMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,58 @@ import { injectable } from 'inversify';
import { Argv, Arguments } from 'yargs';
import { CliContribution } from '@theia/core/lib/node';

let pluginHostTerminateTimeout = 10 * 1000;
if (process.env.PLUGIN_HOST_TERMINATE_TIMEOUT) {
pluginHostTerminateTimeout = Number.parseInt(process.env.PLUGIN_HOST_TERMINATE_TIMEOUT);
}

let pluginHostStopTimeout = 4 * 1000;
if (process.env.PLUGIN_HOST_STOP_TIMEOUT) {
pluginHostStopTimeout = Number.parseInt(process.env.PLUGIN_HOST_STOP_TIMEOUT);
}

@injectable()
export class HostedPluginCliContribution implements CliContribution {

static EXTENSION_TESTS_PATH = 'extensionTestsPath';
static PLUGIN_HOST_TERMINATE_TIMEOUT = 'pluginHostTerminateTimeout';
static PLUGIN_HOST_STOP_TIMEOUT = 'pluginHostStopTimeout';

protected _extensionTestsPath: string | undefined;
get extensionTestsPath(): string | undefined {
return this._extensionTestsPath;
}

protected _pluginHostTerminateTimeout = pluginHostTerminateTimeout;
get pluginHostTerminateTimeout(): number {
return this._pluginHostTerminateTimeout;
}

protected _pluginHostStopTimeout = pluginHostStopTimeout;
get pluginHostStopTimeout(): number {
return this._pluginHostStopTimeout;
}

configure(conf: Argv): void {
conf.option(HostedPluginCliContribution.EXTENSION_TESTS_PATH, {
type: 'string'
});
conf.option(HostedPluginCliContribution.PLUGIN_HOST_TERMINATE_TIMEOUT, {
type: 'number',
default: pluginHostTerminateTimeout,
description: 'Timeout in milliseconds to wait for the plugin host process to terminate before killing it. Use 0 for no timeout.'
});
conf.option(HostedPluginCliContribution.PLUGIN_HOST_STOP_TIMEOUT, {
type: 'number',
default: pluginHostStopTimeout,
description: 'Timeout in milliseconds to wait for the plugin host process to stop internal services. Use 0 for no timeout.'
});
}

setArguments(args: Arguments): void {
this._extensionTestsPath = args[HostedPluginCliContribution.EXTENSION_TESTS_PATH];
this._pluginHostTerminateTimeout = args[HostedPluginCliContribution.PLUGIN_HOST_TERMINATE_TIMEOUT];
this._pluginHostStopTimeout = args[HostedPluginCliContribution.PLUGIN_HOST_STOP_TIMEOUT];
}

}
60 changes: 37 additions & 23 deletions packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import * as path from 'path';
import * as cp from 'child_process';
import { injectable, inject, named } from 'inversify';
import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common';
import { Emitter } from '@theia/core/lib/common/event';
import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol';
import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin } from '../../common/plugin-protocol';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { MAIN_RPC_CONTEXT } from '../../common/plugin-api-rpc';
import { MessageType } from '../../common/rpc-protocol';
import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution';
import * as psTree from 'ps-tree';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface IPCConnectionOptions {
readonly serverName: string;
Expand Down Expand Up @@ -83,7 +82,7 @@ export class HostedPluginProcess implements ServerPluginRunner {
}
}

public terminatePluginServer(): void {
async terminatePluginServer(): Promise<void> {
if (this.childProcess === undefined) {
return;
}
Expand All @@ -93,34 +92,49 @@ export class HostedPluginProcess implements ServerPluginRunner {
const cp = this.childProcess;
this.childProcess = undefined;

const emitter = new Emitter();
const waitForTerminated = new Deferred<void>();
cp.on('message', message => {
emitter.fire(JSON.parse(message));
});
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (cp.send) {
cp.send(JSON.stringify(m));
}
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminated) {
waitForTerminated.resolve();
}
});
const hostedPluginManager = rpc.getProxy(MAIN_RPC_CONTEXT.HOSTED_PLUGIN_MANAGER_EXT);
hostedPluginManager.$stop().then(() => {
emitter.dispose();
this.killProcessTree(cp.pid);
});
const stopTimeout = this.cli.pluginHostStopTimeout;
cp.send(JSON.stringify({ type: MessageType.Terminate, stopTimeout }));

const terminateTimeout = this.cli.pluginHostTerminateTimeout;
if (terminateTimeout) {
await Promise.race([
waitForTerminated.promise,
new Promise(resolve => setTimeout(resolve, terminateTimeout))
]);
} else {
await waitForTerminated.promise;
}

this.killProcessTree(cp.pid);
}

private killProcessTree(parentPid: number): void {
psTree(parentPid, (err: Error, childProcesses: Array<psTree.PS>) => {
childProcesses.forEach((p: psTree.PS) => {
process.kill(parseInt(p.PID));
});
process.kill(parentPid);
psTree(parentPid, (_, childProcesses) => {
childProcesses.forEach(childProcess =>
this.killProcess(parseInt(childProcess.PID))
);
this.killProcess(parentPid);
});
}

protected killProcess(pid: number): void {
try {
process.kill(pid);
} catch (e) {
if (e && 'code' in e && e.code === 'ESRCH') {
return;
}
this.logger.error(`[${pid}] failed to kill`, e);
}
}

public runPluginServer(): void {
if (this.childProcess) {
this.terminatePluginServer();
Expand Down
4 changes: 4 additions & 0 deletions packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export class PluginHostRPC {
);
}

async terminate(): Promise<void> {
akosyakov marked this conversation as resolved.
Show resolved Hide resolved
await this.pluginManager.terminate();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
initContext(contextPath: string, plugin: Plugin): any {
const { name, version } = plugin.rawModel;
Expand Down
34 changes: 30 additions & 4 deletions packages/plugin-ext/src/hosted/node/plugin-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
********************************************************************************/

import { Emitter } from '@theia/core/lib/common/event';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol';
import { PluginHostRPC } from './plugin-host-rpc';
console.log('PLUGIN_HOST(' + process.pid + ') starting instance');

Expand Down Expand Up @@ -50,6 +50,10 @@ process.on('unhandledRejection', (reason: any, promise: Promise<any>) => {
if (index >= 0) {
promise.catch(err => {
unhandledPromises.splice(index, 1);
if (terminating && (ConnectionClosedError.is(err) || ConnectionClosedError.is(reason))) {
// during termination it is expected that pending rpc requerst are rejected
return;
}
console.error(`Promise rejection not handled in one second: ${err} , reason: ${reason}`);
if (err && err.stack) {
console.error(`With stack trace: ${err.stack}`);
Expand All @@ -67,19 +71,41 @@ process.on('rejectionHandled', (promise: Promise<any>) => {
}
});

let terminating = false;
const emitter = new Emitter();
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (process.send) {
if (process.send && !terminating) {
process.send(JSON.stringify(m));
}
}
});

process.on('message', (message: string) => {
process.on('message', async (message: string) => {
if (terminating) {
return;
}
try {
emitter.fire(JSON.parse(message));
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminate) {
terminating = true;
emitter.dispose();
if ('stopTimeout' in msg && typeof msg.stopTimeout === 'number' && msg.stopTimeout) {
await Promise.race([
pluginHostRPC.terminate(),
new Promise(resolve => setTimeout(resolve, msg.stopTimeout))
]);
} else {
await pluginHostRPC.terminate();
}
rpc.dispose();
if (process.send) {
process.send(JSON.stringify({ type: MessageType.Terminated }));
}
} else {
emitter.fire(msg);
}
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 0 additions & 4 deletions packages/plugin-ext/src/plugin/command-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ export class CommandRegistryImpl implements CommandRegistryExt {
});
}

dispose(): void {
throw new Error('Method not implemented.');
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
$executeCommand<T>(id: string, ...args: any[]): PromiseLike<T | undefined> {
if (this.handlers.has(id)) {
Expand Down
Loading