diff --git a/CHANGELOG.md b/CHANGELOG.md index 34350df50ffd0..984069a2812d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,17 +16,6 @@ - [callhierarchy] `paths.ts` and `glob.ts` moved to `core/src/common`; `language-selector.ts` moved to `editor/src/common`. Any imports will need to be updated. - [electron] removed redundant config option `disallowReloadKeybinding` from `dev-packages/application-package/src/application-props.ts` file and corresponding test [#11099](https://github.com/eclipse-theia/theia/pull/11099) -[Breaking Changes:](#breaking_changes_1.26.0) - -- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling. - This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead. - - Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService` - - `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead. - - `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data - - `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`. - [#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics. - - ## v1.25.0 - 4/28/2022 [1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35) diff --git a/package.json b/package.json index 0be00470d7f25..d89e88b39ef76 100644 --- a/package.json +++ b/package.json @@ -10,8 +10,6 @@ "**/@types/node": "12" }, "devDependencies": { - "@types/chai": "4.3.0", - "@types/chai-spies": "1.0.3", "@types/chai-string": "^1.4.0", "@types/jsdom": "^11.0.4", "@types/node": "12", @@ -22,8 +20,6 @@ "@typescript-eslint/eslint-plugin": "^4.8.1", "@typescript-eslint/eslint-plugin-tslint": "^4.8.1", "@typescript-eslint/parser": "^4.8.1", - "chai": "4.3.4", - "chai-spies": "1.0.0", "chai-string": "^1.4.0", "chalk": "4.0.0", "concurrently": "^3.5.0", diff --git a/packages/core/README.md b/packages/core/README.md index 9ddef5f7b5b30..64537785cf2f5 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -96,6 +96,7 @@ export class SomeClass { - `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized)) - `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol)) - `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri)) + - `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc)) - `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify)) - `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express)) - `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce)) diff --git a/packages/core/package.json b/packages/core/package.json index 877fcf3a10812..733a8b75400f9 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -67,6 +67,7 @@ "uuid": "^8.3.2", "vscode-languageserver-protocol": "~3.15.3", "vscode-uri": "^2.1.1", + "vscode-ws-jsonrpc": "^0.2.0", "ws": "^7.1.2", "yargs": "^15.3.1" }, @@ -106,7 +107,8 @@ "react-dom", "react-virtualized", "vscode-languageserver-protocol", - "vscode-uri" + "vscode-uri", + "vscode-ws-jsonrpc" ], "export =": [ "dompurify as DOMPurify", diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts new file mode 100644 index 0000000000000..b11ff897103ed --- /dev/null +++ b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts @@ -0,0 +1 @@ +export * from 'vscode-ws-jsonrpc'; diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.js b/packages/core/shared/vscode-ws-jsonrpc/index.js new file mode 100644 index 0000000000000..3aed560b82d62 --- /dev/null +++ b/packages/core/shared/vscode-ws-jsonrpc/index.js @@ -0,0 +1 @@ +module.exports = require('vscode-ws-jsonrpc'); diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index 905515a70392c..f83aabda22826 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -15,11 +15,11 @@ // ***************************************************************************** import { injectable, interfaces, decorate, unmanaged } from 'inversify'; -import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common'; +import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { Endpoint } from '../endpoint'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { io, Socket } from 'socket.io-client'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; decorate(injectable(), JsonRpcProxyFactory); decorate(unmanaged(), JsonRpcProxyFactory, 0); @@ -35,8 +35,6 @@ export interface WebSocketOptions { export class WebSocketConnectionProvider extends AbstractConnectionProvider { protected readonly onSocketDidOpenEmitter: Emitter = new Emitter(); - // Socket that is used by the main channel - protected socket: Socket; get onSocketDidOpen(): Event { return this.onSocketDidOpenEmitter.event; } @@ -50,39 +48,31 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider(path, arg); } - protected createMainChannel(): Channel { + protected readonly socket: Socket; + + constructor() { + super(); const url = this.createWebSocketUrl(WebSocketChannel.wsPath); const socket = this.createWebSocket(url); - const channel = new WebSocketChannel(this.toIWebSocket(socket)); socket.on('connect', () => { this.fireSocketDidOpen(); }); - channel.onClose(() => this.fireSocketDidClose()); + socket.on('disconnect', reason => { + for (const channel of [...this.channels.values()]) { + channel.close(undefined, reason); + } + this.fireSocketDidClose(); + }); + socket.on('message', data => { + this.handleIncomingRawMessage(data); + }); socket.connect(); this.socket = socket; - - return channel; - } - - protected toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - socket.close(); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { if (this.socket.connected) { - return super.openChannel(path, handler, options); + super.openChannel(path, handler, options); } else { const openChannel = () => { this.socket.off('connect', openChannel); @@ -92,6 +82,14 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { + if (this.socket.connected) { + this.socket.send(content); + } + }); + } + /** * @param path The handler to reach in the backend. */ @@ -145,4 +143,3 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider throw new Error('abstract'); } + protected channelIdSeq = 0; + protected readonly channels = new Map(); + protected readonly onIncomingMessageActivityEmitter: Emitter = new Emitter(); get onIncomingMessageActivity(): Event { return this.onIncomingMessageActivityEmitter.event; @@ -71,39 +75,50 @@ export abstract class AbstractConnectionProvider return factory.createProxy(); } - protected channelMultiPlexer: ChannelMultiplexer; - - constructor() { - this.channelMultiPlexer = this.createMultiplexer(); - } - - protected createMultiplexer(): ChannelMultiplexer { - return new ChannelMultiplexer(this.createMainChannel()); - } - /** * Install a connection handler for the given path. */ listen(handler: ConnectionHandler, options?: AbstractOptions): void { this.openChannel(handler.path, channel => { - handler.onConnection(channel); + const connection = createWebSocketConnection(channel, this.createLogger()); + connection.onDispose(() => channel.close()); + handler.onConnection(connection); }, options); } - async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { - const newChannel = await this.channelMultiPlexer.open(path); - newChannel.onClose(() => { - const { reconnecting } = { reconnecting: true, ...options }; - if (reconnecting) { - this.openChannel(path, handler, options); + openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: AbstractOptions): void { + const id = this.channelIdSeq++; + const channel = this.createChannel(id); + this.channels.set(id, channel); + channel.onClose(() => { + if (this.channels.delete(channel.id)) { + const { reconnecting } = { reconnecting: true, ...options }; + if (reconnecting) { + this.openChannel(path, handler, options); + } + } else { + console.error('The ws channel does not exist', channel.id); } }); - handler(newChannel); + channel.onOpen(() => handler(channel)); + channel.open(path); } - /** - * Create the main connection that is used for multiplexing all channels. - */ - protected abstract createMainChannel(): Channel; + protected abstract createChannel(id: number): WebSocketChannel; + + protected handleIncomingRawMessage(data: string): void { + const message: WebSocketChannel.Message = JSON.parse(data); + const channel = this.channels.get(message.id); + if (channel) { + channel.handleMessage(message); + } else { + console.error('The ws channel does not exist', message.id); + } + this.onIncomingMessageActivityEmitter.fire(undefined); + } + + protected createLogger(): Logger { + return new ConsoleLogger(); + } } diff --git a/packages/core/src/common/messaging/connection-error-handler.ts b/packages/core/src/common/messaging/connection-error-handler.ts index 89a27b60a50db..aecfe68901e24 100644 --- a/packages/core/src/common/messaging/connection-error-handler.ts +++ b/packages/core/src/common/messaging/connection-error-handler.ts @@ -14,6 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { Message } from 'vscode-ws-jsonrpc'; import { ILogger } from '../../common'; export interface ResolvedConnectionErrorHandlerOptions { @@ -50,7 +51,7 @@ export class ConnectionErrorHandler { }; } - shouldStop(error: Error, count?: number): boolean { + shouldStop(error: Error, message?: Message, count?: number): boolean { return !count || count > this.options.maxErrors; } diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts index 1e790d38aeec3..ed03d9d331206 100644 --- a/packages/core/src/common/messaging/handler.ts +++ b/packages/core/src/common/messaging/handler.ts @@ -14,11 +14,11 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; export const ConnectionHandler = Symbol('ConnectionHandler'); export interface ConnectionHandler { readonly path: string; - onConnection(connection: Channel): void; + onConnection(connection: MessageConnection): void; } diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts index 37280e4dbfdaa..2fd0700a41034 100644 --- a/packages/core/src/common/messaging/proxy-factory.spec.ts +++ b/packages/core/src/common/messaging/proxy-factory.spec.ts @@ -15,11 +15,21 @@ // ***************************************************************************** import * as chai from 'chai'; +import { ConsoleLogger } from '../../node/messaging/logger'; import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory'; -import { ChannelPipe } from '../message-rpc/channel.spec'; +import { createMessageConnection } from 'vscode-jsonrpc/lib/main'; +import * as stream from 'stream'; const expect = chai.expect; +class NoTransform extends stream.Transform { + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + override _transform(chunk: any, encoding: string, callback: Function): void { + callback(undefined, chunk); + } +} + class TestServer { requests: string[] = []; doStuff(arg: string): Promise { @@ -92,12 +102,15 @@ function getSetup(): { const server = new TestServer(); const serverProxyFactory = new JsonRpcProxyFactory(client); - const pipe = new ChannelPipe(); - serverProxyFactory.listen(pipe.right); + const client2server = new NoTransform(); + const server2client = new NoTransform(); + const serverConnection = createMessageConnection(server2client, client2server, new ConsoleLogger()); + serverProxyFactory.listen(serverConnection); const serverProxy = serverProxyFactory.createProxy(); const clientProxyFactory = new JsonRpcProxyFactory(server); - clientProxyFactory.listen(pipe.left); + const clientConnection = createMessageConnection(client2server, server2client, new ConsoleLogger()); + clientProxyFactory.listen(clientConnection); const clientProxy = clientProxyFactory.createProxy(); return { client, diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index 765b70dd1a472..f8869449eae94 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -16,12 +16,10 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { ResponseError } from '../message-rpc/rpc-message-encoder'; +import { MessageConnection, ResponseError } from 'vscode-ws-jsonrpc'; import { ApplicationError } from '../application-error'; +import { Event, Emitter } from '../event'; import { Disposable } from '../disposable'; -import { Emitter, Event } from '../event'; -import { Channel } from '../message-rpc/channel'; -import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; export type JsonRpcServer = Disposable & { @@ -47,19 +45,13 @@ export class JsonRpcConnectionHandler implements ConnectionHan readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory ) { } - onConnection(connection: Channel): void { + onConnection(connection: MessageConnection): void { const factory = new this.factoryConstructor(); const proxy = factory.createProxy(); factory.target = this.targetFactory(proxy); factory.listen(connection); } } -/** - * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. - */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; - -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -103,14 +95,13 @@ const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandl * * @param - The type of the object to expose to JSON-RPC. */ - export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: RpcProtocol) => void; - protected connectionPromise: Promise; + protected connectionPromiseResolve: (connection: MessageConnection) => void; + protected connectionPromise: Promise; /** * Build a new JsonRpcProxyFactory. @@ -118,7 +109,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. */ - constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { + constructor(public target?: any) { this.waitForConnection(); } @@ -127,7 +118,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { this.connectionPromiseResolve = resolve ); this.connectionPromise.then(connection => { - connection.channel.onClose(() => + connection.onClose(() => this.onDidCloseConnectionEmitter.fire(undefined) ); this.onDidOpenConnectionEmitter.fire(undefined); @@ -140,10 +131,11 @@ export class JsonRpcProxyFactory implements ProxyHandler { * This connection will be used to send/receive JSON-RPC requests and * response. */ - listen(channel: Channel): void { - const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); - connection.onNotification(event => this.onNotification(event.method, ...event.args)); - + listen(connection: MessageConnection): void { + connection.onRequest((prop, ...args) => this.onRequest(prop, ...args)); + connection.onNotification((prop, ...args) => this.onNotification(prop, ...args)); + connection.onDispose(() => this.waitForConnection()); + connection.listen(); this.connectionPromiseResolve(connection); } @@ -247,10 +239,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { new Promise((resolve, reject) => { try { if (isNotify) { - connection.sendNotification(method, args); + connection.sendNotification(method, ...args); resolve(undefined); } else { - const resultPromise = connection.sendRequest(method, args) as Promise; + const resultPromise = connection.sendRequest(method, ...args) as Promise; resultPromise .catch((err: any) => reject(this.deserializeError(capturedError, err))) .then((result: any) => resolve(result)); @@ -301,4 +293,3 @@ export class JsonRpcProxyFactory implements ProxyHandler { } } - diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 74f7503e0b997..28dff9400068a 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,100 +16,157 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '../event'; -import { WriteBuffer } from '../message-rpc'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; - -/** - * A channel that manages the main websocket connection between frontend and backend. All service channels - * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation - * independent of the actual websocket implementation and its execution context (backend vs. frontend). - */ -export class WebSocketChannel implements Channel { +import { IWebSocket } from 'vscode-ws-jsonrpc/lib/socket/socket'; +import { Disposable, DisposableCollection } from '../disposable'; +import { Emitter } from '../event'; + +export class WebSocketChannel implements IWebSocket { + static wsPath = '/services'; - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; + protected readonly closeEmitter = new Emitter<[number, string]>(); + protected readonly toDispose = new DisposableCollection(this.closeEmitter); + + constructor( + readonly id: number, + protected readonly doSend: (content: string) => void + ) { } + + dispose(): void { + this.toDispose.dispose(); } - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; + protected checkNotDisposed(): void { + if (this.toDispose.disposed) { + throw new Error('The channel has been disposed.'); + } } - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; + handleMessage(message: WebSocketChannel.Message): void { + if (message.kind === 'ready') { + this.fireOpen(); + } else if (message.kind === 'data') { + this.fireMessage(message.content); + } else if (message.kind === 'close') { + this.fireClose(message.code, message.reason); + } } - constructor(protected readonly socket: IWebSocket) { - socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); - socket.onError(error => this.onErrorEmitter.fire(error)); - // eslint-disable-next-line arrow-body-style - socket.onMessage(data => this.onMessageEmitter.fire(() => { - // In the browser context socketIO receives binary messages as ArrayBuffers. - // So we have to convert them to a Uint8Array before delegating the message to the read buffer. - const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; - return new Uint8ArrayReadBuffer(buffer); + open(path: string): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'open', + id: this.id, + path })); } - getWriteBuffer(): WriteBuffer { - const result = new Uint8ArrayWriteBuffer(); + ready(): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'ready', + id: this.id + })); + } - result.onCommit(buffer => { - if (this.socket.isConnected()) { - this.socket.send(buffer); - } else { - console.warn('Could not send message. Websocket is not connected'); - } - }); + send(content: string): void { + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'data', + id: this.id, + content + })); + } - return result; + close(code: number = 1000, reason: string = ''): void { + if (this.closing) { + // Do not try to close the channel if it is already closing. + return; + } + this.checkNotDisposed(); + this.doSend(JSON.stringify({ + kind: 'close', + id: this.id, + code, + reason + })); + this.fireClose(code, reason); } - close(): void { - this.socket.close(); - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); + tryClose(code: number = 1000, reason: string = ''): void { + if (this.closing || this.toDispose.disposed) { + // Do not try to close the channel if it is already closing or disposed. + return; + } + this.doSend(JSON.stringify({ + kind: 'close', + id: this.id, + code, + reason + })); + this.fireClose(code, reason); } -} -/** - * An abstraction that enables reuse of the `{@link WebSocketChannel} class in the frontend and backend - * independent of the actual underlying socket implementation. - */ -export interface IWebSocket { - /** - * Sends the given message over the web socket in binary format. - * @param message The binary message. - */ - send(message: Uint8Array): void; - /** - * Closes the websocket from the local side. - */ - close(): void; - /** - * The connection state of the web socket. - */ - isConnected(): boolean; - /** - * Listener callback to handle incoming messages. - * @param cb The callback. - */ - onMessage(cb: (message: Uint8Array) => void): void; - /** - * Listener callback to handle socket errors. - * @param cb The callback. - */ - onError(cb: (reason: any) => void): void; - /** - * Listener callback to handle close events (Remote side). - * @param cb The callback. - */ - onClose(cb: (reason: string, code?: number) => void): void; -} + protected fireOpen: () => void = () => { }; + onOpen(cb: () => void): void { + this.checkNotDisposed(); + this.fireOpen = cb; + this.toDispose.push(Disposable.create(() => this.fireOpen = () => { })); + } + protected fireMessage: (data: any) => void = () => { }; + onMessage(cb: (data: any) => void): void { + this.checkNotDisposed(); + this.fireMessage = cb; + this.toDispose.push(Disposable.create(() => this.fireMessage = () => { })); + } + + fireError: (reason: any) => void = () => { }; + onError(cb: (reason: any) => void): void { + this.checkNotDisposed(); + this.fireError = cb; + this.toDispose.push(Disposable.create(() => this.fireError = () => { })); + } + + protected closing = false; + protected fireClose(code: number, reason: string): void { + if (this.closing) { + return; + } + this.closing = true; + try { + this.closeEmitter.fire([code, reason]); + } finally { + this.closing = false; + } + this.dispose(); + } + onClose(cb: (code: number, reason: string) => void): Disposable { + this.checkNotDisposed(); + return this.closeEmitter.event(([code, reason]) => cb(code, reason)); + } + +} +export namespace WebSocketChannel { + export interface OpenMessage { + kind: 'open' + id: number + path: string + } + export interface ReadyMessage { + kind: 'ready' + id: number + } + export interface DataMessage { + kind: 'data' + id: number + content: string + } + export interface CloseMessage { + kind: 'close' + id: number + code: number + reason: string + } + export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage; +} diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 6aa06861d2e93..b3d8ce8ab9415 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -17,11 +17,9 @@ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; -import { Emitter, Event } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -36,25 +34,17 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(path, arg); } - protected createMainChannel(): Channel { - const onMessageEmitter = new Emitter(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + constructor() { + super(); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: ElectronEvent, data: string) => { + this.handleIncomingRawMessage(data); + }); + } + + protected createChannel(id: number): WebSocketChannel { + return new WebSocketChannel(id, content => { + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); }); - return { - close: () => Event.None, - getWriteBuffer: () => { - const writer = new Uint8ArrayWriteBuffer(); - writer.onCommit(buffer => - // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array. - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) - ); - return writer; - }, - onClose: Event.None, - onError: Event.None, - onMessage: onMessageEmitter.event - }; } } diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index ac99ee85bbab8..6f75ea31d0dae 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -15,9 +15,9 @@ // ***************************************************************************** import { injectable } from 'inversify'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider'; import { FrontendApplicationContribution } from '../../browser/frontend-application'; -import { Channel } from '../../common'; /** * Customized connection provider between the frontend and the backend in electron environment. @@ -34,13 +34,16 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv onStop(): void { this.stopping = true; - // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. + // Close the websocket connection `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 - // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 }); + for (const channel of [...this.channels.values()]) { + // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. + // But we cannot use `1001`: https://github.com/TypeFox/vscode-ws-jsonrpc/issues/15 + channel.close(1000, 'The frontend is "going away"...'); + } } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void { if (!this.stopping) { super.openChannel(path, handler, options); } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index dfd8f6115c8ce..071796c5cf0ca 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -16,23 +16,24 @@ import { IpcMainEvent, ipcMain, WebContents } from '@theia/electron/shared/electron'; import { inject, injectable, named, postConstruct } from 'inversify'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; import { ContributionProvider } from '../../common/contribution-provider'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { MessagingContribution } from '../../node/messaging/messaging-contribution'; +import { ConsoleLogger } from '../../node/messaging/logger'; import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; -import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; -import { Emitter, Event, WriteBuffer } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** * This component replicates the role filled by `MessagingContribution` but for Electron. * Unlike the WebSocket based implementation, we do not expect to receive * connection events. Instead, we'll create channels based on incoming `open` * events on the `ipcMain` channel. + * * This component allows communication between renderer process (frontend) and electron main process. */ - @injectable() export class ElectronMessagingContribution implements ElectronMainApplicationContribution, ElectronMessagingService { @@ -42,112 +43,89 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon @inject(ContributionProvider) @named(ElectronConnectionHandler) protected readonly connectionHandlers: ContributionProvider; - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); - /** - * Each electron window has a main chanel and its own multiplexer to route multiple client messages the same IPC connection. - */ - protected readonly windowChannelMultiplexer = new Map(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly windowChannels = new Map>(); @postConstruct() protected init(): void { - ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => { - this.handleIpcEvent(event, data); + ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: string) => { + this.handleIpcMessage(event, data); }); } - protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { - const sender = event.sender; - // Get the multiplexer for a given window id - try { - const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); - windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); - } catch (error) { - console.error('IPC: Failed to handle message', { error, data }); - } - } - - // Creates a new multiplexer for a given sender/window - protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } { - const mainChannel = this.createWindowMainChannel(sender); - const multiPlexer = new ChannelMultiplexer(mainChannel); - multiPlexer.onDidOpenChannel(openEvent => { - const { channel, id } = openEvent; - if (this.channelHandlers.route(id, channel)) { - console.debug(`Opening channel for service path '${id}'.`); - channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); - } - }); - - sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. - sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. - const data = { channel: mainChannel, multiPlexer }; - this.windowChannelMultiplexer.set(sender.id, data); - return data; - } - - /** - * Creates the main channel to a window. - * @param sender The window that the channel should be established to. - */ - protected createWindowMainChannel(sender: WebContents): ElectronWebContentChannel { - return new ElectronWebContentChannel(sender); - } - onStart(): void { for (const contribution of this.messagingContributions.getContributions()) { contribution.configure(this); } for (const connectionHandler of this.connectionHandlers.getContributions()) { this.channelHandlers.push(connectionHandler.path, (params, channel) => { - connectionHandler.onConnection(channel); + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + connectionHandler.onConnection(connection); }); } } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ipcChannel(spec: string, callback: (params: any, channel: Channel) => void): void { - this.channelHandlers.push(spec, callback); - } -} - -/** - * Used to establish a connection between the ipcMain and the Electron frontend (window). - * Messages a transferred via electron IPC. - */ -export class ElectronWebContentChannel implements Channel { - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - // Make the message emitter public so that we can easily forward messages received from the ipcMain. - readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; + listen(spec: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void { + this.ipcChannel(spec, (params, channel) => { + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + callback(params, connection); + }); } - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ipcChannel(spec: string, callback: (params: any, channel: WebSocketChannel) => void): void { + this.channelHandlers.push(spec, callback); } - constructor(protected readonly sender: Electron.WebContents) { + protected handleIpcMessage(event: IpcMainEvent, data: string): void { + const sender = event.sender; + try { + // Get the channel map for a given window id + let channels = this.windowChannels.get(sender.id)!; + if (!channels) { + this.windowChannels.set(sender.id, channels = new Map()); + } + // Start parsing the message to extract the channel id and route + const message: WebSocketChannel.Message = JSON.parse(data.toString()); + // Someone wants to open a logical channel + if (message.kind === 'open') { + const { id, path } = message; + const channel = this.createChannel(id, sender); + if (this.channelHandlers.route(path, channel)) { + channel.ready(); + channels.set(id, channel); + channel.onClose(() => channels.delete(id)); + } else { + console.error('Cannot find a service for the path: ' + path); + } + } else { + const { id } = message; + const channel = channels.get(id); + if (channel) { + channel.handleMessage(message); + } else { + console.error('The ipc channel does not exist', id); + } + } + const close = () => { + for (const channel of Array.from(channels.values())) { + channel.close(undefined, 'webContent destroyed'); + } + channels.clear(); + }; + sender.once('did-navigate', close); // When refreshing the browser window. + sender.once('destroyed', close); // When closing the browser window. + } catch (error) { + console.error('IPC: Failed to handle message', { error, data }); + } } - getWriteBuffer(): WriteBuffer { - const writer = new Uint8ArrayWriteBuffer(); - - writer.onCommit(buffer => { - if (!this.sender.isDestroyed()) { - this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); + protected createChannel(id: number, sender: WebContents): WebSocketChannel { + return new WebSocketChannel(id, content => { + if (!sender.isDestroyed()) { + sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content); } }); - - return writer; - } - close(): void { - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); } + } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts index 874d51237b4fd..dde3fdde1d181 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts @@ -14,14 +14,20 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import type { MessageConnection } from 'vscode-jsonrpc'; +import type { WebSocketChannel } from '../../common/messaging/web-socket-channel'; export interface ElectronMessagingService { + /** + * Accept a JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + listen(path: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void; /** * Accept an ipc channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: Channel) => void): void; + ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: WebSocketChannel) => void): void; } export namespace ElectronMessagingService { export interface PathParams { diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts index d46d63efa9071..0bac13bb163b8 100644 --- a/packages/core/src/node/messaging/ipc-bootstrap.ts +++ b/packages/core/src/node/messaging/ipc-bootstrap.ts @@ -16,12 +16,20 @@ import 'reflect-metadata'; import { dynamicRequire } from '../dynamic-require'; -import { IPCChannel } from './ipc-channel'; +import { ConsoleLogger } from 'vscode-ws-jsonrpc/lib/logger'; +import { createMessageConnection, IPCMessageReader, IPCMessageWriter, Trace } from 'vscode-ws-jsonrpc'; import { checkParentAlive, IPCEntryPoint } from './ipc-protocol'; checkParentAlive(); const entryPoint = IPCEntryPoint.getScriptFromEnv(); +const reader = new IPCMessageReader(process); +const writer = new IPCMessageWriter(process); +const logger = new ConsoleLogger(); +const connection = createMessageConnection(reader, writer, logger); +connection.trace(Trace.Off, { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + log: (message: any, data?: string) => console.log(message, data) +}); -dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(new IPCChannel()); - +dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(connection); diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts index 06f57836df6c9..84c256997257a 100644 --- a/packages/core/src/node/messaging/ipc-connection-provider.ts +++ b/packages/core/src/node/messaging/ipc-connection-provider.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import * as cp from 'child_process'; -import { inject, injectable } from 'inversify'; import * as path from 'path'; -import { createInterface } from 'readline'; -import { Channel, ConnectionErrorHandler, Disposable, DisposableCollection, ILogger } from '../../common'; -import { IPCChannel } from './ipc-channel'; +import { injectable, inject } from 'inversify'; +import { Trace, Tracer, IPCMessageReader, IPCMessageWriter, createMessageConnection, MessageConnection, Message } from 'vscode-ws-jsonrpc'; +import { ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '../../common'; import { createIpcEnv } from './ipc-protocol'; export interface ResolvedIPCConnectionOptions { @@ -41,7 +40,7 @@ export class IPCConnectionProvider { @inject(ILogger) protected readonly logger: ILogger; - listen(options: IPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { + listen(options: IPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { return this.doListen({ logger: this.logger, args: [], @@ -49,21 +48,19 @@ export class IPCConnectionProvider { }, acceptor); } - protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable { + protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable { const childProcess = this.fork(options); - const channel = new IPCChannel(childProcess); + const connection = this.createConnection(childProcess, options); const toStop = new DisposableCollection(); const toCancelStop = toStop.push(Disposable.create(() => childProcess.kill())); const errorHandler = options.errorHandler; if (errorHandler) { - let errorCount = 0; - channel.onError((err: Error) => { - errorCount++; - if (errorHandler.shouldStop(err, errorCount)) { + connection.onError((e: [Error, Message | undefined, number | undefined]) => { + if (errorHandler.shouldStop(e[0], e[1], e[2])) { toStop.dispose(); } }); - channel.onClose(() => { + connection.onClose(() => { if (toStop.disposed) { return; } @@ -73,15 +70,36 @@ export class IPCConnectionProvider { } }); } - acceptor(channel); + acceptor(connection); return toStop; } + protected createConnection(childProcess: cp.ChildProcess, options: ResolvedIPCConnectionOptions): MessageConnection { + const reader = new IPCMessageReader(childProcess); + const writer = new IPCMessageWriter(childProcess); + const connection = createMessageConnection(reader, writer, { + error: (message: string) => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${message}`), + warn: (message: string) => this.logger.warn(`[${options.serverName}: ${childProcess.pid}] ${message}`), + info: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`), + log: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`) + }); + const tracer: Tracer = { + log: (message: unknown, data?: string) => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] ${message}` + (typeof data === 'string' ? ' ' + data : '')) + }; + connection.trace(Trace.Verbose, tracer); + this.logger.isDebug().then(isDebug => { + if (!isDebug) { + connection.trace(Trace.Off, tracer); + } + }); + return connection; + } + protected fork(options: ResolvedIPCConnectionOptions): cp.ChildProcess { const forkOptions: cp.ForkOptions = { + silent: true, env: createIpcEnv(options), - execArgv: [], - stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe'] + execArgv: [] }; const inspectArgPrefix = `--${options.serverName}-inspect`; const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix)); @@ -90,9 +108,8 @@ export class IPCConnectionProvider { } const childProcess = cp.fork(path.join(__dirname, 'ipc-bootstrap'), options.args, forkOptions); - - createInterface(childProcess.stdout!).on('line', line => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${line}`)); - createInterface(childProcess.stderr!).on('line', line => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${line}`)); + childProcess.stdout!.on('data', data => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); + childProcess.stderr!.on('data', data => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`)); this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC started`); childProcess.once('exit', () => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC exited`)); diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts index 03aa3944521c3..de9a77394b03e 100644 --- a/packages/core/src/node/messaging/ipc-protocol.ts +++ b/packages/core/src/node/messaging/ipc-protocol.ts @@ -15,14 +15,14 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; const THEIA_PARENT_PID = 'THEIA_PARENT_PID'; const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT'; export const ipcEntryPoint: string | undefined = process.env[THEIA_ENTRY_POINT]; -export type IPCEntryPoint = (connection: Channel) => void; +export type IPCEntryPoint = (connection: MessageConnection) => void; export namespace IPCEntryPoint { /** * Throws if `THEIA_ENTRY_POINT` is undefined or empty. diff --git a/packages/core/src/node/messaging/logger.ts b/packages/core/src/node/messaging/logger.ts new file mode 100644 index 0000000000000..45229f3f47423 --- /dev/null +++ b/packages/core/src/node/messaging/logger.ts @@ -0,0 +1,37 @@ +// ***************************************************************************** +// Copyright (C) 2017 TypeFox and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { Logger } from 'vscode-ws-jsonrpc'; + +export class ConsoleLogger implements Logger { + + error(message: string): void { + console.log(message); + } + + warn(message: string): void { + console.log(message); + } + + info(message: string): void { + console.log(message); + } + + log(message: string): void { + console.log(message); + } + +} diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index 20dea50a483e4..2ee8764854780 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -18,15 +18,19 @@ import * as http from 'http'; import * as https from 'https'; import { Server, Socket } from 'socket.io'; import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection'; +import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; +import * as launch from 'vscode-ws-jsonrpc/lib/server/launch'; import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; import { BackendApplicationContribution } from '../backend-application'; -import { MessagingService } from './messaging-service'; +import { MessagingService, WebSocketChannelConnection } from './messaging-service'; +import { ConsoleLogger } from './logger'; import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; -import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -49,7 +53,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me protected readonly messagingListener: MessagingListener; protected readonly wsHandlers = new MessagingContribution.ConnectionHandlers(); - protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); + protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers(); @postConstruct() protected init(): void { @@ -59,7 +63,21 @@ export class MessagingContribution implements BackendApplicationContribution, Me } } - wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void { + listen(spec: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void { + this.wsChannel(spec, (params, channel) => { + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + callback(params, connection); + }); + } + + forward(spec: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void { + this.wsChannel(spec, (params, channel) => { + const connection = launch.createWebSocketConnection(channel); + callback(params, WebSocketChannelConnection.create(connection, channel)); + }); + } + + wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: WebSocketChannel) => void): void { this.channelHandlers.push(spec, (params, channel) => callback(params, channel)); } @@ -107,31 +125,49 @@ export class MessagingContribution implements BackendApplicationContribution, Me } protected handleChannels(socket: Socket): void { - const socketChannel = new WebSocketChannel(this.toIWebSocket(socket)); - const mulitplexer = new ChannelMultiplexer(socketChannel); const channelHandlers = this.getConnectionChannelHandlers(socket); - mulitplexer.onDidOpenChannel(event => { - if (channelHandlers.route(event.id, event.channel)) { - console.debug(`Opening channel for service path '${event.id}'.`); - event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`)); + const channels = new Map(); + socket.on('message', data => { + try { + const message: WebSocketChannel.Message = JSON.parse(data.toString()); + if (message.kind === 'open') { + const { id, path } = message; + const channel = this.createChannel(id, socket); + if (channelHandlers.route(path, channel)) { + channel.ready(); + console.debug(`Opening channel for service path '${path}'. [ID: ${id}]`); + channels.set(id, channel); + channel.onClose(() => { + console.debug(`Closing channel on service path '${path}'. [ID: ${id}]`); + channels.delete(id); + }); + } else { + console.error('Cannot find a service for the path: ' + path); + } + } else { + const { id } = message; + const channel = channels.get(id); + if (channel) { + channel.handleMessage(message); + } else { + console.error('The ws channel does not exist', id); + } + } + } catch (error) { + console.error('Failed to handle message', { error, data }); } }); - } - - protected toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - socket.disconnect(); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', error => cb(error)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; + socket.on('error', err => { + for (const channel of channels.values()) { + channel.fireError(err); + } + }); + socket.on('disconnect', reason => { + for (const channel of channels.values()) { + channel.close(undefined, reason); + } + channels.clear(); + }); } protected createSocketContainer(socket: Socket): Container { @@ -140,7 +176,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me return connectionContainer; } - protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { + protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers { const connectionContainer = this.createSocketContainer(socket); bindContributionProvider(connectionContainer, ConnectionHandler); connectionContainer.load(...this.connectionModules.getContributions()); @@ -148,12 +184,21 @@ export class MessagingContribution implements BackendApplicationContribution, Me const connectionHandlers = connectionContainer.getNamed>(ContributionProvider, ConnectionHandler); for (const connectionHandler of connectionHandlers.getContributions(true)) { connectionChannelHandlers.push(connectionHandler.path, (_, channel) => { - connectionHandler.onConnection(channel); + const connection = createWebSocketConnection(channel, new ConsoleLogger()); + connectionHandler.onConnection(connection); }); } return connectionChannelHandlers; } + protected createChannel(id: number, socket: Socket): WebSocketChannel { + return new WebSocketChannel(id, content => { + if (socket.connected) { + socket.send(content); + } + }); + } + } export namespace MessagingContribution { diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts index 276b58734bcff..087f6d5850def 100644 --- a/packages/core/src/node/messaging/messaging-service.ts +++ b/packages/core/src/node/messaging/messaging-service.ts @@ -15,21 +15,33 @@ // ***************************************************************************** import { Socket } from 'socket.io'; -import { Channel } from '../../common/message-rpc/channel'; +import { MessageConnection } from 'vscode-ws-jsonrpc'; +import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection'; +import { WebSocketChannel } from '../../common/messaging/web-socket-channel'; export interface MessagingService { + /** + * Accept a JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + listen(path: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void; + /** + * Accept a raw JSON-RPC connection on the given path. + * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. + */ + forward(path: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void; /** * Accept a web socket channel on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. */ - wsChannel(path: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void; + wsChannel(path: string, callback: (params: MessagingService.PathParams, socket: WebSocketChannel) => void): void; /** * Accept a web socket connection on the given path. * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes. * * #### Important - * Prefer using web socket channels over establishing new web socket connection. Clients can handle only limited amount of web sockets - * and excessive amount can cause performance degradation. All web socket channels share a single web socket connection. + * Prefer JSON-RPC connections or web socket channels over web sockets. Clients can handle only limited amount of web sockets + * and excessive amount can cause performance degradation. All JSON-RPC connections and web socket channels share the single web socket connection. */ ws(path: string, callback: (params: MessagingService.PathParams, socket: Socket) => void): void; } @@ -44,3 +56,18 @@ export namespace MessagingService { configure(service: MessagingService): void; } } + +export interface WebSocketChannelConnection extends IConnection { + channel: WebSocketChannel; +} +export namespace WebSocketChannelConnection { + export function is(connection: IConnection): connection is WebSocketChannelConnection { + return (connection as WebSocketChannelConnection).channel instanceof WebSocketChannel; + } + + export function create(connection: IConnection, channel: WebSocketChannel): WebSocketChannelConnection { + const result = connection as WebSocketChannelConnection; + result.channel = channel; + return result; + } +} diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 0ef0c50186cee..2fbb17c9aa8ec 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -16,41 +16,32 @@ import * as http from 'http'; import * as https from 'https'; +import { WebSocketChannel } from '../../../common/messaging/web-socket-channel'; +import { Disposable } from '../../../common/disposable'; import { AddressInfo } from 'net'; -import { io, Socket } from 'socket.io-client'; -import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel'; -import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; +import { io } from 'socket.io-client'; -export class TestWebSocketChannelSetup { - public readonly multiplexer: ChannelMultiplexer; - public readonly channel: Channel; +export class TestWebSocketChannel extends WebSocketChannel { constructor({ server, path }: { server: http.Server | https.Server, path: string }) { + super(0, content => socket.send(content)); const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`); - this.channel = new WebSocketChannel(toIWebSocket(socket)); - this.multiplexer = new ChannelMultiplexer(this.channel); - socket.on('connect', () => { - this.multiplexer.open(path); + socket.on('error', error => + this.fireError(error) + ); + socket.on('disconnect', reason => + this.fireClose(0, reason) + ); + socket.on('message', data => { + this.handleMessage(JSON.parse(data.toString())); }); - socket.connect(); + socket.on('connect', () => + this.open(path) + ); + this.toDispose.push(Disposable.create(() => socket.close())); } -} -function toIWebSocket(socket: Socket): IWebSocket { - return { - close: () => { - socket.removeAllListeners('disconnect'); - socket.removeAllListeners('error'); - socket.removeAllListeners('message'); - socket.close(); - }, - isConnected: () => socket.connected, - onClose: cb => socket.on('disconnect', reason => cb(reason)), - onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(data)), - send: message => socket.emit('message', message) - }; } diff --git a/packages/debug/src/browser/debug-session-connection.ts b/packages/debug/src/browser/debug-session-connection.ts index 7ecafafcc6b4f..47c5ac06b8b2c 100644 --- a/packages/debug/src/browser/debug-session-connection.ts +++ b/packages/debug/src/browser/debug-session-connection.ts @@ -18,9 +18,13 @@ import { DebugProtocol } from 'vscode-debugprotocol'; import { Deferred } from '@theia/core/lib/common/promise-util'; -import { Event, Emitter, DisposableCollection, Disposable, MaybePromise, Channel } from '@theia/core'; +import { Event, Emitter, DisposableCollection, Disposable, MaybePromise } from '@theia/core'; import { OutputChannel } from '@theia/output/lib/browser/output-channel'; +import { Channel } from '../common/debug-service'; + +export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; + export interface DebugRequestTypes { 'attach': [DebugProtocol.AttachRequestArguments, DebugProtocol.AttachResponse] 'breakpointLocations': [DebugProtocol.BreakpointLocationsArguments, DebugProtocol.BreakpointLocationsResponse] @@ -112,8 +116,6 @@ const standardDebugEvents = new Set([ 'thread' ]); -export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise; - export class DebugSessionConnection implements Disposable { private sequence = 1; @@ -166,7 +168,7 @@ export class DebugSessionConnection implements Disposable { this.cancelPendingRequests(); this.onDidCloseEmitter.fire(); }); - connection.onMessage(data => this.handleMessage(data().readString())); + connection.onMessage(data => this.handleMessage(data)); return connection; } @@ -245,7 +247,7 @@ export class DebugSessionConnection implements Disposable { const dateStr = `${now.toLocaleString(undefined, { hour12: false })}.${now.getMilliseconds()}`; this.traceOutputChannel.appendLine(`${this.sessionId.substring(0, 8)} ${dateStr} theia -> adapter: ${JSON.stringify(message, undefined, 4)}`); } - connection.getWriteBuffer().writeString(messageStr).commit(); + connection.send(messageStr); } protected handleMessage(data: string): void { diff --git a/packages/debug/src/browser/debug-session-contribution.ts b/packages/debug/src/browser/debug-session-contribution.ts index a14db324cdf81..cf8b5e7424cc0 100644 --- a/packages/debug/src/browser/debug-session-contribution.ts +++ b/packages/debug/src/browser/debug-session-contribution.ts @@ -26,11 +26,10 @@ import { DebugSessionOptions } from './debug-session-options'; import { OutputChannelManager, OutputChannel } from '@theia/output/lib/browser/output-channel'; import { DebugPreferences } from './debug-preferences'; import { DebugSessionConnection } from './debug-session-connection'; -import { DebugAdapterPath } from '../common/debug-service'; +import { Channel, DebugAdapterPath } from '../common/debug-service'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from './debug-contribution'; -import { Channel } from '@theia/core/lib/common/message-rpc/channel'; import { WorkspaceService } from '@theia/workspace/lib/browser'; /** diff --git a/packages/debug/src/node/debug-adapter-session.ts b/packages/debug/src/node/debug-adapter-session.ts index b47e552ae6344..03ff950d38a90 100644 --- a/packages/debug/src/node/debug-adapter-session.ts +++ b/packages/debug/src/node/debug-adapter-session.ts @@ -26,7 +26,7 @@ import { DebugAdapterSession } from './debug-model'; import { DebugProtocol } from 'vscode-debugprotocol'; -import { Channel } from '@theia/core'; +import { Channel } from '../common/debug-service'; /** * [DebugAdapterSession](#DebugAdapterSession) implementation. @@ -53,7 +53,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { throw new Error('The session has already been started, id: ' + this.id); } this.channel = channel; - this.channel.onMessage(message => this.write(message().readString())); + this.channel.onMessage((message: string) => this.write(message)); this.channel.onClose(() => this.channel = undefined); } @@ -80,7 +80,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession { protected send(message: string): void { if (this.channel) { - this.channel.getWriteBuffer().writeString(message); + this.channel.send(message); } } diff --git a/packages/debug/src/node/debug-model.ts b/packages/debug/src/node/debug-model.ts index 3e6de9ce68fc6..a39352fabbddf 100644 --- a/packages/debug/src/node/debug-model.ts +++ b/packages/debug/src/node/debug-model.ts @@ -25,7 +25,8 @@ import { DebugConfiguration } from '../common/debug-configuration'; import { IJSONSchema, IJSONSchemaSnippet } from '@theia/core/lib/common/json-schema'; import { MaybePromise } from '@theia/core/lib/common/types'; -import { Channel, Event } from '@theia/core'; +import { Event } from '@theia/core/lib/common/event'; +import { Channel } from '../common/debug-service'; // FIXME: break down this file to debug adapter and debug adapter contribution (see Theia file naming conventions) diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index 63f789b9da2a3..95ee67c57d8de 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -846,7 +846,7 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p */ export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider { /** - * Read the contents of the given file as stream. + * Read the contents of the given file as stream. * @param resource The `URI` of the file. * * @return The `ReadableStreamEvents` for the readable stream of the given file. diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index f67e198db75f7..5edb5dbbad9e7 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -42,11 +42,11 @@ export interface RemoteFileSystemServer extends JsonRpcServer; open(resource: string, opts: FileOpenOptions): Promise; close(fd: number): Promise; - read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>; + read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>; readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; - readFile(resource: string): Promise; - write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; - writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise; + readFile(resource: string): Promise; + write(fd: number, pos: number, data: number[], offset: number, length: number): Promise; + writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise; delete(resource: string, opts: FileDeleteOptions): Promise; mkdir(resource: string): Promise; readdir(resource: string): Promise<[string, FileType][]>; @@ -70,7 +70,7 @@ export interface RemoteFileSystemClient { notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void; notifyFileWatchError(): void; notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void; - onFileStreamData(handle: number, data: Uint8Array): void; + onFileStreamData(handle: number, data: number[]): void; onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void; } @@ -169,7 +169,7 @@ export class RemoteFileSystemProvider implements Required, D this.onFileWatchErrorEmitter.fire(); }, notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities), - onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, data]), + onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]), onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error]) }); const onInitialized = this.server.onDidOpenConnection(() => { @@ -224,7 +224,7 @@ export class RemoteFileSystemProvider implements Required, D async readFile(resource: URI): Promise { const bytes = await this.server.readFile(resource.toString()); - return bytes; + return Uint8Array.from(bytes); } readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { @@ -264,11 +264,11 @@ export class RemoteFileSystemProvider implements Required, D } write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { - return this.server.write(fd, pos, data, offset, length); + return this.server.write(fd, pos, [...data.values()], offset, length); } writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise { - return this.server.writeFile(resource.toString(), content, opts); + return this.server.writeFile(resource.toString(), [...content.values()], opts); } delete(resource: URI, opts: FileDeleteOptions): Promise { @@ -412,33 +412,34 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - async read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }> { + async read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }> { if (hasOpenReadWriteCloseCapability(this.provider)) { const buffer = BinaryBuffer.alloc(this.BUFFER_SIZE); const bytes = buffer.buffer; const bytesRead = await this.provider.read(fd, pos, bytes, 0, length); - return { bytes, bytesRead }; + return { bytes: [...bytes.values()], bytesRead }; } throw new Error('not supported'); } - write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { + write(fd: number, pos: number, data: number[], offset: number, length: number): Promise { if (hasOpenReadWriteCloseCapability(this.provider)) { - return this.provider.write(fd, pos, data, offset, length); + return this.provider.write(fd, pos, Uint8Array.from(data), offset, length); } throw new Error('not supported'); } - async readFile(resource: string): Promise { + async readFile(resource: string): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.readFile(new URI(resource)); + const buffer = await this.provider.readFile(new URI(resource)); + return [...buffer.values()]; } throw new Error('not supported'); } - writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise { + writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise { if (hasReadWriteCapability(this.provider)) { - return this.provider.writeFile(new URI(resource), content, opts); + return this.provider.writeFile(new URI(resource), Uint8Array.from(content), opts); } throw new Error('not supported'); } @@ -496,7 +497,7 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { if (hasFileReadStreamCapability(this.provider)) { const handle = this.readFileStreamSeq++; const stream = this.provider.readFileStream(new URI(resource), opts, token); - stream.on('data', data => this.client?.onFileStreamData(handle, data)); + stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()])); stream.on('error', error => { const code = error instanceof FileSystemProviderError ? error.code : undefined; const { name, message, stack } = error; diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts index ee8e43ea7c639..48ae3adb36363 100644 --- a/packages/plugin-ext/src/common/connection.ts +++ b/packages/plugin-ext/src/common/connection.ts @@ -13,38 +13,27 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { Channel } from '@theia/debug/lib/common/debug-service'; import { ConnectionExt, ConnectionMain } from './plugin-api-rpc'; -import { Emitter, Event } from '@theia/core/lib/common/event'; -import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; -import { WriteBuffer, Channel } from '@theia/core'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; +import { Emitter } from '@theia/core/lib/common/event'; /** * A channel communicating with a counterpart in a plugin host. */ export class PluginChannel implements Channel { - private messageEmitter: Emitter = new Emitter(); + private messageEmitter: Emitter = new Emitter(); private errorEmitter: Emitter = new Emitter(); - private closedEmitter: Emitter = new Emitter(); + private closedEmitter: Emitter = new Emitter(); constructor( - readonly id: string, + protected readonly id: string, protected readonly connection: ConnectionExt | ConnectionMain) { } - getWriteBuffer(): WriteBuffer { - const result = new Uint8ArrayWriteBuffer(); - result.onCommit(buffer => { - this.connection.$sendMessage(this.id, new Uint8ArrayReadBuffer(buffer).readString()); - }); - - return result; - } - send(content: string): void { this.connection.$sendMessage(this.id, content); } - fireMessageReceived(msg: MessageProvider): void { + fireMessageReceived(msg: string): void { this.messageEmitter.fire(msg); } @@ -53,19 +42,21 @@ export class PluginChannel implements Channel { } fireClosed(): void { - this.closedEmitter.fire({ reason: 'Plugin channel has been closed from the extension side' }); + this.closedEmitter.fire(); } - get onMessage(): Event { - return this.messageEmitter.event; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onMessage(cb: (data: any) => void): void { + this.messageEmitter.event(cb); } - get onError(): Event { - return this.errorEmitter.event; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onError(cb: (reason: any) => void): void { + this.errorEmitter.event(cb); } - get onClose(): Event { - return this.closedEmitter.event; + onClose(cb: (code: number, reason: string) => void): void { + this.closedEmitter.event(() => cb(-1, 'closed')); } close(): void { @@ -89,10 +80,7 @@ export class ConnectionImpl implements ConnectionMain, ConnectionExt { */ async $sendMessage(id: string, message: string): Promise { if (this.connections.has(id)) { - const writer = new Uint8ArrayWriteBuffer().writeString(message); - const reader = new Uint8ArrayReadBuffer(writer.getCurrentContents()); - writer.dispose(); - this.connections.get(id)!.fireMessageReceived(() => reader); + this.connections.get(id)!.fireMessageReceived(message); } else { console.warn(`Received message for unknown connection: ${id}`); } diff --git a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts index 176fdb246a693..b16892dccc727 100644 --- a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts +++ b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts @@ -30,7 +30,7 @@ import { TerminalOptionsExt } from '../../../common/plugin-api-rpc'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from '@theia/debug/lib/browser/debug-contribution'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; -import { Channel } from '@theia/core/lib/common/message-rpc/channel'; +import { Channel } from '@theia/debug/lib/common/debug-service'; import { WorkspaceService } from '@theia/workspace/lib/browser'; export class PluginDebugSession extends DebugSession { diff --git a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts index b34a73ae201be..b05859750198d 100644 --- a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts +++ b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts @@ -17,7 +17,7 @@ import { DebugAdapterSessionImpl } from '@theia/debug/lib/node/debug-adapter-session'; import * as theia from '@theia/plugin'; import { DebugAdapter } from '@theia/debug/lib/node/debug-model'; -import { Channel } from '@theia/core/lib/common/message-rpc/channel'; +import { Channel } from '@theia/debug/lib/common/debug-service'; /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts index 7e63ddd14927c..fbe968348d9d2 100644 --- a/packages/task/src/node/task-server.slow-spec.ts +++ b/packages/task/src/node/task-server.slow-spec.ts @@ -28,10 +28,9 @@ import { isWindows, isOSX } from '@theia/core/lib/common/os'; import { FileUri } from '@theia/core/lib/node'; import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol'; import { expectThrowsAsync } from '@theia/core/lib/common/test/expect'; -import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; import { expect } from 'chai'; import URI from '@theia/core/lib/common/uri'; -import { RpcProtocol } from '@theia/core'; // test scripts that we bundle with tasks const commandShortRunning = './task'; @@ -107,38 +106,26 @@ describe('Task server / back-end', function (): void { // hook-up to terminal's ws and confirm that it outputs expected tasks' output await new Promise((resolve, reject) => { - const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` }); - setup.multiplexer.onDidOpenChannel(event => { - const channel = event.channel; - const connection = new RpcProtocol(channel, async (method, args) => { - const error = new Error(`Received unexpected request: ${method} with args: ${args} `); - reject(error); - throw error; - }); - channel.onError(reject); - channel.onClose(() => reject(new Error('Channel has been closed'))); - connection.onNotification(not => { - // check output of task on terminal is what we expect - const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; - // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. - // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` - const currentMessage = not.args[0]; - messages.unshift(currentMessage); - if (currentMessage.indexOf(expected) !== -1) { - resolve(); - channel.close(); - return; - } - if (messages.length >= messagesToWaitFor) { - reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); - channel.close(); - } - }); - channel.onMessage(reader => { - - }); + const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); + channel.onError(reject); + channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); + channel.onMessage(msg => { + // check output of task on terminal is what we expect + const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`; + // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected. + // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n` + const currentMessage = msg.toString(); + messages.unshift(currentMessage); + if (currentMessage.indexOf(expected) !== -1) { + resolve(); + channel.close(); + return; + } + if (messages.length >= messagesToWaitFor) { + reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`)); + channel.close(); + } }); - }); }); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 04228a13c6275..47af4261c8ada 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -17,7 +17,7 @@ import { Terminal, RendererType } from 'xterm'; import { FitAddon } from 'xterm-addon-fit'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core'; +import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection } from '@theia/core'; import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon } from '@theia/core/lib/browser'; import { isOSX } from '@theia/core/lib/common'; import { WorkspaceService } from '@theia/workspace/lib/browser'; @@ -26,6 +26,7 @@ import { terminalsPath } from '../common/terminal-protocol'; import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol'; import { TerminalWatcher } from '../common/terminal-watcher'; import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions, TerminalExitStatus } from './base/terminal-widget'; +import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences'; import { TerminalContribution } from './terminal-contribution'; @@ -60,7 +61,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected hoverMessage: HTMLDivElement; protected lastTouchEnd: TouchEvent | undefined; protected isAttachedCloseListener: boolean = false; @@ -506,23 +507,16 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { - const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); - - const rpc = new RpcProtocol(connection, requestHandler); - rpc.onNotification(event => { - if (event.method === 'onData') { - this.write(event.args[0]); - } - }); + connection.onNotification('onData', (data: string) => this.write(data)); // Excludes the device status code emitted by Xterm.js const sendData = (data?: string) => { if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) { - return rpc.sendRequest('write', [data]); + return connection.sendRequest('write', data); } }; @@ -530,10 +524,12 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget disposable.push(this.term.onData(sendData)); disposable.push(this.term.onBinary(sendData)); - connection.onClose(() => disposable.dispose()); + connection.onDispose(() => disposable.dispose()); + this.toDisposeOnConnect.push(connection); + connection.listen(); if (waitForConnection) { - waitForConnection.resolve(rpc); + waitForConnection.resolve(connection); } } }, { reconnecting: false }); @@ -583,7 +579,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget sendText(text: string): void { if (this.waitForConnection) { this.waitForConnection.promise.then(connection => - connection.sendRequest('write', [text]) + connection.sendRequest('write', text) ); } } diff --git a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts index aa4a54e72deff..6d39ddd973f20 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts @@ -20,7 +20,7 @@ import { IShellTerminalServer } from '../common/shell-terminal-protocol'; import * as http from 'http'; import * as https from 'https'; import { terminalsPath } from '../common/terminal-protocol'; -import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; +import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; describe('Terminal Backend Contribution', function (): void { @@ -45,19 +45,13 @@ describe('Terminal Backend Contribution', function (): void { it('is data received from the terminal ws server', async () => { const terminalId = await shellTerminalServer.create({}); await new Promise((resolve, reject) => { - const path = `${terminalsPath}/${terminalId}`; - const { channel, multiplexer } = new TestWebSocketChannelSetup({ server, path }); + const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` }); channel.onError(reject); - channel.onClose(event => reject(new Error(`channel is closed with '${event.code}' code and '${event.reason}' reason}`))); - - multiplexer.onDidOpenChannel(event => { - if (event.id === path) { - resolve(); - channel.close(); - } + channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`))); + channel.onOpen(() => { + resolve(); + channel.close(); }); - }); }); - }); diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index dea4504e0ffea..4675b7a32290c 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -15,11 +15,10 @@ // ***************************************************************************** import { injectable, inject, named } from '@theia/core/shared/inversify'; -import { ILogger, RequestHandler } from '@theia/core/lib/common'; +import { ILogger } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; -import { RpcProtocol } from '@theia/core/'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -31,27 +30,19 @@ export class TerminalBackendContribution implements MessagingService.Contributio protected readonly logger: ILogger; configure(service: MessagingService): void { - service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => { + service.listen(`${terminalsPath}/:id`, (params: { id: string }, connection) => { const id = parseInt(params.id, 10); const termProcess = this.processManager.get(id); if (termProcess instanceof TerminalProcess) { const output = termProcess.createOutputStream(); - // Create a RPC connection to the terminal process - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const requestHandler: RequestHandler = async (method: string, args: any[]) => { - if (method === 'write' && args[0]) { - termProcess.write(args[0]); - } else { - this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args }); - } - }; - - const rpc = new RpcProtocol(channel, requestHandler); - output.on('data', data => { - rpc.sendNotification('onData', [data]); - }); - channel.onClose(() => output.dispose()); + output.on('data', data => connection.sendNotification('onData', data.toString())); + connection.onRequest('write', (data: string) => termProcess.write(data)); + connection.onClose(() => output.dispose()); + connection.listen(); + } else { + connection.dispose(); } }); } + } diff --git a/yarn.lock b/yarn.lock index 896f24662ccb3..52802cb3c200a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2177,13 +2177,6 @@ resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8" integrity sha512-6ckxMjBBD8URvjB6J3NcnuAn5Pkl7t3TizAg+xdlzzQGSPSmBcXf8KoIH0ua/i+tio+ZRUHEXp0HEmvaR4kt0w== -"@types/chai-spies@1.0.3": - version "1.0.3" - resolved "https://registry.yarnpkg.com/@types%2fchai-spies/-/chai-spies-1.0.3.tgz#a52dc61af3853ec9b80965040811d15dfd401542" - integrity sha512-RBZjhVuK7vrg4rWMt04UF5zHYwfHnpk5mIWu3nQvU3AKGDixXzSjZ6v0zke6pBcaJqMv3IBZ5ibLWPMRDL0sLw== - dependencies: - "@types/chai" "*" - "@types/chai-string@^1.4.0": version "1.4.2" resolved "https://registry.yarnpkg.com/@types/chai-string/-/chai-string-1.4.2.tgz#0f116504a666b6c6a3c42becf86634316c9a19ac" @@ -2196,11 +2189,6 @@ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.1.tgz#e2c6e73e0bdeb2521d00756d099218e9f5d90a04" integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ== -"@types/chai@4.3.0": - version "4.3.0" - resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.0.tgz#23509ebc1fa32f1b4d50d6a66c4032d5b8eaabdc" - integrity sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw== - "@types/component-emitter@^1.2.10": version "1.2.11" resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506" @@ -3912,28 +3900,11 @@ caseless@~0.12.0: resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc" integrity sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw= -chai-spies@1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/chai-spies/-/chai-spies-1.0.0.tgz#d16b39336fb316d03abf8c375feb23c0c8bb163d" - integrity sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg== - chai-string@^1.4.0: version "1.5.0" resolved "https://registry.yarnpkg.com/chai-string/-/chai-string-1.5.0.tgz#0bdb2d8a5f1dbe90bc78ec493c1c1c180dd4d3d2" integrity sha512-sydDC3S3pNAQMYwJrs6dQX0oBQ6KfIPuOZ78n7rocW0eJJlsHPh2t3kwW7xfwYA/1Bf6/arGtSUo16rxR2JFlw== -chai@4.3.4: - version "4.3.4" - resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.4.tgz#b55e655b31e1eac7099be4c08c21964fce2e6c49" - integrity sha512-yS5H68VYOCtN1cjfwumDSuzn/9c+yza4f3reKXlE5rUg7SFcCEy90gJvydNgOYtblyf4Zi6jIWRnXOgErta0KA== - dependencies: - assertion-error "^1.1.0" - check-error "^1.0.2" - deep-eql "^3.0.1" - get-func-name "^2.0.0" - pathval "^1.1.1" - type-detect "^4.0.5" - chai@^4.2.0: version "4.3.6" resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.6.tgz#ffe4ba2d9fa9d6680cc0b370adae709ec9011e9c" @@ -11628,7 +11599,7 @@ vscode-debugprotocol@^1.32.0: resolved "https://registry.yarnpkg.com/vscode-debugprotocol/-/vscode-debugprotocol-1.51.0.tgz#c03168dac778b6c24ce17b3511cb61e89c11b2df" integrity sha512-dzKWTMMyebIMPF1VYMuuQj7gGFq7guR8AFya0mKacu+ayptJfaRuM0mdHCqiOth4FnRP8mPhEroFPx6Ift8wHA== -vscode-jsonrpc@^5.0.1: +vscode-jsonrpc@^5.0.0, vscode-jsonrpc@^5.0.1: version "5.0.1" resolved "https://registry.yarnpkg.com/vscode-jsonrpc/-/vscode-jsonrpc-5.0.1.tgz#9bab9c330d89f43fc8c1e8702b5c36e058a01794" integrity sha512-JvONPptw3GAQGXlVV2utDcHx0BiY34FupW/kI6mZ5x06ER5DdPG/tXWMVHjTNULF5uKPOUUD0SaXg5QaubJL0A== @@ -11681,6 +11652,13 @@ vscode-uri@^2.1.1: resolved "https://registry.yarnpkg.com/vscode-uri/-/vscode-uri-2.1.2.tgz#c8d40de93eb57af31f3c715dd650e2ca2c096f1c" integrity sha512-8TEXQxlldWAuIODdukIb+TR5s+9Ds40eSJrw+1iDDA9IFORPjMELarNQE3myz5XIkWWpdprmJjm1/SxMlWOC8A== +vscode-ws-jsonrpc@^0.2.0: + version "0.2.0" + resolved "https://registry.yarnpkg.com/vscode-ws-jsonrpc/-/vscode-ws-jsonrpc-0.2.0.tgz#5e9c26e10da54a1a235da7d59e74508bbcb8edd9" + integrity sha512-NE9HNRgPjCaPyTJvIudcpyIWPImxwRDtuTX16yks7SAiZgSXigxAiZOvSvVBGmD1G/OMfrFo6BblOtjVR9DdVA== + dependencies: + vscode-jsonrpc "^5.0.0" + w3c-hr-time@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"