Skip to content

Commit

Permalink
Address review feedback + integrate performance improvements
Browse files Browse the repository at this point in the history
Address review feedback:
- Merge and RpcClient and RpcServer in RpcProtocol
- Only export public API in index.ts
- Remove outdated comments
- Refactor error value encoder
- Fix casing in `MessageTypes

Integrate performance improvements
- Use Uint8Array instead of ArrayBuffer when feasible
- Use TextEncoder.encodeInto() instead of encode()
- Use improved encoding for length values
- Preallocate a single TextEncoder
- Consider potential buffer byteOffsets when creating new Write/Reader buffers. 

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed May 5, 2022
1 parent 208b074 commit 946929d
Show file tree
Hide file tree
Showing 23 changed files with 416 additions and 523 deletions.
188 changes: 0 additions & 188 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts

This file was deleted.

10 changes: 5 additions & 5 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************
import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel';

use(spies);
Expand All @@ -25,16 +25,16 @@ use(spies);
*/
export class ChannelPipe {
readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => {
const leftWrite = new ArrayBufferWriteBuffer();
const leftWrite = new Uint8ArrayWriteBuffer();
leftWrite.onCommit(buffer => {
this.right.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer));
this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return leftWrite;
});
readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => {
const rightWrite = new ArrayBufferWriteBuffer();
const rightWrite = new Uint8ArrayWriteBuffer();
rightWrite.onCommit(buffer => {
this.left.onMessageEmitter.fire(() => new ArrayBufferReadBuffer(buffer));
this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return rightWrite;
});
Expand Down
25 changes: 15 additions & 10 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,12 @@ export interface ChannelCloseEvent {

/**
* The `MessageProvider` is emitted when a channel receives a new message.
* Listeners can read the provider to obtain a new {@link ReadBuffer} for the received message
* Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message.
* This ensures that each listener has its own isolated {@link ReadBuffer} instance.
*
*/
export type MessageProvider = () => ReadBuffer;

export enum MessageTypes {
Open = 1,
Close = 2,
AckOpen = 3,
Data = 4
}

/**
* Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
* the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
Expand Down Expand Up @@ -98,7 +93,7 @@ export class ForwardingChannel implements Channel {
return this.writeBufferSource();
}

send(message: ArrayBuffer): void {
send(message: Uint8Array): void {
const writeBuffer = this.getWriteBuffer();
writeBuffer.writeBytes(message);
writeBuffer.commit();
Expand All @@ -109,6 +104,16 @@ export class ForwardingChannel implements Channel {
}
}

/**
* The different message types used in the messaging protocol of the {@link ChannelMultiplexer}
*/
export enum MessageTypes {
Open = 1,
Close = 2,
AckOpen = 3,
Data = 4
}

/**
* The write buffers in this implementation immediately write to the underlying
* channel, so we rely on writers to the multiplexed channels to always commit their
Expand Down Expand Up @@ -159,7 +164,7 @@ export class ChannelMultiplexer {
return this.handleClose(id);
}
case MessageTypes.Data: {
return this.handleData(id, buffer.sliceAtCurrentPosition());
return this.handleData(id, buffer.sliceAtReadPosition());
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/common/message-rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export * from './rpc-protocol';
export * from './channel';
export * from './message-buffer';
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
57 changes: 13 additions & 44 deletions packages/core/src/common/message-rpc/message-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ export interface WriteBuffer {
writeUint16(value: number): WriteBuffer
writeUint32(value: number): WriteBuffer;
writeString(value: string): WriteBuffer;
writeBytes(value: ArrayBuffer): WriteBuffer;
/**
* Writes a number as integer value.The best suited encoding format(Uint8 Uint16 or Uint32) is
* computed automatically and encoded as the first byte. Mainly used to persist length values of
* strings and arrays.
*/
writeInteger(value: number): WriteBuffer
writeBytes(value: Uint8Array): WriteBuffer;
writeLength(value: number): WriteBuffer
/**
* Makes any writes to the buffer permanent, for example by sending the writes over a channel.
* You must obtain a new write buffer after committing
Expand All @@ -55,8 +50,8 @@ export class ForwardingWriteBuffer implements WriteBuffer {
return this;
}

writeInteger(value: number): WriteBuffer {
this.underlying.writeInteger(value);
writeLength(value: number): WriteBuffer {
this.underlying.writeLength(value);
return this;
}

Expand All @@ -65,7 +60,7 @@ export class ForwardingWriteBuffer implements WriteBuffer {
return this;
}

writeBytes(value: ArrayBuffer): WriteBuffer {
writeBytes(value: Uint8Array): WriteBuffer {
this.underlying.writeBytes(value);
return this;
}
Expand All @@ -75,31 +70,6 @@ export class ForwardingWriteBuffer implements WriteBuffer {
}
}

export enum UintType {
Uint8 = 1,
Uint16 = 2,
Uint32 = 3
}

/**
* Checks wether the given number is an unsigned integer and returns the {@link UintType}
* that is needed to store it in binary format.
* @param value The number for which the UintType should be retrieved.
* @returns the corresponding UInt type.
* @throws An error if the given number is not an unsigned integer.
*/
export function getUintType(value: number): UintType {
if (value < 0 || (value % 1) !== 0) {
throw new Error(`Could not determine IntType. ${value} is not an unsigned integer`);
}
if (value <= 255) {
return UintType.Uint8;
} else if (value <= 65535) {
return UintType.Uint16;
}
return UintType.Uint32;
}

/**
* A buffer maintaining a read position in a buffer containing a received message capable of
* reading primitive values.
Expand All @@ -109,16 +79,15 @@ export interface ReadBuffer {
readUint16(): number;
readUint32(): number;
readString(): string;
readBytes(): ArrayBuffer;
readNumber(): number,
readLength(): number,
readBytes(): Uint8Array;

/**
* Reads a number as int value. The encoding format(Uint8, Uint16, or Uint32) is expected to be
* encoded in the first byte.
*/
readInteger(): number
/**
* Returns a new read buffer whose starting read position is the current read position of this buffer.
* Can be used to read (sub) messages multiple times.
* Returns a new read buffer whose starting read position is the current read position of this buffer.
* This is useful to create read buffers sub messages.
* (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part
* of the message reader that is sent to the target channel)
*/
sliceAtCurrentPosition(): ReadBuffer
sliceAtReadPosition(): ReadBuffer
}
Loading

0 comments on commit 946929d

Please sign in to comment.