Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker communication optimization (aka removing netstring dependency) #644

Merged
merged 12 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/Channel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export declare class Channel extends EnhancedEventEmitter {
private readonly _consumerSocket;
private _nextId;
private readonly _sents;
private _recvBuffer?;
private _recvBuffer;
/**
* @private
*/
Expand Down
2 changes: 1 addition & 1 deletion lib/Channel.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 38 additions & 38 deletions lib/Channel.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
// @ts-ignore
const netstring = require("netstring");
const Logger_1 = require("./Logger");
const EnhancedEventEmitter_1 = require("./EnhancedEventEmitter");
const errors_1 = require("./errors");
const os = require("os");
const littleEndian = os.endianness() == 'LE';
const logger = new Logger_1.Logger('Channel');
// netstring length for a 4194304 bytes payload.
const NS_MESSAGE_MAX_LEN = 4194313;
const NS_PAYLOAD_MAX_LEN = 4194304;
// Binary length for a 4194304 bytes payload.
const MESSAGE_MAX_LEN = 4194308;
const PAYLOAD_MAX_LEN = 4194304;
class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
/**
* @private
Expand All @@ -21,77 +21,76 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._nextId = 0;
// Map of pending sent requests.
this._sents = new Map();
// Buffer for reading messages from the worker.
this._recvBuffer = Buffer.alloc(0);
logger.debug('constructor()');
this._producerSocket = producerSocket;
this._consumerSocket = consumerSocket;
// Read Channel responses/notifications from the worker.
this._consumerSocket.on('data', (buffer) => {
if (!this._recvBuffer) {
if (!this._recvBuffer.length) {
this._recvBuffer = buffer;
}
else {
this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);
}
if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data into it');
if (this._recvBuffer.length > PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data in it');
// Reset the buffer and exit.
this._recvBuffer = undefined;
this._recvBuffer = Buffer.alloc(0);
return;
}
let msgStart = 0;
while (true) // eslint-disable-line no-constant-condition
{
let nsPayload;
try {
nsPayload = netstring.nsPayload(this._recvBuffer);
const readLen = this._recvBuffer.length - msgStart;
if (readLen < 4) {
// Incomplete data.
break;
}
catch (error) {
logger.error('invalid netstring data received from the worker process: %s', String(error));
// Reset the buffer and exit.
this._recvBuffer = undefined;
return;
const dataView = new DataView(this._recvBuffer.buffer, this._recvBuffer.byteOffset + msgStart);
const msgLen = dataView.getUint32(0, littleEndian);
if (readLen < 4 + msgLen) {
// Incomplete data.
break;
}
// Incomplete netstring message.
if (nsPayload === -1)
return;
const payload = this._recvBuffer.subarray(msgStart + 4, msgStart + 4 + msgLen);
msgStart += 4 + msgLen;
try {
// We can receive JSON messages (Channel messages) or log strings.
switch (nsPayload[0]) {
// 123 = '{' (a Channel JSON messsage).
switch (payload[0]) {
// 123 = '{' (a Channel JSON message).
case 123:
this._processMessage(JSON.parse(nsPayload.toString('utf8')));
this._processMessage(JSON.parse(payload.toString('utf8')));
break;
// 68 = 'D' (a debug log).
case 68:
logger.debug(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
logger.debug(`[pid:${pid}] ${payload.toString('utf8', 1)}`);
break;
// 87 = 'W' (a warn log).
case 87:
logger.warn(`[pid:${pid}] ${nsPayload.toString('utf8', 1)}`);
logger.warn(`[pid:${pid}] ${payload.toString('utf8', 1)}`);
break;
// 69 = 'E' (an error log).
case 69:
logger.error(`[pid:${pid} ${nsPayload.toString('utf8', 1)}`);
logger.error(`[pid:${pid} ${payload.toString('utf8', 1)}`);
break;
// 88 = 'X' (a dump log).
case 88:
// eslint-disable-next-line no-console
console.log(nsPayload.toString('utf8', 1));
console.log(payload.toString('utf8', 1));
break;
default:
// eslint-disable-next-line no-console
console.warn(`worker[pid:${pid}] unexpected data: %s`, nsPayload.toString('utf8', 1));
console.warn(`worker[pid:${pid}] unexpected data: %s`, payload.toString('utf8', 1));
}
}
catch (error) {
logger.error('received invalid message from the worker process: %s', String(error));
}
// Remove the read payload from the buffer.
this._recvBuffer =
this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));
if (!this._recvBuffer.length) {
this._recvBuffer = undefined;
return;
}
}
if (msgStart != 0) {
this._recvBuffer = this._recvBuffer.slice(msgStart);
}
});
this._consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process')));
Expand Down Expand Up @@ -141,11 +140,12 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
if (this._closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const ns = netstring.nsWrite(JSON.stringify(request));
if (Buffer.byteLength(ns) > NS_MESSAGE_MAX_LEN)
const payload = JSON.stringify(request);
if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
// This may throw if closed or remote side ended.
this._producerSocket.write(ns);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
return new Promise((pResolve, pReject) => {
const timeout = 1000 * (15 + (0.1 * this._sents.size));
const sent = {
Expand Down
2 changes: 1 addition & 1 deletion lib/PayloadChannel.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export declare class PayloadChannel extends EnhancedEventEmitter {
private readonly _consumerSocket;
private _nextId;
private readonly _sents;
private _recvBuffer?;
private _recvBuffer;
private _ongoingNotification?;
/**
* @private
Expand Down
2 changes: 1 addition & 1 deletion lib/PayloadChannel.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 41 additions & 42 deletions lib/PayloadChannel.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
// @ts-ignore
const netstring = require("netstring");
const Logger_1 = require("./Logger");
const EnhancedEventEmitter_1 = require("./EnhancedEventEmitter");
const errors_1 = require("./errors");
const os = require("os");
const littleEndian = os.endianness() == 'LE';
const logger = new Logger_1.Logger('PayloadChannel');
// netstring length for a 4194304 bytes payload.
const NS_MESSAGE_MAX_LEN = 4194313;
const NS_PAYLOAD_MAX_LEN = 4194304;
// binary length for a 4194304 bytes payload.
const MESSAGE_MAX_LEN = 4194308;
const PAYLOAD_MAX_LEN = 4194304;
class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
/**
* @private
Expand All @@ -21,46 +21,45 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
this._nextId = 0;
// Map of pending sent requests.
this._sents = new Map();
// Buffer for reading messages from the worker.
this._recvBuffer = Buffer.alloc(0);
logger.debug('constructor()');
this._producerSocket = producerSocket;
this._consumerSocket = consumerSocket;
// Read PayloadChannel notifications from the worker.
this._consumerSocket.on('data', (buffer) => {
if (!this._recvBuffer) {
if (!this._recvBuffer.length) {
this._recvBuffer = buffer;
}
else {
this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);
}
if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data into it');
if (this._recvBuffer.length > PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data in it');
// Reset the buffer and exit.
this._recvBuffer = undefined;
this._recvBuffer = Buffer.alloc(0);
return;
}
let msgStart = 0;
while (true) // eslint-disable-line no-constant-condition
{
let nsPayload;
try {
nsPayload = netstring.nsPayload(this._recvBuffer);
const readLen = this._recvBuffer.length - msgStart;
if (readLen < 4) {
// Incomplete data.
break;
}
catch (error) {
logger.error('invalid netstring data received from the worker process: %s', String(error));
// Reset the buffer and exit.
this._recvBuffer = undefined;
return;
}
// Incomplete netstring message.
if (nsPayload === -1)
return;
this._processData(nsPayload);
// Remove the read payload from the buffer.
this._recvBuffer =
this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));
if (!this._recvBuffer.length) {
this._recvBuffer = undefined;
return;
const dataView = new DataView(this._recvBuffer.buffer, this._recvBuffer.byteOffset + msgStart);
const msgLen = dataView.getUint32(0, littleEndian);
if (readLen < 4 + msgLen) {
// Incomplete data.
break;
}
const payload = this._recvBuffer.subarray(msgStart + 4, msgStart + 4 + msgLen);
msgStart += 4 + msgLen;
this._processData(payload);
}
if (msgStart != 0) {
this._recvBuffer = this._recvBuffer.slice(msgStart);
}
});
this._consumerSocket.on('end', () => (logger.debug('Consumer PayloadChannel ended by the worker process')));
Expand Down Expand Up @@ -103,24 +102,24 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('notify() [event:%s]', event);
if (this._closed)
throw new errors_1.InvalidStateError('PayloadChannel closed');
const notification = { event, internal, data };
const ns1 = netstring.nsWrite(JSON.stringify(notification));
const ns2 = netstring.nsWrite(payload);
if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN)
const notification = JSON.stringify({ event, internal, data });
if (Buffer.byteLength(notification) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel notification too big');
else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN)
else if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel payload too big');
try {
// This may throw if closed or remote side ended.
this._producerSocket.write(ns1);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(notification)).buffer));
this._producerSocket.write(notification);
}
catch (error) {
logger.warn('notify() | sending notification failed: %s', String(error));
return;
}
try {
// This may throw if closed or remote side ended.
this._producerSocket.write(ns2);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
}
catch (error) {
logger.warn('notify() | sending payload failed: %s', String(error));
Expand All @@ -136,16 +135,16 @@ class PayloadChannel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
logger.debug('request() [method:%s, id:%s]', method, id);
if (this._closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const ns1 = netstring.nsWrite(JSON.stringify(request));
const ns2 = netstring.nsWrite(payload);
if (Buffer.byteLength(ns1) > NS_MESSAGE_MAX_LEN)
const request = JSON.stringify({ id, method, internal, data });
if (Buffer.byteLength(request) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
else if (Buffer.byteLength(ns2) > NS_MESSAGE_MAX_LEN)
else if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('PayloadChannel payload too big');
// This may throw if closed or remote side ended.
this._producerSocket.write(ns1);
this._producerSocket.write(ns2);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(request)).buffer));
this._producerSocket.write(request);
this._producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this._producerSocket.write(payload);
return new Promise((pResolve, pReject) => {
const timeout = 1000 * (15 + (0.1 * this._sents.size));
const sent = {
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"awaitqueue": "^2.3.3",
"debug": "^4.3.2",
"h264-profile-level-id": "^1.0.1",
"netstring": "^0.3.0",
"random-number": "^0.0.9",
"supports-color": "^9.0.2",
"uuid": "^8.3.2"
Expand Down
4 changes: 4 additions & 0 deletions rust/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

# Unreleased

* Fix for receiving data over payload channel

# 0.8.5

* Fix types for `round_trip_time` and `bitrate_by_layer` fields `ProducerStat` and `ConsumerStat`
Expand Down
8 changes: 8 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ actix = "0.12.0"
actix-web = "4.0.0-beta.8"
actix-web-actors = "4.0.0-beta.6"
env_logger = "0.9.0"

[dev-dependencies.criterion]
version = "0.3.5"
features = ["async_futures"]

[[bench]]
name = "direct_data"
harness = false
Loading