diff --git a/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts b/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts deleted file mode 100644 index f402244a97d03..0000000000000 --- a/packages/core/src/common/message-rpc/array-buffer-message-buffer.ts +++ /dev/null @@ -1,188 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -import { Emitter, Event } from '../event'; -import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer'; - -export class ArrayBufferWriteBuffer implements WriteBuffer { - constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) { - } - - private get msg(): DataView { - return new DataView(this.buffer); - } - - ensureCapacity(value: number): WriteBuffer { - let newLength = this.buffer.byteLength; - while (newLength < this.offset + value) { - newLength *= 2; - } - if (newLength !== this.buffer.byteLength) { - const newBuffer = new ArrayBuffer(newLength); - new Uint8Array(newBuffer).set(new Uint8Array(this.buffer)); - this.buffer = newBuffer; - } - return this; - } - - writeUint8(value: number): WriteBuffer { - this.ensureCapacity(1); - this.msg.setUint8(this.offset++, value); - return this; - } - - writeUint16(value: number): WriteBuffer { - this.ensureCapacity(2); - this.msg.setUint16(this.offset, value); - this.offset += 2; - return this; - } - - writeUint32(value: number): WriteBuffer { - this.ensureCapacity(4); - this.msg.setUint32(this.offset, value); - this.offset += 4; - return this; - } - - writeInteger(value: number): WriteBuffer { - const type = getUintType(value); - this.writeUint8(type); - switch (type) { - case UintType.Uint8: - this.writeUint8(value); - return this; - case UintType.Uint16: - this.writeUint16(value); - return this; - default: - this.writeUint32(value); - return this; - } - } - - writeString(value: string): WriteBuffer { - const encoded = this.encodeString(value); - this.writeBytes(encoded); - return this; - } - - private encodeString(value: string): Uint8Array { - return new TextEncoder().encode(value); - } - - writeBytes(value: ArrayBuffer): WriteBuffer { - this.writeInteger(value.byteLength); - this.ensureCapacity(value.byteLength); - new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset); - this.offset += value.byteLength; - return this; - } - - private onCommitEmitter = new Emitter(); - get onCommit(): Event { - return this.onCommitEmitter.event; - } - - commit(): void { - this.onCommitEmitter.fire(this.getCurrentContents()); - } - - getCurrentContents(): ArrayBuffer { - return this.buffer.slice(0, this.offset); - - } -} - -export class ArrayBufferReadBuffer implements ReadBuffer { - private offset: number = 0; - - constructor(private readonly buffer: ArrayBuffer, readPosition = 0) { - this.offset = readPosition; - } - - private get msg(): DataView { - return new DataView(this.buffer); - } - - readUint8(): number { - return this.msg.getUint8(this.offset++); - } - - readUint16(): number { - const result = this.msg.getUint16(this.offset); - this.offset += 2; - return result; - } - - readUint32(): number { - const result = this.msg.getInt32(this.offset); - this.offset += 4; - return result; - } - - readInteger(): number { - const type = this.readUint8(); - switch (type) { - case UintType.Uint8: - return this.readUint8(); - case UintType.Uint16: - return this.readUint16(); - default: - return this.readUint32(); - } - } - - readString(): string { - const len = this.readInteger(); - const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len)); - this.offset += len; - return result; - } - - private decodeString(buf: ArrayBuffer): string { - return new TextDecoder().decode(buf); - } - - readBytes(): ArrayBuffer { - const length = this.readInteger(); - const result = this.buffer.slice(this.offset, this.offset + length); - this.offset += length; - return result; - } - - sliceAtCurrentPosition(): ReadBuffer { - return new ArrayBufferReadBuffer(this.buffer, this.offset); - } -} - -/** - * Retrieve an {@link ArrayBuffer} view for the given buffer. Some {@link Uint8Array} buffer implementations e.g node's {@link Buffer} - * are using shared memory array buffers under the hood. Therefore we need to check the buffers `byteOffset` and `length` and slice - * the underlying array buffer if needed. - * @param buffer The Uint8Array or ArrayBuffer that should be converted. - * @returns a trimmed `ArrayBuffer` representation for the given buffer. - */ -export function toArrayBuffer(buffer: Uint8Array | ArrayBuffer): ArrayBuffer { - if (buffer instanceof ArrayBuffer) { - return buffer; - } - if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) { - return buffer.buffer; - } - - return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength); -} - diff --git a/packages/core/src/common/message-rpc/channel.spec.ts b/packages/core/src/common/message-rpc/channel.spec.ts index 47d7d579b9c29..2cb0d06b1be6d 100644 --- a/packages/core/src/common/message-rpc/channel.spec.ts +++ b/packages/core/src/common/message-rpc/channel.spec.ts @@ -15,7 +15,7 @@ // ***************************************************************************** import { assert, expect, spy, use } from 'chai'; import * as spies from 'chai-spies'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel'; use(spies); @@ -25,16 +25,16 @@ use(spies); */ export class ChannelPipe { readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => { - const leftWrite = new ArrayBufferWriteBuffer(); + const leftWrite = new Uint8ArrayWriteBuffer(); leftWrite.onCommit(buffer => { - this.right.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer)); + this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); }); return leftWrite; }); readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => { - const rightWrite = new ArrayBufferWriteBuffer(); + const rightWrite = new Uint8ArrayWriteBuffer(); rightWrite.onCommit(buffer => { - this.left.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer)); + this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); }); return rightWrite; }); diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index 97f70b6d6bcd6..f1bca79065e31 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -59,17 +59,12 @@ export interface ChannelCloseEvent { /** * The `MessageProvider` is emitted when a channel receives a new message. - * Listeners can read the provider to obtain a new {@link ReadBuffer} for the received message + * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message. + * This ensures that each listener has its own isolated {@link ReadBuffer} instance. + * */ export type MessageProvider = () => ReadBuffer; -export enum MessageTypes { - Open = 1, - Close = 2, - AckOpen = 3, - Data = 4 -} - /** * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. @@ -98,7 +93,7 @@ export class ForwardingChannel implements Channel { return this.writeBufferSource(); } - send(message: ArrayBuffer): void { + send(message: Uint8Array): void { const writeBuffer = this.getWriteBuffer(); writeBuffer.writeBytes(message); writeBuffer.commit(); @@ -109,6 +104,16 @@ export class ForwardingChannel implements Channel { } } +/** + * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} + */ +export enum MessageTypes { + Open = 1, + Close = 2, + AckOpen = 3, + Data = 4 +} + /** * The write buffers in this implementation immediately write to the underlying * channel, so we rely on writers to the multiplexed channels to always commit their @@ -159,7 +164,7 @@ export class ChannelMultiplexer { return this.handleClose(id); } case MessageTypes.Data: { - return this.handleData(id, buffer.sliceAtCurrentPosition()); + return this.handleData(id, buffer.sliceAtReadPosition()); } } } diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/common/message-rpc/index.ts index 8cada9981de3e..671540a31c42b 100644 --- a/packages/core/src/common/message-rpc/index.ts +++ b/packages/core/src/common/message-rpc/index.ts @@ -13,6 +13,6 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -export * from './rpc-protocol'; -export * from './channel'; -export * from './message-buffer'; +export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; +export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; +export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts index 56276e94dfdf0..46f083597d478 100644 --- a/packages/core/src/common/message-rpc/message-buffer.ts +++ b/packages/core/src/common/message-rpc/message-buffer.ts @@ -22,13 +22,8 @@ export interface WriteBuffer { writeUint16(value: number): WriteBuffer writeUint32(value: number): WriteBuffer; writeString(value: string): WriteBuffer; - writeBytes(value: ArrayBuffer): WriteBuffer; - /** - * Writes a number as integer value.The best suited encoding format(Uint8 Uint16 or Uint32) is - * computed automatically and encoded as the first byte. Mainly used to persist length values of - * strings and arrays. - */ - writeInteger(value: number): WriteBuffer + writeBytes(value: Uint8Array): WriteBuffer; + writeLength(value: number): WriteBuffer /** * Makes any writes to the buffer permanent, for example by sending the writes over a channel. * You must obtain a new write buffer after committing @@ -55,8 +50,8 @@ export class ForwardingWriteBuffer implements WriteBuffer { return this; } - writeInteger(value: number): WriteBuffer { - this.underlying.writeInteger(value); + writeLength(value: number): WriteBuffer { + this.underlying.writeLength(value); return this; } @@ -65,7 +60,7 @@ export class ForwardingWriteBuffer implements WriteBuffer { return this; } - writeBytes(value: ArrayBuffer): WriteBuffer { + writeBytes(value: Uint8Array): WriteBuffer { this.underlying.writeBytes(value); return this; } @@ -75,31 +70,6 @@ export class ForwardingWriteBuffer implements WriteBuffer { } } -export enum UintType { - Uint8 = 1, - Uint16 = 2, - Uint32 = 3 -} - -/** - * Checks wether the given number is an unsigned integer and returns the {@link UintType} - * that is needed to store it in binary format. - * @param value The number for which the UintType should be retrieved. - * @returns the corresponding UInt type. - * @throws An error if the given number is not an unsigned integer. - */ -export function getUintType(value: number): UintType { - if (value < 0 || (value % 1) !== 0) { - throw new Error(`Could not determine IntType. ${value} is not an unsigned integer`); - } - if (value <= 255) { - return UintType.Uint8; - } else if (value <= 65535) { - return UintType.Uint16; - } - return UintType.Uint32; -} - /** * A buffer maintaining a read position in a buffer containing a received message capable of * reading primitive values. @@ -109,16 +79,15 @@ export interface ReadBuffer { readUint16(): number; readUint32(): number; readString(): string; - readBytes(): ArrayBuffer; + readNumber(): number, + readLength(): number, + readBytes(): Uint8Array; /** - * Reads a number as int value. The encoding format(Uint8, Uint16, or Uint32) is expected to be - * encoded in the first byte. - */ - readInteger(): number - /** - * Returns a new read buffer whose starting read position is the current read position of this buffer. - * Can be used to read (sub) messages multiple times. + * Returns a new read buffer whose starting read position is the current read position of this buffer. + * This is useful to create read buffers sub messages. + * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part + * of the message reader that is sent to the target channel) */ - sliceAtCurrentPosition(): ReadBuffer + sliceAtReadPosition(): ReadBuffer } diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts index 32060af0c487e..0f22ab0a6551d 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts @@ -14,13 +14,13 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** import { expect } from 'chai'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; import { RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; describe('message buffer test', () => { it('encode object', () => { - const buffer = new ArrayBuffer(1024); - const writer = new ArrayBufferWriteBuffer(buffer); + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); const encoder = new RpcMessageEncoder(); const jsonMangled = JSON.parse(JSON.stringify(encoder)); @@ -29,7 +29,7 @@ describe('message buffer test', () => { const written = writer.getCurrentContents(); - const reader = new ArrayBufferReadBuffer(written); + const reader = new Uint8ArrayReadBuffer(written); const decoder = new RpcMessageDecoder(); const decoded = decoder.readTypedValue(reader); diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index 8262fbcc8628d..76bd435a0d8d6 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -13,12 +13,10 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -// partly based on https://github.com/microsoft/vscode/blob/435f8a4cae52fc9850766af92d5df3c492f59341/src/vs/workbench/services/extensions/common/rpcProtocol. /* eslint-disable @typescript-eslint/no-explicit-any */ import { ResponseError } from 'vscode-languageserver-protocol'; -import { toArrayBuffer } from './array-buffer-message-buffer'; -import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; /** * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) @@ -76,34 +74,18 @@ export interface SerializedError { readonly stack: string; } -export function transformErrorForSerialization(error: Error): SerializedError { - if (error instanceof Error) { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - return { - $isError: true, - name, - message, - stack - }; - } - - // return as is - return error; -} - /** * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s */ export enum ObjectType { - JSON = 1, - ArrayBuffer = 2, - ByteArray = 3, - UNDEFINED = 4, - ObjectArray = 5, - RESPONSE_ERROR = 6, - ERROR = 7 + Json = 1, + ByteArray = 2, + Undefined = 3, + ObjectArray = 4, + // eslint-disable-next-line @typescript-eslint/no-shadow + ResponseError = 5, + Error = 6 } @@ -151,22 +133,17 @@ export interface ValueDecoder { export class RpcMessageDecoder { protected decoders: Map = new Map(); - /** - * Declares the Uint8 type (i.e. the amount of bytes) necessary to store a decoder tag - * value in the buffer. - */ - protected tagIntType: UintType; constructor() { - this.registerDecoder(ObjectType.JSON, { + this.registerDecoder(ObjectType.Json, { read: buf => JSON.parse(buf.readString()) }); - this.registerDecoder(ObjectType.UNDEFINED, { + this.registerDecoder(ObjectType.Undefined, { read: () => undefined }); - this.registerDecoder(ObjectType.ERROR, { + this.registerDecoder(ObjectType.Error, { read: buf => { const serializedError: SerializedError = JSON.parse(buf.readString()); const error = new Error(serializedError.message); @@ -175,7 +152,7 @@ export class RpcMessageDecoder { } }); - this.registerDecoder(ObjectType.RESPONSE_ERROR, { + this.registerDecoder(ObjectType.ResponseError, { read: buf => { const error = JSON.parse(buf.readString()); return new ResponseError(error.code, error.message, error.data); @@ -183,10 +160,6 @@ export class RpcMessageDecoder { }); this.registerDecoder(ObjectType.ByteArray, { - read: buf => new Uint8Array(buf.readBytes()) - }); - - this.registerDecoder(ObjectType.ArrayBuffer, { read: buf => buf.readBytes() }); @@ -197,7 +170,7 @@ export class RpcMessageDecoder { if (!encodedSeparately) { return this.readTypedValue(buf); } - const length = buf.readInteger(); + const length = buf.readLength(); const result = new Array(length); for (let i = 0; i < length; i++) { result[i] = this.readTypedValue(buf); @@ -219,8 +192,6 @@ export class RpcMessageDecoder { throw new Error(`Decoder already registered: ${tag}`); } this.decoders.set(tag, decoder); - const maxTagId = Array.from(this.decoders.keys()).sort().reverse()[0]; - this.tagIntType = getUintType(maxTagId); } readTypedValue(buf: ReadBuffer): any { @@ -267,10 +238,7 @@ export class RpcMessageDecoder { protected parseRequest(msg: ReadBuffer): RequestMessage { const callId = msg.readUint32(); const method = msg.readString(); - let args = this.readTypedValue(msg) as any[]; - // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs - args = args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null - + const args = this.readTypedValue(msg) as any[]; return { type: RpcMessageType.Request, id: callId, @@ -325,7 +293,6 @@ export class RpcMessageEncoder { protected readonly encoders: [number, ValueEncoder][] = []; protected readonly registeredTags: Set = new Set(); - protected tagIntType: UintType; constructor() { this.registerEncoders(); @@ -333,25 +300,35 @@ export class RpcMessageEncoder { protected registerEncoders(): void { // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last - this.registerEncoder(ObjectType.JSON, { + this.registerEncoder(ObjectType.Json, { is: () => true, write: (buf, value) => { buf.writeString(JSON.stringify(value)); } }); - this.registerEncoder(ObjectType.UNDEFINED, { + this.registerEncoder(ObjectType.Undefined, { // eslint-disable-next-line no-null/no-null is: value => value == null, write: () => { } }); - this.registerEncoder(ObjectType.ERROR, { + this.registerEncoder(ObjectType.Error, { is: value => value instanceof Error, - write: (buf, value: Error) => buf.writeString(JSON.stringify(transformErrorForSerialization(value))) + write: (buf, error: Error) => { + const { name, message } = error; + const stack: string = (error).stacktrace || error.stack; + const serializedError = { + $isError: true, + name, + message, + stack + }; + buf.writeString(JSON.stringify(serializedError)); + } }); - this.registerEncoder(ObjectType.RESPONSE_ERROR, { + this.registerEncoder(ObjectType.ResponseError, { is: value => value instanceof ResponseError, write: (buf, value) => buf.writeString(JSON.stringify(value)) }); @@ -359,19 +336,9 @@ export class RpcMessageEncoder { this.registerEncoder(ObjectType.ByteArray, { is: value => value instanceof Uint8Array, write: (buf, value: Uint8Array) => { - /* When running in a nodejs context the received Uint8Array might be - a nodejs Buffer allocated from node's Buffer pool, which is not transferrable. - Therefore we use the `toArrayBuffer` utility method to retrieve the correct ArrayBuffer */ - const arrayBuffer = toArrayBuffer(value); - buf.writeBytes(arrayBuffer); + buf.writeBytes(value); } }); - - this.registerEncoder(ObjectType.ArrayBuffer, { - is: value => value instanceof ArrayBuffer, - write: (buf, value: ArrayBuffer) => buf.writeBytes(value) - }); - this.registerEncoder(ObjectType.ObjectArray, { is: value => Array.isArray(value), write: (buf, args: any[]) => { @@ -380,7 +347,7 @@ export class RpcMessageEncoder { if (!encodeSeparately) { this.writeTypedValue(buf, args, ObjectType.ObjectArray); } else { - buf.writeInteger(args.length); + buf.writeLength(args.length); for (let i = 0; i < args.length; i++) { this.writeTypedValue(buf, args[i], ObjectType.ObjectArray); } @@ -402,8 +369,6 @@ export class RpcMessageEncoder { } this.registeredTags.add(tag); this.encoders.push([tag, encoder]); - const maxTagId = this.encoders.map(value => value[0]).sort().reverse()[0]; - this.tagIntType = getUintType(maxTagId); } /** @@ -415,7 +380,7 @@ export class RpcMessageEncoder { * @returns `true` if the arguments require separate encoding, `false` otherwise. */ protected requiresSeparateEncoding(args: any[]): boolean { - return args.find(arg => arg instanceof Uint8Array || arg instanceof ArrayBuffer) !== undefined; + return args.find(arg => arg instanceof Uint8Array) !== undefined; } writeString(buf: WriteBuffer, value: string): void { diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 93bb04ea3e17d..c06bf5b687d3c 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -15,36 +15,44 @@ // ***************************************************************************** /* eslint-disable @typescript-eslint/no-explicit-any */ +import { CancellationToken, CancellationTokenSource } from '../../../shared/vscode-languageserver-protocol'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; -import { Channel, MessageProvider } from './channel'; -import { ReadBuffer } from './message-buffer'; +import { Channel } from './channel'; import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; -import { CancellationToken, CancellationTokenSource } from '../../../shared/vscode-languageserver-protocol'; /** * Handles request messages received by the {@link RpcServer}. */ export type RequestHandler = (method: string, args: any[]) => Promise; -const CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; /** - * Initialization options for {@link RpcClient}s and {@link RpcServer}s. + * Initialization options for a {@link RpcProtocol}. */ -export interface RpcInitializationOptions extends RPCConnectionOptions { +export interface RpcProtocolOptions { + /** + * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. + */ + encoder?: RpcMessageEncoder, /** - * Boolean flag to indicate whether the client/server should be used as as standalone component or is part of - * a {@link RpcConnection}. Default is `true` + * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. */ - standalone?: boolean, + decoder?: RpcMessageDecoder } - /** - * A `RpcServer` reads rcp request and notification messages and sends the reply values or - * errors from the request to the channel. - * It can either be instantiated as a standalone component or as part of a {@link RpcConnection}. + * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send + * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. + * Clients can get a promise for a remote request result that will be either resolved or + * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request + * Currently, there is no timeout handling for long running requests implemented. */ -export class RpcServer { +export class RpcProtocol { + static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token'; + + protected readonly pendingRequests: Map> = new Map(); + + protected nextMessageId: number = 0; + protected readonly encoder: RpcMessageEncoder; protected readonly decoder: RpcMessageDecoder; @@ -55,13 +63,12 @@ export class RpcServer { return this.onNotificationEmitter.event; } - constructor(protected channel: Channel, public readonly requestHandler: RequestHandler, options: RpcInitializationOptions = {}) { + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { this.encoder = options.encoder ?? new RpcMessageEncoder(); this.decoder = options.decoder ?? new RpcMessageDecoder(); - if (options.standalone ?? true) { - const registration = channel.onMessage((msg: MessageProvider) => this.handleMessage(this.decoder.parse(msg()))); - channel.onClose(() => registration.dispose()); - } + const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))); + channel.onClose(() => registration.dispose()); + } handleMessage(message: RpcMessage): void { @@ -78,72 +85,6 @@ export class RpcServer { this.handleNotify(message.id, message.method, message.args); break; } - } - } - - protected handleCancel(id: number): void { - const cancellationTokenSource = this.cancellationTokenSources.get(id); - if (cancellationTokenSource) { - this.cancellationTokenSources.delete(id); - cancellationTokenSource.cancel(); - } - } - - protected async handleRequest(id: number, method: string, args: any[]): Promise { - - const output = this.channel.getWriteBuffer(); - - const addToken = args.length && args[args.length - 1] === CANCELLATION_TOKEN_KEY ? args.pop() : false; - if (addToken) { - const tokenSource = new CancellationTokenSource(); - this.cancellationTokenSources.set(id, tokenSource); - args.push(tokenSource.token); - } - - try { - const result = await this.requestHandler(method, args); - this.cancellationTokenSources.delete(id); - this.encoder.replyOK(output, id, result); - } catch (err) { - this.cancellationTokenSources.delete(id); - this.encoder.replyErr(output, id, err); - } - output.commit(); - } - - protected async handleNotify(id: number, method: string, args: any[]): Promise { - this.onNotificationEmitter.fire({ method, args }); - } -} - -/** - * A `RpcClient` sends requests and notifications to a remote server. - * Clients can get a promise for the request result that will be either resolved or - * rejected depending on the success of the request. - * The `RpcClient` keeps track of outstanding requests and matches replies to the appropriate request - * Currently, there is no timeout handling implemented in the client. - * It can either be instantiated as a standalone component or as part of a {@link RpcConnection}. - */ -export class RpcClient { - protected readonly pendingRequests: Map> = new Map(); - - protected nextMessageId: number = 0; - - protected readonly encoder: RpcMessageEncoder; - protected readonly decoder: RpcMessageDecoder; - - constructor(public readonly channel: Channel, options: RpcInitializationOptions = {}) { - this.encoder = options.encoder ?? new RpcMessageEncoder(); - this.decoder = options.decoder ?? new RpcMessageDecoder(); - if (options.standalone ?? true) { - const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))); - channel.onClose(() => registration.dispose()); - } - } - - handleMessage(message: RpcMessage): void { - - switch (message.type) { case RpcMessageType.Reply: { this.handleReply(message.id, message.res); break; @@ -184,13 +125,15 @@ export class RpcClient { const id = this.nextMessageId++; const reply = new Deferred(); + // The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the + // args array and the `CANCELLATION_TOKEN_KEY` string instead. const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; if (cancellationToken && cancellationToken.isCancellationRequested) { return Promise.reject(this.cancelError()); } if (cancellationToken) { - args.push(CANCELLATION_TOKEN_KEY); + args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); cancellationToken.onCancellationRequested(() => { this.sendCancel(id); this.pendingRequests.get(id)?.reject(this.cancelError()); @@ -222,59 +165,40 @@ export class RpcClient { error.name = 'Cancel'; return error; } -} - -/** - * Initialization options for a {@link RpcConnection}. - */ -export interface RPCConnectionOptions { - /** - * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. - */ - encoder?: RpcMessageEncoder, - /** - * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. - */ - decoder?: RpcMessageDecoder -} -/** - * A RpcConnection can be used to to establish a bi-directional RPC connection. It is capable of - * both sending & receiving requests and notifications to/from the channel. It acts as - * a {@link RpcServer} and a {@link RpcClient} at the same time. - */ -export class RpcConnection { - protected rpcClient: RpcClient; - protected rpcServer: RpcServer; - protected decoder = new RpcMessageDecoder(); - get onNotification(): Event<{ method: string; args: any[]; }> { - return this.rpcServer.onNotification; + protected handleCancel(id: number): void { + const cancellationTokenSource = this.cancellationTokenSources.get(id); + if (cancellationTokenSource) { + this.cancellationTokenSources.delete(id); + cancellationTokenSource.cancel(); + } } - constructor(readonly channel: Channel, public readonly requestHandler: (method: string, args: any[]) => Promise, options: RPCConnectionOptions = {}) { - this.decoder = options.decoder ?? new RpcMessageDecoder(); - this.rpcClient = new RpcClient(channel, { standalone: false, ...options }); - this.rpcServer = new RpcServer(channel, requestHandler, { standalone: false, ...options }); - const registration = channel.onMessage(data => this.handleMessage(data())); - channel.onClose(() => registration.dispose()); - } + protected async handleRequest(id: number, method: string, args: any[]): Promise { - handleMessage(data: ReadBuffer): void { - const message = this.decoder.parse(data); - switch (message.type) { - case RpcMessageType.Reply: - case RpcMessageType.ReplyErr: { - this.rpcClient.handleMessage(message); - } - default: - this.rpcServer.handleMessage(message); + const output = this.channel.getWriteBuffer(); + + // Check if the last argument of the received args is the key for indicating that a cancellation token should be used + // If so remove the key from the args and create a new cancellation token. + const addToken = args.length && args[args.length - 1] === RpcProtocol.CANCELLATION_TOKEN_KEY ? args.pop() : false; + if (addToken) { + const tokenSource = new CancellationTokenSource(); + this.cancellationTokenSources.set(id, tokenSource); + args.push(tokenSource.token); } - } - sendRequest(method: string, args: any[]): Promise { - return this.rpcClient.sendRequest(method, args); + + try { + const result = await this.requestHandler(method, args); + this.cancellationTokenSources.delete(id); + this.encoder.replyOK(output, id, result); + } catch (err) { + this.cancellationTokenSources.delete(id); + this.encoder.replyErr(output, id, err); + } + output.commit(); } - sendNotification(method: string, args: any[]): void { - this.rpcClient.sendNotification(method, args); + protected async handleNotify(id: number, method: string, args: any[]): Promise { + this.onNotificationEmitter.fire({ method, args }); } } diff --git a/packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts similarity index 85% rename from packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts rename to packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts index 8b2d43a50755c..e638b3f07cb46 100644 --- a/packages/core/src/common/message-rpc/array-buffer-message-buffer.spec.ts +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts @@ -14,12 +14,12 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** import { expect } from 'chai'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; describe('array message buffer tests', () => { it('basic read write test', () => { - const buffer = new ArrayBuffer(1024); - const writer = new ArrayBufferWriteBuffer(buffer); + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); writer.writeUint8(8); writer.writeUint32(10000); @@ -30,7 +30,7 @@ describe('array message buffer tests', () => { const written = writer.getCurrentContents(); - const reader = new ArrayBufferReadBuffer(written); + const reader = new Uint8ArrayReadBuffer(written); expect(reader.readUint8()).equal(8); expect(reader.readUint32()).equal(10000); diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts new file mode 100644 index 0000000000000..b63a38c6e39fa --- /dev/null +++ b/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts @@ -0,0 +1,206 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +import { Disposable } from 'vscode-languageserver-protocol'; +import { Emitter, Event } from '../event'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events. + * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed + * and can no longer be used for writing data. If the writer buffer is no longer needed but the message + * has not been committed yet it has to be disposed manually. + */ +export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { + + private encoder = new TextEncoder(); + private msg: DataView; + private isDisposed = false; + private offset: number; + + constructor(private buffer: Uint8Array = new Uint8Array(1024 * 1024), writePosition: number = 0) { + this.offset = buffer.byteOffset + writePosition; + this.msg = new DataView(buffer.buffer); + } + + ensureCapacity(value: number): WriteBuffer { + let newLength = this.buffer.byteLength; + while (newLength < this.offset + value) { + newLength *= 2; + } + if (newLength !== this.buffer.byteLength) { + const newBuffer = new Uint8Array(newLength); + newBuffer.set(this.buffer); + this.buffer = newBuffer; + this.msg = new DataView(this.buffer.buffer); + } + return this; + } + + writeLength(length: number): WriteBuffer { + if (length < 0 || (length % 1) !== 0) { + throw new Error(`Could not write the given length value. '${length}' is not an integer >= 0`); + } + if (length < 127) { + this.writeUint8(length); + } else { + this.writeUint8(128 + (length & 127)); + this.writeLength(length >> 7); + } + return this; + } + + writeNumber(value: number): WriteBuffer { + this.ensureCapacity(8); + this.msg.setFloat64(this.offset, value); + this.offset += 8; + return this; + } + + writeUint8(value: number): WriteBuffer { + this.ensureCapacity(1); + this.buffer[this.offset++] = value; + return this; + } + + writeUint16(value: number): WriteBuffer { + this.ensureCapacity(2); + this.msg.setUint16(this.offset, value); + this.offset += 2; + return this; + } + + writeUint32(value: number): WriteBuffer { + this.ensureCapacity(4); + this.msg.setUint32(this.offset, value); + this.offset += 4; + return this; + } + + writeString(value: string): WriteBuffer { + this.ensureCapacity(4 * value.length); + const result = this.encoder.encodeInto(value, this.buffer.subarray(this.offset + 4)); + this.msg.setUint32(this.offset, result.written!); + this.offset += 4 + result.written!; + return this; + } + + writeBytes(value: Uint8Array): WriteBuffer { + this.writeLength(value.byteLength); + this.ensureCapacity(value.length); + this.buffer.set(value, this.offset); + this.offset += value.length; + return this; + } + + private onCommitEmitter = new Emitter(); + get onCommit(): Event { + return this.onCommitEmitter.event; + } + + commit(): void { + if (this.isDisposed) { + throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed."); + } + this.onCommitEmitter.fire(this.getCurrentContents()); + this.dispose(); + } + + getCurrentContents(): Uint8Array { + return this.buffer.slice(this.buffer.byteOffset, this.offset); + } + + dispose(): void { + if (!this.isDisposed) { + this.onCommitEmitter.dispose(); + this.isDisposed = true; + } + } + +} +/** + * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering. + * Is for single message read. A message can only be read once. + */ +export class Uint8ArrayReadBuffer implements ReadBuffer { + private offset: number = 0; + private msg: DataView; + private decoder = new TextDecoder(); + + constructor(private readonly buffer: Uint8Array, readPosition = 0) { + this.offset = buffer.byteOffset + readPosition; + this.msg = new DataView(buffer.buffer); + } + + readUint8(): number { + return this.msg.getUint8(this.offset++); + } + + readUint16(): number { + const result = this.msg.getUint16(this.offset); + this.offset += 2; + return result; + } + + readUint32(): number { + const result = this.msg.getInt32(this.offset); + this.offset += 4; + return result; + } + + readLength(): number { + let shift = 0; + let byte = this.readUint8(); + let value = (byte & 127) << shift; + while (byte > 127) { + shift += 7; + byte = this.readUint8(); + value = value + ((byte & 127) << shift); + } + return value; + } + + readNumber(): number { + const result = this.msg.getFloat64(this.offset); + this.offset += 8; + return result; + } + + readString(): string { + const len = this.readUint32(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.decodeString(this.buffer.slice(sliceOffset, sliceOffset + len)); + this.offset += len; + return result; + } + + private decodeString(buf: Uint8Array): string { + return this.decoder.decode(buf); + } + + readBytes(): Uint8Array { + const length = this.readLength(); + const sliceOffset = this.offset - this.buffer.byteOffset; + const result = this.buffer.slice(sliceOffset, sliceOffset + length); + this.offset += length; + return result; + } + + sliceAtReadPosition(): ReadBuffer { + const sliceOffset = this.offset - this.buffer.byteOffset; + return new Uint8ArrayReadBuffer(this.buffer, sliceOffset); + } +} diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index 11752f256d623..d09aa34b9ed15 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -21,7 +21,7 @@ import { ApplicationError } from '../application-error'; import { Disposable } from '../disposable'; import { Emitter, Event } from '../event'; import { Channel } from '../message-rpc/channel'; -import { RequestHandler, RpcConnection } from '../message-rpc/rpc-protocol'; +import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; import { ConnectionHandler } from './handler'; export type JsonRpcServer = Disposable & { @@ -57,9 +57,9 @@ export class JsonRpcConnectionHandler implements ConnectionHan /** * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcConnection; +export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcConnection(channel, requestHandler); +const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -109,8 +109,8 @@ export class JsonRpcProxyFactory implements ProxyHandler { protected readonly onDidOpenConnectionEmitter = new Emitter(); protected readonly onDidCloseConnectionEmitter = new Emitter(); - protected connectionPromiseResolve: (connection: RpcConnection) => void; - protected connectionPromise: Promise; + protected connectionPromiseResolve: (connection: RpcProtocol) => void; + protected connectionPromise: Promise; /** * Build a new JsonRpcProxyFactory. diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index ce1d7501cf3bf..f43f33e6f7644 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -18,7 +18,7 @@ import { Emitter, Event } from '../event'; import { WriteBuffer } from '../message-rpc'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; /** @@ -47,11 +47,17 @@ export class WebSocketChannel implements Channel { constructor(protected readonly socket: IWebSocket) { socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); socket.onError(error => this.onErrorEmitter.fire(error)); - socket.onMessage(buffer => this.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer))); + // eslint-disable-next-line arrow-body-style + socket.onMessage(data => this.onMessageEmitter.fire(() => { + // In the browser context socketIO receives binary messages as ArrayBuffers. + // So we have to convert them to a Uint8Array before delegating the message to the read buffer. + const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; + return new Uint8ArrayReadBuffer(buffer); + })); } getWriteBuffer(): WriteBuffer { - const result = new ArrayBufferWriteBuffer(); + const result = new Uint8ArrayWriteBuffer(); result.onCommit(buffer => { if (this.socket.isConnected()) { @@ -79,7 +85,7 @@ export interface IWebSocket { * Sends the given message over the web socket in binary format. * @param message The binary message. */ - send(message: ArrayBuffer): void; + send(message: Uint8Array): void; /** * Closes the websocket from the local side. */ @@ -92,7 +98,7 @@ export interface IWebSocket { * Listener callback to handle incoming messages. * @param cb The callback. */ - onMessage(cb: (message: ArrayBuffer) => void): void; + onMessage(cb: (message: Uint8Array) => void): void; /** * Listener callback to handle socket errors. * @param cb The callback. diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 366cfd9cb17bf..152c2677b2d33 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -17,7 +17,7 @@ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; import { Emitter, Event } from '../../common'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { Channel, MessageProvider } from '../../common/message-rpc/channel'; import { JsonRpcProxy } from '../../common/messaging'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; @@ -39,15 +39,15 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(); ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new ArrayBufferReadBuffer(data.buffer)); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); return { close: () => Event.None, getWriteBuffer: () => { - const writer = new ArrayBufferWriteBuffer(); + const writer = new Uint8ArrayWriteBuffer(); writer.onCommit(buffer => // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array. - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, new Uint8Array(buffer)) + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) ); return writer; }, diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index afb9a90a1e963..95014a673ac42 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -18,7 +18,7 @@ import { ipcMain, IpcMainEvent, WebContents } from '@theia/electron/shared/elect import { inject, injectable, named, postConstruct } from 'inversify'; import { Emitter, Event, WriteBuffer } from '../../common'; import { ContributionProvider } from '../../common/contribution-provider'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { MessagingContribution } from '../../node/messaging/messaging-contribution'; @@ -56,30 +56,34 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { const sender = event.sender; + // Get the multiplexer for a given window id try { - // Get the multiplexer for a given window id - const windowChannelData = this.windowChannelMultiplexer.get(sender.id)!; - if (!windowChannelData) { - const mainChannel = this.createWindowMainChannel(sender); - const multiPlexer = new ChannelMultiplexer(mainChannel); - multiPlexer.onDidOpenChannel(openEvent => { - const { channel, id } = openEvent; - if (this.channelHandlers.route(id, channel)) { - console.debug(`Opening channel for service path '${id}'.`); - channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); - } - }); - - sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. - sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. - this.windowChannelMultiplexer.set(sender.id, { channel: mainChannel, multiPlexer }); - } - windowChannelData.channel.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(data.buffer)); + const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); + windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); } catch (error) { console.error('IPC: Failed to handle message', { error, data }); } } + // Creates a new multiplexer for a given sender/window + protected createWindowChannelData(sender: Electron.WebContents): { channel: ElectronWebContentChannel, multiPlexer: ChannelMultiplexer } { + const mainChannel = this.createWindowMainChannel(sender); + const multiPlexer = new ChannelMultiplexer(mainChannel); + multiPlexer.onDidOpenChannel(openEvent => { + const { channel, id } = openEvent; + if (this.channelHandlers.route(id, channel)) { + console.debug(`Opening channel for service path '${id}'.`); + channel.onClose(() => console.debug(`Closing channel on service path '${id}'.`)); + } + }); + + sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. + sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. + const data = { channel: mainChannel, multiPlexer }; + this.windowChannelMultiplexer.set(sender.id, data); + return data; + } + /** * Creates the main channel to a window. * @param sender The window that the channel should be established to. @@ -130,16 +134,19 @@ export class ElectronWebContentChannel implements Channel { } getWriteBuffer(): WriteBuffer { - const writer = new ArrayBufferWriteBuffer(); + const writer = new Uint8ArrayWriteBuffer(); writer.onCommit(buffer => { if (!this.sender.isDestroyed()) { - this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, new Uint8Array(buffer)); + this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); } }); return writer; } close(): void { + this.onCloseEmitter.dispose(); + this.onMessageEmitter.dispose(); + this.onErrorEmitter.dispose(); } } diff --git a/packages/core/src/node/messaging/ipc-bootstrap.ts b/packages/core/src/node/messaging/ipc-bootstrap.ts index df7139fda4d17..c1331dbe9bf49 100644 --- a/packages/core/src/node/messaging/ipc-bootstrap.ts +++ b/packages/core/src/node/messaging/ipc-bootstrap.ts @@ -17,7 +17,7 @@ import { Socket } from 'net'; import 'reflect-metadata'; import { Emitter } from '../../common'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { Channel, ChannelCloseEvent, MessageProvider } from '../../common/message-rpc/channel'; import { dynamicRequire } from '../dynamic-require'; import { checkParentAlive, IPCEntryPoint } from './ipc-protocol'; @@ -40,7 +40,7 @@ function createChannel(): Channel { eventEmitter.on('error', error => onErrorEmitter.fire(error)); eventEmitter.on('close', () => onCloseEmitter.fire({ reason: 'Process has been closed from remote site (parent)' })); pipe.on('data', (data: Uint8Array) => { - onMessageEmitter.fire(() => new ArrayBufferReadBuffer(data.buffer)); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); return { @@ -49,9 +49,9 @@ function createChannel(): Channel { onError: onErrorEmitter.event, onMessage: onMessageEmitter.event, getWriteBuffer: () => { - const result = new ArrayBufferWriteBuffer(); + const result = new Uint8ArrayWriteBuffer(); result.onCommit(buffer => { - pipe.write(new Uint8Array(buffer)); + pipe.write(buffer); }); return result; diff --git a/packages/core/src/node/messaging/ipc-connection-provider.ts b/packages/core/src/node/messaging/ipc-connection-provider.ts index 6fc31a193d033..a758659b7dfce 100644 --- a/packages/core/src/node/messaging/ipc-connection-provider.ts +++ b/packages/core/src/node/messaging/ipc-connection-provider.ts @@ -20,7 +20,7 @@ import * as path from 'path'; import { Writable } from 'stream'; import { Message } from 'vscode-ws-jsonrpc'; import { ConnectionErrorHandler, Disposable, DisposableCollection, Emitter, ILogger } from '../../common'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '../../common/message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { Channel, ChannelCloseEvent, MessageProvider } from '../../common/message-rpc/channel'; import { createIpcEnv } from './ipc-protocol'; @@ -85,7 +85,7 @@ export class IPCConnectionProvider { const pipe = childProcess.stdio[4] as Writable; pipe.on('data', (data: Uint8Array) => { - onMessageEmitter.fire(() => new ArrayBufferReadBuffer(data.buffer)); + onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); }); childProcess.on('error', err => onErrorEmitter.fire(err)); @@ -97,9 +97,9 @@ export class IPCConnectionProvider { onError: onErrorEmitter.event, onMessage: onMessageEmitter.event, getWriteBuffer: () => { - const result = new ArrayBufferWriteBuffer(); + const result = new Uint8ArrayWriteBuffer(); result.onCommit(buffer => { - pipe.write(new Uint8Array(buffer)); + pipe.write(buffer); }); return result; diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index f5d611307dae3..02c28449e53dc 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -26,8 +26,8 @@ import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; -import { toArrayBuffer } from '../../common/message-rpc/array-buffer-message-buffer'; -import { Channel, ChannelMultiplexer } from '../../common/message-rpc'; +import { Channel } from '../../common/message-rpc'; +import { ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -149,7 +149,7 @@ function toIWebSocket(socket: Socket): IWebSocket { isConnected: () => socket.connected, onClose: cb => socket.on('disconnect', reason => cb(reason)), onError: cb => socket.on('error', error => cb(error)), - onMessage: cb => socket.on('message', data => cb(toArrayBuffer(data))), + onMessage: cb => socket.on('message', data => cb(data)), send: message => socket.emit('message', message) }; } diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 89f7071e8533f..9abd824cdc50e 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -18,8 +18,8 @@ import * as http from 'http'; import * as https from 'https'; import { AddressInfo } from 'net'; import { io, Socket } from 'socket.io-client'; -import { Channel, ChannelMultiplexer } from '../../../common'; -import { toArrayBuffer } from '../../../common/message-rpc/array-buffer-message-buffer'; +import { Channel } from '../../../common'; +import { ChannelMultiplexer } from '../../../common/message-rpc/channel'; import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; export class TestWebSocketChannelSetup { public readonly multiPlexer: ChannelMultiplexer; @@ -50,7 +50,7 @@ function toIWebSocket(socket: Socket): IWebSocket { isConnected: () => socket.connected, onClose: cb => socket.on('disconnect', reason => cb(reason)), onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(toArrayBuffer(data))), + onMessage: cb => socket.on('message', data => cb(data)), send: message => socket.emit('message', message) }; } diff --git a/packages/debug/src/browser/debug-session-connection.ts b/packages/debug/src/browser/debug-session-connection.ts index 2168697562231..748715cc78865 100644 --- a/packages/debug/src/browser/debug-session-connection.ts +++ b/packages/debug/src/browser/debug-session-connection.ts @@ -113,7 +113,6 @@ const standardDebugEvents = new Set([ 'thread' ]); -// TODO: Proper message RPC for debug session protocol export class DebugSessionConnection implements Disposable { private sequence = 1; diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts index dfc19d630b47a..4139d3314c0e9 100644 --- a/packages/plugin-ext/src/common/connection.ts +++ b/packages/plugin-ext/src/common/connection.ts @@ -17,7 +17,7 @@ import { ConnectionExt, ConnectionMain } from './plugin-api-rpc'; import { Emitter, Event } from '@theia/core/lib/common/event'; import { ChannelCloseEvent, MessageProvider } from '@theia/core/lib/common/message-rpc/channel'; import { WriteBuffer, Channel } from '@theia/core'; -import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from '@theia/core/lib/common/message-rpc/array-buffer-message-buffer'; +import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer'; /** * A channel communicating with a counterpart in a plugin host. @@ -32,9 +32,9 @@ export class PluginChannel implements Channel { protected readonly connection: ConnectionExt | ConnectionMain) { } getWriteBuffer(): WriteBuffer { - const result = new ArrayBufferWriteBuffer(); + const result = new Uint8ArrayWriteBuffer(); result.onCommit(buffer => { - this.connection.$sendMessage(this.id, new ArrayBufferReadBuffer(buffer).readString()); + this.connection.$sendMessage(this.id, new Uint8ArrayReadBuffer(buffer).readString()); }); return result; @@ -89,9 +89,9 @@ export class ConnectionImpl implements ConnectionMain, ConnectionExt { */ async $sendMessage(id: string, message: string): Promise { if (this.connections.has(id)) { - const writer = new ArrayBufferWriteBuffer(); + const writer = new Uint8ArrayWriteBuffer(); writer.writeString(message); - const reader = new ArrayBufferReadBuffer(writer.getCurrentContents()); + const reader = new Uint8ArrayReadBuffer(writer.getCurrentContents()); this.connections.get(id)!.fireMessageReceived(() => reader); } else { console.warn(`Received message for unknown connection: ${id}`); diff --git a/packages/task/src/node/task-server.slow-spec.ts b/packages/task/src/node/task-server.slow-spec.ts index cffb3aedb8dbd..d1c0a72ccc1bd 100644 --- a/packages/task/src/node/task-server.slow-spec.ts +++ b/packages/task/src/node/task-server.slow-spec.ts @@ -31,7 +31,7 @@ import { ProcessTaskConfiguration, ProcessType } from '../common/process/task-pr import { createTaskTestContainer } from './test/task-test-container'; import { TestWebSocketChannelSetup } from '@theia/core/lib/node/messaging/test/test-web-socket-channel'; import { terminalsPath } from '@theia/terminal/lib/common/terminal-protocol'; -import { RpcConnection } from '@theia/core'; +import { RpcProtocol } from '@theia/core'; // test scripts that we bundle with tasks const commandShortRunning = './task'; @@ -110,7 +110,7 @@ describe('Task server / back-end', function (): void { const setup = new TestWebSocketChannelSetup({ server, path: `${terminalsPath}/${terminalId}` }); setup.multiPlexer.onDidOpenChannel(event => { const channel = event.channel; - const connection = new RpcConnection(channel, (method, args) => { + const connection = new RpcProtocol(channel, (method, args) => { reject(`Received unexpected request: ${method} with args: ${args} `); return Promise.reject(); }); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 848aad4a4d780..04956f31654ba 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -22,7 +22,7 @@ import { nls } from '@theia/core/lib/common/nls'; import { Deferred } from '@theia/core/lib/common/promise-util'; import URI from '@theia/core/lib/common/uri'; import { inject, injectable, named, postConstruct } from '@theia/core/shared/inversify'; -import { RequestHandler, RpcConnection } from '@theia/core/lib/common/message-rpc/rpc-protocol'; +import { RequestHandler, RpcProtocol } from '@theia/core/lib/common/message-rpc/rpc-protocol'; import { CommandLineOptions, ShellCommandBuilder } from '@theia/process/lib/common/shell-command-builder'; import { WorkspaceService } from '@theia/workspace/lib/browser'; import { RendererType, Terminal } from 'xterm'; @@ -58,7 +58,7 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget protected searchBox: TerminalSearchWidget; protected restored = false; protected closeOnDispose = true; - protected waitForConnection: Deferred | undefined; + protected waitForConnection: Deferred | undefined; protected hoverMessage: HTMLDivElement; protected lastTouchEnd: TouchEvent | undefined; protected isAttachedCloseListener: boolean = false; @@ -507,13 +507,13 @@ export class TerminalWidgetImpl extends TerminalWidget implements StatefulWidget } this.toDisposeOnConnect.dispose(); this.toDispose.push(this.toDisposeOnConnect); - const waitForConnection = this.waitForConnection = new Deferred(); + const waitForConnection = this.waitForConnection = new Deferred(); this.webSocketConnectionProvider.listen({ path: `${terminalsPath}/${this.terminalId}`, onConnection: connection => { const requestHandler: RequestHandler = _method => this.logger.warn('Received an unhandled RPC request from the terminal process'); - const rpc = new RpcConnection(connection, requestHandler); + const rpc = new RpcProtocol(connection, requestHandler); rpc.onNotification(event => { if (event.method === 'onData') { this.write(event.args[0]); diff --git a/packages/terminal/src/node/terminal-backend-contribution.ts b/packages/terminal/src/node/terminal-backend-contribution.ts index 358f91dcdf472..efcee668ee41b 100644 --- a/packages/terminal/src/node/terminal-backend-contribution.ts +++ b/packages/terminal/src/node/terminal-backend-contribution.ts @@ -19,7 +19,7 @@ import { ILogger, RequestHandler } from '@theia/core/lib/common'; import { TerminalProcess, ProcessManager } from '@theia/process/lib/node'; import { terminalsPath } from '../common/terminal-protocol'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; -import { RpcConnection } from '@theia/core/'; +import { RpcProtocol } from '@theia/core/'; @injectable() export class TerminalBackendContribution implements MessagingService.Contribution { @@ -31,7 +31,7 @@ export class TerminalBackendContribution implements MessagingService.Contributio protected readonly logger: ILogger; configure(service: MessagingService): void { - service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, connection) => { + service.wsChannel(`${terminalsPath}/:id`, (params: { id: string }, channel) => { const id = parseInt(params.id, 10); const termProcess = this.processManager.get(id); if (termProcess instanceof TerminalProcess) { @@ -46,11 +46,11 @@ export class TerminalBackendContribution implements MessagingService.Contributio } }; - const rpc = new RpcConnection(connection, requestHandler); + const rpc = new RpcProtocol(channel, requestHandler); output.on('data', data => { rpc.sendNotification('onData', [data]); }); - connection.onClose(() => output.dispose()); + channel.onClose(() => output.dispose()); } }); }