Skip to content

Commit

Permalink
Worker communication optimization (aka removing netstring dependency) (
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc authored Oct 7, 2021
1 parent 500f917 commit af15b52
Show file tree
Hide file tree
Showing 31 changed files with 412 additions and 1,010 deletions.
2 changes: 1 addition & 1 deletion lib/Channel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export declare class Channel extends EnhancedEventEmitter {
private readonly _consumerSocket;
private _nextId;
private readonly _sents;
private _recvBuffer?;
private _recvBuffer;
/**
* @private
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/Channel.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 38 additions & 38 deletions lib/Channel.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
// @ts-ignore
const netstring = require("netstring");
const Logger_1 = require("./Logger");
const EnhancedEventEmitter_1 = require("./EnhancedEventEmitter");
const errors_1 = require("./errors");
const os = require("os");
const littleEndian = os.endianness() == 'LE';
const logger = new Logger_1.Logger('Channel');
// netstring length for a 4194304 bytes payload.
const NS_MESSAGE_MAX_LEN = 4194313;
const NS_PAYLOAD_MAX_LEN = 4194304;
// Binary length for a 4194304 bytes payload.
const MESSAGE_MAX_LEN = 4194308;
const PAYLOAD_MAX_LEN = 4194304;
class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
/**
* @private
Expand All @@ -21,77 +21,76 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._nextId = 0;
// Map of pending sent requests.
this._sents = new Map();
// Buffer for reading messages from the worker.
this._recvBuffer = Buffer.alloc(0);
logger.debug('constructor()');
this._producerSocket = producerSocket;
this._consumerSocket = consumerSocket;
// Read Channel responses/notifications from the worker.
this._consumerSocket.on('data', (buffer) => {
if (!this._recvBuffer) {
if (!this._recvBuffer.length) {
this._recvBuffer = buffer;
}
else {
this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);
}
if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data into it');
if (this._recvBuffer.length > PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data in it');
// Reset the buffer and exit.
this._recvBuffer = undefined;
this._recvBuffer = Buffer.alloc(0);
return;
}
let msgStart = 0;
while (true) // eslint-disable-line no-constant-condition
{
let nsPayload;
try {
nsPayload = netstring.nsPayload(this._recvBuffer);
const readLen = this._recvBuffer.length - msgStart;
if (readLen < 4) {
// Incomplete data.
break;
}
catch (error) {
logger.error('invalid netstring data received from the worker process: %s', String(error));
// Reset the buffer and exit.
this._recvBuffer = undefined;
return;
const dataView = new DataView(this._recvBuffer.buffer, this._recvBuffer.byteOffset + msgStart);
const msgLen = dataView.getUint32(0, littleEndian);
if (readLen < 4 + msgLen) {
// Incomplete data.
break;
}
// Incomplete netstring message.
if (nsPayload === -1)
return;
const payload = this._recvBuffer.subarray(msgStart + 4, msgStart + 4 + msgLen);
msgStart += 4 + msgLen;
try {
// We can receive JSON messages (Channel messages) or log strings.
switch (nsPayload[0]) {
// 123 = '{' (a Channel JSON messsage).
switch (payload[0]) {
// 123 = '{' (a Channel JSON message).
case 123:
this._processMessage(JSON.parse(nsPayload.toString('utf8')));
this._processMessage(JSON.parse(payload.toString('utf8')));
break;
// 68 = 'D' (a debug log).
case 68:
logger.debug(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
logger.debug(`[pid:${pid}] ${payload.toString('utf8', 1)}`);
break;
// 87 = 'W' (a warn log).
case 87:
logger.warn(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
logger.warn(`[pid:${pid}] ${payload.toString('utf8', 1)}`);
break;
// 69 = 'E' (an error log).
case 69:
logger.error(`[pid:${pid} ${nsPayload.toString('utf8', 1)}`);
logger.error(`[pid:${pid} ${payload.toString('utf8', 1)}`);
break;
// 88 = 'X' (a dump log).
case 88:
// eslint-disable-next-line no-console
console.log(nsPayload.toString('utf8', 1));
console.log(payload.toString('utf8', 1));
break;
default:
// eslint-disable-next-line no-console
console.warn(`worker[pid:${pid}] unexpected data: %s`, nsPayload.toString('utf8', 1));
console.warn(`worker[pid:${pid}] unexpected data: %s`, payload.toString('utf8', 1));
}
}
catch (error) {
logger.error('received invalid message from the worker process: %s', String(error));
}
// Remove the read payload from the buffer.
this._recvBuffer =
this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));
if (!this._recvBuffer.length) {
this._recvBuffer = undefined;
return;
}
}
if (msgStart != 0) {
this._recvBuffer = this._recvBuffer.slice(msgStart);
}
});
this._consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process')));
Expand Down Expand Up @@ -141,11 +140,12 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
if (this._closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const ns = netstring.nsWrite(JSON.stringify(request));
if (Buffer.byteLength(ns) > NS_MESSAGE_MAX_LEN)
const payload = JSON.stringify(request);
if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
// This may throw if closed or remote side ended.
this._producerSocket.write(ns);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
return new Promise((pResolve, pReject) => {
const timeout = 1000 * (15 + (0.1 * this._sents.size));
const sent = {
Expand Down
2 changes: 1 addition & 1 deletion lib/PayloadChannel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export declare class PayloadChannel extends EnhancedEventEmitter {
private readonly _consumerSocket;
private _nextId;
private readonly _sents;
private _recvBuffer?;
private _recvBuffer;
private _ongoingNotification?;
/**
* @private
Expand Down
2 changes: 1 addition & 1 deletion lib/PayloadChannel.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 41 additions & 42 deletions lib/PayloadChannel.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
// @ts-ignore
const netstring = require("netstring");
const Logger_1 = require("./Logger");
const EnhancedEventEmitter_1 = require("./EnhancedEventEmitter");
const errors_1 = require("./errors");
const os = require("os");
const littleEndian = os.endianness() == 'LE';
const logger = new Logger_1.Logger('PayloadChannel');
// netstring length for a 4194304 bytes payload.
const NS_MESSAGE_MAX_LEN = 4194313;
const NS_PAYLOAD_MAX_LEN = 4194304;
// binary length for a 4194304 bytes payload.
const MESSAGE_MAX_LEN = 4194308;
const PAYLOAD_MAX_LEN = 4194304;
class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
/**
* @private
Expand All @@ -21,46 +21,45 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._nextId = 0;
// Map of pending sent requests.
this._sents = new Map();
// Buffer for reading messages from the worker.
this._recvBuffer = Buffer.alloc(0);
logger.debug('constructor()');
this._producerSocket = producerSocket;
this._consumerSocket = consumerSocket;
// Read PayloadChannel notifications from the worker.
this._consumerSocket.on('data', (buffer) => {
if (!this._recvBuffer) {
if (!this._recvBuffer.length) {
this._recvBuffer = buffer;
}
else {
this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);
}
if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data into it');
if (this._recvBuffer.length > PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data in it');
// Reset the buffer and exit.
this._recvBuffer = undefined;
this._recvBuffer = Buffer.alloc(0);
return;
}
let msgStart = 0;
while (true) // eslint-disable-line no-constant-condition
{
let nsPayload;
try {
nsPayload = netstring.nsPayload(this._recvBuffer);
const readLen = this._recvBuffer.length - msgStart;
if (readLen < 4) {
// Incomplete data.
break;
}
catch (error) {
logger.error('invalid netstring data received from the worker process: %s', String(error));
// Reset the buffer and exit.
this._recvBuffer = undefined;
return;
}
// Incomplete netstring message.
if (nsPayload === -1)
return;
this._processData(nsPayload);
// Remove the read payload from the buffer.
this._recvBuffer =
this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));
if (!this._recvBuffer.length) {
this._recvBuffer = undefined;
return;
const dataView = new DataView(this._recvBuffer.buffer, this._recvBuffer.byteOffset + msgStart);
const msgLen = dataView.getUint32(0, littleEndian);
if (readLen < 4 + msgLen) {
// Incomplete data.
break;
}
const payload = this._recvBuffer.subarray(msgStart + 4, msgStart + 4 + msgLen);
msgStart += 4 + msgLen;
this._processData(payload);
}
if (msgStart != 0) {
this._recvBuffer = this._recvBuffer.slice(msgStart);
}
});
this._consumerSocket.on('end', () => (logger.debug('Consumer PayloadChannel ended by the worker process')));
Expand Down Expand Up @@ -103,24 +102,24 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('notify() [event:%s]', event);
if (this._closed)
throw new errors_1.InvalidStateError('PayloadChannel closed');
const notification = { event, internal, data };
const ns1 = netstring.nsWrite(JSON.stringify(notification));
const ns2 = netstring.nsWrite(payload);
if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN)
const notification = JSON.stringify({ event, internal, data });
if (Buffer.byteLength(notification) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel notification too big');
else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN)
else if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel payload too big');
try {
// This may throw if closed or remote side ended.
this._producerSocket.write(ns1);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(notification)).buffer));
this._producerSocket.write(notification);
}
catch (error) {
logger.warn('notify() | sending notification failed: %s', String(error));
return;
}
try {
// This may throw if closed or remote side ended.
this._producerSocket.write(ns2);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
}
catch (error) {
logger.warn('notify() | sending payload failed: %s', String(error));
Expand All @@ -136,16 +135,16 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('request() [method:%s, id:%s]', method, id);
if (this._closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const ns1 = netstring.nsWrite(JSON.stringify(request));
const ns2 = netstring.nsWrite(payload);
if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN)
const request = JSON.stringify({ id, method, internal, data });
if (Buffer.byteLength(request) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN)
else if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel payload too big');
// This may throw if closed or remote side ended.
this._producerSocket.write(ns1);
this._producerSocket.write(ns2);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(request)).buffer));
this._producerSocket.write(request);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
return new Promise((pResolve, pReject) => {
const timeout = 1000 * (15 + (0.1 * this._sents.size));
const sent = {
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"awaitqueue": "^2.3.3",
"debug": "^4.3.2",
"h264-profile-level-id": "^1.0.1",
"netstring": "^0.3.0",
"random-number": "^0.0.9",
"supports-color": "^9.0.2",
"uuid": "^8.3.2"
Expand Down
4 changes: 4 additions & 0 deletions rust/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

# Unreleased

* Fix for receiving data over payload channel

# 0.8.5

* Fix types for `round_trip_time` and `bitrate_by_layer` fields `ProducerStat` and `ConsumerStat`
Expand Down
8 changes: 8 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ actix = "0.12.0"
actix-web = "4.0.0-beta.8"
actix-web-actors = "4.0.0-beta.6"
env_logger = "0.9.0"

[dev-dependencies.criterion]
version = "0.3.5"
features = ["async_futures"]

[[bench]]
name = "direct_data"
harness = false
Loading

0 comments on commit af15b52

Please sign in to comment.