Skip to content

Commit

Permalink
fix #7176: gracefully terminate plugin host process without rpc conne…
Browse files Browse the repository at this point in the history
…ction

Signed-off-by: Anton Kosyakov <anton.kosyakov@typefox.io>
  • Loading branch information
akosyakov committed Feb 20, 2020
1 parent 532525f commit ad70c71
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 50 deletions.
30 changes: 23 additions & 7 deletions packages/plugin-ext/src/common/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export function createProxyIdentifier<T>(identifier: string): ProxyIdentifier<T>
return new ProxyIdentifier(false, identifier);
}

export interface ConnectionClosedError extends Error {
code: 'RPC_PROTOCOL_CLOSED'
}
export namespace ConnectionClosedError {
const code: ConnectionClosedError['code'] = 'RPC_PROTOCOL_CLOSED';
export function create(message: string = 'connection is closed'): ConnectionClosedError {
return Object.assign(new Error(message), { code });
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function is(error: any): error is ConnectionClosedError {
return !!error && typeof error === 'object' && 'code' in error && error['code'] === code;
}
}

export class RPCProtocolImpl implements RPCProtocol {

private readonly locals = new Map<string, any>();
Expand All @@ -82,7 +96,7 @@ export class RPCProtocolImpl implements RPCProtocol {
this.toDispose.push(Disposable.create(() => {
this.proxies.clear();
for (const reply of this.pendingRPCReplies.values()) {
reply.reject(new Error('connection is closed'));
reply.reject(ConnectionClosedError.create());
}
this.pendingRPCReplies.clear();
}));
Expand All @@ -98,7 +112,7 @@ export class RPCProtocolImpl implements RPCProtocol {

getProxy<T>(proxyId: ProxyIdentifier<T>): T {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
let proxy = this.proxies.get(proxyId.id);
if (!proxy) {
Expand All @@ -110,7 +124,7 @@ export class RPCProtocolImpl implements RPCProtocol {

set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
if (this.isDisposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
this.locals.set(identifier.id, instance);
if (Disposable.is(instance)) {
Expand All @@ -135,7 +149,7 @@ export class RPCProtocolImpl implements RPCProtocol {

private remoteCall(proxyId: string, methodName: string, args: any[]): Promise<any> {
if (this.isDisposed) {
return Promise.reject(new Error('connection is closed'));
return Promise.reject(ConnectionClosedError.create());
}
const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
if (cancellationToken && cancellationToken.isCancellationRequested) {
Expand Down Expand Up @@ -320,7 +334,7 @@ class RPCMultiplexer implements Disposable {

public send(msg: string): void {
if (this.toDispose.disposed) {
throw new Error('connection is closed');
throw ConnectionClosedError.create();
}
if (this.messagesToSend.length === 0) {
if (typeof setImmediate !== 'undefined') {
Expand Down Expand Up @@ -460,11 +474,13 @@ function isSerializedObject(obj: any): obj is SerializedObject {
return obj && obj.$type !== undefined && obj.data !== undefined;
}

const enum MessageType {
export const enum MessageType {
Request = 1,
Reply = 2,
ReplyErr = 3,
Cancel = 4
Cancel = 4,
Terminate = 5,
Terminated = 6
}

class CancelMessage {
Expand Down
53 changes: 30 additions & 23 deletions packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import * as path from 'path';
import * as cp from 'child_process';
import { injectable, inject, named } from 'inversify';
import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common';
import { Emitter } from '@theia/core/lib/common/event';
import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol';
import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin } from '../../common/plugin-protocol';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { MAIN_RPC_CONTEXT } from '../../common/plugin-api-rpc';
import { MessageType } from '../../common/rpc-protocol';
import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution';
import * as psTree from 'ps-tree';
import { Deferred } from '@theia/core/lib/common/promise-util';

export interface IPCConnectionOptions {
readonly serverName: string;
Expand Down Expand Up @@ -83,7 +82,7 @@ export class HostedPluginProcess implements ServerPluginRunner {
}
}

public terminatePluginServer(): void {
async terminatePluginServer(): Promise<void> {
if (this.childProcess === undefined) {
return;
}
Expand All @@ -93,34 +92,42 @@ export class HostedPluginProcess implements ServerPluginRunner {
const cp = this.childProcess;
this.childProcess = undefined;

const emitter = new Emitter();
const waitForTerminated = new Deferred<void>();
cp.on('message', message => {
emitter.fire(JSON.parse(message));
});
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (cp.send) {
cp.send(JSON.stringify(m));
}
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminated) {
waitForTerminated.resolve();
}
});
const hostedPluginManager = rpc.getProxy(MAIN_RPC_CONTEXT.HOSTED_PLUGIN_MANAGER_EXT);
hostedPluginManager.$stop().then(() => {
emitter.dispose();
this.killProcessTree(cp.pid);
});
cp.send(JSON.stringify({ type: MessageType.Terminate }));
// give 10 seconds to terminate gracefully and then kill the process
await Promise.race([
waitForTerminated,
new Promise(resolve => setTimeout(resolve, 10 * 1000))
]);
this.killProcessTree(cp.pid);
}

private killProcessTree(parentPid: number): void {
psTree(parentPid, (err: Error, childProcesses: Array<psTree.PS>) => {
childProcesses.forEach((p: psTree.PS) => {
process.kill(parseInt(p.PID));
});
process.kill(parentPid);
psTree(parentPid, (_, childProcesses) => {
childProcesses.forEach(childProcess =>
this.killProcess(parseInt(childProcess.PID))
);
this.killProcess(parentPid);
});
}

protected killProcess(pid: number): void {
try {
process.kill(pid);
} catch (e) {
if (e && 'code' in e && e.code === 'ESRCH') {
return;
}
console.error(`[${pid}] failed to kill`, e);
}
}

public runPluginServer(): void {
if (this.childProcess) {
this.terminatePluginServer();
Expand Down
4 changes: 4 additions & 0 deletions packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export class PluginHostRPC {
);
}

async terminate(): Promise<void> {
await this.pluginManager.terminate();
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
initContext(contextPath: string, plugin: Plugin): any {
const { name, version } = plugin.rawModel;
Expand Down
30 changes: 26 additions & 4 deletions packages/plugin-ext/src/hosted/node/plugin-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
********************************************************************************/

import { Emitter } from '@theia/core/lib/common/event';
import { RPCProtocolImpl } from '../../common/rpc-protocol';
import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol';
import { PluginHostRPC } from './plugin-host-rpc';
console.log('PLUGIN_HOST(' + process.pid + ') starting instance');

Expand Down Expand Up @@ -50,6 +50,10 @@ process.on('unhandledRejection', (reason: any, promise: Promise<any>) => {
if (index >= 0) {
promise.catch(err => {
unhandledPromises.splice(index, 1);
if (terminating && (ConnectionClosedError.is(err) || ConnectionClosedError.is(reason))) {
// during termination it is expected that pending rpc requerst are rejected
return;
}
console.error(`Promise rejection not handled in one second: ${err} , reason: ${reason}`);
if (err && err.stack) {
console.error(`With stack trace: ${err.stack}`);
Expand All @@ -67,19 +71,37 @@ process.on('rejectionHandled', (promise: Promise<any>) => {
}
});

let terminating = false;
const emitter = new Emitter();
const rpc = new RPCProtocolImpl({
onMessage: emitter.event,
send: (m: {}) => {
if (process.send) {
if (process.send && !terminating) {
process.send(JSON.stringify(m));
}
}
});

process.on('message', (message: string) => {
process.on('message', async (message: string) => {
if (terminating) {
return;
}
try {
emitter.fire(JSON.parse(message));
const msg = JSON.parse(message);
if ('type' in msg && msg.type === MessageType.Terminate) {
terminating = true;
emitter.dispose();
await Promise.race([
pluginHostRPC.terminate(),
new Promise(resolve => setTimeout(resolve, 4 * 1000))
]);
rpc.dispose();
if (process.send) {
process.send(JSON.stringify({ type: MessageType.Terminated }));
}
} else {
emitter.fire(msg);
}
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 0 additions & 4 deletions packages/plugin-ext/src/plugin/command-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ export class CommandRegistryImpl implements CommandRegistryExt {
});
}

dispose(): void {
throw new Error('Method not implemented.');
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
$executeCommand<T>(id: string, ...args: any[]): PromiseLike<T | undefined> {
if (this.handlers.has(id)) {
Expand Down
59 changes: 48 additions & 11 deletions packages/plugin-ext/src/plugin/plugin-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import {
import { PluginMetadata } from '../common/plugin-protocol';
import * as theia from '@theia/plugin';
import { join } from 'path';
import { dispose } from '../common/disposable-util';
import { Deferred } from '@theia/core/lib/common/promise-util';
import { EnvExtImpl } from './env';
import { PreferenceRegistryExtImpl } from './preference-registry';
Expand All @@ -55,7 +54,16 @@ export interface PluginHost {
}

interface StopFn {
(): void;
(): void | Promise<void>;
}

interface StopOptions {
/**
* if terminating then stopping will ignore all errors,
* since the main side is already gone and any requests are likely to fail
* or hang
*/
terminating: boolean
}

class ActivatedPlugin {
Expand Down Expand Up @@ -107,8 +115,7 @@ export class PluginManagerExtImpl implements PluginManagerExt, PluginManager {

async $stop(pluginId?: string): Promise<void> {
if (!pluginId) {
this.stopAll();
return;
return this.stopAll();
}
this.registry.delete(pluginId);
this.pluginActivationPromises.delete(pluginId);
Expand All @@ -119,28 +126,58 @@ export class PluginManagerExtImpl implements PluginManagerExt, PluginManager {
return;
}
this.activatedPlugins.delete(pluginId);
this.stopPlugin(plugin);
return this.stopPlugin(pluginId, plugin);
}

async terminate(): Promise<void> {
return this.stopAll({ terminating: true });
}

protected stopAll(): void {
this.activatedPlugins.forEach(plugin => this.stopPlugin(plugin));
protected async stopAll(options: StopOptions = { terminating: false }): Promise<void> {
const promises = [];
for (const [id, plugin] of this.activatedPlugins) {
promises.push(this.stopPlugin(id, plugin, options));
}

this.registry.clear();
this.loadedPlugins.clear();
this.activatedPlugins.clear();
this.pluginActivationPromises.clear();
this.pluginContextsMap.clear();
await Promise.all(promises);
}

protected stopPlugin(plugin: ActivatedPlugin): void {
protected async stopPlugin(id: string, plugin: ActivatedPlugin, options: StopOptions = { terminating: false }): Promise<void> {
let result;
if (plugin.stopFn) {
plugin.stopFn();
try {
result = plugin.stopFn();
} catch (e) {
if (!options.terminating) {
console.error(`[${id}]: failed to stop:`, e);
}
}
}

// dispose any objects
const pluginContext = plugin.pluginContext;
if (pluginContext) {
dispose(pluginContext.subscriptions);
for (const subscription of pluginContext.subscriptions) {
try {
subscription.dispose();
} catch (e) {
if (!options.terminating) {
console.error(`[${id}]: failed to dispose subscription:`, e);
}
}
}
}

try {
await result;
} catch (e) {
if (!options.terminating) {
console.error(`[${id}]: failed to stop:`, e);
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion packages/plugin-ext/src/plugin/tasks/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
import * as theia from '@theia/plugin';
import * as converter from '../type-converters';
import { Disposable } from '../types-impl';
import { RPCProtocol } from '../../common/rpc-protocol';
import { RPCProtocol, ConnectionClosedError } from '../../common/rpc-protocol';
import { TaskProviderAdapter } from './task-provider';
import { Emitter, Event } from '@theia/core/lib/common/event';

Expand All @@ -40,11 +40,17 @@ export class TasksExtImpl implements TasksExt {
private readonly onDidExecuteTaskProcess: Emitter<theia.TaskProcessStartEvent> = new Emitter<theia.TaskProcessStartEvent>();
private readonly onDidTerminateTaskProcess: Emitter<theia.TaskProcessEndEvent> = new Emitter<theia.TaskProcessEndEvent>();

private disposed = false;

constructor(rpc: RPCProtocol) {
this.proxy = rpc.getProxy(PLUGIN_RPC_CONTEXT.TASKS_MAIN);
this.fetchTaskExecutions();
}

dispose(): void {
this.disposed = true;
}

get taskExecutions(): ReadonlyArray<theia.TaskExecution> {
return [...this.executions.values()];
}
Expand Down Expand Up @@ -169,6 +175,9 @@ export class TasksExtImpl implements TasksExt {
const taskExecutions = await this.proxy.$taskExecutions();
taskExecutions.forEach(execution => this.getTaskExecution(execution));
} catch (error) {
if (this.disposed && ConnectionClosedError.is(error)) {
return;
}
console.error(`Can not fetch running tasks: ${error}`);
}
}
Expand Down

0 comments on commit ad70c71

Please sign in to comment.