diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index cff56221df2a8..8dddd0583b88f 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -180,6 +180,10 @@ export class ChannelMultiplexer implements Disposable { } + protected getUnderlyingWriteBuffer(): WriteBuffer { + return this.underlyingChannel.getWriteBuffer(); + } + protected handleMessage(buffer: ReadBuffer): void { const type = buffer.readUint8(); const id = buffer.readString(); @@ -206,7 +210,7 @@ export class ChannelMultiplexer implements Disposable { const channel = this.createChannel(id); this.pendingOpen.delete(id); this.openChannels.set(id, channel); - resolve!(channel); + resolve(channel); this.onOpenChannelEmitter.fire({ id, channel }); } } @@ -220,7 +224,7 @@ export class ChannelMultiplexer implements Disposable { // edge case: both side try to open a channel at the same time. resolve(channel); } - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); + this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); this.onOpenChannelEmitter.fire({ id, channel }); } } @@ -236,7 +240,7 @@ export class ChannelMultiplexer implements Disposable { protected handleData(id: string, data: ReadBuffer): void { const channel = this.openChannels.get(id); if (channel) { - channel.onMessageEmitter.fire(() => data); + channel.onMessageEmitter.fire(() => data.sliceAtReadPosition()); } } @@ -247,14 +251,14 @@ export class ChannelMultiplexer implements Disposable { // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded // and written to the buffer before the actual message. protected prepareWriteBuffer(id: string): WriteBuffer { - const underlying = this.underlyingChannel.getWriteBuffer(); + const underlying = this.getUnderlyingWriteBuffer(); underlying.writeUint8(MessageTypes.Data); underlying.writeString(id); return underlying; } protected closeChannel(id: string): void { - this.underlyingChannel.getWriteBuffer() + this.getUnderlyingWriteBuffer() .writeUint8(MessageTypes.Close) .writeString(id) .commit(); @@ -263,10 +267,14 @@ export class ChannelMultiplexer implements Disposable { } open(id: string): Promise { + const existingChannel = this.getOpenChannel(id); + if (existingChannel) { + return Promise.resolve(existingChannel); + } const result = new Promise((resolve, reject) => { this.pendingOpen.set(id, resolve); }); - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); + this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); return result; } diff --git a/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts b/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts new file mode 100644 index 0000000000000..21cc4bcc63d22 --- /dev/null +++ b/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts @@ -0,0 +1,87 @@ +// ***************************************************************************** +// Copyright (C) 2022 STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { addExtension } from 'msgpackr'; +import { ResponseError } from './rpc-message-encoder'; + +/** + * Handles the global registration of custom MsgPackR extensions + * required for the default RPC communication. MsgPackR extensions + * are installed globally on both ends of the communication channel. + * (frontend-backend, pluginExt-pluginMain). + * Is implemented as singleton as it is also used in plugin child processes which have no access to inversify. + */ +export class MsgPackExtensionManager { + private static readonly INSTANCE = new MsgPackExtensionManager(); + public static getInstance(): MsgPackExtensionManager { + return this.INSTANCE; + } + + private extensions = new Map(); + + private constructor() { + } + + registerExtensions(...extensions: MsgPackExtension[]): void { + extensions.forEach(extension => { + if (extension.tag < 1 || extension.tag > 100) { + // MsgPackR reserves the tag range 1-100 for custom extensions. + throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`); + } + if (this.extensions.has(extension.tag)) { + throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`); + } + this.extensions.set(extension.tag, extension); + addExtension({ + Class: extension.class, + type: extension.tag, + write: extension.serialize, + read: extension.deserialize + }); + }); + } + + getExtension(tag: number): MsgPackExtension | undefined { + return this.extensions.get(tag); + } +} + +// Register custom msgPack extension for ResponseErrors. +MsgPackExtensionManager.getInstance().registerExtensions({ + class: ResponseError, + tag: 1, + serialize: (instance: ResponseError) => { + const { code, data, message, name, stack } = instance; + return { code, data, message, name, stack }; + }, + deserialize: data => { + const error = new ResponseError(data.code, data.message, data.data); + error.name = data.name; + error.stack = data.stack; + return error; + } +}); + +export interface MsgPackExtension { + class: Function, + tag: number, + serialize(instance: unknown): unknown, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + deserialize(serialized: any): unknown +} + +export type Constructor = new (...params: unknown[]) => T; + diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index f8a91857d8ec6..03b93755a5251 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -15,7 +15,7 @@ // ***************************************************************************** /* eslint-disable @typescript-eslint/no-explicit-any */ -import { addExtension, Packr as MsgPack } from 'msgpackr'; +import { Packr as MsgPack } from 'msgpackr'; import { ReadBuffer, WriteBuffer } from './message-buffer'; /** @@ -121,27 +121,10 @@ export interface RpcMessageEncoder { } export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false }); -// Add custom msgpackR extension for ResponseErrors. -addExtension({ - Class: ResponseError, - type: 1, - write: (instance: ResponseError) => { - const { code, data, message, name, stack } = instance; - return { code, data, message, name, stack }; - }, - read: data => { - const error = new ResponseError(data.code, data.message, data.data); - error.name = data.name; - error.stack = data.stack; - return error; - } -}); export class MsgPackMessageEncoder implements RpcMessageEncoder { - constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { - - } + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { } cancel(buf: WriteBuffer, requestId: number): void { this.encode(buf, { type: RpcMessageType.Cancel, id: requestId }); @@ -169,13 +152,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder { throw err; } } - } export class MsgPackMessageDecoder implements RpcMessageDecoder { - constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { } - } decode(buf: ReadBuffer): T { const bytes = buf.readBytes(); return this.msgPack.decode(bytes); @@ -184,5 +165,4 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder { parse(buffer: ReadBuffer): RpcMessage { return this.decode(buffer); } - } diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 61be9c73d3881..4dbf422e0b877 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -16,15 +16,14 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { CancellationToken, CancellationTokenSource } from '../cancellation'; -import { DisposableCollection } from '../disposable'; +import { Disposable, DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; import { MsgPackMessageDecoder, MsgPackMessageEncoder, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; -import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; /** - * Handles request messages received by the {@link RpcServer}. + * Handles request messages received by the {@link RPCProtocol}. */ export type RequestHandler = (method: string, args: any[]) => Promise; @@ -39,15 +38,20 @@ export interface RpcProtocolOptions { /** * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. */ - decoder?: RpcMessageDecoder + decoder?: RpcMessageDecoder, + /** + * The runtime mode determines whether the RPC protocol is bi-directional (default) or acts as a client or server only. + */ + mode?: 'default' | 'clientOnly' | 'serverOnly' } /** - * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send - * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. + * Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send + * requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server). * Clients can get a promise for a remote request result that will be either resolved or * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request * Currently, there is no timeout handling for long running requests implemented. + * The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct an RPC protocol instance that acts only as client or server instead. */ export class RpcProtocol { static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; @@ -58,6 +62,7 @@ export class RpcProtocol { protected readonly encoder: RpcMessageEncoder; protected readonly decoder: RpcMessageDecoder; + protected readonly mode: 'default' | 'clientOnly' | 'serverOnly'; protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); protected readonly cancellationTokenSources = new Map(); @@ -68,37 +73,50 @@ export class RpcProtocol { protected toDispose = new DisposableCollection(); - constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler | undefined, options: RpcProtocolOptions = {}) { this.encoder = options.encoder ?? new MsgPackMessageEncoder(); this.decoder = options.decoder ?? new MsgPackMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); - this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); channel.onClose(() => this.toDispose.dispose()); + this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); + this.mode = options.mode ?? 'default'; + + if (this.mode !== 'clientOnly' && requestHandler === undefined) { + console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.'); + } } handleMessage(message: RpcMessage): void { - switch (message.type) { - case RpcMessageType.Cancel: { - this.handleCancel(message.id); - break; - } - case RpcMessageType.Request: { - this.handleRequest(message.id, message.method, message.args); - break; - } - case RpcMessageType.Notification: { - this.handleNotify(message.id, message.method, message.args); - break; + if (this.mode !== 'clientOnly') { + switch (message.type) { + case RpcMessageType.Cancel: { + this.handleCancel(message.id); + return; + } + case RpcMessageType.Request: { + this.handleRequest(message.id, message.method, message.args); + return; + } + case RpcMessageType.Notification: { + this.handleNotify(message.id, message.method, message.args); + return; + } } - case RpcMessageType.Reply: { - this.handleReply(message.id, message.res); - break; - } - case RpcMessageType.ReplyErr: { - this.handleReplyErr(message.id, message.err); - break; + } + if (this.mode !== 'serverOnly') { + switch (message.type) { + case RpcMessageType.Reply: { + this.handleReply(message.id, message.res); + return; + } + case RpcMessageType.ReplyErr: { + this.handleReplyErr(message.id, message.err); + return; + } } } + // If the message was not handled until here, it is incompatible with the mode. + console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`); } protected handleReply(id: number, value: any): void { @@ -126,13 +144,13 @@ export class RpcProtocol { } sendRequest(method: string, args: any[]): Promise { - const id = this.nextMessageId++; - const reply = new Deferred(); - // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the // args array and the `CANCELLATION_TOKEN_KEY` string instead. const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; + const id = this.nextMessageId++; + const reply = new Deferred(); + if (cancellationToken) { args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); } @@ -153,6 +171,13 @@ export class RpcProtocol { } sendNotification(method: string, args: any[]): void { + // If the notification supports a CancellationToken, it needs to be treated like a request + // because cancellation does not work with the simplified "fire and forget" approach of simple notifications. + if (args.length && CancellationToken.is(args[args.length - 1])) { + this.sendRequest(method, args); + return; + } + const output = this.channel.getWriteBuffer(); this.encoder.notification(output, this.nextMessageId++, method, args); output.commit(); @@ -167,7 +192,6 @@ export class RpcProtocol { protected handleCancel(id: number): void { const cancellationTokenSource = this.cancellationTokenSources.get(id); if (cancellationTokenSource) { - this.cancellationTokenSources.delete(id); cancellationTokenSource.cancel(); } } @@ -185,14 +209,14 @@ export class RpcProtocol { } try { - const result = await this.requestHandler(method, args); + const result = await this.requestHandler!(method, args); this.cancellationTokenSources.delete(id); this.encoder.replyOK(output, id, result); output.commit(); } catch (err) { // In case of an error the output buffer might already contains parts of an message. // => Dispose the current buffer and retrieve a new, clean one for writing the response error. - if (output instanceof Uint8ArrayWriteBuffer) { + if (Disposable.is(output)) { output.dispose(); } const errorOutput = this.channel.getWriteBuffer(); diff --git a/packages/core/src/node/messaging/index.ts b/packages/core/src/node/messaging/index.ts index fd161d93a9df0..23da1fe350586 100644 --- a/packages/core/src/node/messaging/index.ts +++ b/packages/core/src/node/messaging/index.ts @@ -16,3 +16,4 @@ export * from './messaging-service'; export * from './ipc-connection-provider'; +export * from './ipc-channel'; diff --git a/packages/plugin-ext/src/common/index.ts b/packages/plugin-ext/src/common/index.ts index f77f053e2d32d..d39f1fc134210 100644 --- a/packages/plugin-ext/src/common/index.ts +++ b/packages/plugin-ext/src/common/index.ts @@ -18,3 +18,7 @@ export * from './plugin-protocol'; export * from './plugin-api-rpc'; export * from './plugin-ext-api-contribution'; + +import { registerMsgPackExtensions } from './rpc-protocol'; + +registerMsgPackExtensions(); diff --git a/packages/plugin-ext/src/common/plugin-api-rpc.ts b/packages/plugin-ext/src/common/plugin-api-rpc.ts index cc04bd3b288c5..dc1083bb98e21 100644 --- a/packages/plugin-ext/src/common/plugin-api-rpc.ts +++ b/packages/plugin-ext/src/common/plugin-api-rpc.ts @@ -1914,7 +1914,6 @@ export const PLUGIN_RPC_CONTEXT = { STATUS_BAR_MESSAGE_REGISTRY_MAIN: >createProxyIdentifier('StatusBarMessageRegistryMain'), ENV_MAIN: createProxyIdentifier('EnvMain'), NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'), - NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'), TERMINAL_MAIN: createProxyIdentifier('TerminalServiceMain'), TREE_VIEWS_MAIN: createProxyIdentifier('TreeViewsMain'), PREFERENCE_REGISTRY_MAIN: createProxyIdentifier('PreferenceRegistryMain'), @@ -1946,7 +1945,6 @@ export const MAIN_RPC_CONTEXT = { QUICK_OPEN_EXT: createProxyIdentifier('QuickOpenExt'), WINDOW_STATE_EXT: createProxyIdentifier('WindowStateExt'), NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'), - NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'), WORKSPACE_EXT: createProxyIdentifier('WorkspaceExt'), TEXT_EDITORS_EXT: createProxyIdentifier('TextEditorsExt'), EDITORS_AND_DOCUMENTS_EXT: createProxyIdentifier('EditorsAndDocumentsExt'), diff --git a/packages/plugin-ext/src/common/plugin-protocol.ts b/packages/plugin-ext/src/common/plugin-protocol.ts index d4aeea1ac05cc..f4e84cbbf9d77 100644 --- a/packages/plugin-ext/src/common/plugin-protocol.ts +++ b/packages/plugin-ext/src/common/plugin-protocol.ts @@ -838,7 +838,7 @@ export function buildFrontendModuleName(plugin: PluginPackage | PluginModel): st export const HostedPluginClient = Symbol('HostedPluginClient'); export interface HostedPluginClient { - postMessage(pluginHost: string, message: string): Promise; + postMessage(pluginHost: string, buffer: Uint8Array): Promise; log(logPart: LogPart): void; @@ -901,7 +901,7 @@ export interface HostedPluginServer extends JsonRpcServer { getExtPluginAPI(): Promise; - onMessage(targetHost: string, message: string): Promise; + onMessage(targetHost: string, message: Uint8Array): Promise; } @@ -944,9 +944,9 @@ export interface PluginServer { export const ServerPluginRunner = Symbol('ServerPluginRunner'); export interface ServerPluginRunner { // eslint-disable-next-line @typescript-eslint/no-explicit-any - acceptMessage(pluginHostId: string, jsonMessage: string): boolean; + acceptMessage(pluginHostId: string, jsonMessage: Uint8Array): boolean; // eslint-disable-next-line @typescript-eslint/no-explicit-any - onMessage(pluginHostId: string, jsonMessage: string): void; + onMessage(pluginHostId: string, jsonMessage: Uint8Array): void; setClient(client: HostedPluginClient): void; setDefault(defaultRunner: ServerPluginRunner): void; clientClosed(): void; diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts new file mode 100644 index 0000000000000..d106d5dc915c7 --- /dev/null +++ b/packages/plugin-ext/src/common/proxy-handler.ts @@ -0,0 +1,116 @@ +/******************************************************************************** + * Copyright (C) 2022 STMicroelectronics and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Channel, MaybePromise, RpcProtocol, RpcProtocolOptions } from '@theia/core/'; +import { RpcMessageDecoder, RpcMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; +import { Deferred } from '@theia/core/lib/common/promise-util'; + +export interface RpcMessageCodec { + encoder: RpcMessageEncoder, + decoder: RpcMessageDecoder +} +/** + * A proxy handler that will send any method invocation on the proxied object + * as a rcp protocol message over a channel. + */ +export class ClientProxyHandler implements ProxyHandler { + private rpcDeferred: Deferred = new Deferred(); + private isRpcInitialized = false; + + constructor(protected readonly id: string, protected readonly codec: RpcMessageCodec, protected readonly channelProvider: () => MaybePromise) { } + + private async initializeRpc(): Promise { + const clientOptions: RpcProtocolOptions = { ...this.codec, mode: 'clientOnly' }; + const channel = await this.channelProvider(); + const rpc = new RpcProtocol(channel, undefined, clientOptions); + this.rpcDeferred.resolve(rpc); + this.isRpcInitialized = true; + } + + get(target: any, name: string, receiver: any): any { + if (!this.isRpcInitialized) { + this.initializeRpc(); + } + + if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) { + return target[name]; + } + const isNotify = this.isNotification(name); + return (...args: any[]) => { + const method = name.toString(); + return this.rpcDeferred.promise.then((connection: RpcProtocol) => + new Promise((resolve, reject) => { + try { + if (isNotify) { + connection.sendNotification(method, args); + resolve(undefined); + } else { + const resultPromise = connection.sendRequest(method, args) as Promise; + resultPromise.then((result: any) => { + resolve(result); + }).catch(e => { + reject(e); + }); + } + } catch (err) { + reject(err); + } + }) + ); + }; + } + + /** + * Return whether the given property represents a notification. If true, + * the promise returned from the invocation will resolve immediately to `undefined` + * + * A property leads to a notification rather than a method call if its name + * begins with `notify` or `on`. + * + * @param p - The property being called on the proxy. + * @return Whether `p` represents a notification. + */ + protected isNotification(p: PropertyKey): boolean { + let propertyString = p.toString(); + if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) { + propertyString = propertyString.substring(1); + } + return propertyString.startsWith('notify') || propertyString.startsWith('on'); + } +} + +export class RpcInvocationHandler { + private rpcDeferred: Deferred = new Deferred(); + + constructor(readonly id: string, readonly target: any, protected readonly codec: RpcMessageCodec) { + } + + listen(channel: Channel): void { + const serverOptions: RpcProtocolOptions = { ...this.codec, mode: 'serverOnly' }; + const server = new RpcProtocol(channel, (method: string, args: any[]) => this.handleRequest(method, args), serverOptions); + server.onNotification((e: { method: string, args: any }) => this.onNotification(e.method, e.args)); + this.rpcDeferred.resolve(server); + } + + protected handleRequest(method: string, args: any[]): Promise { + return this.rpcDeferred.promise.then(() => this.target[method](...args)); + } + + protected onNotification(method: string, args: any[]): void { + this.target[method](...args); + } +} + diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts index efd5b9503bee3..55b985c782339 100644 --- a/packages/plugin-ext/src/common/rpc-protocol.ts +++ b/packages/plugin-ext/src/common/rpc-protocol.ts @@ -22,14 +22,17 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '@theia/core/lib/common/event'; -import { DisposableCollection, Disposable } from '@theia/core/lib/common/disposable'; -import { Deferred } from '@theia/core/lib/common/promise-util'; +import { Channel, Disposable, DisposableCollection, ReadBuffer, WriteBuffer } from '@theia/core'; +import { Event } from '@theia/core/lib/common/event'; +import { ChannelMultiplexer } from '@theia/core/lib/common/message-rpc/channel'; +import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ClientProxyHandler, RpcInvocationHandler, RpcMessageCodec } from './proxy-handler'; +import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager'; import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri'; import URI from '@theia/core/lib/common/uri'; -import { CancellationToken, CancellationTokenSource } from '@theia/core/shared/vscode-languageserver-protocol'; -import { Range, Position } from '../plugin/types-impl'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; +import { Range, Position } from '../plugin/types-impl'; export interface MessageConnection { send(msg: string): void; @@ -76,49 +79,32 @@ export namespace ConnectionClosedError { } export class RPCProtocolImpl implements RPCProtocol { - - private readonly locals = new Map(); + private readonly locals = new Map(); private readonly proxies = new Map(); - private lastMessageId = 0; - private readonly cancellationTokenSources = new Map(); - private readonly pendingRPCReplies = new Map>(); - private readonly multiplexer: RPCMultiplexer; - - private replacer: (key: string | undefined, value: any) => any; - private reviver: (key: string | undefined, value: any) => any; + private readonly multiplexer: ChannelMultiplexer; + private messageCodec: RpcMessageCodec; private readonly toDispose = new DisposableCollection( Disposable.create(() => { /* mark as no disposed */ }) ); - constructor(connection: MessageConnection, transformations?: { - replacer?: (key: string | undefined, value: any) => any, - reviver?: (key: string | undefined, value: any) => any - }) { - this.toDispose.push( - this.multiplexer = new RPCMultiplexer(connection) - ); - this.multiplexer.onMessage(msg => this.receiveOneMessage(msg)); - this.toDispose.push(Disposable.create(() => { - this.proxies.clear(); - for (const reply of this.pendingRPCReplies.values()) { - reply.reject(ConnectionClosedError.create()); - } - this.pendingRPCReplies.clear(); - })); - - this.reviver = transformations?.reviver || ObjectsTransferrer.reviver; - this.replacer = transformations?.replacer || ObjectsTransferrer.replacer; - } - - private get isDisposed(): boolean { - return this.toDispose.disposed; + constructor(channel: Channel) { + this.messageCodec = { + encoder: new MsgPackMessageEncoder(), + decoder: new MsgPackMessageDecoder(), + }; + this.toDispose.push(this.multiplexer = new QueuingChannelMultiplexer(channel)); + this.toDispose.push(Disposable.create(() => this.proxies.clear())); } dispose(): void { this.toDispose.dispose(); } + protected get isDisposed(): boolean { + return this.toDispose.disposed; + } + getProxy(proxyId: ProxyIdentifier): T { if (this.isDisposed) { throw ConnectionClosedError.create(); @@ -131,416 +117,154 @@ export class RPCProtocolImpl implements RPCProtocol { return proxy; } - set(identifier: ProxyIdentifier, instance: R): R { - if (this.isDisposed) { - throw ConnectionClosedError.create(); - } - this.locals.set(identifier.id, instance); - if (Disposable.is(instance)) { - this.toDispose.push(instance); - } - this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id))); - return instance; - } - - private createProxy(proxyId: string): T { - const handler = { - get: (target: any, name: string) => { - if (!target[name] && name.charCodeAt(0) === 36 /* CharCode.DollarSign */) { - target[name] = (...myArgs: any[]) => - this.remoteCall(proxyId, name, myArgs); - } - return target[name]; - } - }; + protected createProxy(proxyId: string): T { + const handler = new ClientProxyHandler(proxyId, this.messageCodec, () => this.multiplexer.open(proxyId)); return new Proxy(Object.create(null), handler); } - private remoteCall(proxyId: string, methodName: string, args: any[]): Promise { - if (this.isDisposed) { - return Promise.reject(ConnectionClosedError.create()); - } - const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; - if (cancellationToken && cancellationToken.isCancellationRequested) { - return Promise.reject(canceled()); - } - - const callId = String(++this.lastMessageId); - const result = new Deferred(); - - if (cancellationToken) { - args.push('add.cancellation.token'); - cancellationToken.onCancellationRequested(() => - this.multiplexer.send(this.cancel(callId)) - ); - } - - this.pendingRPCReplies.set(callId, result); - this.multiplexer.send(this.request(callId, proxyId, methodName, args)); - return result.promise; - } - - private receiveOneMessage(rawmsg: string): void { + set(identifier: ProxyIdentifier, instance: R): R { if (this.isDisposed) { - return; + throw ConnectionClosedError.create(); } - try { - const msg = JSON.parse(rawmsg, this.reviver); + const invocationHandler = this.locals.get(identifier.id); + if (!invocationHandler) { + const handler = new RpcInvocationHandler(identifier.id, instance, this.messageCodec); - switch (msg.type) { - case MessageType.Request: - this.receiveRequest(msg); - break; - case MessageType.Reply: - this.receiveReply(msg); - break; - case MessageType.ReplyErr: - this.receiveReplyErr(msg); - break; - case MessageType.Cancel: - this.receiveCancel(msg); - break; + const channel = this.multiplexer.getOpenChannel(identifier.id); + if (channel) { + handler.listen(channel); + } else { + const channelOpenListener = this.multiplexer.onDidOpenChannel(event => { + if (event.id === identifier.id) { + handler.listen(event.channel); + channelOpenListener.dispose(); + } + }); } - } catch (e) { - // exception does not show problematic content: log it! - console.log('failed to parse message: ' + rawmsg); - throw e; - } - - } - - private receiveCancel(msg: CancelMessage): void { - const cancellationTokenSource = this.cancellationTokenSources.get(msg.id); - if (cancellationTokenSource) { - cancellationTokenSource.cancel(); - } - } - - private receiveRequest(msg: RequestMessage): void { - const callId = msg.id; - const proxyId = msg.proxyId; - // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs - const args = msg.args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null - - const addToken = args.length && args[args.length - 1] === 'add.cancellation.token' ? args.pop() : false; - if (addToken) { - const tokenSource = new CancellationTokenSource(); - this.cancellationTokenSources.set(callId, tokenSource); - args.push(tokenSource.token); - } - const invocation = this.invokeHandler(proxyId, msg.method, args); - - invocation.then(result => { - this.cancellationTokenSources.delete(callId); - this.multiplexer.send(this.replyOK(callId, result)); - }, error => { - this.cancellationTokenSources.delete(callId); - this.multiplexer.send(this.replyErr(callId, error)); - }); - } - private receiveReply(msg: ReplyMessage): void { - const callId = msg.id; - const pendingReply = this.pendingRPCReplies.get(callId); - if (!pendingReply) { - return; - } - this.pendingRPCReplies.delete(callId); - pendingReply.resolve(msg.res); - } - - private receiveReplyErr(msg: ReplyErrMessage): void { - const callId = msg.id; - const pendingReply = this.pendingRPCReplies.get(callId); - if (!pendingReply) { - return; - } - this.pendingRPCReplies.delete(callId); - - let err: Error | undefined = undefined; - if (msg.err && msg.err.$isError) { - err = new Error(); - err.name = msg.err.name; - err.message = msg.err.message; - err.stack = msg.err.stack; - } - pendingReply.reject(err); - } - - private invokeHandler(proxyId: string, methodName: string, args: any[]): Promise { - try { - return Promise.resolve(this.doInvokeHandler(proxyId, methodName, args)); - } catch (err) { - return Promise.reject(err); - } - } - - private doInvokeHandler(proxyId: string, methodName: string, args: any[]): any { - const actor = this.locals.get(proxyId); - if (!actor) { - throw new Error('Unknown actor ' + proxyId); - } - const method = actor[methodName]; - if (typeof method !== 'function') { - throw new Error('Unknown method ' + methodName + ' on actor ' + proxyId); - } - return method.apply(actor, args); - } - - private cancel(req: string): string { - return `{"type":${MessageType.Cancel},"id":"${req}"}`; - } - - private request(req: string, rpcId: string, method: string, args: any[]): string { - return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args, this.replacer)}}`; - } - - private replyOK(req: string, res: any): string { - if (typeof res === 'undefined') { - return `{"type":${MessageType.Reply},"id":"${req}"}`; - } - return `{"type":${MessageType.Reply},"id":"${req}","res":${safeStringify(res, this.replacer)}}`; - } + this.locals.set(identifier.id, handler); + if (Disposable.is(instance)) { + this.toDispose.push(instance); + } + this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id))); - private replyErr(req: string, err: any): string { - err = typeof err === 'string' ? new Error(err) : err; - if (err instanceof Error) { - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${safeStringify(transformErrorForSerialization(err))}}`; } - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`; + return instance; } } -function canceled(): Error { - const error = new Error('Canceled'); - error.name = error.message; - return error; -} - /** * Sends/Receives multiple messages in one go: * - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`. * - each incoming message is handled in a separate `process.nextTick`. */ -class RPCMultiplexer implements Disposable, MessageConnection { - - private readonly connection: MessageConnection; - private readonly sendAccumulatedBound: () => void; - - private messagesToSend: string[]; - - private readonly messageEmitter = new Emitter(); - private readonly toDispose = new DisposableCollection(); - - constructor(connection: MessageConnection) { - this.connection = connection; - this.sendAccumulatedBound = this.sendAccumulated.bind(this); +export class QueuingChannelMultiplexer extends ChannelMultiplexer { + protected messagesToSend: Uint8Array[] = []; + constructor(underlyingChannel: Channel) { + super(underlyingChannel); this.toDispose.push(Disposable.create(() => this.messagesToSend = [])); - this.toDispose.push(this.connection.onMessage((msg: string) => { - const messages = JSON.parse(msg); - for (const message of messages) { - this.messageEmitter.fire(message); - } - })); - this.toDispose.push(this.messageEmitter); - - this.messagesToSend = []; } - dispose(): void { - this.toDispose.dispose(); + protected override getUnderlyingWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => this.commitSingleMessage(buffer)); + return writer; } - get onMessage(): Event { - return this.messageEmitter.event; - } - - private sendAccumulated(): void { - const tmp = this.messagesToSend; - this.messagesToSend = []; - this.connection.send(JSON.stringify(tmp)); - } - - public send(msg: string): void { + protected commitSingleMessage(msg: Uint8Array): void { if (this.toDispose.disposed) { throw ConnectionClosedError.create(); } if (this.messagesToSend.length === 0) { if (typeof setImmediate !== 'undefined') { - setImmediate(this.sendAccumulatedBound); + setImmediate(() => this.sendAccumulated()); } else { - setTimeout(this.sendAccumulatedBound, 0); + setTimeout(() => this.sendAccumulated(), 0); } } this.messagesToSend.push(msg); } -} -/** - * These functions are responsible for correct transferring objects via rpc channel. - * - * To reach that some specific kind of objects is converted to json in some custom way - * and then, after receiving, revived to objects again, - * so there is feeling that object was transferred via rpc channel. - * - * To distinguish between regular and altered objects, field $type is added to altered ones. - * Also value of that field specifies kind of the object. - */ -export namespace ObjectsTransferrer { + protected sendAccumulated(): void { + const cachedMessages = this.messagesToSend; + this.messagesToSend = []; + const writer = this.underlyingChannel.getWriteBuffer(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - export function replacer(key: string | undefined, value: any): any { - if (value instanceof URI) { - return { - $type: SerializedObjectType.THEIA_URI, - data: value.toString() - } as SerializedObject; - } else if (value instanceof Range) { - const range = value as Range; - const serializedValue = { - start: { - line: range.start.line, - character: range.start.character - }, - end: { - line: range.end.line, - character: range.end.character - } - }; - return { - $type: SerializedObjectType.THEIA_RANGE, - data: JSON.stringify(serializedValue) - } as SerializedObject; - } else if (value && value['$mid'] === 1) { - // Given value is VSCode URI - // We cannot use instanceof here because VSCode URI has toJSON method which is invoked before this replacer. - const uri = VSCodeURI.revive(value); - return { - $type: SerializedObjectType.VSCODE_URI, - data: uri.toString() - } as SerializedObject; - } else if (value instanceof BinaryBuffer) { - const bytes = [...value.buffer.values()]; - return { - $type: SerializedObjectType.TEXT_BUFFER, - data: JSON.stringify({ bytes }) - }; - } + if (cachedMessages.length > 0) { + writer.writeLength(cachedMessages.length); + cachedMessages.forEach(msg => { + writer.writeBytes(msg); + }); - return value; + } + writer.commit(); } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - export function reviver(key: string | undefined, value: any): any { - if (isSerializedObject(value)) { - switch (value.$type) { - case SerializedObjectType.THEIA_URI: - return new URI(value.data); - case SerializedObjectType.VSCODE_URI: - return VSCodeURI.parse(value.data); - case SerializedObjectType.THEIA_RANGE: - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const obj: any = JSON.parse(value.data); - const start = new Position(obj.start.line, obj.start.character); - const end = new Position(obj.end.line, obj.end.character); - return new Range(start, end); - case SerializedObjectType.TEXT_BUFFER: - const data: { bytes: number[] } = JSON.parse(value.data); - return BinaryBuffer.wrap(Uint8Array.from(data.bytes)); + protected override handleMessage(buffer: ReadBuffer): void { + // Read in the list of messages and handle each message individually + const length = buffer.readLength(); + if (length > 0) { + for (let index = 0; index < length; index++) { + const message = buffer.readBytes(); + this.handleSingleMessage(new Uint8ArrayReadBuffer(message)); + } } - - return value; } -} - -interface SerializedObject { - $type: SerializedObjectType; - data: string; -} - -enum SerializedObjectType { - THEIA_URI, - VSCODE_URI, - THEIA_RANGE, - TEXT_BUFFER -} - -function isSerializedObject(obj: unknown): obj is SerializedObject { - const serializedObject = obj as SerializedObject; - return !!obj && typeof obj === 'object' && serializedObject.$type !== undefined && serializedObject.data !== undefined; -} - -export const enum MessageType { - Request = 1, - Reply = 2, - ReplyErr = 3, - Cancel = 4, - Terminate = 5, - Terminated = 6 -} - -class CancelMessage { - type: MessageType.Cancel; - id: string; -} - -class RequestMessage { - type: MessageType.Request; - id: string; - proxyId: string; - method: string; - args: any[]; -} - -class ReplyMessage { - type: MessageType.Reply; - id: string; - res: any; -} - -class ReplyErrMessage { - type: MessageType.ReplyErr; - id: string; - err: SerializedError; -} - -type RPCMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage; - -export interface SerializedError { - readonly $isError: true; - readonly name: string; - readonly message: string; - readonly stack: string; -} - -export function transformErrorForSerialization(error: Error): SerializedError { - if (error instanceof Error) { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - return { - $isError: true, - name, - message, - stack - }; + protected handleSingleMessage(buffer: ReadBuffer): void { + return super.handleMessage(buffer); } - - // return as is - return error; } -interface JSONStringifyReplacer { - (key: string, value: any): any; -} - -function safeStringify(obj: any, replacer?: JSONStringifyReplacer): string { - try { - return JSON.stringify(obj, replacer); - } catch (err) { - console.error('error stringifying response: ', err); - return 'null'; - } +export function registerMsgPackExtensions(): void { + MsgPackExtensionManager.getInstance().registerExtensions( + { + class: URI, + tag: 2, + serialize: (instance: URI) => instance.toString(), + deserialize: data => new URI(data) + }, + { + class: Range, + tag: 3, + serialize: (range: Range) => ({ + start: { + line: range.start.line, + character: range.start.character + }, + end: { + line: range.end.line, + character: range.end.character + } + }), + deserialize: data => { + const start = new Position(data.start.line, data.start.character); + const end = new Position(data.end.line, data.end.character); + return new Range(start, end); + } + }, + { + class: VSCodeURI, + tag: 4, + // eslint-disable-next-line arrow-body-style + serialize: (instance: URI) => { + return instance.toString(); + }, + deserialize: data => VSCodeURI.parse(data) + }, + { + class: BinaryBuffer, + tag: 5, + // eslint-disable-next-line arrow-body-style + serialize: (instance: BinaryBuffer) => { + return instance.buffer; + }, + // eslint-disable-next-line arrow-body-style + deserialize: buffer => { + return BinaryBuffer.wrap(buffer); + } + } + ); } diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts index 3c80f8d33d410..6fb3a0c55932b 100644 --- a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts +++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts @@ -21,7 +21,8 @@ import { LogPart } from '../../common/types'; @injectable() export class HostedPluginWatcher { - private onPostMessage = new Emitter<{ pluginHostId: string, message: string }>(); + private onPostMessage = new Emitter<{ pluginHostId: string, message: Uint8Array }>(); + private onLogMessage = new Emitter(); private readonly onDidDeployEmitter = new Emitter(); @@ -31,7 +32,7 @@ export class HostedPluginWatcher { const messageEmitter = this.onPostMessage; const logEmitter = this.onLogMessage; return { - postMessage(pluginHostId, message: string): Promise { + postMessage(pluginHostId, message: Uint8Array): Promise { messageEmitter.fire({ pluginHostId, message }); return Promise.resolve(); }, @@ -43,7 +44,7 @@ export class HostedPluginWatcher { }; } - get onPostMessageEvent(): Event<{ pluginHostId: string, message: string }> { + get onPostMessageEvent(): Event<{ pluginHostId: string, message: Uint8Array }> { return this.onPostMessage.event; } diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts index e2c167f02dc68..b947b556c91c8 100644 --- a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts +++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts @@ -33,7 +33,7 @@ import { RPCProtocol, RPCProtocolImpl } from '../../common/rpc-protocol'; import { Disposable, DisposableCollection, Emitter, isCancelled, ILogger, ContributionProvider, CommandRegistry, WillExecuteCommandEvent, - CancellationTokenSource, JsonRpcProxy, ProgressService, nls + CancellationTokenSource, JsonRpcProxy, ProgressService, nls, ChannelCloseEvent, MessageProvider } from '@theia/core'; import { PreferenceServiceImpl, PreferenceProviderProvider } from '@theia/core/lib/browser/preferences'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -66,6 +66,7 @@ import { StandaloneServices } from '@theia/monaco-editor-core/esm/vs/editor/stan import { ILanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/languages/language'; import { LanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/services/languageService'; import { Measurement, Stopwatch } from '@theia/core/lib/common'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; export type PluginHost = 'frontend' | string; export type DebugActivationEvent = 'onDebugResolve' | 'onDebugInitialConfigurations' | 'onDebugAdapterProtocolTracker' | 'onDebugDynamicConfigurations'; @@ -534,18 +535,37 @@ export class HostedPluginSupport { } protected createServerRpc(pluginHostId: string): RPCProtocol { - const emitter = new Emitter(); + + const onCloseEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + + // Create RPC protocol before adding the listener to the watcher to receive the watcher's cached messages after the rpc protocol was created. + const rpc = new RPCProtocolImpl({ + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => { + this.server.onMessage(pluginHostId, buffer); + }); + return writer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event + }); + this.watcher.onPostMessageEvent(received => { if (pluginHostId === received.pluginHostId) { - emitter.fire(received.message); - } - }); - return new RPCProtocolImpl({ - onMessage: emitter.event, - send: message => { - this.server.onMessage(pluginHostId, message); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(received.message)); } }); + + return rpc; } protected async updateStoragePath(): Promise { diff --git a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts index 874ad820448e6..c4ddf674bb339 100644 --- a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts +++ b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts @@ -16,6 +16,8 @@ import { injectable } from '@theia/core/shared/inversify'; import { Emitter } from '@theia/core/lib/common/event'; import { RPCProtocol, RPCProtocolImpl } from '../../common/rpc-protocol'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc'; @injectable() export class PluginWorker { @@ -25,22 +27,41 @@ export class PluginWorker { public readonly rpc: RPCProtocol; constructor() { - const emitter = new Emitter(); - this.worker = new Worker(new URL('./worker/worker-main', // @ts-expect-error (TS1343) // We compile to CommonJS but `import.meta` is still available in the browser import.meta.url)); - this.worker.onmessage = m => emitter.fire(m.data); - this.worker.onerror = e => console.error(e); + const onCloseEmitter = new Emitter(); + const onMessageEmitter = new Emitter(); + const onErrorEmitter = new Emitter(); this.rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - this.worker.postMessage(m); - } + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => { + this.worker.postMessage(buffer); + }); + return writer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event + }); + + // eslint-disable-next-line arrow-body-style + this.worker.onmessage = buffer => onMessageEmitter.fire(() => { + return new Uint8ArrayReadBuffer(buffer.data); }); + + this.worker.onerror = e => onErrorEmitter.fire(e); + + this.worker.onerror = e => console.error(e); } } diff --git a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts index 4176d07b499c0..157d62620798b 100644 --- a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts +++ b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts @@ -13,9 +13,9 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** - +// eslint-disable-next-line import/no-extraneous-dependencies +import 'reflect-metadata'; import { Emitter } from '@theia/core/lib/common/event'; -import { RPCProtocolImpl } from '../../../common/rpc-protocol'; import { PluginManagerExtImpl } from '../../../plugin/plugin-manager'; import { MAIN_RPC_CONTEXT, Plugin, emptyPlugin, TerminalServiceExt } from '../../../common/plugin-api-rpc'; import { createAPIFactory } from '../../../plugin/plugin-context'; @@ -33,8 +33,10 @@ import { KeyValueStorageProxy } from '../../../plugin/plugin-storage'; import { WebviewsExtImpl } from '../../../plugin/webviews'; import { loadManifest } from './plugin-manifest-loader'; import { TerminalServiceExtImpl } from '../../../plugin/terminal-ext'; -import { reviver } from '../../../plugin/types-impl'; import { SecretsExtImpl } from '../../../plugin/secrets-ext'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { ChannelCloseEvent, MessageProvider } from '@theia/core'; +import { RPCProtocolImpl } from '../../../common/rpc-protocol'; // eslint-disable-next-line @typescript-eslint/no-explicit-any const ctx = self as any; @@ -42,20 +44,31 @@ const ctx = self as any; const pluginsApiImpl = new Map(); const pluginsModulesNames = new Map(); -const emitter = new Emitter(); -const rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - ctx.postMessage(m); - }, -}, -{ - reviver: reviver -}); +const onCloseEmitter = new Emitter(); +const onErrorEmitter = new Emitter(); +const onMessageEmitter = new Emitter(); // eslint-disable-next-line @typescript-eslint/no-explicit-any addEventListener('message', (message: any) => { - emitter.fire(message.data); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message.data)); +}); + +const rpc = new RPCProtocolImpl({ + close: () => { + onCloseEmitter.dispose(); + onErrorEmitter.dispose(); + onMessageEmitter.dispose(); + }, + getWriteBuffer: () => { + const writeBuffer = new Uint8ArrayWriteBuffer(); + writeBuffer.onCommit(buffer => { + ctx.postMessage(buffer); + }); + return writeBuffer; + }, + onClose: onCloseEmitter.event, + onError: onErrorEmitter.event, + onMessage: onMessageEmitter.event }); const scripts = new Set(); diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts index 2c0db80da4a72..85df499084cae 100644 --- a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts @@ -14,16 +14,18 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import * as cp from 'child_process'; -import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common'; +import { ConnectionErrorHandler, ContributionProvider, ILogger, MessageService } from '@theia/core/lib/common'; +import { Deferred } from '@theia/core/lib/common/promise-util'; import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol'; -import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin, PLUGIN_HOST_BACKEND, PluginIdentifiers } from '../../common/plugin-protocol'; -import { MessageType } from '../../common/rpc-protocol'; +import { inject, injectable, named } from '@theia/core/shared/inversify'; +import * as cp from 'child_process'; import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution'; -import * as psTree from 'ps-tree'; -import { Deferred } from '@theia/core/lib/common/promise-util'; import { HostedPluginLocalizationService } from './hosted-plugin-localization-service'; +import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol'; +import { BinaryMessagePipe } from '@theia/core/lib/node/messaging/binary-message-pipe'; +import { DeployedPlugin, HostedPluginClient, PluginHostEnvironmentVariable, PluginIdentifiers, PLUGIN_HOST_BACKEND, ServerPluginRunner } from '../../common/plugin-protocol'; +import psTree = require('ps-tree'); +import { Duplex } from 'stream'; export interface IPCConnectionOptions { readonly serverName: string; @@ -60,6 +62,7 @@ export class HostedPluginProcess implements ServerPluginRunner { protected readonly localizationService: HostedPluginLocalizationService; private childProcess: cp.ChildProcess | undefined; + private messagePipe?: BinaryMessagePipe; private client: HostedPluginClient; private terminatingPluginServer = false; @@ -82,14 +85,14 @@ export class HostedPluginProcess implements ServerPluginRunner { } // eslint-disable-next-line @typescript-eslint/no-explicit-any - public acceptMessage(pluginHostId: string, message: string): boolean { + public acceptMessage(pluginHostId: string, message: Uint8Array): boolean { return pluginHostId === 'main'; } // eslint-disable-next-line @typescript-eslint/no-explicit-any - public onMessage(pluginHostId: string, jsonMessage: string): void { - if (this.childProcess) { - this.childProcess.send(jsonMessage); + public onMessage(pluginHostId: string, message: Uint8Array): void { + if (this.messagePipe) { + this.messagePipe.send(message); } } @@ -106,12 +109,12 @@ export class HostedPluginProcess implements ServerPluginRunner { const waitForTerminated = new Deferred(); cp.on('message', message => { const msg = JSON.parse(message as string); - if ('type' in msg && msg.type === MessageType.Terminated) { + if (ProcessTerminatedMessage.is(msg)) { waitForTerminated.resolve(); } }); const stopTimeout = this.cli.pluginHostStopTimeout; - cp.send(JSON.stringify({ type: MessageType.Terminate, stopTimeout })); + cp.send(JSON.stringify({ type: ProcessTerminateMessage.TYPE, stopTimeout })); const terminateTimeout = this.cli.pluginHostTerminateTimeout; if (terminateTimeout) { @@ -156,9 +159,11 @@ export class HostedPluginProcess implements ServerPluginRunner { logger: this.logger, args: [] }); - this.childProcess.on('message', message => { + + this.messagePipe = new BinaryMessagePipe(this.childProcess.stdio[4] as Duplex); + this.messagePipe.onMessage(buffer => { if (this.client) { - this.client.postMessage(PLUGIN_HOST_BACKEND, message as string); + this.client.postMessage(PLUGIN_HOST_BACKEND, buffer); } }); } @@ -184,7 +189,11 @@ export class HostedPluginProcess implements ServerPluginRunner { silent: true, env: env, execArgv: [], - stdio: ['pipe', 'pipe', 'pipe', 'ipc'] + // 5th element MUST be 'overlapped' for it to work properly on Windows. + // 'overlapped' works just like 'pipe' on non-Windows platforms. + // See: https://nodejs.org/docs/latest-v14.x/api/child_process.html#child_process_options_stdio + // Note: For some reason `@types/node` does not know about 'overlapped'. + stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'overlapped' as 'pipe'] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts new file mode 100644 index 0000000000000..429c2168d92ac --- /dev/null +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts @@ -0,0 +1,49 @@ +// ***************************************************************************** +// Copyright (C) 2022 STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +// Custom message protocol between `HostedPluginProcess` and its `PluginHost` child process. + +/** + * Sent to initiate termination of the counterpart process. + */ +export interface ProcessTerminateMessage { + type: typeof ProcessTerminateMessage.TYPE, + stopTimeout?: number +} + +export namespace ProcessTerminateMessage { + export const TYPE = 0; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + export function is(object: any): object is ProcessTerminateMessage { + return typeof object === 'object' && object.type === TYPE; + } +} + +/** + * Sent to inform the counter part process that the process termination has been completed. + */ +export interface ProcessTerminatedMessage { + type: typeof ProcessTerminateMessage.TYPE, +} + +export namespace ProcessTerminatedMessage { + export const TYPE = 1; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + export function is(object: any): object is ProcessTerminateMessage { + return typeof object === 'object' && object.type === TYPE; + } +} + diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts index 119822628bd1e..81d03a156ea20 100644 --- a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts +++ b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts @@ -71,7 +71,7 @@ export class HostedPluginSupport { } } - onMessage(pluginHostId: string, message: string): void { + onMessage(pluginHostId: string, message: Uint8Array): void { // need to perform routing // eslint-disable-next-line @typescript-eslint/no-explicit-any if (this.pluginRunners.length > 0) { diff --git a/packages/plugin-ext/src/hosted/node/plugin-host.ts b/packages/plugin-ext/src/hosted/node/plugin-host.ts index fa54f21beb537..24b67a6304b83 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-host.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-host.ts @@ -13,11 +13,12 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** - -import { Emitter } from '@theia/core/lib/common/event'; -import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol'; +// eslint-disable-next-line import/no-extraneous-dependencies +import 'reflect-metadata'; +import { ConnectionClosedError, RPCProtocolImpl } from '../../common/rpc-protocol'; +import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol'; import { PluginHostRPC } from './plugin-host-rpc'; -import { reviver } from '../../plugin/types-impl'; +import { IPCChannel } from '@theia/core/lib/node'; console.log('PLUGIN_HOST(' + process.pid + ') starting instance'); @@ -74,18 +75,8 @@ process.on('rejectionHandled', (promise: Promise) => { }); let terminating = false; -const emitter = new Emitter(); -const rpc = new RPCProtocolImpl({ - onMessage: emitter.event, - send: (m: string) => { - if (process.send && !terminating) { - process.send(m); - } - } -}, -{ - reviver: reviver -}); +const channel = new IPCChannel(); +const rpc = new RPCProtocolImpl(channel); process.on('message', async (message: string) => { if (terminating) { @@ -93,10 +84,9 @@ process.on('message', async (message: string) => { } try { const msg = JSON.parse(message); - if ('type' in msg && msg.type === MessageType.Terminate) { + if (ProcessTerminateMessage.is(msg)) { terminating = true; - emitter.dispose(); - if ('stopTimeout' in msg && typeof msg.stopTimeout === 'number' && msg.stopTimeout) { + if (msg.stopTimeout) { await Promise.race([ pluginHostRPC.terminate(), new Promise(resolve => setTimeout(resolve, msg.stopTimeout)) @@ -106,10 +96,9 @@ process.on('message', async (message: string) => { } rpc.dispose(); if (process.send) { - process.send(JSON.stringify({ type: MessageType.Terminated })); + process.send(JSON.stringify({ type: ProcessTerminatedMessage.TYPE })); } - } else { - emitter.fire(message); + } } catch (e) { console.error(e); diff --git a/packages/plugin-ext/src/hosted/node/plugin-service.ts b/packages/plugin-ext/src/hosted/node/plugin-service.ts index fcd24918ed2f6..4789ffb38f0ac 100644 --- a/packages/plugin-ext/src/hosted/node/plugin-service.ts +++ b/packages/plugin-ext/src/hosted/node/plugin-service.ts @@ -164,7 +164,7 @@ export class HostedPluginServerImpl implements HostedPluginServer { return Promise.all(plugins.map(plugin => this.localizationService.localizePlugin(plugin))); } - onMessage(pluginHostId: string, message: string): Promise { + onMessage(pluginHostId: string, message: Uint8Array): Promise { this.hostedPlugin.onMessage(pluginHostId, message); return Promise.resolve(); } diff --git a/packages/plugin-ext/src/main/browser/main-context.ts b/packages/plugin-ext/src/main/browser/main-context.ts index 74288f2d67b36..eee7fd8cd7335 100644 --- a/packages/plugin-ext/src/main/browser/main-context.ts +++ b/packages/plugin-ext/src/main/browser/main-context.ts @@ -56,7 +56,6 @@ import { CustomEditorsMainImpl } from './custom-editors/custom-editors-main'; import { SecretsMainImpl } from './secrets-main'; import { WebviewViewsMainImpl } from './webview-views/webview-views-main'; import { MonacoLanguages } from '@theia/monaco/lib/browser/monaco-languages'; -import { NotificationExtImpl } from '../../plugin/notification'; import { UntitledResourceResolver } from '@theia/core/lib/common/resource'; import { ThemeService } from '@theia/core/lib/browser/theming'; @@ -110,9 +109,6 @@ export function setUpPluginApi(rpc: RPCProtocol, container: interfaces.Container const notificationMain = new NotificationMainImpl(rpc, container); rpc.set(PLUGIN_RPC_CONTEXT.NOTIFICATION_MAIN, notificationMain); - const notificationExt = new NotificationExtImpl(rpc); - rpc.set(MAIN_RPC_CONTEXT.NOTIFICATION_EXT, notificationExt); - const terminalMain = new TerminalServiceMainImpl(rpc, container); rpc.set(PLUGIN_RPC_CONTEXT.TERMINAL_MAIN, terminalMain); diff --git a/packages/plugin-ext/src/plugin/types-impl.ts b/packages/plugin-ext/src/plugin/types-impl.ts index cf9eea96158cb..5d4abbdee56d1 100644 --- a/packages/plugin-ext/src/plugin/types-impl.ts +++ b/packages/plugin-ext/src/plugin/types-impl.ts @@ -30,22 +30,8 @@ import { startsWithIgnoreCase } from '@theia/core/lib/common/strings'; import { SymbolKind } from '../common/plugin-api-rpc-model'; import { FileSystemProviderErrorCode, markAsFileSystemProviderError } from '@theia/filesystem/lib/common/files'; import * as paths from 'path'; -import { ObjectsTransferrer } from '../common/rpc-protocol'; import { es5ClassCompat } from '../common/types'; -/** - * A reviver that takes URI's transferred via JSON.stringify() and makes - * instances of our local plugin API URI class (below) - */ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -export function reviver(key: string | undefined, value: any): any { - const revived = ObjectsTransferrer.reviver(key, value); - if (CodeURI.isUri(revived)) { - return URI.revive(revived); - } - return revived; -} - /** * This is an implementation of #theia.Uri based on vscode-uri. * This is supposed to fix https://github.com/eclipse-theia/theia/issues/8752