diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7765aeca41716..35cf3cfc706c6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,17 @@
- [plugin] Introduce `DebugSession#workspaceFolder` [#11090](https://github.com/eclipse-theia/theia/pull/11090) - Contributed on behalf of STMicroelectronics
+[Breaking Changes:](#breaking_changes_1.26.0)
+
+- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
+ This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
+ - Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
+ - `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
+ - `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
+ - `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
+ [#11011](https://github.com/eclipse-theia/theia/pull/11011) - Contributed on behalf of STMicroelectronics.
+
+
## v1.25.0 - 4/28/2022
[1.25.0 Milestone](https://github.com/eclipse-theia/theia/milestone/35)
diff --git a/package.json b/package.json
index 950701b6136a3..6b294c3fed0c2 100644
--- a/package.json
+++ b/package.json
@@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
+ "@types/chai": "4.3.0",
+ "@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
@@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
+ "chai": "4.3.4",
+ "chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
diff --git a/packages/core/README.md b/packages/core/README.md
index 64537785cf2f5..9ddef5f7b5b30 100644
--- a/packages/core/README.md
+++ b/packages/core/README.md
@@ -96,7 +96,6 @@ export class SomeClass {
- `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized))
- `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol))
- `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri))
- - `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc))
- `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify))
- `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express))
- `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce))
diff --git a/packages/core/package.json b/packages/core/package.json
index 01c2c52c39c8d..df71d5dfcc1ee 100644
--- a/packages/core/package.json
+++ b/packages/core/package.json
@@ -68,7 +68,6 @@
"uuid": "^8.3.2",
"vscode-languageserver-protocol": "~3.15.3",
"vscode-uri": "^2.1.1",
- "vscode-ws-jsonrpc": "^0.2.0",
"ws": "^7.1.2",
"yargs": "^15.3.1"
},
@@ -108,8 +107,7 @@
"react-dom",
"react-virtualized",
"vscode-languageserver-protocol",
- "vscode-uri",
- "vscode-ws-jsonrpc"
+ "vscode-uri"
],
"export =": [
"dompurify as DOMPurify",
diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts b/packages/core/shared/vscode-ws-jsonrpc/index.d.ts
deleted file mode 100644
index b11ff897103ed..0000000000000
--- a/packages/core/shared/vscode-ws-jsonrpc/index.d.ts
+++ /dev/null
@@ -1 +0,0 @@
-export * from 'vscode-ws-jsonrpc';
diff --git a/packages/core/shared/vscode-ws-jsonrpc/index.js b/packages/core/shared/vscode-ws-jsonrpc/index.js
deleted file mode 100644
index 3aed560b82d62..0000000000000
--- a/packages/core/shared/vscode-ws-jsonrpc/index.js
+++ /dev/null
@@ -1 +0,0 @@
-module.exports = require('vscode-ws-jsonrpc');
diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts
index f83aabda22826..905515a70392c 100644
--- a/packages/core/src/browser/messaging/ws-connection-provider.ts
+++ b/packages/core/src/browser/messaging/ws-connection-provider.ts
@@ -15,11 +15,11 @@
// *****************************************************************************
import { injectable, interfaces, decorate, unmanaged } from 'inversify';
-import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
+import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';
+import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';
decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
@@ -35,6 +35,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider {
protected readonly onSocketDidOpenEmitter: Emitter = new Emitter();
+ // Socket that is used by the main channel
+ protected socket: Socket;
get onSocketDidOpen(): Event {
return this.onSocketDidOpenEmitter.event;
}
@@ -48,31 +50,39 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider(path, arg);
}
- protected readonly socket: Socket;
-
- constructor() {
- super();
+ protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
+ const channel = new WebSocketChannel(this.toIWebSocket(socket));
socket.on('connect', () => {
this.fireSocketDidOpen();
});
- socket.on('disconnect', reason => {
- for (const channel of [...this.channels.values()]) {
- channel.close(undefined, reason);
- }
- this.fireSocketDidClose();
- });
- socket.on('message', data => {
- this.handleIncomingRawMessage(data);
- });
+ channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;
+
+ return channel;
+ }
+
+ protected toIWebSocket(socket: Socket): IWebSocket {
+ return {
+ close: () => {
+ socket.removeAllListeners('disconnect');
+ socket.removeAllListeners('error');
+ socket.removeAllListeners('message');
+ socket.close();
+ },
+ isConnected: () => socket.connected,
+ onClose: cb => socket.on('disconnect', reason => cb(reason)),
+ onError: cb => socket.on('error', reason => cb(reason)),
+ onMessage: cb => socket.on('message', data => cb(data)),
+ send: message => socket.emit('message', message)
+ };
}
- override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
+ override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise {
if (this.socket.connected) {
- super.openChannel(path, handler, options);
+ return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
@@ -82,14 +92,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider {
- if (this.socket.connected) {
- this.socket.send(content);
- }
- });
- }
-
/**
* @param path The handler to reach in the backend.
*/
@@ -143,3 +145,4 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => {
+ const leftWrite = new Uint8ArrayWriteBuffer();
+ leftWrite.onCommit(buffer => {
+ this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
+ });
+ return leftWrite;
+ });
+ readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => {
+ const rightWrite = new Uint8ArrayWriteBuffer();
+ rightWrite.onCommit(buffer => {
+ this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
+ });
+ return rightWrite;
+ });
+}
+describe('Message Channel', () => {
+ describe('Channel multiplexer', () => {
+ it('should forward messages to intended target channel', async () => {
+ const pipe = new ChannelPipe();
+
+ const leftMultiplexer = new ChannelMultiplexer(pipe.left);
+ const rightMultiplexer = new ChannelMultiplexer(pipe.right);
+ const openChannelSpy = spy(() => {
+ });
+
+ rightMultiplexer.onDidOpenChannel(openChannelSpy);
+ leftMultiplexer.onDidOpenChannel(openChannelSpy);
+
+ const leftFirst = await leftMultiplexer.open('first');
+ const leftSecond = await leftMultiplexer.open('second');
+
+ const rightFirst = rightMultiplexer.getOpenChannel('first');
+ const rightSecond = rightMultiplexer.getOpenChannel('second');
+
+ assert.isNotNull(rightFirst);
+ assert.isNotNull(rightSecond);
+
+ const leftSecondSpy = spy((buf: MessageProvider) => {
+ const message = buf().readString();
+ expect(message).equal('message for second');
+ });
+
+ leftSecond.onMessage(leftSecondSpy);
+
+ const rightFirstSpy = spy((buf: MessageProvider) => {
+ const message = buf().readString();
+ expect(message).equal('message for first');
+ });
+
+ rightFirst!.onMessage(rightFirstSpy);
+
+ leftFirst.getWriteBuffer().writeString('message for first').commit();
+ rightSecond!.getWriteBuffer().writeString('message for second').commit();
+
+ expect(leftSecondSpy).to.be.called();
+ expect(rightFirstSpy).to.be.called();
+
+ expect(openChannelSpy).to.be.called.exactly(4);
+ });
+ });
+});
diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts
new file mode 100644
index 0000000000000..e4279b43d5173
--- /dev/null
+++ b/packages/core/src/common/message-rpc/channel.ts
@@ -0,0 +1,248 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+import { Emitter, Event } from '../event';
+import { ReadBuffer, WriteBuffer } from './message-buffer';
+
+/**
+ * A channel is a bidirectional communications channel with lifecycle and
+ * error signalling. Note that creation of channels is specific to particular
+ * implementations and thus not part of the protocol.
+ */
+export interface Channel {
+
+ /**
+ * The remote side has closed the channel
+ */
+ onClose: Event;
+
+ /**
+ * An error has occurred while writing to or reading from the channel
+ */
+ onError: Event;
+
+ /**
+ * A message has arrived and can be read by listeners using a {@link MessageProvider}.
+ */
+ onMessage: Event;
+
+ /**
+ * Obtain a {@link WriteBuffer} to write a message to the channel.
+ */
+ getWriteBuffer(): WriteBuffer;
+
+ /**
+ * Close this channel. No {@link onClose} event should be sent
+ */
+ close(): void;
+}
+
+/**
+ * The event that is emitted when a channel is closed from the remote side.
+ */
+export interface ChannelCloseEvent {
+ reason: string,
+ code?: number
+};
+
+/**
+ * The `MessageProvider` is emitted when a channel receives a new message.
+ * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message.
+ * This ensures that each listener has its own isolated {@link ReadBuffer} instance.
+ *
+ */
+export type MessageProvider = () => ReadBuffer;
+
+/**
+ * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
+ * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
+ */
+export class ForwardingChannel implements Channel {
+
+ constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
+ }
+
+ onCloseEmitter: Emitter = new Emitter();
+ get onClose(): Event {
+ return this.onCloseEmitter.event;
+ };
+
+ onErrorEmitter: Emitter = new Emitter();
+ get onError(): Event {
+ return this.onErrorEmitter.event;
+ };
+
+ onMessageEmitter: Emitter = new Emitter();
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
+ };
+
+ getWriteBuffer(): WriteBuffer {
+ return this.writeBufferSource();
+ }
+
+ send(message: Uint8Array): void {
+ const writeBuffer = this.getWriteBuffer();
+ writeBuffer.writeBytes(message);
+ writeBuffer.commit();
+ }
+
+ close(): void {
+ this.closeHandler();
+ }
+}
+
+/**
+ * The different message types used in the messaging protocol of the {@link ChannelMultiplexer}
+ */
+export enum MessageTypes {
+ Open = 1,
+ Close = 2,
+ AckOpen = 3,
+ Data = 4
+}
+
+/**
+ * The write buffers in this implementation immediately write to the underlying
+ * channel, so we rely on writers to the multiplexed channels to always commit their
+ * messages and always in one go.
+ */
+export class ChannelMultiplexer {
+ protected pendingOpen: Map void> = new Map();
+ protected openChannels: Map = new Map();
+
+ protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>();
+ get onDidOpenChannel(): Event<{ id: string, channel: Channel }> {
+ return this.onOpenChannelEmitter.event;
+ }
+
+ constructor(protected readonly underlyingChannel: Channel) {
+ this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer()));
+ this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event));
+ this.underlyingChannel.onError(error => this.handleError(error));
+ }
+
+ protected handleError(error: unknown): void {
+ this.openChannels.forEach(channel => {
+ channel.onErrorEmitter.fire(error);
+ });
+ }
+
+ closeUnderlyingChannel(event?: ChannelCloseEvent): void {
+
+ this.pendingOpen.clear();
+ this.openChannels.forEach(channel => {
+ channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' });
+ });
+
+ this.openChannels.clear();
+ }
+
+ protected handleMessage(buffer: ReadBuffer): void {
+ const type = buffer.readUint8();
+ const id = buffer.readString();
+ switch (type) {
+ case MessageTypes.AckOpen: {
+ return this.handleAckOpen(id);
+ }
+ case MessageTypes.Open: {
+ return this.handleOpen(id);
+ }
+ case MessageTypes.Close: {
+ return this.handleClose(id);
+ }
+ case MessageTypes.Data: {
+ return this.handleData(id, buffer.sliceAtReadPosition());
+ }
+ }
+ }
+
+ protected handleAckOpen(id: string): void {
+ // edge case: both side try to open a channel at the same time.
+ const resolve = this.pendingOpen.get(id);
+ if (resolve) {
+ const channel = this.createChannel(id);
+ this.pendingOpen.delete(id);
+ this.openChannels.set(id, channel);
+ resolve!(channel);
+ this.onOpenChannelEmitter.fire({ id, channel });
+ }
+ }
+
+ protected handleOpen(id: string): void {
+ if (!this.openChannels.has(id)) {
+ const channel = this.createChannel(id);
+ this.openChannels.set(id, channel);
+ const resolve = this.pendingOpen.get(id);
+ if (resolve) {
+ // edge case: both side try to open a channel at the same time.
+ resolve(channel);
+ }
+ this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
+ this.onOpenChannelEmitter.fire({ id, channel });
+ }
+ }
+
+ protected handleClose(id: string): void {
+ const channel = this.openChannels.get(id);
+ if (channel) {
+ channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' });
+ this.openChannels.delete(id);
+ }
+ }
+
+ protected handleData(id: string, data: ReadBuffer): void {
+ const channel = this.openChannels.get(id);
+ if (channel) {
+ channel.onMessageEmitter.fire(() => data);
+ }
+ }
+
+ protected createChannel(id: string): ForwardingChannel {
+ return new ForwardingChannel(id, () => this.closeChannel(id), () => this.prepareWriteBuffer(id));
+ }
+
+ // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded
+ // and written to the buffer before the actual message.
+ protected prepareWriteBuffer(id: string): WriteBuffer {
+ const underlying = this.underlyingChannel.getWriteBuffer();
+ underlying.writeUint8(MessageTypes.Data);
+ underlying.writeString(id);
+ return underlying;
+ }
+
+ protected closeChannel(id: string): void {
+ this.underlyingChannel.getWriteBuffer()
+ .writeUint8(MessageTypes.Close)
+ .writeString(id)
+ .commit();
+
+ this.openChannels.delete(id);
+ }
+
+ open(id: string): Promise {
+ const result = new Promise((resolve, reject) => {
+ this.pendingOpen.set(id, resolve);
+ });
+ this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
+ return result;
+ }
+
+ getOpenChannel(id: string): Channel | undefined {
+ return this.openChannels.get(id);
+ }
+}
+
diff --git a/packages/core/src/node/messaging/logger.ts b/packages/core/src/common/message-rpc/index.ts
similarity index 64%
rename from packages/core/src/node/messaging/logger.ts
rename to packages/core/src/common/message-rpc/index.ts
index 45229f3f47423..671540a31c42b 100644
--- a/packages/core/src/node/messaging/logger.ts
+++ b/packages/core/src/common/message-rpc/index.ts
@@ -1,5 +1,5 @@
// *****************************************************************************
-// Copyright (C) 2017 TypeFox and others.
+// Copyright (C) 2022 STMicroelectronics and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
@@ -13,25 +13,6 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-
-import { Logger } from 'vscode-ws-jsonrpc';
-
-export class ConsoleLogger implements Logger {
-
- error(message: string): void {
- console.log(message);
- }
-
- warn(message: string): void {
- console.log(message);
- }
-
- info(message: string): void {
- console.log(message);
- }
-
- log(message: string): void {
- console.log(message);
- }
-
-}
+export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
+export { Channel, ChannelCloseEvent, MessageProvider } from './channel';
+export { ReadBuffer, WriteBuffer } from './message-buffer';
diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts
new file mode 100644
index 0000000000000..396ba95d93d73
--- /dev/null
+++ b/packages/core/src/common/message-rpc/message-buffer.ts
@@ -0,0 +1,99 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+/**
+ * A buffer maintaining a write position capable of writing primitive values
+ */
+export interface WriteBuffer {
+ writeUint8(byte: number): this
+ writeUint16(value: number): this
+ writeUint32(value: number): this
+ writeString(value: string): this
+ writeBytes(value: Uint8Array): this
+ writeNumber(value: number): this
+ writeLength(value: number): this
+ /**
+ * Makes any writes to the buffer permanent, for example by sending the writes over a channel.
+ * You must obtain a new write buffer after committing
+ */
+ commit(): void;
+}
+
+export class ForwardingWriteBuffer implements WriteBuffer {
+ constructor(protected readonly underlying: WriteBuffer) {
+ }
+
+ writeUint8(byte: number): this {
+ this.underlying.writeUint8(byte);
+ return this;
+ }
+
+ writeUint16(value: number): this {
+ this.underlying.writeUint16(value);
+ return this;
+ }
+
+ writeUint32(value: number): this {
+ this.underlying.writeUint32(value);
+ return this;
+ }
+
+ writeLength(value: number): this {
+ this.underlying.writeLength(value);
+ return this;
+ }
+
+ writeString(value: string): this {
+ this.underlying.writeString(value);
+ return this;
+ }
+
+ writeBytes(value: Uint8Array): this {
+ this.underlying.writeBytes(value);
+ return this;
+ }
+
+ writeNumber(value: number): this {
+ this.underlying.writeNumber(value);
+ return this;
+ }
+
+ commit(): void {
+ this.underlying.commit();
+ }
+}
+
+/**
+ * A buffer maintaining a read position in a buffer containing a received message capable of
+ * reading primitive values.
+ */
+export interface ReadBuffer {
+ readUint8(): number;
+ readUint16(): number;
+ readUint32(): number;
+ readString(): string;
+ readNumber(): number,
+ readLength(): number,
+ readBytes(): Uint8Array;
+
+ /**
+ * Returns a new read buffer whose starting read position is the current read position of this buffer.
+ * This is useful to create read buffers sub messages.
+ * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part
+ * of the message reader that is sent to the target channel)
+ */
+ sliceAtReadPosition(): ReadBuffer
+}
diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts
new file mode 100644
index 0000000000000..01a8aa2099ffb
--- /dev/null
+++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts
@@ -0,0 +1,42 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+import { expect } from 'chai';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
+import { RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder';
+
+describe('PPC Message Codex', () => {
+ describe('RPC Message Encoder & Decoder', () => {
+ it('should encode object into binary message and decode the message back into the original object', () => {
+ const buffer = new Uint8Array(1024);
+ const writer = new Uint8ArrayWriteBuffer(buffer);
+
+ const encoder = new RpcMessageEncoder();
+ const jsonMangled = JSON.parse(JSON.stringify(encoder));
+
+ encoder.writeTypedValue(writer, encoder);
+
+ const written = writer.getCurrentContents();
+
+ const reader = new Uint8ArrayReadBuffer(written);
+
+ const decoder = new RpcMessageDecoder();
+ const decoded = decoder.readTypedValue(reader);
+
+ expect(decoded).deep.equal(jsonMangled);
+ });
+ });
+});
diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts
new file mode 100644
index 0000000000000..8b0c237187747
--- /dev/null
+++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts
@@ -0,0 +1,497 @@
+// *****************************************************************************
+// Copyright (C) 2022 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+/* eslint-disable @typescript-eslint/no-explicit-any */
+
+import { ReadBuffer, WriteBuffer } from './message-buffer';
+
+/**
+ * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel)
+ * into a channel write buffer and decode the same messages from a read buffer.
+ * Custom encoders/decoders can be registered to specially handling certain types of values
+ * to be encoded. Clients are responsible for ensuring that the set of tags for encoders
+ * is distinct and the same at both ends of a channel.
+ */
+
+export type RpcMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage;
+
+export const enum RpcMessageType {
+ Request = 1,
+ Notification = 2,
+ Reply = 3,
+ ReplyErr = 4,
+ Cancel = 5,
+}
+
+export interface CancelMessage {
+ type: RpcMessageType.Cancel;
+ id: number;
+}
+
+export interface RequestMessage {
+ type: RpcMessageType.Request;
+ id: number;
+ method: string;
+ args: any[];
+}
+
+export interface NotificationMessage {
+ type: RpcMessageType.Notification;
+ id: number;
+ method: string;
+ args: any[];
+}
+
+export interface ReplyMessage {
+ type: RpcMessageType.Reply;
+ id: number;
+ res: any;
+}
+
+export interface ReplyErrMessage {
+ type: RpcMessageType.ReplyErr;
+ id: number;
+ err: any;
+}
+
+export interface SerializedError {
+ readonly $isError: true;
+ readonly name: string;
+ readonly message: string;
+ readonly stack: string;
+}
+
+/**
+ * A special error that can be returned in case a request
+ * has failed. Provides additional information i.e. an error code
+ * and additional error data.
+ */
+export class ResponseError extends Error {
+ constructor(readonly code: number, message: string, readonly data: any) {
+ super(message);
+ }
+}
+
+/**
+ * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s
+ */
+
+export enum ObjectType {
+ JSON = 0,
+ ByteArray = 1,
+ ObjectArray = 2,
+ Undefined = 3,
+ Object = 4,
+ String = 5,
+ Boolean = 6,
+ Number = 7,
+ // eslint-disable-next-line @typescript-eslint/no-shadow
+ ResponseError = 8,
+ Error = 9
+
+}
+
+/**
+ * A value encoder writes javascript values to a write buffer. Encoders will be asked
+ * in turn (ordered by their tag value, descending) whether they can encode a given value
+ * This means encoders with higher tag values have priority. Since the default encoders
+ * have tag values from 1-7, they can be easily overridden.
+ */
+export interface ValueEncoder {
+ /**
+ * Returns true if this encoder wants to encode this value.
+ * @param value the value to be encoded
+ */
+ is(value: any): boolean;
+ /**
+ * Write the given value to the buffer. Will only be called if {@link is(value)} returns true.
+ * @param buf The buffer to write to
+ * @param value The value to be written
+ * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder}
+ * to write a value to the underlying buffer. This is used mostly to write structures like an array
+ * without having to know how to encode the values in the array
+ */
+ write(buf: WriteBuffer, value: any, recursiveEncode?: (buf: WriteBuffer, value: any) => void): void;
+}
+
+/**
+ * Reads javascript values from a read buffer
+ */
+export interface ValueDecoder {
+ /**
+ * Reads a value from a read buffer. This method will be called for the decoder that is
+ * registered for the tag associated with the value encoder that encoded this value.
+ * @param buf The read buffer to read from
+ * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder}
+ * to read values from the underlying read buffer. This is used mostly to decode structures like an array
+ * without having to know how to decode the values in the array.
+ */
+ read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown;
+}
+
+/**
+ * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage}
+ */
+export class RpcMessageDecoder {
+
+ protected decoders: Map = new Map();
+
+ constructor() {
+ this.registerDecoder(ObjectType.JSON, {
+ read: buf => {
+ const json = buf.readString();
+ return JSON.parse(json);
+ }
+ });
+ this.registerDecoder(ObjectType.Error, {
+ read: buf => {
+ const serializedError: SerializedError = JSON.parse(buf.readString());
+ const error = new Error(serializedError.message);
+ Object.assign(error, serializedError);
+ return error;
+ }
+ });
+
+ this.registerDecoder(ObjectType.ResponseError, {
+ read: buf => {
+ const error = JSON.parse(buf.readString());
+ return new ResponseError(error.code, error.message, error.data);
+ }
+ });
+ this.registerDecoder(ObjectType.ByteArray, {
+ read: buf => buf.readBytes()
+ });
+
+ this.registerDecoder(ObjectType.ObjectArray, {
+ read: buf => this.readArray(buf)
+ });
+
+ this.registerDecoder(ObjectType.Undefined, {
+ read: () => undefined
+ });
+
+ this.registerDecoder(ObjectType.Object, {
+ read: (buf, recursiveRead) => {
+ const propertyCount = buf.readLength();
+ const result = Object.create({});
+ for (let i = 0; i < propertyCount; i++) {
+ const key = buf.readString();
+ const value = recursiveRead(buf);
+ result[key] = value;
+ }
+ return result;
+ }
+ });
+
+ this.registerDecoder(ObjectType.String, {
+ read: (buf, recursiveRead) => buf.readString()
+ });
+
+ this.registerDecoder(ObjectType.Boolean, {
+ read: buf => buf.readUint8() === 1
+ });
+
+ this.registerDecoder(ObjectType.Number, {
+ read: buf => buf.readNumber()
+ });
+
+ }
+
+ /**
+ * Registers a new {@link ValueDecoder} for the given tag.
+ * After the successful registration the {@link tagIntType} is recomputed
+ * by retrieving the highest tag value and calculating the required Uint size to store it.
+ * @param tag the tag for which the decoder should be registered.
+ * @param decoder the decoder that should be registered.
+ */
+ registerDecoder(tag: number, decoder: ValueDecoder): void {
+ if (this.decoders.has(tag)) {
+ throw new Error(`Decoder already registered: ${tag}`);
+ }
+ this.decoders.set(tag, decoder);
+ }
+ parse(buf: ReadBuffer): RpcMessage {
+ try {
+ const msgType = buf.readUint8();
+
+ switch (msgType) {
+ case RpcMessageType.Request:
+ return this.parseRequest(buf);
+ case RpcMessageType.Notification:
+ return this.parseNotification(buf);
+ case RpcMessageType.Reply:
+ return this.parseReply(buf);
+ case RpcMessageType.ReplyErr:
+ return this.parseReplyErr(buf);
+ case RpcMessageType.Cancel:
+ return this.parseCancel(buf);
+ }
+ throw new Error(`Unknown message type: ${msgType}`);
+ } catch (e) {
+ // exception does not show problematic content: log it!
+ console.log('failed to parse message: ' + buf);
+ throw e;
+ }
+ }
+
+ protected parseCancel(msg: ReadBuffer): CancelMessage {
+ const callId = msg.readUint32();
+ return {
+ type: RpcMessageType.Cancel,
+ id: callId
+ };
+ }
+
+ protected parseRequest(msg: ReadBuffer): RequestMessage {
+ const callId = msg.readUint32();
+ const method = msg.readString();
+ let args = this.readArray(msg);
+ // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs
+ args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null
+
+ return {
+ type: RpcMessageType.Request,
+ id: callId,
+ method: method,
+ args: args
+ };
+ }
+
+ protected parseNotification(msg: ReadBuffer): NotificationMessage {
+ const callId = msg.readUint32();
+ const method = msg.readString();
+ let args = this.readArray(msg);
+ // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs
+ args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null
+
+ return {
+ type: RpcMessageType.Notification,
+ id: callId,
+ method: method,
+ args: args
+ };
+ }
+
+ parseReply(msg: ReadBuffer): ReplyMessage {
+ const callId = msg.readUint32();
+ const value = this.readTypedValue(msg);
+ return {
+ type: RpcMessageType.Reply,
+ id: callId,
+ res: value
+ };
+ }
+
+ parseReplyErr(msg: ReadBuffer): ReplyErrMessage {
+ const callId = msg.readUint32();
+
+ const err = this.readTypedValue(msg);
+
+ return {
+ type: RpcMessageType.ReplyErr,
+ id: callId,
+ err
+ };
+ }
+
+ readArray(buf: ReadBuffer): any[] {
+ const length = buf.readLength();
+ const result = new Array(length);
+ for (let i = 0; i < length; i++) {
+ result[i] = this.readTypedValue(buf);
+ }
+ return result;
+ }
+
+ readTypedValue(buf: ReadBuffer): any {
+ const type = buf.readUint8();
+ const decoder = this.decoders.get(type);
+ if (!decoder) {
+ throw new Error(`No decoder for tag ${type}`);
+ }
+ return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer));
+ }
+}
+
+/**
+ * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is
+ * up to clients to commit the message. This allows for multiple messages being
+ * encoded before sending.
+ */
+export class RpcMessageEncoder {
+
+ protected readonly encoders: [number, ValueEncoder][] = [];
+ protected readonly registeredTags: Set = new Set();
+
+ constructor() {
+ this.registerEncoders();
+ }
+
+ protected registerEncoders(): void {
+ // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last
+ this.registerEncoder(ObjectType.JSON, {
+ is: () => true,
+ write: (buf, value) => {
+ buf.writeString(JSON.stringify(value));
+ }
+ });
+
+ this.registerEncoder(ObjectType.Object, {
+ is: value => typeof value === 'object',
+ write: (buf, object, recursiveEncode) => {
+ const properties = Object.keys(object);
+ const relevant = [];
+ for (const property of properties) {
+ const value = object[property];
+ if (typeof value !== 'function') {
+ relevant.push([property, value]);
+ }
+ }
+
+ buf.writeLength(relevant.length);
+ for (const [property, value] of relevant) {
+ buf.writeString(property);
+ recursiveEncode?.(buf, value);
+ }
+ }
+ });
+
+ this.registerEncoder(ObjectType.Error, {
+ is: value => value instanceof Error,
+ write: (buf, error: Error) => {
+ const { name, message } = error;
+ const stack: string = (error).stacktrace || error.stack;
+ const serializedError = {
+ $isError: true,
+ name,
+ message,
+ stack
+ };
+ buf.writeString(JSON.stringify(serializedError));
+ }
+ });
+
+ this.registerEncoder(ObjectType.ResponseError, {
+ is: value => value instanceof ResponseError,
+ write: (buf, value) => buf.writeString(JSON.stringify(value))
+ });
+
+ this.registerEncoder(ObjectType.Undefined, {
+ // eslint-disable-next-line no-null/no-null
+ is: value => value == null,
+ write: () => { }
+ });
+
+ this.registerEncoder(ObjectType.ObjectArray, {
+ is: value => Array.isArray(value),
+ write: (buf, value) => {
+ this.writeArray(buf, value);
+ }
+ });
+
+ this.registerEncoder(ObjectType.ByteArray, {
+ is: value => value instanceof Uint8Array,
+ write: (buf, value) => {
+ buf.writeBytes(value);
+ }
+ });
+
+ this.registerEncoder(ObjectType.String, {
+ is: value => typeof value === 'string',
+ write: (buf, value) => {
+ buf.writeString(value);
+ }
+ });
+
+ this.registerEncoder(ObjectType.Boolean, {
+ is: value => typeof value === 'boolean',
+ write: (buf, value) => {
+ buf.writeUint8(value === true ? 1 : 0);
+ }
+ });
+
+ this.registerEncoder(ObjectType.Number, {
+ is: value => typeof value === 'number',
+ write: (buf, value) => {
+ buf.writeNumber(value);
+ }
+ });
+ }
+
+ /**
+ * Registers a new {@link ValueEncoder} for the given tag.
+ * After the successful registration the {@link tagIntType} is recomputed
+ * by retrieving the highest tag value and calculating the required Uint size to store it.
+ * @param tag the tag for which the encoder should be registered.
+ * @param decoder the encoder that should be registered.
+ */
+ registerEncoder(tag: number, encoder: ValueEncoder): void {
+ if (this.registeredTags.has(tag)) {
+ throw new Error(`Tag already registered: ${tag}`);
+ }
+ this.registeredTags.add(tag);
+ this.encoders.push([tag, encoder]);
+ }
+
+ cancel(buf: WriteBuffer, requestId: number): void {
+ buf.writeUint8(RpcMessageType.Cancel);
+ buf.writeUint32(requestId);
+ }
+
+ notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
+ buf.writeUint8(RpcMessageType.Notification);
+ buf.writeUint32(requestId);
+ buf.writeString(method);
+ this.writeArray(buf, args);
+ }
+
+ request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
+ buf.writeUint8(RpcMessageType.Request);
+ buf.writeUint32(requestId);
+ buf.writeString(method);
+ this.writeArray(buf, args);
+ }
+
+ replyOK(buf: WriteBuffer, requestId: number, res: any): void {
+ buf.writeUint8(RpcMessageType.Reply);
+ buf.writeUint32(requestId);
+ this.writeTypedValue(buf, res);
+ }
+
+ replyErr(buf: WriteBuffer, requestId: number, err: any): void {
+ buf.writeUint8(RpcMessageType.ReplyErr);
+ buf.writeUint32(requestId);
+ this.writeTypedValue(buf, err);
+ }
+
+ writeTypedValue(buf: WriteBuffer, value: any): void {
+ for (let i: number = this.encoders.length - 1; i >= 0; i--) {
+ if (this.encoders[i][1].is(value)) {
+ buf.writeUint8(this.encoders[i][0]);
+ this.encoders[i][1].write(buf, value, (innerBuffer, innerValue) => {
+ this.writeTypedValue(innerBuffer, innerValue);
+ });
+ return;
+ }
+ }
+ }
+
+ writeArray(buf: WriteBuffer, value: any[]): void {
+ buf.writeLength(value.length);
+ for (let i = 0; i < value.length; i++) {
+ this.writeTypedValue(buf, value[i]);
+ }
+ }
+}
diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts
new file mode 100644
index 0000000000000..4854fa124d612
--- /dev/null
+++ b/packages/core/src/common/message-rpc/rpc-protocol.ts
@@ -0,0 +1,212 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+/* eslint-disable @typescript-eslint/no-explicit-any */
+
+import { CancellationToken, CancellationTokenSource } from '../cancellation';
+import { Emitter, Event } from '../event';
+import { Deferred } from '../promise-util';
+import { Channel } from './channel';
+import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder';
+import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
+
+/**
+ * Handles request messages received by the {@link RpcServer}.
+ */
+export type RequestHandler = (method: string, args: any[]) => Promise;
+
+/**
+ * Initialization options for a {@link RpcProtocol}.
+ */
+export interface RpcProtocolOptions {
+ /**
+ * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used.
+ */
+ encoder?: RpcMessageEncoder,
+ /**
+ * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used.
+ */
+ decoder?: RpcMessageDecoder
+}
+/**
+ * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
+ * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
+ * Clients can get a promise for a remote request result that will be either resolved or
+ * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request
+ * Currently, there is no timeout handling for long running requests implemented.
+ */
+export class RpcProtocol {
+ static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token';
+
+ protected readonly pendingRequests: Map> = new Map();
+
+ protected nextMessageId: number = 0;
+
+ protected readonly encoder: RpcMessageEncoder;
+ protected readonly decoder: RpcMessageDecoder;
+
+ protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter();
+ protected readonly cancellationTokenSources = new Map();
+
+ get onNotification(): Event<{ method: string; args: any[]; }> {
+ return this.onNotificationEmitter.event;
+ }
+
+ constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
+ this.encoder = options.encoder ?? new RpcMessageEncoder();
+ this.decoder = options.decoder ?? new RpcMessageDecoder();
+ const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())));
+ channel.onClose(() => registration.dispose());
+
+ }
+
+ handleMessage(message: RpcMessage): void {
+ switch (message.type) {
+ case RpcMessageType.Cancel: {
+ this.handleCancel(message.id);
+ break;
+ }
+ case RpcMessageType.Request: {
+ this.handleRequest(message.id, message.method, message.args);
+ break;
+ }
+ case RpcMessageType.Notification: {
+ this.handleNotify(message.id, message.method, message.args);
+ break;
+ }
+ case RpcMessageType.Reply: {
+ this.handleReply(message.id, message.res);
+ break;
+ }
+ case RpcMessageType.ReplyErr: {
+ this.handleReplyErr(message.id, message.err);
+ break;
+ }
+ }
+ }
+
+ protected handleReply(id: number, value: any): void {
+ const replyHandler = this.pendingRequests.get(id);
+ if (replyHandler) {
+ this.pendingRequests.delete(id);
+ replyHandler.resolve(value);
+ } else {
+ console.warn(`reply: no handler for message: ${id}`);
+ }
+ }
+
+ protected handleReplyErr(id: number, error: any): void {
+ try {
+ const replyHandler = this.pendingRequests.get(id);
+ if (replyHandler) {
+ this.pendingRequests.delete(id);
+ replyHandler.reject(error);
+ } else {
+ console.warn(`error: no handler for message: ${id}`);
+ }
+ } catch (err) {
+ throw err;
+ }
+ }
+
+ sendRequest(method: string, args: any[]): Promise {
+
+ const id = this.nextMessageId++;
+ const reply = new Deferred();
+
+ // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the
+ // args array and the `CANCELLATION_TOKEN_KEY` string instead.
+ const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
+ if (cancellationToken && cancellationToken.isCancellationRequested) {
+ return Promise.reject(this.cancelError());
+ }
+
+ if (cancellationToken) {
+ args.push(RpcProtocol.CANCELLATION_TOKEN_KEY);
+ cancellationToken.onCancellationRequested(() => {
+ this.sendCancel(id);
+ this.pendingRequests.get(id)?.reject(this.cancelError());
+ }
+ );
+ }
+ this.pendingRequests.set(id, reply);
+
+ const output = this.channel.getWriteBuffer();
+ this.encoder.request(output, id, method, args);
+ output.commit();
+ return reply.promise;
+ }
+
+ sendNotification(method: string, args: any[]): void {
+ const output = this.channel.getWriteBuffer();
+ this.encoder.notification(output, this.nextMessageId++, method, args);
+ output.commit();
+ }
+
+ sendCancel(requestId: number): void {
+ const output = this.channel.getWriteBuffer();
+ this.encoder.cancel(output, requestId);
+ output.commit();
+ }
+
+ cancelError(): Error {
+ const error = new Error('"Request has already been canceled by the sender"');
+ error.name = 'Cancel';
+ return error;
+ }
+
+ protected handleCancel(id: number): void {
+ const cancellationTokenSource = this.cancellationTokenSources.get(id);
+ if (cancellationTokenSource) {
+ this.cancellationTokenSources.delete(id);
+ cancellationTokenSource.cancel();
+ }
+ }
+
+ protected async handleRequest(id: number, method: string, args: any[]): Promise {
+
+ const output = this.channel.getWriteBuffer();
+
+ // Check if the last argument of the received args is the key for indicating that a cancellation token should be used
+ // If so remove the key from the args and create a new cancellation token.
+ const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false;
+ if (addToken) {
+ const tokenSource = new CancellationTokenSource();
+ this.cancellationTokenSources.set(id, tokenSource);
+ args.push(tokenSource.token);
+ }
+
+ try {
+ const result = await this.requestHandler(method, args);
+ this.cancellationTokenSources.delete(id);
+ this.encoder.replyOK(output, id, result);
+ output.commit();
+ } catch (err) {
+ // In case of an error the output buffer might already contains parts of an message.
+ // => Dispose the current buffer and retrieve a new, clean one for writing the response error.
+ if (output instanceof Uint8ArrayWriteBuffer) {
+ output.dispose();
+ }
+ const errorOutput = this.channel.getWriteBuffer();
+ this.cancellationTokenSources.delete(id);
+ this.encoder.replyErr(errorOutput, id, err);
+ errorOutput.commit();
+ }
+ }
+
+ protected async handleNotify(id: number, method: string, args: any[]): Promise {
+ this.onNotificationEmitter.fire({ method, args });
+ }
+}
diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts
new file mode 100644
index 0000000000000..59cccbf90a605
--- /dev/null
+++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts
@@ -0,0 +1,41 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+import { expect } from 'chai';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
+
+describe('array message buffer tests', () => {
+ it('basic read write test', () => {
+ const buffer = new Uint8Array(1024);
+ const writer = new Uint8ArrayWriteBuffer(buffer);
+
+ writer.writeUint8(8);
+ writer.writeUint32(10000);
+ writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
+ writer.writeString('this is a string');
+ writer.writeString('another string');
+ writer.commit();
+
+ const written = writer.getCurrentContents();
+
+ const reader = new Uint8ArrayReadBuffer(written);
+
+ expect(reader.readUint8()).equal(8);
+ expect(reader.readUint32()).equal(10000);
+ expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]));
+ expect(reader.readString()).equal('this is a string');
+ expect(reader.readString()).equal('another string');
+ });
+});
diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts
new file mode 100644
index 0000000000000..feec31dcd69cf
--- /dev/null
+++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts
@@ -0,0 +1,206 @@
+// *****************************************************************************
+// Copyright (C) 2021 Red Hat, Inc. and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+import { Disposable } from '../disposable';
+import { Emitter, Event } from '../event';
+import { ReadBuffer, WriteBuffer } from './message-buffer';
+
+/**
+ * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering.
+ * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events.
+ * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed
+ * and can no longer be used for writing data. If the writer buffer is no longer needed but the message
+ * has not been committed yet it has to be disposed manually.
+ */
+export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable {
+
+ private encoder = new TextEncoder();
+ private msg: DataView;
+ private isDisposed = false;
+ private offset: number;
+
+ constructor(private buffer: Uint8Array = new Uint8Array(1024), writePosition: number = 0) {
+ this.offset = buffer.byteOffset + writePosition;
+ this.msg = new DataView(buffer.buffer);
+ }
+
+ ensureCapacity(value: number): this {
+ let newLength = this.buffer.byteLength;
+ while (newLength < this.offset + value) {
+ newLength *= 2;
+ }
+ if (newLength !== this.buffer.byteLength) {
+ const newBuffer = new Uint8Array(newLength);
+ newBuffer.set(this.buffer);
+ this.buffer = newBuffer;
+ this.msg = new DataView(this.buffer.buffer);
+ }
+ return this;
+ }
+
+ writeLength(length: number): this {
+ if (length < 0 || (length % 1) !== 0) {
+ throw new Error(`Could not write the given length value. '${length}' is not an integer >= 0`);
+ }
+ if (length < 127) {
+ this.writeUint8(length);
+ } else {
+ this.writeUint8(128 + (length & 127));
+ this.writeLength(length >> 7);
+ }
+ return this;
+ }
+
+ writeNumber(value: number): this {
+ this.ensureCapacity(8);
+ this.msg.setFloat64(this.offset, value);
+ this.offset += 8;
+ return this;
+ }
+
+ writeUint8(value: number): this {
+ this.ensureCapacity(1);
+ this.buffer[this.offset++] = value;
+ return this;
+ }
+
+ writeUint16(value: number): this {
+ this.ensureCapacity(2);
+ this.msg.setUint16(this.offset, value);
+ this.offset += 2;
+ return this;
+ }
+
+ writeUint32(value: number): this {
+ this.ensureCapacity(4);
+ this.msg.setUint32(this.offset, value);
+ this.offset += 4;
+ return this;
+ }
+
+ writeString(value: string): this {
+ this.ensureCapacity(4 * value.length);
+ const result = this.encoder.encodeInto(value, this.buffer.subarray(this.offset + 4));
+ this.msg.setUint32(this.offset, result.written!);
+ this.offset += 4 + result.written!;
+ return this;
+ }
+
+ writeBytes(value: Uint8Array): this {
+ this.writeLength(value.byteLength);
+ this.ensureCapacity(value.length);
+ this.buffer.set(value, this.offset);
+ this.offset += value.length;
+ return this;
+ }
+
+ private onCommitEmitter = new Emitter();
+ get onCommit(): Event {
+ return this.onCommitEmitter.event;
+ }
+
+ commit(): void {
+ if (this.isDisposed) {
+ throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed.");
+ }
+ this.onCommitEmitter.fire(this.getCurrentContents());
+ this.dispose();
+ }
+
+ getCurrentContents(): Uint8Array {
+ return this.buffer.slice(this.buffer.byteOffset, this.offset);
+ }
+
+ dispose(): void {
+ if (!this.isDisposed) {
+ this.onCommitEmitter.dispose();
+ this.isDisposed = true;
+ }
+ }
+
+}
+/**
+ * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering.
+ * Is for single message read. A message can only be read once.
+ */
+export class Uint8ArrayReadBuffer implements ReadBuffer {
+ private offset: number = 0;
+ private msg: DataView;
+ private decoder = new TextDecoder();
+
+ constructor(private readonly buffer: Uint8Array, readPosition = 0) {
+ this.offset = buffer.byteOffset + readPosition;
+ this.msg = new DataView(buffer.buffer);
+ }
+
+ readUint8(): number {
+ return this.msg.getUint8(this.offset++);
+ }
+
+ readUint16(): number {
+ const result = this.msg.getUint16(this.offset);
+ this.offset += 2;
+ return result;
+ }
+
+ readUint32(): number {
+ const result = this.msg.getUint32(this.offset);
+ this.offset += 4;
+ return result;
+ }
+
+ readLength(): number {
+ let shift = 0;
+ let byte = this.readUint8();
+ let value = (byte & 127) << shift;
+ while (byte > 127) {
+ shift += 7;
+ byte = this.readUint8();
+ value = value + ((byte & 127) << shift);
+ }
+ return value;
+ }
+
+ readNumber(): number {
+ const result = this.msg.getFloat64(this.offset);
+ this.offset += 8;
+ return result;
+ }
+
+ readString(): string {
+ const len = this.readUint32();
+ const sliceOffset = this.offset - this.buffer.byteOffset;
+ const result = this.decodeString(this.buffer.slice(sliceOffset, sliceOffset + len));
+ this.offset += len;
+ return result;
+ }
+
+ private decodeString(buf: Uint8Array): string {
+ return this.decoder.decode(buf);
+ }
+
+ readBytes(): Uint8Array {
+ const length = this.readLength();
+ const sliceOffset = this.offset - this.buffer.byteOffset;
+ const result = this.buffer.slice(sliceOffset, sliceOffset + length);
+ this.offset += length;
+ return result;
+ }
+
+ sliceAtReadPosition(): ReadBuffer {
+ const sliceOffset = this.offset - this.buffer.byteOffset;
+ return new Uint8ArrayReadBuffer(this.buffer, sliceOffset);
+ }
+}
diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts
index d4c5c3bd3aaf3..d003fe86fc683 100644
--- a/packages/core/src/common/messaging/abstract-connection-provider.ts
+++ b/packages/core/src/common/messaging/abstract-connection-provider.ts
@@ -15,11 +15,10 @@
// *****************************************************************************
import { injectable, interfaces } from 'inversify';
-import { ConsoleLogger, createWebSocketConnection, Logger } from 'vscode-ws-jsonrpc';
import { Emitter, Event } from '../event';
import { ConnectionHandler } from './handler';
import { JsonRpcProxy, JsonRpcProxyFactory } from './proxy-factory';
-import { WebSocketChannel } from './web-socket-channel';
+import { Channel, ChannelMultiplexer } from '../message-rpc/channel';
/**
* Factor common logic according to `ElectronIpcConnectionProvider` and
@@ -45,9 +44,6 @@ export abstract class AbstractConnectionProvider
throw new Error('abstract');
}
- protected channelIdSeq = 0;
- protected readonly channels = new Map();
-
protected readonly onIncomingMessageActivityEmitter: Emitter = new Emitter();
get onIncomingMessageActivity(): Event {
return this.onIncomingMessageActivityEmitter.event;
@@ -75,50 +71,39 @@ export abstract class AbstractConnectionProvider
return factory.createProxy();
}
+ protected channelMultiPlexer: ChannelMultiplexer;
+
+ constructor() {
+ this.channelMultiPlexer = this.createMultiplexer();
+ }
+
+ protected createMultiplexer(): ChannelMultiplexer {
+ return new ChannelMultiplexer(this.createMainChannel());
+ }
+
/**
* Install a connection handler for the given path.
*/
listen(handler: ConnectionHandler, options?: AbstractOptions): void {
this.openChannel(handler.path, channel => {
- const connection = createWebSocketConnection(channel, this.createLogger());
- connection.onDispose(() => channel.close());
- handler.onConnection(connection);
+ handler.onConnection(channel);
}, options);
}
- openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: AbstractOptions): void {
- const id = this.channelIdSeq++;
- const channel = this.createChannel(id);
- this.channels.set(id, channel);
- channel.onClose(() => {
- if (this.channels.delete(channel.id)) {
- const { reconnecting } = { reconnecting: true, ...options };
- if (reconnecting) {
- this.openChannel(path, handler, options);
- }
- } else {
- console.error('The ws channel does not exist', channel.id);
+ async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise {
+ const newChannel = await this.channelMultiPlexer.open(path);
+ newChannel.onClose(() => {
+ const { reconnecting } = { reconnecting: true, ...options };
+ if (reconnecting) {
+ this.openChannel(path, handler, options);
}
});
- channel.onOpen(() => handler(channel));
- channel.open(path);
+ handler(newChannel);
}
- protected abstract createChannel(id: number): WebSocketChannel;
-
- protected handleIncomingRawMessage(data: string): void {
- const message: WebSocketChannel.Message = JSON.parse(data);
- const channel = this.channels.get(message.id);
- if (channel) {
- channel.handleMessage(message);
- } else {
- console.error('The ws channel does not exist', message.id);
- }
- this.onIncomingMessageActivityEmitter.fire(undefined);
- }
-
- protected createLogger(): Logger {
- return new ConsoleLogger();
- }
+ /**
+ * Create the main connection that is used for multiplexing all channels.
+ */
+ protected abstract createMainChannel(): Channel;
}
diff --git a/packages/core/src/common/messaging/connection-error-handler.ts b/packages/core/src/common/messaging/connection-error-handler.ts
index aecfe68901e24..89a27b60a50db 100644
--- a/packages/core/src/common/messaging/connection-error-handler.ts
+++ b/packages/core/src/common/messaging/connection-error-handler.ts
@@ -14,7 +14,6 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import { Message } from 'vscode-ws-jsonrpc';
import { ILogger } from '../../common';
export interface ResolvedConnectionErrorHandlerOptions {
@@ -51,7 +50,7 @@ export class ConnectionErrorHandler {
};
}
- shouldStop(error: Error, message?: Message, count?: number): boolean {
+ shouldStop(error: Error, count?: number): boolean {
return !count || count > this.options.maxErrors;
}
diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts
index ed03d9d331206..1e790d38aeec3 100644
--- a/packages/core/src/common/messaging/handler.ts
+++ b/packages/core/src/common/messaging/handler.ts
@@ -14,11 +14,11 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import { MessageConnection } from 'vscode-ws-jsonrpc';
+import { Channel } from '../message-rpc/channel';
export const ConnectionHandler = Symbol('ConnectionHandler');
export interface ConnectionHandler {
readonly path: string;
- onConnection(connection: MessageConnection): void;
+ onConnection(connection: Channel): void;
}
diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts
index 2fd0700a41034..37280e4dbfdaa 100644
--- a/packages/core/src/common/messaging/proxy-factory.spec.ts
+++ b/packages/core/src/common/messaging/proxy-factory.spec.ts
@@ -15,21 +15,11 @@
// *****************************************************************************
import * as chai from 'chai';
-import { ConsoleLogger } from '../../node/messaging/logger';
import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory';
-import { createMessageConnection } from 'vscode-jsonrpc/lib/main';
-import * as stream from 'stream';
+import { ChannelPipe } from '../message-rpc/channel.spec';
const expect = chai.expect;
-class NoTransform extends stream.Transform {
-
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- override _transform(chunk: any, encoding: string, callback: Function): void {
- callback(undefined, chunk);
- }
-}
-
class TestServer {
requests: string[] = [];
doStuff(arg: string): Promise {
@@ -102,15 +92,12 @@ function getSetup(): {
const server = new TestServer();
const serverProxyFactory = new JsonRpcProxyFactory(client);
- const client2server = new NoTransform();
- const server2client = new NoTransform();
- const serverConnection = createMessageConnection(server2client, client2server, new ConsoleLogger());
- serverProxyFactory.listen(serverConnection);
+ const pipe = new ChannelPipe();
+ serverProxyFactory.listen(pipe.right);
const serverProxy = serverProxyFactory.createProxy();
const clientProxyFactory = new JsonRpcProxyFactory(server);
- const clientConnection = createMessageConnection(client2server, server2client, new ConsoleLogger());
- clientProxyFactory.listen(clientConnection);
+ clientProxyFactory.listen(pipe.left);
const clientProxy = clientProxyFactory.createProxy();
return {
client,
diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts
index f8869449eae94..765b70dd1a472 100644
--- a/packages/core/src/common/messaging/proxy-factory.ts
+++ b/packages/core/src/common/messaging/proxy-factory.ts
@@ -16,10 +16,12 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
-import { MessageConnection, ResponseError } from 'vscode-ws-jsonrpc';
+import { ResponseError } from '../message-rpc/rpc-message-encoder';
import { ApplicationError } from '../application-error';
-import { Event, Emitter } from '../event';
import { Disposable } from '../disposable';
+import { Emitter, Event } from '../event';
+import { Channel } from '../message-rpc/channel';
+import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol';
import { ConnectionHandler } from './handler';
export type JsonRpcServer = Disposable & {
@@ -45,13 +47,19 @@ export class JsonRpcConnectionHandler implements ConnectionHan
readonly factoryConstructor: new () => JsonRpcProxyFactory = JsonRpcProxyFactory
) { }
- onConnection(connection: MessageConnection): void {
+ onConnection(connection: Channel): void {
const factory = new this.factoryConstructor();
const proxy = factory.createProxy();
factory.target = this.targetFactory(proxy);
factory.listen(connection);
}
}
+/**
+ * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}.
+ */
+export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
+
+const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
/**
* Factory for JSON-RPC proxy objects.
@@ -95,13 +103,14 @@ export class JsonRpcConnectionHandler implements ConnectionHan
*
* @param - The type of the object to expose to JSON-RPC.
*/
+
export class JsonRpcProxyFactory implements ProxyHandler {
protected readonly onDidOpenConnectionEmitter = new Emitter();
protected readonly onDidCloseConnectionEmitter = new Emitter();
- protected connectionPromiseResolve: (connection: MessageConnection) => void;
- protected connectionPromise: Promise;
+ protected connectionPromiseResolve: (connection: RpcProtocol) => void;
+ protected connectionPromise: Promise;
/**
* Build a new JsonRpcProxyFactory.
@@ -109,7 +118,7 @@ export class JsonRpcProxyFactory implements ProxyHandler {
* @param target - The object to expose to JSON-RPC methods calls. If this
* is omitted, the proxy won't be able to handle requests, only send them.
*/
- constructor(public target?: any) {
+ constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) {
this.waitForConnection();
}
@@ -118,7 +127,7 @@ export class JsonRpcProxyFactory implements ProxyHandler {
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
- connection.onClose(() =>
+ connection.channel.onClose(() =>
this.onDidCloseConnectionEmitter.fire(undefined)
);
this.onDidOpenConnectionEmitter.fire(undefined);
@@ -131,11 +140,10 @@ export class JsonRpcProxyFactory implements ProxyHandler {
* This connection will be used to send/receive JSON-RPC requests and
* response.
*/
- listen(connection: MessageConnection): void {
- connection.onRequest((prop, ...args) => this.onRequest(prop, ...args));
- connection.onNotification((prop, ...args) => this.onNotification(prop, ...args));
- connection.onDispose(() => this.waitForConnection());
- connection.listen();
+ listen(channel: Channel): void {
+ const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args));
+ connection.onNotification(event => this.onNotification(event.method, ...event.args));
+
this.connectionPromiseResolve(connection);
}
@@ -239,10 +247,10 @@ export class JsonRpcProxyFactory implements ProxyHandler {
new Promise((resolve, reject) => {
try {
if (isNotify) {
- connection.sendNotification(method, ...args);
+ connection.sendNotification(method, args);
resolve(undefined);
} else {
- const resultPromise = connection.sendRequest(method, ...args) as Promise;
+ const resultPromise = connection.sendRequest(method, args) as Promise;
resultPromise
.catch((err: any) => reject(this.deserializeError(capturedError, err)))
.then((result: any) => resolve(result));
@@ -293,3 +301,4 @@ export class JsonRpcProxyFactory implements ProxyHandler {
}
}
+
diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts
index 28dff9400068a..74f7503e0b997 100644
--- a/packages/core/src/common/messaging/web-socket-channel.ts
+++ b/packages/core/src/common/messaging/web-socket-channel.ts
@@ -16,157 +16,100 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
-import { IWebSocket } from 'vscode-ws-jsonrpc/lib/socket/socket';
-import { Disposable, DisposableCollection } from '../disposable';
-import { Emitter } from '../event';
-
-export class WebSocketChannel implements IWebSocket {
-
+import { Emitter, Event } from '../event';
+import { WriteBuffer } from '../message-rpc';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer';
+import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel';
+
+/**
+ * A channel that manages the main websocket connection between frontend and backend. All service channels
+ * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation
+ * independent of the actual websocket implementation and its execution context (backend vs. frontend).
+ */
+export class WebSocketChannel implements Channel {
static wsPath = '/services';
- protected readonly closeEmitter = new Emitter<[number, string]>();
- protected readonly toDispose = new DisposableCollection(this.closeEmitter);
-
- constructor(
- readonly id: number,
- protected readonly doSend: (content: string) => void
- ) { }
-
- dispose(): void {
- this.toDispose.dispose();
- }
-
- protected checkNotDisposed(): void {
- if (this.toDispose.disposed) {
- throw new Error('The channel has been disposed.');
- }
- }
-
- handleMessage(message: WebSocketChannel.Message): void {
- if (message.kind === 'ready') {
- this.fireOpen();
- } else if (message.kind === 'data') {
- this.fireMessage(message.content);
- } else if (message.kind === 'close') {
- this.fireClose(message.code, message.reason);
- }
- }
-
- open(path: string): void {
- this.checkNotDisposed();
- this.doSend(JSON.stringify({
- kind: 'open',
- id: this.id,
- path
- }));
- }
-
- ready(): void {
- this.checkNotDisposed();
- this.doSend(JSON.stringify({
- kind: 'ready',
- id: this.id
- }));
+ protected readonly onCloseEmitter: Emitter = new Emitter();
+ get onClose(): Event {
+ return this.onCloseEmitter.event;
}
- send(content: string): void {
- this.checkNotDisposed();
- this.doSend(JSON.stringify({
- kind: 'data',
- id: this.id,
- content
- }));
+ protected readonly onMessageEmitter: Emitter = new Emitter();
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
}
- close(code: number = 1000, reason: string = ''): void {
- if (this.closing) {
- // Do not try to close the channel if it is already closing.
- return;
- }
- this.checkNotDisposed();
- this.doSend(JSON.stringify({
- kind: 'close',
- id: this.id,
- code,
- reason
- }));
- this.fireClose(code, reason);
+ protected readonly onErrorEmitter: Emitter = new Emitter();
+ get onError(): Event {
+ return this.onErrorEmitter.event;
}
- tryClose(code: number = 1000, reason: string = ''): void {
- if (this.closing || this.toDispose.disposed) {
- // Do not try to close the channel if it is already closing or disposed.
- return;
- }
- this.doSend(JSON.stringify({
- kind: 'close',
- id: this.id,
- code,
- reason
+ constructor(protected readonly socket: IWebSocket) {
+ socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code }));
+ socket.onError(error => this.onErrorEmitter.fire(error));
+ // eslint-disable-next-line arrow-body-style
+ socket.onMessage(data => this.onMessageEmitter.fire(() => {
+ // In the browser context socketIO receives binary messages as ArrayBuffers.
+ // So we have to convert them to a Uint8Array before delegating the message to the read buffer.
+ const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data;
+ return new Uint8ArrayReadBuffer(buffer);
}));
- this.fireClose(code, reason);
}
- protected fireOpen: () => void = () => { };
- onOpen(cb: () => void): void {
- this.checkNotDisposed();
- this.fireOpen = cb;
- this.toDispose.push(Disposable.create(() => this.fireOpen = () => { }));
- }
+ getWriteBuffer(): WriteBuffer {
+ const result = new Uint8ArrayWriteBuffer();
- protected fireMessage: (data: any) => void = () => { };
- onMessage(cb: (data: any) => void): void {
- this.checkNotDisposed();
- this.fireMessage = cb;
- this.toDispose.push(Disposable.create(() => this.fireMessage = () => { }));
- }
+ result.onCommit(buffer => {
+ if (this.socket.isConnected()) {
+ this.socket.send(buffer);
+ } else {
+ console.warn('Could not send message. Websocket is not connected');
+ }
+ });
- fireError: (reason: any) => void = () => { };
- onError(cb: (reason: any) => void): void {
- this.checkNotDisposed();
- this.fireError = cb;
- this.toDispose.push(Disposable.create(() => this.fireError = () => { }));
+ return result;
}
- protected closing = false;
- protected fireClose(code: number, reason: string): void {
- if (this.closing) {
- return;
- }
- this.closing = true;
- try {
- this.closeEmitter.fire([code, reason]);
- } finally {
- this.closing = false;
- }
- this.dispose();
- }
- onClose(cb: (code: number, reason: string) => void): Disposable {
- this.checkNotDisposed();
- return this.closeEmitter.event(([code, reason]) => cb(code, reason));
+ close(): void {
+ this.socket.close();
+ this.onCloseEmitter.dispose();
+ this.onMessageEmitter.dispose();
+ this.onErrorEmitter.dispose();
}
-
}
-export namespace WebSocketChannel {
- export interface OpenMessage {
- kind: 'open'
- id: number
- path: string
- }
- export interface ReadyMessage {
- kind: 'ready'
- id: number
- }
- export interface DataMessage {
- kind: 'data'
- id: number
- content: string
- }
- export interface CloseMessage {
- kind: 'close'
- id: number
- code: number
- reason: string
- }
- export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage;
+
+/**
+ * An abstraction that enables reuse of the `{@link WebSocketChannel} class in the frontend and backend
+ * independent of the actual underlying socket implementation.
+ */
+export interface IWebSocket {
+ /**
+ * Sends the given message over the web socket in binary format.
+ * @param message The binary message.
+ */
+ send(message: Uint8Array): void;
+ /**
+ * Closes the websocket from the local side.
+ */
+ close(): void;
+ /**
+ * The connection state of the web socket.
+ */
+ isConnected(): boolean;
+ /**
+ * Listener callback to handle incoming messages.
+ * @param cb The callback.
+ */
+ onMessage(cb: (message: Uint8Array) => void): void;
+ /**
+ * Listener callback to handle socket errors.
+ * @param cb The callback.
+ */
+ onError(cb: (reason: any) => void): void;
+ /**
+ * Listener callback to handle close events (Remote side).
+ * @param cb The callback.
+ */
+ onClose(cb: (reason: string, code?: number) => void): void;
}
+
diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts
index b3d8ce8ab9415..6aa06861d2e93 100644
--- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts
+++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts
@@ -17,9 +17,11 @@
import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron';
import { injectable, interfaces } from 'inversify';
import { JsonRpcProxy } from '../../common/messaging';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler';
+import { Emitter, Event } from '../../common';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer';
+import { Channel, MessageProvider } from '../../common/message-rpc/channel';
export interface ElectronIpcOptions {
}
@@ -34,17 +36,25 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(path, arg);
}
- constructor() {
- super();
- ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: ElectronEvent, data: string) => {
- this.handleIncomingRawMessage(data);
- });
- }
-
- protected createChannel(id: number): WebSocketChannel {
- return new WebSocketChannel(id, content => {
- ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content);
+ protected createMainChannel(): Channel {
+ const onMessageEmitter = new Emitter();
+ ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => {
+ onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data));
});
+ return {
+ close: () => Event.None,
+ getWriteBuffer: () => {
+ const writer = new Uint8ArrayWriteBuffer();
+ writer.onCommit(buffer =>
+ // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array.
+ ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer)
+ );
+ return writer;
+ },
+ onClose: Event.None,
+ onError: Event.None,
+ onMessage: onMessageEmitter.event
+ };
}
}
diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts
index 6f75ea31d0dae..ac99ee85bbab8 100644
--- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts
+++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts
@@ -15,9 +15,9 @@
// *****************************************************************************
import { injectable } from 'inversify';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider';
import { FrontendApplicationContribution } from '../../browser/frontend-application';
+import { Channel } from '../../common';
/**
* Customized connection provider between the frontend and the backend in electron environment.
@@ -34,16 +34,13 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv
onStop(): void {
this.stopping = true;
- // Close the websocket connection `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay.
+ // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay.
// https://github.com/eclipse-theia/theia/issues/6499
- for (const channel of [...this.channels.values()]) {
- // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page.
- // But we cannot use `1001`: https://github.com/TypeFox/vscode-ws-jsonrpc/issues/15
- channel.close(1000, 'The frontend is "going away"...');
- }
+ // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page.
+ this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 });
}
- override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
+ override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise {
if (!this.stopping) {
super.openChannel(path, handler, options);
}
diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts
index 071796c5cf0ca..dfd8f6115c8ce 100644
--- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts
+++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts
@@ -16,24 +16,23 @@
import { IpcMainEvent, ipcMain, WebContents } from '@theia/electron/shared/electron';
import { inject, injectable, named, postConstruct } from 'inversify';
-import { MessageConnection } from 'vscode-ws-jsonrpc';
-import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection';
import { ContributionProvider } from '../../common/contribution-provider';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { MessagingContribution } from '../../node/messaging/messaging-contribution';
-import { ConsoleLogger } from '../../node/messaging/logger';
import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler';
import { ElectronMainApplicationContribution } from '../electron-main-application';
import { ElectronMessagingService } from './electron-messaging-service';
+import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel';
+import { Emitter, Event, WriteBuffer } from '../../common';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer';
/**
* This component replicates the role filled by `MessagingContribution` but for Electron.
* Unlike the WebSocket based implementation, we do not expect to receive
* connection events. Instead, we'll create channels based on incoming `open`
* events on the `ipcMain` channel.
- *
* This component allows communication between renderer process (frontend) and electron main process.
*/
+
@injectable()
export class ElectronMessagingContribution implements ElectronMainApplicationContribution, ElectronMessagingService {
@@ -43,89 +42,112 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon
@inject(ContributionProvider) @named(ElectronConnectionHandler)
protected readonly connectionHandlers: ContributionProvider;
- protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers();
- protected readonly windowChannels = new Map>();
+ protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers();
+ /**
+ * Each electron window has a main chanel and its own multiplexer to route multiple client messages the same IPC connection.
+ */
+ protected readonly windowChannelMultiplexer = new Map();
@postConstruct()
protected init(): void {
- ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: string) => {
- this.handleIpcMessage(event, data);
+ ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => {
+ this.handleIpcEvent(event, data);
});
}
+ protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void {
+ const sender = event.sender;
+ // Get the multiplexer for a given window id
+ try {
+ const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender);
+ windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data));
+ } catch (error) {
+ console.error('IPC: Failed to handle message', { error, data });
+ }
+ }
+
+ // Creates a new multiplexer for a given sender/window
+ protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } {
+ const mainChannel = this.createWindowMainChannel(sender);
+ const multiPlexer = new ChannelMultiplexer(mainChannel);
+ multiPlexer.onDidOpenChannel(openEvent => {
+ const { channel, id } = openEvent;
+ if (this.channelHandlers.route(id, channel)) {
+ console.debug(`Opening channel for service path '${id}'.`);
+ channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`));
+ }
+ });
+
+ sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window.
+ sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window.
+ const data = { channel: mainChannel, multiPlexer };
+ this.windowChannelMultiplexer.set(sender.id, data);
+ return data;
+ }
+
+ /**
+ * Creates the main channel to a window.
+ * @param sender The window that the channel should be established to.
+ */
+ protected createWindowMainChannel(sender: WebContents): ElectronWebContentChannel {
+ return new ElectronWebContentChannel(sender);
+ }
+
onStart(): void {
for (const contribution of this.messagingContributions.getContributions()) {
contribution.configure(this);
}
for (const connectionHandler of this.connectionHandlers.getContributions()) {
this.channelHandlers.push(connectionHandler.path, (params, channel) => {
- const connection = createWebSocketConnection(channel, new ConsoleLogger());
- connectionHandler.onConnection(connection);
+ connectionHandler.onConnection(channel);
});
}
}
- listen(spec: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void {
- this.ipcChannel(spec, (params, channel) => {
- const connection = createWebSocketConnection(channel, new ConsoleLogger());
- callback(params, connection);
- });
- }
-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- ipcChannel(spec: string, callback: (params: any, channel: WebSocketChannel) => void): void {
+ ipcChannel(spec: string, callback: (params: any, channel: Channel) => void): void {
this.channelHandlers.push(spec, callback);
}
+}
+
+/**
+ * Used to establish a connection between the ipcMain and the Electron frontend (window).
+ * Messages a transferred via electron IPC.
+ */
+export class ElectronWebContentChannel implements Channel {
+ protected readonly onCloseEmitter: Emitter = new Emitter();
+ get onClose(): Event {
+ return this.onCloseEmitter.event;
+ }
- protected handleIpcMessage(event: IpcMainEvent, data: string): void {
- const sender = event.sender;
- try {
- // Get the channel map for a given window id
- let channels = this.windowChannels.get(sender.id)!;
- if (!channels) {
- this.windowChannels.set(sender.id, channels = new Map());
- }
- // Start parsing the message to extract the channel id and route
- const message: WebSocketChannel.Message = JSON.parse(data.toString());
- // Someone wants to open a logical channel
- if (message.kind === 'open') {
- const { id, path } = message;
- const channel = this.createChannel(id, sender);
- if (this.channelHandlers.route(path, channel)) {
- channel.ready();
- channels.set(id, channel);
- channel.onClose(() => channels.delete(id));
- } else {
- console.error('Cannot find a service for the path: ' + path);
- }
- } else {
- const { id } = message;
- const channel = channels.get(id);
- if (channel) {
- channel.handleMessage(message);
- } else {
- console.error('The ipc channel does not exist', id);
- }
- }
- const close = () => {
- for (const channel of Array.from(channels.values())) {
- channel.close(undefined, 'webContent destroyed');
- }
- channels.clear();
- };
- sender.once('did-navigate', close); // When refreshing the browser window.
- sender.once('destroyed', close); // When closing the browser window.
- } catch (error) {
- console.error('IPC: Failed to handle message', { error, data });
- }
+ // Make the message emitter public so that we can easily forward messages received from the ipcMain.
+ readonly onMessageEmitter: Emitter = new Emitter();
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
+ }
+
+ protected readonly onErrorEmitter: Emitter = new Emitter();
+ get onError(): Event {
+ return this.onErrorEmitter.event;
+ }
+
+ constructor(protected readonly sender: Electron.WebContents) {
}
- protected createChannel(id: number, sender: WebContents): WebSocketChannel {
- return new WebSocketChannel(id, content => {
- if (!sender.isDestroyed()) {
- sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, content);
+ getWriteBuffer(): WriteBuffer {
+ const writer = new Uint8ArrayWriteBuffer();
+
+ writer.onCommit(buffer => {
+ if (!this.sender.isDestroyed()) {
+ this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer);
}
});
- }
+ return writer;
+ }
+ close(): void {
+ this.onCloseEmitter.dispose();
+ this.onMessageEmitter.dispose();
+ this.onErrorEmitter.dispose();
+ }
}
diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts
index dde3fdde1d181..874d51237b4fd 100644
--- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts
+++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts
@@ -14,20 +14,14 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import type { MessageConnection } from 'vscode-jsonrpc';
-import type { WebSocketChannel } from '../../common/messaging/web-socket-channel';
+import { Channel } from '../../common/message-rpc/channel';
export interface ElectronMessagingService {
- /**
- * Accept a JSON-RPC connection on the given path.
- * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
- */
- listen(path: string, callback: (params: ElectronMessagingService.PathParams, connection: MessageConnection) => void): void;
/**
* Accept an ipc channel on the given path.
* A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
*/
- ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: WebSocketChannel) => void): void;
+ ipcChannel(path: string, callback: (params: ElectronMessagingService.PathParams, socket: Channel) => void): void;
}
export namespace ElectronMessagingService {
export interface PathParams {
diff --git a/packages/core/src/node/messaging/binary-message-pipe.ts b/packages/core/src/node/messaging/binary-message-pipe.ts
new file mode 100644
index 0000000000000..1143aef9cf4cc
--- /dev/null
+++ b/packages/core/src/node/messaging/binary-message-pipe.ts
@@ -0,0 +1,168 @@
+// *****************************************************************************
+// Copyright (C) 2022 STMicroelectronics and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+import { Duplex } from 'stream';
+import { Disposable, Emitter, Event } from '../../common';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer';
+
+/**
+ * A `BinaryMessagePipe` is capable of sending and retrieving binary messages i.e. {@link Uint8Array}s over
+ * and underlying streamed process pipe/fd. The message length of individual messages is encoding at the beginning of
+ * a new message. This makes it possible to extract messages from the streamed data.
+ */
+export class BinaryMessagePipe implements Disposable {
+ static readonly MESSAGE_START_IDENTIFIER = '';
+ protected dataHandler = (chunk: Uint8Array) => this.handleChunk(chunk);
+
+ protected onMessageEmitter = new Emitter();
+ protected cachedMessageData: StreamedMessageData = {
+ chunks: [],
+ missingBytes: 0
+ };
+
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
+ }
+
+ constructor(protected readonly underlyingPipe: Duplex) {
+ underlyingPipe.on('data', this.dataHandler);
+ }
+
+ send(message: Uint8Array): void {
+ this.underlyingPipe.write(this.encodeMessageStart(message));
+ this.underlyingPipe.write(message);
+ }
+
+ protected handleChunk(chunk: Uint8Array): void {
+ if (this.cachedMessageData.missingBytes === 0) {
+ // There is no currently streamed message => We expect that the beginning of the chunk is the message start for a new message
+ this.handleNewMessage(chunk);
+ } else {
+ // The chunk contains message data intended for the currently cached message
+ this.handleMessageContentChunk(chunk);
+ }
+ }
+
+ protected handleNewMessage(chunk: Uint8Array): void {
+ if (chunk.byteLength < this.messageStartByteLength) {
+ // The chunk only contains a part of the encoded message start
+ this.cachedMessageData.partialMessageStart = chunk;
+ return;
+ }
+
+ const messageLength = this.readMessageStart(chunk);
+
+ if (chunk.length - this.messageStartByteLength > messageLength) {
+ // The initial chunk contains more than one binary message => Fire `onMessage` for first message and handle remaining content
+ const firstMessage = chunk.slice(this.messageStartByteLength, this.messageStartByteLength + messageLength);
+ this.onMessageEmitter.fire(firstMessage);
+ this.handleNewMessage(chunk.slice(this.messageStartByteLength + messageLength));
+
+ } else if (chunk.length - this.messageStartByteLength === messageLength) {
+ // The initial chunk contains exactly one complete message. => Directly fire the `onMessage` event.
+ this.onMessageEmitter.fire(chunk.slice(this.messageStartByteLength));
+ } else {
+ // The initial chunk contains only part of the message content => Cache message data
+ this.cachedMessageData.chunks = [chunk.slice(this.messageStartByteLength)];
+ this.cachedMessageData.missingBytes = messageLength - chunk.byteLength + this.messageStartByteLength;
+ }
+ }
+
+ protected handleMessageContentChunk(chunk: Uint8Array): void {
+ if (this.cachedMessageData) {
+ if (chunk.byteLength < this.cachedMessageData.missingBytes) {
+ // The chunk only contains parts of the missing bytes for the cached message.
+ this.cachedMessageData.chunks.push(chunk);
+ this.cachedMessageData.missingBytes -= chunk.byteLength;
+ } else if (chunk.byteLength === this.cachedMessageData.missingBytes) {
+ // Chunk contains exactly the missing data for the cached message
+ this.cachedMessageData.chunks.push(chunk);
+ this.emitCachedMessage();
+ } else {
+ // Chunk contains missing data for the cached message + data for the next message
+ const messageEnd = this.cachedMessageData.missingBytes;
+ const missingData = chunk.slice(0, messageEnd);
+ this.cachedMessageData.chunks.push(missingData);
+ this.emitCachedMessage();
+ this.handleNewMessage(chunk.slice(messageEnd));
+ }
+
+ }
+ }
+
+ protected emitCachedMessage(): void {
+ const message = Buffer.concat(this.cachedMessageData.chunks);
+ this.onMessageEmitter.fire(message);
+ this.cachedMessageData.chunks = [];
+ this.cachedMessageData.missingBytes = 0;
+ }
+
+ /**
+ * Encodes the start of a new message into a {@link Uint8Array}.
+ * The message start consists of a identifier string and the length of the following message.
+ * @returns the buffer contains the encoded message start
+ */
+ protected encodeMessageStart(message: Uint8Array): Uint8Array {
+ const writer = new Uint8ArrayWriteBuffer()
+ .writeString(BinaryMessagePipe.MESSAGE_START_IDENTIFIER)
+ .writeUint32(message.length);
+ const messageStart = writer.getCurrentContents();
+ writer.dispose();
+ return messageStart;
+ }
+
+ protected get messageStartByteLength(): number {
+ // 4 bytes for length of id + id string length + 4 bytes for length of message
+ return 4 + BinaryMessagePipe.MESSAGE_START_IDENTIFIER.length + 4;
+ }
+
+ /**
+ * Reads the start of a new message from a stream chunk (or cached message) received from the underlying pipe.
+ * The message start is expected to consist of an identifier string and the length of the message.
+ * @param chunk The stream chunk.
+ * @returns The length of the message content to read.
+ * @throws An error if the message start can not be read successfully.
+ */
+ protected readMessageStart(chunk: Uint8Array): number {
+ const messageData = this.cachedMessageData.partialMessageStart ? Buffer.concat([this.cachedMessageData.partialMessageStart, chunk]) : chunk;
+ this.cachedMessageData.partialMessageStart = undefined;
+
+ const reader = new Uint8ArrayReadBuffer(messageData);
+ const identifier = reader.readString();
+
+ if (identifier !== BinaryMessagePipe.MESSAGE_START_IDENTIFIER) {
+ throw new Error(`Could not read message start. The start identifier should be '${BinaryMessagePipe.MESSAGE_START_IDENTIFIER}' but was '${identifier}`);
+ }
+ const length = reader.readUint32();
+ return length;
+ }
+
+ dispose(): void {
+ this.underlyingPipe.removeListener('data', this.dataHandler);
+ this.underlyingPipe.end();
+ this.onMessageEmitter.dispose();
+ this.cachedMessageData = {
+ chunks: [],
+ missingBytes: 0
+ };
+ }
+}
+
+interface StreamedMessageData {
+ chunks: Uint8Array[];
+ missingBytes: number;
+ partialMessageStart?: Uint8Array;
+}
diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts
index 0bac13bb163b8..d46d63efa9071 100644
--- a/packages/core/src/node/messaging/ipc-bootstrap.ts
+++ b/packages/core/src/node/messaging/ipc-bootstrap.ts
@@ -16,20 +16,12 @@
import 'reflect-metadata';
import { dynamicRequire } from '../dynamic-require';
-import { ConsoleLogger } from 'vscode-ws-jsonrpc/lib/logger';
-import { createMessageConnection, IPCMessageReader, IPCMessageWriter, Trace } from 'vscode-ws-jsonrpc';
+import { IPCChannel } from './ipc-channel';
import { checkParentAlive, IPCEntryPoint } from './ipc-protocol';
checkParentAlive();
const entryPoint = IPCEntryPoint.getScriptFromEnv();
-const reader = new IPCMessageReader(process);
-const writer = new IPCMessageWriter(process);
-const logger = new ConsoleLogger();
-const connection = createMessageConnection(reader, writer, logger);
-connection.trace(Trace.Off, {
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- log: (message: any, data?: string) => console.log(message, data)
-});
-dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(connection);
+dynamicRequire<{ default: IPCEntryPoint }>(entryPoint).default(new IPCChannel());
+
diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts
new file mode 100644
index 0000000000000..71fc65d774f08
--- /dev/null
+++ b/packages/core/src/node/messaging/ipc-channel.ts
@@ -0,0 +1,98 @@
+// *****************************************************************************
+// Copyright (C) 2022 STMicroelectronics and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+/* eslint-disable @typescript-eslint/no-explicit-any */
+
+import * as cp from 'child_process';
+import { Socket } from 'net';
+import { Duplex } from 'stream';
+import { Channel, ChannelCloseEvent, Disposable, DisposableCollection, Emitter, Event, MessageProvider, WriteBuffer } from '../../common';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer';
+import { BinaryMessagePipe } from './binary-message-pipe';
+
+/**
+ * A {@link Channel} to send messages between two processes using a dedicated pipe/fd for binary messages.
+ * This fd is opened as 5th channel in addition to the default stdios (stdin,stdout,stderr, ipc). This means the default channels
+ * are not blocked and can be used by the respective process for additional custom message handling.
+ */
+export class IPCChannel implements Channel {
+
+ protected readonly onCloseEmitter: Emitter = new Emitter();
+ get onClose(): Event {
+ return this.onCloseEmitter.event;
+ }
+
+ protected readonly onMessageEmitter: Emitter = new Emitter();
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
+ }
+
+ protected readonly onErrorEmitter: Emitter = new Emitter();
+ get onError(): Event {
+ return this.onErrorEmitter.event;
+ }
+
+ protected messagePipe: BinaryMessagePipe;
+ protected toDispose = new DisposableCollection();
+
+ protected ipcErrorListener: (error: Error) => void = error => this.onErrorEmitter.fire(error);
+
+ constructor(childProcess?: cp.ChildProcess) {
+ if (childProcess) {
+ this.setupChildProcess(childProcess);
+ } else {
+ this.setupProcess();
+ }
+ this.messagePipe.onMessage(message => {
+ console.log(`IPChannel: fire on message with ${message.length}`);
+ this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message));
+ });
+ this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]);
+ }
+
+ protected setupChildProcess(childProcess: cp.ChildProcess): void {
+ childProcess.once('exit', code => this.onCloseEmitter.fire({ reason: 'Child process has been terminated', code: code ?? undefined }));
+ this.messagePipe = new BinaryMessagePipe(childProcess.stdio[4] as Duplex);
+ childProcess.on('error', this.ipcErrorListener);
+ this.toDispose.push(Disposable.create(() => {
+ childProcess.removeListener('error', this.ipcErrorListener);
+ this.messagePipe.dispose();
+ }));
+ }
+
+ protected setupProcess(): void {
+ process.once('beforeExit', code => this.onCloseEmitter.fire({ reason: 'Process is about to be terminated', code }));
+ this.messagePipe = new BinaryMessagePipe(new Socket({ fd: 4 }));
+ process.on('uncaughtException', this.ipcErrorListener);
+ this.toDispose.push(Disposable.create(() => {
+ (process as NodeJS.EventEmitter).removeListener('uncaughtException', this.ipcErrorListener);
+ this.messagePipe.dispose();
+ }));
+ }
+
+ getWriteBuffer(): WriteBuffer {
+ const result = new Uint8ArrayWriteBuffer();
+ result.onCommit(buffer => {
+ this.messagePipe.send(buffer);
+ });
+
+ return result;
+ }
+
+ close(): void {
+ this.toDispose.dispose();
+ }
+
+}
diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts
index 84c256997257a..06f57836df6c9 100644
--- a/packages/core/src/node/messaging/ipc-connection-provider.ts
+++ b/packages/core/src/node/messaging/ipc-connection-provider.ts
@@ -15,10 +15,11 @@
// *****************************************************************************
import * as cp from 'child_process';
+import { inject, injectable } from 'inversify';
import * as path from 'path';
-import { injectable, inject } from 'inversify';
-import { Trace, Tracer, IPCMessageReader, IPCMessageWriter, createMessageConnection, MessageConnection, Message } from 'vscode-ws-jsonrpc';
-import { ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '../../common';
+import { createInterface } from 'readline';
+import { Channel, ConnectionErrorHandler, Disposable, DisposableCollection, ILogger } from '../../common';
+import { IPCChannel } from './ipc-channel';
import { createIpcEnv } from './ipc-protocol';
export interface ResolvedIPCConnectionOptions {
@@ -40,7 +41,7 @@ export class IPCConnectionProvider {
@inject(ILogger)
protected readonly logger: ILogger;
- listen(options: IPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable {
+ listen(options: IPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable {
return this.doListen({
logger: this.logger,
args: [],
@@ -48,19 +49,21 @@ export class IPCConnectionProvider {
}, acceptor);
}
- protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: MessageConnection) => void): Disposable {
+ protected doListen(options: ResolvedIPCConnectionOptions, acceptor: (connection: Channel) => void): Disposable {
const childProcess = this.fork(options);
- const connection = this.createConnection(childProcess, options);
+ const channel = new IPCChannel(childProcess);
const toStop = new DisposableCollection();
const toCancelStop = toStop.push(Disposable.create(() => childProcess.kill()));
const errorHandler = options.errorHandler;
if (errorHandler) {
- connection.onError((e: [Error, Message | undefined, number | undefined]) => {
- if (errorHandler.shouldStop(e[0], e[1], e[2])) {
+ let errorCount = 0;
+ channel.onError((err: Error) => {
+ errorCount++;
+ if (errorHandler.shouldStop(err, errorCount)) {
toStop.dispose();
}
});
- connection.onClose(() => {
+ channel.onClose(() => {
if (toStop.disposed) {
return;
}
@@ -70,36 +73,15 @@ export class IPCConnectionProvider {
}
});
}
- acceptor(connection);
+ acceptor(channel);
return toStop;
}
- protected createConnection(childProcess: cp.ChildProcess, options: ResolvedIPCConnectionOptions): MessageConnection {
- const reader = new IPCMessageReader(childProcess);
- const writer = new IPCMessageWriter(childProcess);
- const connection = createMessageConnection(reader, writer, {
- error: (message: string) => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${message}`),
- warn: (message: string) => this.logger.warn(`[${options.serverName}: ${childProcess.pid}] ${message}`),
- info: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`),
- log: (message: string) => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${message}`)
- });
- const tracer: Tracer = {
- log: (message: unknown, data?: string) => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] ${message}` + (typeof data === 'string' ? ' ' + data : ''))
- };
- connection.trace(Trace.Verbose, tracer);
- this.logger.isDebug().then(isDebug => {
- if (!isDebug) {
- connection.trace(Trace.Off, tracer);
- }
- });
- return connection;
- }
-
protected fork(options: ResolvedIPCConnectionOptions): cp.ChildProcess {
const forkOptions: cp.ForkOptions = {
- silent: true,
env: createIpcEnv(options),
- execArgv: []
+ execArgv: [],
+ stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'pipe']
};
const inspectArgPrefix = `--${options.serverName}-inspect`;
const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix));
@@ -108,8 +90,9 @@ export class IPCConnectionProvider {
}
const childProcess = cp.fork(path.join(__dirname, 'ipc-bootstrap'), options.args, forkOptions);
- childProcess.stdout!.on('data', data => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`));
- childProcess.stderr!.on('data', data => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${data.toString().trim()}`));
+
+ createInterface(childProcess.stdout!).on('line', line => this.logger.info(`[${options.serverName}: ${childProcess.pid}] ${line}`));
+ createInterface(childProcess.stderr!).on('line', line => this.logger.error(`[${options.serverName}: ${childProcess.pid}] ${line}`));
this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC started`);
childProcess.once('exit', () => this.logger.debug(`[${options.serverName}: ${childProcess.pid}] IPC exited`));
diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts
index de9a77394b03e..03aa3944521c3 100644
--- a/packages/core/src/node/messaging/ipc-protocol.ts
+++ b/packages/core/src/node/messaging/ipc-protocol.ts
@@ -15,14 +15,14 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import { MessageConnection } from 'vscode-ws-jsonrpc';
+import { Channel } from '../../common/message-rpc/channel';
const THEIA_PARENT_PID = 'THEIA_PARENT_PID';
const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT';
export const ipcEntryPoint: string | undefined = process.env[THEIA_ENTRY_POINT];
-export type IPCEntryPoint = (connection: MessageConnection) => void;
+export type IPCEntryPoint = (connection: Channel) => void;
export namespace IPCEntryPoint {
/**
* Throws if `THEIA_ENTRY_POINT` is undefined or empty.
diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts
index 2ee8764854780..20dea50a483e4 100644
--- a/packages/core/src/node/messaging/messaging-contribution.ts
+++ b/packages/core/src/node/messaging/messaging-contribution.ts
@@ -18,19 +18,15 @@ import * as http from 'http';
import * as https from 'https';
import { Server, Socket } from 'socket.io';
import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify';
-import { MessageConnection } from 'vscode-ws-jsonrpc';
-import { createWebSocketConnection } from 'vscode-ws-jsonrpc/lib/socket/connection';
-import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection';
-import * as launch from 'vscode-ws-jsonrpc/lib/server/launch';
import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
+import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { BackendApplicationContribution } from '../backend-application';
-import { MessagingService, WebSocketChannelConnection } from './messaging-service';
-import { ConsoleLogger } from './logger';
+import { MessagingService } from './messaging-service';
import { ConnectionContainerModule } from './connection-container-module';
import Route = require('route-parser');
import { WsRequestValidator } from '../ws-request-validators';
import { MessagingListener } from './messaging-listeners';
+import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel';
export const MessagingContainer = Symbol('MessagingContainer');
@@ -53,7 +49,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me
protected readonly messagingListener: MessagingListener;
protected readonly wsHandlers = new MessagingContribution.ConnectionHandlers();
- protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers();
+ protected readonly channelHandlers = new MessagingContribution.ConnectionHandlers();
@postConstruct()
protected init(): void {
@@ -63,21 +59,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me
}
}
- listen(spec: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void {
- this.wsChannel(spec, (params, channel) => {
- const connection = createWebSocketConnection(channel, new ConsoleLogger());
- callback(params, connection);
- });
- }
-
- forward(spec: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void {
- this.wsChannel(spec, (params, channel) => {
- const connection = launch.createWebSocketConnection(channel);
- callback(params, WebSocketChannelConnection.create(connection, channel));
- });
- }
-
- wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: WebSocketChannel) => void): void {
+ wsChannel(spec: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void {
this.channelHandlers.push(spec, (params, channel) => callback(params, channel));
}
@@ -125,49 +107,31 @@ export class MessagingContribution implements BackendApplicationContribution, Me
}
protected handleChannels(socket: Socket): void {
+ const socketChannel = new WebSocketChannel(this.toIWebSocket(socket));
+ const mulitplexer = new ChannelMultiplexer(socketChannel);
const channelHandlers = this.getConnectionChannelHandlers(socket);
- const channels = new Map();
- socket.on('message', data => {
- try {
- const message: WebSocketChannel.Message = JSON.parse(data.toString());
- if (message.kind === 'open') {
- const { id, path } = message;
- const channel = this.createChannel(id, socket);
- if (channelHandlers.route(path, channel)) {
- channel.ready();
- console.debug(`Opening channel for service path '${path}'. [ID: ${id}]`);
- channels.set(id, channel);
- channel.onClose(() => {
- console.debug(`Closing channel on service path '${path}'. [ID: ${id}]`);
- channels.delete(id);
- });
- } else {
- console.error('Cannot find a service for the path: ' + path);
- }
- } else {
- const { id } = message;
- const channel = channels.get(id);
- if (channel) {
- channel.handleMessage(message);
- } else {
- console.error('The ws channel does not exist', id);
- }
- }
- } catch (error) {
- console.error('Failed to handle message', { error, data });
+ mulitplexer.onDidOpenChannel(event => {
+ if (channelHandlers.route(event.id, event.channel)) {
+ console.debug(`Opening channel for service path '${event.id}'.`);
+ event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`));
}
});
- socket.on('error', err => {
- for (const channel of channels.values()) {
- channel.fireError(err);
- }
- });
- socket.on('disconnect', reason => {
- for (const channel of channels.values()) {
- channel.close(undefined, reason);
- }
- channels.clear();
- });
+ }
+
+ protected toIWebSocket(socket: Socket): IWebSocket {
+ return {
+ close: () => {
+ socket.removeAllListeners('disconnect');
+ socket.removeAllListeners('error');
+ socket.removeAllListeners('message');
+ socket.disconnect();
+ },
+ isConnected: () => socket.connected,
+ onClose: cb => socket.on('disconnect', reason => cb(reason)),
+ onError: cb => socket.on('error', error => cb(error)),
+ onMessage: cb => socket.on('message', data => cb(data)),
+ send: message => socket.emit('message', message)
+ };
}
protected createSocketContainer(socket: Socket): Container {
@@ -176,7 +140,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me
return connectionContainer;
}
- protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers {
+ protected getConnectionChannelHandlers(socket: Socket): MessagingContribution.ConnectionHandlers {
const connectionContainer = this.createSocketContainer(socket);
bindContributionProvider(connectionContainer, ConnectionHandler);
connectionContainer.load(...this.connectionModules.getContributions());
@@ -184,21 +148,12 @@ export class MessagingContribution implements BackendApplicationContribution, Me
const connectionHandlers = connectionContainer.getNamed>(ContributionProvider, ConnectionHandler);
for (const connectionHandler of connectionHandlers.getContributions(true)) {
connectionChannelHandlers.push(connectionHandler.path, (_, channel) => {
- const connection = createWebSocketConnection(channel, new ConsoleLogger());
- connectionHandler.onConnection(connection);
+ connectionHandler.onConnection(channel);
});
}
return connectionChannelHandlers;
}
- protected createChannel(id: number, socket: Socket): WebSocketChannel {
- return new WebSocketChannel(id, content => {
- if (socket.connected) {
- socket.send(content);
- }
- });
- }
-
}
export namespace MessagingContribution {
diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts
index 087f6d5850def..276b58734bcff 100644
--- a/packages/core/src/node/messaging/messaging-service.ts
+++ b/packages/core/src/node/messaging/messaging-service.ts
@@ -15,33 +15,21 @@
// *****************************************************************************
import { Socket } from 'socket.io';
-import { MessageConnection } from 'vscode-ws-jsonrpc';
-import { IConnection } from 'vscode-ws-jsonrpc/lib/server/connection';
-import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
+import { Channel } from '../../common/message-rpc/channel';
export interface MessagingService {
- /**
- * Accept a JSON-RPC connection on the given path.
- * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
- */
- listen(path: string, callback: (params: MessagingService.PathParams, connection: MessageConnection) => void): void;
- /**
- * Accept a raw JSON-RPC connection on the given path.
- * A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
- */
- forward(path: string, callback: (params: MessagingService.PathParams, connection: IConnection) => void): void;
/**
* Accept a web socket channel on the given path.
* A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
*/
- wsChannel(path: string, callback: (params: MessagingService.PathParams, socket: WebSocketChannel) => void): void;
+ wsChannel(path: string, callback: (params: MessagingService.PathParams, channel: Channel) => void): void;
/**
* Accept a web socket connection on the given path.
* A path supports the route syntax: https://github.com/rcs/route-parser#what-can-i-use-in-my-routes.
*
* #### Important
- * Prefer JSON-RPC connections or web socket channels over web sockets. Clients can handle only limited amount of web sockets
- * and excessive amount can cause performance degradation. All JSON-RPC connections and web socket channels share the single web socket connection.
+ * Prefer using web socket channels over establishing new web socket connection. Clients can handle only limited amount of web sockets
+ * and excessive amount can cause performance degradation. All web socket channels share a single web socket connection.
*/
ws(path: string, callback: (params: MessagingService.PathParams, socket: Socket) => void): void;
}
@@ -56,18 +44,3 @@ export namespace MessagingService {
configure(service: MessagingService): void;
}
}
-
-export interface WebSocketChannelConnection extends IConnection {
- channel: WebSocketChannel;
-}
-export namespace WebSocketChannelConnection {
- export function is(connection: IConnection): connection is WebSocketChannelConnection {
- return (connection as WebSocketChannelConnection).channel instanceof WebSocketChannel;
- }
-
- export function create(connection: IConnection, channel: WebSocketChannel): WebSocketChannelConnection {
- const result = connection as WebSocketChannelConnection;
- result.channel = channel;
- return result;
- }
-}
diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts
index 2fbb17c9aa8ec..0ef0c50186cee 100644
--- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts
+++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts
@@ -16,32 +16,41 @@
import * as http from 'http';
import * as https from 'https';
-import { WebSocketChannel } from '../../../common/messaging/web-socket-channel';
-import { Disposable } from '../../../common/disposable';
import { AddressInfo } from 'net';
-import { io } from 'socket.io-client';
+import { io, Socket } from 'socket.io-client';
+import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel';
+import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel';
-export class TestWebSocketChannel extends WebSocketChannel {
+export class TestWebSocketChannelSetup {
+ public readonly multiplexer: ChannelMultiplexer;
+ public readonly channel: Channel;
constructor({ server, path }: {
server: http.Server | https.Server,
path: string
}) {
- super(0, content => socket.send(content));
const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`);
- socket.on('error', error =>
- this.fireError(error)
- );
- socket.on('disconnect', reason =>
- this.fireClose(0, reason)
- );
- socket.on('message', data => {
- this.handleMessage(JSON.parse(data.toString()));
+ this.channel = new WebSocketChannel(toIWebSocket(socket));
+ this.multiplexer = new ChannelMultiplexer(this.channel);
+ socket.on('connect', () => {
+ this.multiplexer.open(path);
});
- socket.on('connect', () =>
- this.open(path)
- );
- this.toDispose.push(Disposable.create(() => socket.close()));
+ socket.connect();
}
+}
+function toIWebSocket(socket: Socket): IWebSocket {
+ return {
+ close: () => {
+ socket.removeAllListeners('disconnect');
+ socket.removeAllListeners('error');
+ socket.removeAllListeners('message');
+ socket.close();
+ },
+ isConnected: () => socket.connected,
+ onClose: cb => socket.on('disconnect', reason => cb(reason)),
+ onError: cb => socket.on('error', reason => cb(reason)),
+ onMessage: cb => socket.on('message', data => cb(data)),
+ send: message => socket.emit('message', message)
+ };
}
diff --git a/packages/debug/src/browser/debug-session-connection.ts b/packages/debug/src/browser/debug-session-connection.ts
index 4ef9db7818a74..f309094ed8cbe 100644
--- a/packages/debug/src/browser/debug-session-connection.ts
+++ b/packages/debug/src/browser/debug-session-connection.ts
@@ -18,13 +18,9 @@
import { DebugProtocol } from 'vscode-debugprotocol';
import { Deferred } from '@theia/core/lib/common/promise-util';
-import { Event, Emitter, DisposableCollection, Disposable, MaybePromise } from '@theia/core';
+import { Event, Emitter, DisposableCollection, Disposable, MaybePromise, Channel } from '@theia/core';
import { OutputChannel } from '@theia/output/lib/browser/output-channel';
-import { Channel } from '../common/debug-service';
-
-export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise;
-
export interface DebugRequestTypes {
'attach': [DebugProtocol.AttachRequestArguments, DebugProtocol.AttachResponse]
'breakpointLocations': [DebugProtocol.BreakpointLocationsArguments, DebugProtocol.BreakpointLocationsResponse]
@@ -116,6 +112,8 @@ const standardDebugEvents = new Set([
'thread'
]);
+export type DebugRequestHandler = (request: DebugProtocol.Request) => MaybePromise;
+
export class DebugSessionConnection implements Disposable {
private sequence = 1;
@@ -168,7 +166,7 @@ export class DebugSessionConnection implements Disposable {
this.cancelPendingRequests();
this.onDidCloseEmitter.fire();
});
- connection.onMessage(data => this.handleMessage(data));
+ connection.onMessage(data => this.handleMessage(data().readString()));
return connection;
}
@@ -247,7 +245,7 @@ export class DebugSessionConnection implements Disposable {
const dateStr = `${now.toLocaleString(undefined, { hour12: false })}.${now.getMilliseconds()}`;
this.traceOutputChannel.appendLine(`${this.sessionId.substring(0, 8)} ${dateStr} theia -> adapter: ${JSON.stringify(message, undefined, 4)}`);
}
- connection.send(messageStr);
+ connection.getWriteBuffer().writeString(messageStr).commit();
}
protected handleMessage(data: string): void {
diff --git a/packages/debug/src/browser/debug-session-contribution.ts b/packages/debug/src/browser/debug-session-contribution.ts
index cf8b5e7424cc0..a14db324cdf81 100644
--- a/packages/debug/src/browser/debug-session-contribution.ts
+++ b/packages/debug/src/browser/debug-session-contribution.ts
@@ -26,10 +26,11 @@ import { DebugSessionOptions } from './debug-session-options';
import { OutputChannelManager, OutputChannel } from '@theia/output/lib/browser/output-channel';
import { DebugPreferences } from './debug-preferences';
import { DebugSessionConnection } from './debug-session-connection';
-import { Channel, DebugAdapterPath } from '../common/debug-service';
+import { DebugAdapterPath } from '../common/debug-service';
import { ContributionProvider } from '@theia/core/lib/common/contribution-provider';
import { FileService } from '@theia/filesystem/lib/browser/file-service';
import { DebugContribution } from './debug-contribution';
+import { Channel } from '@theia/core/lib/common/message-rpc/channel';
import { WorkspaceService } from '@theia/workspace/lib/browser';
/**
diff --git a/packages/debug/src/node/debug-adapter-session.ts b/packages/debug/src/node/debug-adapter-session.ts
index 03ff950d38a90..b47e552ae6344 100644
--- a/packages/debug/src/node/debug-adapter-session.ts
+++ b/packages/debug/src/node/debug-adapter-session.ts
@@ -26,7 +26,7 @@ import {
DebugAdapterSession
} from './debug-model';
import { DebugProtocol } from 'vscode-debugprotocol';
-import { Channel } from '../common/debug-service';
+import { Channel } from '@theia/core';
/**
* [DebugAdapterSession](#DebugAdapterSession) implementation.
@@ -53,7 +53,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession {
throw new Error('The session has already been started, id: ' + this.id);
}
this.channel = channel;
- this.channel.onMessage((message: string) => this.write(message));
+ this.channel.onMessage(message => this.write(message().readString()));
this.channel.onClose(() => this.channel = undefined);
}
@@ -80,7 +80,7 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession {
protected send(message: string): void {
if (this.channel) {
- this.channel.send(message);
+ this.channel.getWriteBuffer().writeString(message);
}
}
diff --git a/packages/debug/src/node/debug-model.ts b/packages/debug/src/node/debug-model.ts
index a39352fabbddf..3e6de9ce68fc6 100644
--- a/packages/debug/src/node/debug-model.ts
+++ b/packages/debug/src/node/debug-model.ts
@@ -25,8 +25,7 @@
import { DebugConfiguration } from '../common/debug-configuration';
import { IJSONSchema, IJSONSchemaSnippet } from '@theia/core/lib/common/json-schema';
import { MaybePromise } from '@theia/core/lib/common/types';
-import { Event } from '@theia/core/lib/common/event';
-import { Channel } from '../common/debug-service';
+import { Channel, Event } from '@theia/core';
// FIXME: break down this file to debug adapter and debug adapter contribution (see Theia file naming conventions)
diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts
index 95ee67c57d8de..63f789b9da2a3 100644
--- a/packages/filesystem/src/common/files.ts
+++ b/packages/filesystem/src/common/files.ts
@@ -846,7 +846,7 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p
*/
export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider {
/**
- * Read the contents of the given file as stream.
+ * Read the contents of the given file as stream.
* @param resource The `URI` of the file.
*
* @return The `ReadableStreamEvents` for the readable stream of the given file.
diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts
index 5edb5dbbad9e7..f67e198db75f7 100644
--- a/packages/filesystem/src/common/remote-file-system-provider.ts
+++ b/packages/filesystem/src/common/remote-file-system-provider.ts
@@ -42,11 +42,11 @@ export interface RemoteFileSystemServer extends JsonRpcServer;
open(resource: string, opts: FileOpenOptions): Promise;
close(fd: number): Promise;
- read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>;
+ read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }>;
readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise;
- readFile(resource: string): Promise;
- write(fd: number, pos: number, data: number[], offset: number, length: number): Promise;
- writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise;
+ readFile(resource: string): Promise;
+ write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise;
+ writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise;
delete(resource: string, opts: FileDeleteOptions): Promise;
mkdir(resource: string): Promise;
readdir(resource: string): Promise<[string, FileType][]>;
@@ -70,7 +70,7 @@ export interface RemoteFileSystemClient {
notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void;
notifyFileWatchError(): void;
notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void;
- onFileStreamData(handle: number, data: number[]): void;
+ onFileStreamData(handle: number, data: Uint8Array): void;
onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void;
}
@@ -169,7 +169,7 @@ export class RemoteFileSystemProvider implements Required, D
this.onFileWatchErrorEmitter.fire();
},
notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities),
- onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]),
+ onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, data]),
onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error])
});
const onInitialized = this.server.onDidOpenConnection(() => {
@@ -224,7 +224,7 @@ export class RemoteFileSystemProvider implements Required, D
async readFile(resource: URI): Promise {
const bytes = await this.server.readFile(resource.toString());
- return Uint8Array.from(bytes);
+ return bytes;
}
readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents {
@@ -264,11 +264,11 @@ export class RemoteFileSystemProvider implements Required, D
}
write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise {
- return this.server.write(fd, pos, [...data.values()], offset, length);
+ return this.server.write(fd, pos, data, offset, length);
}
writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise {
- return this.server.writeFile(resource.toString(), [...content.values()], opts);
+ return this.server.writeFile(resource.toString(), content, opts);
}
delete(resource: URI, opts: FileDeleteOptions): Promise {
@@ -412,34 +412,33 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
throw new Error('not supported');
}
- async read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }> {
+ async read(fd: number, pos: number, length: number): Promise<{ bytes: Uint8Array; bytesRead: number; }> {
if (hasOpenReadWriteCloseCapability(this.provider)) {
const buffer = BinaryBuffer.alloc(this.BUFFER_SIZE);
const bytes = buffer.buffer;
const bytesRead = await this.provider.read(fd, pos, bytes, 0, length);
- return { bytes: [...bytes.values()], bytesRead };
+ return { bytes, bytesRead };
}
throw new Error('not supported');
}
- write(fd: number, pos: number, data: number[], offset: number, length: number): Promise {
+ write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise {
if (hasOpenReadWriteCloseCapability(this.provider)) {
- return this.provider.write(fd, pos, Uint8Array.from(data), offset, length);
+ return this.provider.write(fd, pos, data, offset, length);
}
throw new Error('not supported');
}
- async readFile(resource: string): Promise {
+ async readFile(resource: string): Promise {
if (hasReadWriteCapability(this.provider)) {
- const buffer = await this.provider.readFile(new URI(resource));
- return [...buffer.values()];
+ return this.provider.readFile(new URI(resource));
}
throw new Error('not supported');
}
- writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise {
+ writeFile(resource: string, content: Uint8Array, opts: FileWriteOptions): Promise {
if (hasReadWriteCapability(this.provider)) {
- return this.provider.writeFile(new URI(resource), Uint8Array.from(content), opts);
+ return this.provider.writeFile(new URI(resource), content, opts);
}
throw new Error('not supported');
}
@@ -497,7 +496,7 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
if (hasFileReadStreamCapability(this.provider)) {
const handle = this.readFileStreamSeq++;
const stream = this.provider.readFileStream(new URI(resource), opts, token);
- stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()]));
+ stream.on('data', data => this.client?.onFileStreamData(handle, data));
stream.on('error', error => {
const code = error instanceof FileSystemProviderError ? error.code : undefined;
const { name, message, stack } = error;
diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts
index 48ae3adb36363..ee8e43ea7c639 100644
--- a/packages/plugin-ext/src/common/connection.ts
+++ b/packages/plugin-ext/src/common/connection.ts
@@ -13,27 +13,38 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import { Channel } from '@theia/debug/lib/common/debug-service';
import { ConnectionExt, ConnectionMain } from './plugin-api-rpc';
-import { Emitter } from '@theia/core/lib/common/event';
+import { Emitter, Event } from '@theia/core/lib/common/event';
+import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
+import { WriteBuffer, Channel } from '@theia/core';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
/**
* A channel communicating with a counterpart in a plugin host.
*/
export class PluginChannel implements Channel {
- private messageEmitter: Emitter = new Emitter();
+ private messageEmitter: Emitter = new Emitter();
private errorEmitter: Emitter = new Emitter();
- private closedEmitter: Emitter = new Emitter();
+ private closedEmitter: Emitter = new Emitter();
constructor(
- protected readonly id: string,
+ readonly id: string,
protected readonly connection: ConnectionExt | ConnectionMain) { }
+ getWriteBuffer(): WriteBuffer {
+ const result = new Uint8ArrayWriteBuffer();
+ result.onCommit(buffer => {
+ this.connection.$sendMessage(this.id, new Uint8ArrayReadBuffer(buffer).readString());
+ });
+
+ return result;
+ }
+
send(content: string): void {
this.connection.$sendMessage(this.id, content);
}
- fireMessageReceived(msg: string): void {
+ fireMessageReceived(msg: MessageProvider): void {
this.messageEmitter.fire(msg);
}
@@ -42,21 +53,19 @@ export class PluginChannel implements Channel {
}
fireClosed(): void {
- this.closedEmitter.fire();
+ this.closedEmitter.fire({ reason: 'Plugin channel has been closed from the extension side' });
}
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- onMessage(cb: (data: any) => void): void {
- this.messageEmitter.event(cb);
+ get onMessage(): Event {
+ return this.messageEmitter.event;
}
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- onError(cb: (reason: any) => void): void {
- this.errorEmitter.event(cb);
+ get onError(): Event {
+ return this.errorEmitter.event;
}
- onClose(cb: (code: number, reason: string) => void): void {
- this.closedEmitter.event(() => cb(-1, 'closed'));
+ get onClose(): Event {
+ return this.closedEmitter.event;
}
close(): void {
@@ -80,7 +89,10 @@ export class ConnectionImpl implements ConnectionMain, ConnectionExt {
*/
async $sendMessage(id: string, message: string): Promise {
if (this.connections.has(id)) {
- this.connections.get(id)!.fireMessageReceived(message);
+ const writer = new Uint8ArrayWriteBuffer().writeString(message);
+ const reader = new Uint8ArrayReadBuffer(writer.getCurrentContents());
+ writer.dispose();
+ this.connections.get(id)!.fireMessageReceived(() => reader);
} else {
console.warn(`Received message for unknown connection: ${id}`);
}
diff --git a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts
index b16892dccc727..176fdb246a693 100644
--- a/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts
+++ b/packages/plugin-ext/src/main/browser/debug/plugin-debug-session-factory.ts
@@ -30,7 +30,7 @@ import { TerminalOptionsExt } from '../../../common/plugin-api-rpc';
import { FileService } from '@theia/filesystem/lib/browser/file-service';
import { DebugContribution } from '@theia/debug/lib/browser/debug-contribution';
import { ContributionProvider } from '@theia/core/lib/common/contribution-provider';
-import { Channel } from '@theia/debug/lib/common/debug-service';
+import { Channel } from '@theia/core/lib/common/message-rpc/channel';
import { WorkspaceService } from '@theia/workspace/lib/browser';
export class PluginDebugSession extends DebugSession {
diff --git a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts
index b05859750198d..b34a73ae201be 100644
--- a/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts
+++ b/packages/plugin-ext/src/plugin/node/debug/plugin-debug-adapter-session.ts
@@ -17,7 +17,7 @@
import { DebugAdapterSessionImpl } from '@theia/debug/lib/node/debug-adapter-session';
import * as theia from '@theia/plugin';
import { DebugAdapter } from '@theia/debug/lib/node/debug-model';
-import { Channel } from '@theia/debug/lib/common/debug-service';
+import { Channel } from '@theia/core/lib/common/message-rpc/channel';
/* eslint-disable @typescript-eslint/no-explicit-any */
diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts
index fbe968348d9d2..7e63ddd14927c 100644
--- a/packages/task/src/node/task-server.slow-spec.ts
+++ b/packages/task/src/node/task-server.slow-spec.ts
@@ -28,9 +28,10 @@ import { isWindows, isOSX } from '@theia/core/lib/common/os';
import { FileUri } from '@theia/core/lib/node';
import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol';
import { expectThrowsAsync } from '@theia/core/lib/common/test/expect';
-import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel';
+import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel';
import { expect } from 'chai';
import URI from '@theia/core/lib/common/uri';
+import { RpcProtocol } from '@theia/core';
// test scripts that we bundle with tasks
const commandShortRunning = './task';
@@ -106,26 +107,38 @@ describe('Task server / back-end', function (): void {
// hook-up to terminal's ws and confirm that it outputs expected tasks' output
await new Promise((resolve, reject) => {
- const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` });
- channel.onError(reject);
- channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`)));
- channel.onMessage(msg => {
- // check output of task on terminal is what we expect
- const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`;
- // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected.
- // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n`
- const currentMessage = msg.toString();
- messages.unshift(currentMessage);
- if (currentMessage.indexOf(expected) !== -1) {
- resolve();
- channel.close();
- return;
- }
- if (messages.length >= messagesToWaitFor) {
- reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`));
- channel.close();
- }
+ const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` });
+ setup.multiplexer.onDidOpenChannel(event => {
+ const channel = event.channel;
+ const connection = new RpcProtocol(channel, async (method, args) => {
+ const error = new Error(`Received unexpected request: ${method} with args: ${args} `);
+ reject(error);
+ throw error;
+ });
+ channel.onError(reject);
+ channel.onClose(() => reject(new Error('Channel has been closed')));
+ connection.onNotification(not => {
+ // check output of task on terminal is what we expect
+ const expected = `${isOSX ? 'tasking osx' : 'tasking'}... ${someString}`;
+ // Instead of waiting for one message from the terminal, we wait for several ones as the very first message can be something unexpected.
+ // For instance: `nvm is not compatible with the \"PREFIX\" environment variable: currently set to \"/usr/local\"\r\n`
+ const currentMessage = not.args[0];
+ messages.unshift(currentMessage);
+ if (currentMessage.indexOf(expected) !== -1) {
+ resolve();
+ channel.close();
+ return;
+ }
+ if (messages.length >= messagesToWaitFor) {
+ reject(new Error(`expected sub-string not found in terminal output. Expected: "${expected}" vs Actual messages: ${JSON.stringify(messages)}`));
+ channel.close();
+ }
+ });
+ channel.onMessage(reader => {
+
+ });
});
+
});
});
diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts
index 311b9122e3659..040831c26c88c 100644
--- a/packages/terminal/src/browser/terminal-widget-impl.ts
+++ b/packages/terminal/src/browser/terminal-widget-impl.ts
@@ -17,7 +17,7 @@
import { Terminal, RendererType } from 'xterm';
import { FitAddon } from 'xterm-addon-fit';
import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify';
-import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection } from '@theia/core';
+import { ContributionProvider, Disposable, Event, Emitter, ILogger, DisposableCollection, RpcProtocol, RequestHandler } from '@theia/core';
import { Widget, Message, WebSocketConnectionProvider, StatefulWidget, isFirefox, MessageLoop, KeyCode, codicon } from '@theia/core/lib/browser';
import { isOSX } from '@theia/core/lib/common';
import { WorkspaceService } from '@theia/workspace/lib/browser';
@@ -26,7 +26,6 @@ import { terminalsPath } from '../common/terminal-protocol';
import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol';
import { TerminalWatcher } from '../common/terminal-watcher';
import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions } from './base/terminal-widget';
-import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc';
import { Deferred } from '@theia/core/lib/common/promise-util';
import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences';
import { TerminalContribution } from './terminal-contribution';
@@ -58,7 +57,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget
protected searchBox: TerminalSearchWidget;
protected restored = false;
protected closeOnDispose = true;
- protected waitForConnection: Deferred | undefined;
+ protected waitForConnection: Deferred | undefined;
protected hoverMessage: HTMLDivElement;
protected lastTouchEnd: TouchEvent | undefined;
protected isAttachedCloseListener: boolean = false;
@@ -507,16 +506,23 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget
}
this.toDisposeOnConnect.dispose();
this.toDispose.push(this.toDisposeOnConnect);
- const waitForConnection = this.waitForConnection = new Deferred();
+ const waitForConnection = this.waitForConnection = new Deferred();
this.webSocketConnectionProvider.listen({
path: `${terminalsPath}/${this.terminalId}`,
onConnection: connection => {
- connection.onNotification('onData', (data: string) => this.write(data));
+ const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process');
+
+ const rpc = new RpcProtocol(connection, requestHandler);
+ rpc.onNotification(event => {
+ if (event.method === 'onData') {
+ this.write(event.args[0]);
+ }
+ });
// Excludes the device status code emitted by Xterm.js
const sendData = (data?: string) => {
if (data && !this.deviceStatusCodes.has(data) && !this.disableEnterWhenAttachCloseListener()) {
- return connection.sendRequest('write', data);
+ return rpc.sendRequest('write', [data]);
}
};
@@ -524,12 +530,10 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget
disposable.push(this.term.onData(sendData));
disposable.push(this.term.onBinary(sendData));
- connection.onDispose(() => disposable.dispose());
+ connection.onClose(() => disposable.dispose());
- this.toDisposeOnConnect.push(connection);
- connection.listen();
if (waitForConnection) {
- waitForConnection.resolve(connection);
+ waitForConnection.resolve(rpc);
}
}
}, { reconnecting: false });
@@ -579,7 +583,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget
sendText(text: string): void {
if (this.waitForConnection) {
this.waitForConnection.promise.then(connection =>
- connection.sendRequest('write', text)
+ connection.sendRequest('write', [text])
);
}
}
diff --git a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts
index 6d39ddd973f20..aa4a54e72deff 100644
--- a/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts
+++ b/packages/terminal/src/node/terminal-backend-contribution.slow-spec.ts
@@ -20,7 +20,7 @@ import { IShellTerminalServer } from '../common/shell-terminal-protocol';
import * as http from 'http';
import * as https from 'https';
import { terminalsPath } from '../common/terminal-protocol';
-import { TestWebSocketChannel } from '@theia/core/lib/node/messaging/test/test-web-socket-channel';
+import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel';
describe('Terminal Backend Contribution', function (): void {
@@ -45,13 +45,19 @@ describe('Terminal Backend Contribution', function (): void {
it('is data received from the terminal ws server', async () => {
const terminalId = await shellTerminalServer.create({});
await new Promise((resolve, reject) => {
- const channel = new TestWebSocketChannel({ server, path: `${terminalsPath}/${terminalId}` });
+ const path = `${terminalsPath}/${terminalId}`;
+ const { channel, multiplexer } = new TestWebSocketChannelSetup({ server, path });
channel.onError(reject);
- channel.onClose((code, reason) => reject(new Error(`channel is closed with '${code}' code and '${reason}' reason`)));
- channel.onOpen(() => {
- resolve();
- channel.close();
+ channel.onClose(event => reject(new Error(`channel is closed with '${event.code}' code and '${event.reason}' reason}`)));
+
+ multiplexer.onDidOpenChannel(event => {
+ if (event.id === path) {
+ resolve();
+ channel.close();
+ }
});
+
});
});
+
});
diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts
index 4675b7a32290c..dea4504e0ffea 100644
--- a/packages/terminal/src/node/terminal-backend-contribution.ts
+++ b/packages/terminal/src/node/terminal-backend-contribution.ts
@@ -15,10 +15,11 @@
// *****************************************************************************
import { injectable, inject, named } from '@theia/core/shared/inversify';
-import { ILogger } from '@theia/core/lib/common';
+import { ILogger, RequestHandler } from '@theia/core/lib/common';
import { TerminalProcess, ProcessManager } from '@theia/process/lib/node';
import { terminalsPath } from '../common/terminal-protocol';
import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service';
+import { RpcProtocol } from '@theia/core/';
@injectable()
export class TerminalBackendContribution implements MessagingService.Contribution {
@@ -30,19 +31,27 @@ export class TerminalBackendContribution implements MessagingService.Contributio
protected readonly logger: ILogger;
configure(service: MessagingService): void {
- service.listen(`${terminalsPath}/:id`, (params: { id: string }, connection) => {
+ service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => {
const id = parseInt(params.id, 10);
const termProcess = this.processManager.get(id);
if (termProcess instanceof TerminalProcess) {
const output = termProcess.createOutputStream();
- output.on('data', data => connection.sendNotification('onData', data.toString()));
- connection.onRequest('write', (data: string) => termProcess.write(data));
- connection.onClose(() => output.dispose());
- connection.listen();
- } else {
- connection.dispose();
+ // Create a RPC connection to the terminal process
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ const requestHandler: RequestHandler = async (method: string, args: any[]) => {
+ if (method === 'write' && args[0]) {
+ termProcess.write(args[0]);
+ } else {
+ this.logger.warn('Terminal process received a request with an unsupported method or argument', { method, args });
+ }
+ };
+
+ const rpc = new RpcProtocol(channel, requestHandler);
+ output.on('data', data => {
+ rpc.sendNotification('onData', [data]);
+ });
+ channel.onClose(() => output.dispose());
}
});
}
-
}
diff --git a/yarn.lock b/yarn.lock
index 90f9595550e63..6e26c479ec832 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -2184,6 +2184,13 @@
resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8"
integrity sha512-6ckxMjBBD8URvjB6J3NcnuAn5Pkl7t3TizAg+xdlzzQGSPSmBcXf8KoIH0ua/i+tio+ZRUHEXp0HEmvaR4kt0w==
+"@types/chai-spies@1.0.3":
+ version "1.0.3"
+ resolved "https://registry.yarnpkg.com/@types%2fchai-spies/-/chai-spies-1.0.3.tgz#a52dc61af3853ec9b80965040811d15dfd401542"
+ integrity sha512-RBZjhVuK7vrg4rWMt04UF5zHYwfHnpk5mIWu3nQvU3AKGDixXzSjZ6v0zke6pBcaJqMv3IBZ5ibLWPMRDL0sLw==
+ dependencies:
+ "@types/chai" "*"
+
"@types/chai-string@^1.4.0":
version "1.4.2"
resolved "https://registry.yarnpkg.com/@types/chai-string/-/chai-string-1.4.2.tgz#0f116504a666b6c6a3c42becf86634316c9a19ac"
@@ -2196,6 +2203,11 @@
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.1.tgz#e2c6e73e0bdeb2521d00756d099218e9f5d90a04"
integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ==
+"@types/chai@4.3.0":
+ version "4.3.0"
+ resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.3.0.tgz#23509ebc1fa32f1b4d50d6a66c4032d5b8eaabdc"
+ integrity sha512-/ceqdqeRraGolFTcfoXNiqjyQhZzbINDngeoAq9GoHa8PPK1yNzTaxWjA6BFWp5Ua9JpXEMSS4s5i9tS0hOJtw==
+
"@types/component-emitter@^1.2.10":
version "1.2.11"
resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506"
@@ -3907,11 +3919,28 @@ caseless@~0.12.0:
resolved "https://registry.yarnpkg.com/caseless/-/caseless-0.12.0.tgz#1b681c21ff84033c826543090689420d187151dc"
integrity sha1-G2gcIf+EAzyCZUMJBolCDRhxUdw=
+chai-spies@1.0.0:
+ version "1.0.0"
+ resolved "https://registry.yarnpkg.com/chai-spies/-/chai-spies-1.0.0.tgz#d16b39336fb316d03abf8c375feb23c0c8bb163d"
+ integrity sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg==
+
chai-string@^1.4.0:
version "1.5.0"
resolved "https://registry.yarnpkg.com/chai-string/-/chai-string-1.5.0.tgz#0bdb2d8a5f1dbe90bc78ec493c1c1c180dd4d3d2"
integrity sha512-sydDC3S3pNAQMYwJrs6dQX0oBQ6KfIPuOZ78n7rocW0eJJlsHPh2t3kwW7xfwYA/1Bf6/arGtSUo16rxR2JFlw==
+chai@4.3.4:
+ version "4.3.4"
+ resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.4.tgz#b55e655b31e1eac7099be4c08c21964fce2e6c49"
+ integrity sha512-yS5H68VYOCtN1cjfwumDSuzn/9c+yza4f3reKXlE5rUg7SFcCEy90gJvydNgOYtblyf4Zi6jIWRnXOgErta0KA==
+ dependencies:
+ assertion-error "^1.1.0"
+ check-error "^1.0.2"
+ deep-eql "^3.0.1"
+ get-func-name "^2.0.0"
+ pathval "^1.1.1"
+ type-detect "^4.0.5"
+
chai@^4.2.0:
version "4.3.6"
resolved "https://registry.yarnpkg.com/chai/-/chai-4.3.6.tgz#ffe4ba2d9fa9d6680cc0b370adae709ec9011e9c"
@@ -11606,7 +11635,7 @@ vscode-debugprotocol@^1.32.0:
resolved "https://registry.yarnpkg.com/vscode-debugprotocol/-/vscode-debugprotocol-1.51.0.tgz#c03168dac778b6c24ce17b3511cb61e89c11b2df"
integrity sha512-dzKWTMMyebIMPF1VYMuuQj7gGFq7guR8AFya0mKacu+ayptJfaRuM0mdHCqiOth4FnRP8mPhEroFPx6Ift8wHA==
-vscode-jsonrpc@^5.0.0, vscode-jsonrpc@^5.0.1:
+vscode-jsonrpc@^5.0.1:
version "5.0.1"
resolved "https://registry.yarnpkg.com/vscode-jsonrpc/-/vscode-jsonrpc-5.0.1.tgz#9bab9c330d89f43fc8c1e8702b5c36e058a01794"
integrity sha512-JvONPptw3GAQGXlVV2utDcHx0BiY34FupW/kI6mZ5x06ER5DdPG/tXWMVHjTNULF5uKPOUUD0SaXg5QaubJL0A==
@@ -11659,13 +11688,6 @@ vscode-uri@^2.1.1:
resolved "https://registry.yarnpkg.com/vscode-uri/-/vscode-uri-2.1.2.tgz#c8d40de93eb57af31f3c715dd650e2ca2c096f1c"
integrity sha512-8TEXQxlldWAuIODdukIb+TR5s+9Ds40eSJrw+1iDDA9IFORPjMELarNQE3myz5XIkWWpdprmJjm1/SxMlWOC8A==
-vscode-ws-jsonrpc@^0.2.0:
- version "0.2.0"
- resolved "https://registry.yarnpkg.com/vscode-ws-jsonrpc/-/vscode-ws-jsonrpc-0.2.0.tgz#5e9c26e10da54a1a235da7d59e74508bbcb8edd9"
- integrity sha512-NE9HNRgPjCaPyTJvIudcpyIWPImxwRDtuTX16yks7SAiZgSXigxAiZOvSvVBGmD1G/OMfrFo6BblOtjVR9DdVA==
- dependencies:
- vscode-jsonrpc "^5.0.0"
-
w3c-hr-time@^1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"