From ad70c71171666d0a0a7db6fc2149151644e9d420 Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Thu, 20 Feb 2020 20:27:41 +0000 Subject: [PATCH] fix #7176: gracefully terminate plugin host process without rpc connection Signed-off-by: Anton Kosyakov --- .../plugin-ext/src/common/rpc-protocol.ts | 30 +++++++--- .../src/hosted/node/hosted-plugin-process.ts | 53 +++++++++-------- .../src/hosted/node/plugin-host-rpc.ts | 4 ++ .../plugin-ext/src/hosted/node/plugin-host.ts | 30 ++++++++-- .../plugin-ext/src/plugin/command-registry.ts | 4 -- .../plugin-ext/src/plugin/plugin-manager.ts | 59 +++++++++++++++---- packages/plugin-ext/src/plugin/tasks/tasks.ts | 11 +++- 7 files changed, 141 insertions(+), 50 deletions(-) diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index c96b79fef7a5f..a0375762a5ba7 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -61,6 +61,20 @@ export function createProxyIdentifier(identifier: string): ProxyIdentifier return new ProxyIdentifier(false, identifier); } +export interface ConnectionClosedError extends Error { + code: 'RPC_PROTOCOL_CLOSED' +} +export namespace ConnectionClosedError { + const code: ConnectionClosedError['code'] = 'RPC_PROTOCOL_CLOSED'; + export function create(message: string = 'connection is closed'): ConnectionClosedError { + return Object.assign(new Error(message), { code }); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + export function is(error: any): error is ConnectionClosedError { + return !!error && typeof error === 'object' && 'code' in error && error['code'] === code; + } +} + export class RPCProtocolImpl implements RPCProtocol { private readonly locals = new Map(); @@ -82,7 +96,7 @@ export class RPCProtocolImpl implements RPCProtocol { this.toDispose.push(Disposable.create(() => { this.proxies.clear(); for (const reply of this.pendingRPCReplies.values()) { - reply.reject(new Error('connection is closed')); + reply.reject(ConnectionClosedError.create()); } this.pendingRPCReplies.clear(); })); @@ -98,7 +112,7 @@ export class RPCProtocolImpl implements RPCProtocol { getProxy(proxyId: ProxyIdentifier): T { if (this.isDisposed) { - throw new Error('connection is closed'); + throw ConnectionClosedError.create(); } let proxy = this.proxies.get(proxyId.id); if (!proxy) { @@ -110,7 +124,7 @@ export class RPCProtocolImpl implements RPCProtocol { set(identifier: ProxyIdentifier, instance: R): R { if (this.isDisposed) { - throw new Error('connection is closed'); + throw ConnectionClosedError.create(); } this.locals.set(identifier.id, instance); if (Disposable.is(instance)) { @@ -135,7 +149,7 @@ export class RPCProtocolImpl implements RPCProtocol { private remoteCall(proxyId: string, methodName: string, args: any[]): Promise { if (this.isDisposed) { - return Promise.reject(new Error('connection is closed')); + return Promise.reject(ConnectionClosedError.create()); } const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; if (cancellationToken && cancellationToken.isCancellationRequested) { @@ -320,7 +334,7 @@ class RPCMultiplexer implements Disposable { public send(msg: string): void { if (this.toDispose.disposed) { - throw new Error('connection is closed'); + throw ConnectionClosedError.create(); } if (this.messagesToSend.length === 0) { if (typeof setImmediate !== 'undefined') { @@ -460,11 +474,13 @@ function isSerializedObject(obj: any): obj is SerializedObject { return obj && obj.$type !== undefined && obj.data !== undefined; } -const enum MessageType { +export const enum MessageType { Request = 1, Reply = 2, ReplyErr = 3, - Cancel = 4 + Cancel = 4, + Terminate = 5, + Terminated = 6 } class CancelMessage { diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts index 783e44b1ebd05..b1d5bfa1f26bf 100644 --- a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts @@ -18,13 +18,12 @@ import * as path from 'path'; import * as cp from 'child_process'; import { injectable, inject, named } from 'inversify'; import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common'; -import { Emitter } from '@theia/core/lib/common/event'; import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol'; import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin } from '../../common/plugin-protocol'; -import { RPCProtocolImpl } from '../../common/rpc-protocol'; -import { MAIN_RPC_CONTEXT } from '../../common/plugin-api-rpc'; +import { MessageType } from '../../common/rpc-protocol'; import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution'; import * as psTree from 'ps-tree'; +import { Deferred } from '@theia/core/lib/common/promise-util'; export interface IPCConnectionOptions { readonly serverName: string; @@ -83,7 +82,7 @@ export class HostedPluginProcess implements ServerPluginRunner { } } - public terminatePluginServer(): void { + async terminatePluginServer(): Promise { if (this.childProcess === undefined) { return; } @@ -93,34 +92,42 @@ export class HostedPluginProcess implements ServerPluginRunner { const cp = this.childProcess; this.childProcess = undefined; - const emitter = new Emitter(); + const waitForTerminated = new Deferred(); cp.on('message', message => { - emitter.fire(JSON.parse(message)); - }); - const rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: {}) => { - if (cp.send) { - cp.send(JSON.stringify(m)); - } + const msg = JSON.parse(message); + if ('type' in msg && msg.type === MessageType.Terminated) { + waitForTerminated.resolve(); } }); - const hostedPluginManager = rpc.getProxy(MAIN_RPC_CONTEXT.HOSTED_PLUGIN_MANAGER_EXT); - hostedPluginManager.$stop().then(() => { - emitter.dispose(); - this.killProcessTree(cp.pid); - }); + cp.send(JSON.stringify({ type: MessageType.Terminate })); + // give 10 seconds to terminate gracefully and then kill the process + await Promise.race([ + waitForTerminated, + new Promise(resolve => setTimeout(resolve, 10 * 1000)) + ]); + this.killProcessTree(cp.pid); } private killProcessTree(parentPid: number): void { - psTree(parentPid, (err: Error, childProcesses: Array) => { - childProcesses.forEach((p: psTree.PS) => { - process.kill(parseInt(p.PID)); - }); - process.kill(parentPid); + psTree(parentPid, (_, childProcesses) => { + childProcesses.forEach(childProcess => + this.killProcess(parseInt(childProcess.PID)) + ); + this.killProcess(parentPid); }); } + protected killProcess(pid: number): void { + try { + process.kill(pid); + } catch (e) { + if (e && 'code' in e && e.code === 'ESRCH') { + return; + } + console.error(`[${pid}] failed to kill`, e); + } + } + public runPluginServer(): void { if (this.childProcess) { this.terminatePluginServer(); diff --git a/packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts b/packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts index 7071bfd3df3d0..8c64bddda202c 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-host-rpc.ts @@ -76,6 +76,10 @@ export class PluginHostRPC { ); } + async terminate(): Promise { + await this.pluginManager.terminate(); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any initContext(contextPath: string, plugin: Plugin): any { const { name, version } = plugin.rawModel; diff --git a/packages/plugin-ext/src/hosted/node/plugin-host.ts b/packages/plugin-ext/src/hosted/node/plugin-host.ts index 2b1a70a9f3815..62b625e84531f 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-host.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-host.ts @@ -15,7 +15,7 @@ ********************************************************************************/ import { Emitter } from '@theia/core/lib/common/event'; -import { RPCProtocolImpl } from '../../common/rpc-protocol'; +import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol'; import { PluginHostRPC } from './plugin-host-rpc'; console.log('PLUGIN_HOST(' + process.pid + ') starting instance'); @@ -50,6 +50,10 @@ process.on('unhandledRejection', (reason: any, promise: Promise) => { if (index >= 0) { promise.catch(err => { unhandledPromises.splice(index, 1); + if (terminating && (ConnectionClosedError.is(err) || ConnectionClosedError.is(reason))) { + // during termination it is expected that pending rpc requerst are rejected + return; + } console.error(`Promise rejection not handled in one second: ${err} , reason: ${reason}`); if (err && err.stack) { console.error(`With stack trace: ${err.stack}`); @@ -67,19 +71,37 @@ process.on('rejectionHandled', (promise: Promise) => { } }); +let terminating = false; const emitter = new Emitter(); const rpc = new RPCProtocolImpl({ onMessage: emitter.event, send: (m: {}) => { - if (process.send) { + if (process.send && !terminating) { process.send(JSON.stringify(m)); } } }); -process.on('message', (message: string) => { +process.on('message', async (message: string) => { + if (terminating) { + return; + } try { - emitter.fire(JSON.parse(message)); + const msg = JSON.parse(message); + if ('type' in msg && msg.type === MessageType.Terminate) { + terminating = true; + emitter.dispose(); + await Promise.race([ + pluginHostRPC.terminate(), + new Promise(resolve => setTimeout(resolve, 4 * 1000)) + ]); + rpc.dispose(); + if (process.send) { + process.send(JSON.stringify({ type: MessageType.Terminated })); + } + } else { + emitter.fire(msg); + } } catch (e) { console.error(e); } diff --git a/packages/plugin-ext/src/plugin/command-registry.ts b/packages/plugin-ext/src/plugin/command-registry.ts index 5131edde5250a..1c78066051e20 100644 --- a/packages/plugin-ext/src/plugin/command-registry.ts +++ b/packages/plugin-ext/src/plugin/command-registry.ts @@ -85,10 +85,6 @@ export class CommandRegistryImpl implements CommandRegistryExt { }); } - dispose(): void { - throw new Error('Method not implemented.'); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any $executeCommand(id: string, ...args: any[]): PromiseLike { if (this.handlers.has(id)) { diff --git a/packages/plugin-ext/src/plugin/plugin-manager.ts b/packages/plugin-ext/src/plugin/plugin-manager.ts index d1c2578e88e4c..3e56b44f0514d 100644 --- a/packages/plugin-ext/src/plugin/plugin-manager.ts +++ b/packages/plugin-ext/src/plugin/plugin-manager.ts @@ -30,7 +30,6 @@ import { import { PluginMetadata } from '../common/plugin-protocol'; import * as theia from '@theia/plugin'; import { join } from 'path'; -import { dispose } from '../common/disposable-util'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { EnvExtImpl } from './env'; import { PreferenceRegistryExtImpl } from './preference-registry'; @@ -55,7 +54,16 @@ export interface PluginHost { } interface StopFn { - (): void; + (): void | Promise; +} + +interface StopOptions { + /** + * if terminating then stopping will ignore all errors, + * since the main side is already gone and any requests are likely to fail + * or hang + */ + terminating: boolean } class ActivatedPlugin { @@ -107,8 +115,7 @@ export class PluginManagerExtImpl implements PluginManagerExt, PluginManager { async $stop(pluginId?: string): Promise { if (!pluginId) { - this.stopAll(); - return; + return this.stopAll(); } this.registry.delete(pluginId); this.pluginActivationPromises.delete(pluginId); @@ -119,28 +126,58 @@ export class PluginManagerExtImpl implements PluginManagerExt, PluginManager { return; } this.activatedPlugins.delete(pluginId); - this.stopPlugin(plugin); + return this.stopPlugin(pluginId, plugin); + } + + async terminate(): Promise { + return this.stopAll({ terminating: true }); } - protected stopAll(): void { - this.activatedPlugins.forEach(plugin => this.stopPlugin(plugin)); + protected async stopAll(options: StopOptions = { terminating: false }): Promise { + const promises = []; + for (const [id, plugin] of this.activatedPlugins) { + promises.push(this.stopPlugin(id, plugin, options)); + } this.registry.clear(); this.loadedPlugins.clear(); this.activatedPlugins.clear(); this.pluginActivationPromises.clear(); this.pluginContextsMap.clear(); + await Promise.all(promises); } - protected stopPlugin(plugin: ActivatedPlugin): void { + protected async stopPlugin(id: string, plugin: ActivatedPlugin, options: StopOptions = { terminating: false }): Promise { + let result; if (plugin.stopFn) { - plugin.stopFn(); + try { + result = plugin.stopFn(); + } catch (e) { + if (!options.terminating) { + console.error(`[${id}]: failed to stop:`, e); + } + } } - // dispose any objects const pluginContext = plugin.pluginContext; if (pluginContext) { - dispose(pluginContext.subscriptions); + for (const subscription of pluginContext.subscriptions) { + try { + subscription.dispose(); + } catch (e) { + if (!options.terminating) { + console.error(`[${id}]: failed to dispose subscription:`, e); + } + } + } + } + + try { + await result; + } catch (e) { + if (!options.terminating) { + console.error(`[${id}]: failed to stop:`, e); + } } } diff --git a/packages/plugin-ext/src/plugin/tasks/tasks.ts b/packages/plugin-ext/src/plugin/tasks/tasks.ts index 264cb9235e642..c8cd314b0cf30 100644 --- a/packages/plugin-ext/src/plugin/tasks/tasks.ts +++ b/packages/plugin-ext/src/plugin/tasks/tasks.ts @@ -24,7 +24,7 @@ import { import * as theia from '@theia/plugin'; import * as converter from '../type-converters'; import { Disposable } from '../types-impl'; -import { RPCProtocol } from '../../common/rpc-protocol'; +import { RPCProtocol, ConnectionClosedError } from '../../common/rpc-protocol'; import { TaskProviderAdapter } from './task-provider'; import { Emitter, Event } from '@theia/core/lib/common/event'; @@ -40,11 +40,17 @@ export class TasksExtImpl implements TasksExt { private readonly onDidExecuteTaskProcess: Emitter = new Emitter(); private readonly onDidTerminateTaskProcess: Emitter = new Emitter(); + private disposed = false; + constructor(rpc: RPCProtocol) { this.proxy = rpc.getProxy(PLUGIN_RPC_CONTEXT.TASKS_MAIN); this.fetchTaskExecutions(); } + dispose(): void { + this.disposed = true; + } + get taskExecutions(): ReadonlyArray { return [...this.executions.values()]; } @@ -169,6 +175,9 @@ export class TasksExtImpl implements TasksExt { const taskExecutions = await this.proxy.$taskExecutions(); taskExecutions.forEach(execution => this.getTaskExecution(execution)); } catch (error) { + if (this.disposed && ConnectionClosedError.is(error)) { + return; + } console.error(`Can not fetch running tasks: ${error}`); } }