Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/v3' into memory-optimizations
Browse files Browse the repository at this point in the history
# Conflicts:
#	worker/src/RTC/Transport.cpp
  • Loading branch information
nazar-pc committed Nov 17, 2021
2 parents 57187ed + 20d11ff commit 5833d50
Show file tree
Hide file tree
Showing 70 changed files with 11,455 additions and 1,392 deletions.
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
# Changelog


### WIP
### 3.9.2

* `pipeToRouter()`: Reuse same `PipeTransport` when possible (PR #697).
* Add `worker.died` boolean getter.
* Update TypeScript version to 4.X.X and use `target: "esnext"` so transpilation of ECMAScript private fields (`#xxxxx`) don't use `WeakMaps` tricks but use standard syntax instead.
* Use more than one core for compilation on Windows (PR #709).
* `Consumer`: Modification of bitrate allocation algorithm (PR #708).
* Update NPM deps.


### 3.9.1

* NixOS friendly build process (PR #683).
* `Worker: emit "died" event before observer "close" (PR #684).
* Transport: Hide debug message for RTX RTCP-RR packets (PR #688).
* Update `libuv to 1.42.0.
* Improve Windows support (PR #692).
* Avoid build commands when MEDIASOUP_WORKER_BIN is set (PR #695).
* Update NPM deps.


### 3.9.0

* Replaces GYP build system with fully-functional Meson build system (PR #622).
* Worker communication optimization (aka removing netstring dependency) (PR #644).
Expand Down
52 changes: 21 additions & 31 deletions node/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,27 +175,24 @@ const eslintConfig =
'strict' : 2,
'valid-typeof' : 2,
'yoda' : 2
}
},
overrides : []
};

switch (process.env.MEDIASOUP_NODE_LANGUAGE)
{
case 'typescript':
eslintConfig.overrides.push(
{
eslintConfig.parser = '@typescript-eslint/parser';
eslintConfig.plugins =
[
files : [ '*.ts' ],
parser : '@typescript-eslint/parser',
plugins : [
...eslintConfig.plugins,
'@typescript-eslint'
];
eslintConfig.extends =
[
],
extends : [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended'
];
eslintConfig.rules =
{
],
rules : {
...eslintConfig.rules,
'no-unused-vars' : 0,
'@typescript-eslint/ban-types' : 0,
Expand All @@ -219,27 +216,20 @@ switch (process.env.MEDIASOUP_NODE_LANGUAGE)
'@typescript-eslint/no-use-before-define' : [ 2, { functions: false } ],
'@typescript-eslint/no-empty-function' : 0,
'@typescript-eslint/no-non-null-assertion' : 0
};
}
});

break;
}

case 'javascript':
eslintConfig.overrides.push(
{
eslintConfig.env['jest/globals'] = true;
eslintConfig.plugins =
[
files : [ '*.js' ],
env : {
...eslintConfig.env,
'jest/globals' : true
},
plugins : [
...eslintConfig.plugins,
'jest'
];

break;
}

default:
{
throw new TypeError('wrong/missing MEDIASOUP_NODE_LANGUAGE env');
}
}
]
});

module.exports = eslintConfig;
1 change: 1 addition & 0 deletions node/lib/ActiveSpeakerObserver.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.ActiveSpeakerObserver = void 0;
const Logger_1 = require("./Logger");
const RtpObserver_1 = require("./RtpObserver");
const logger = new Logger_1.Logger('ActiveSpeakerObserver');
Expand Down
1 change: 1 addition & 0 deletions node/lib/AudioLevelObserver.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.AudioLevelObserver = void 0;
const Logger_1 = require("./Logger");
const RtpObserver_1 = require("./RtpObserver");
const logger = new Logger_1.Logger('AudioLevelObserver');
Expand Down
112 changes: 49 additions & 63 deletions node/lib/Channel.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
"use strict";
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, privateMap, value) {
if (!privateMap.has(receiver)) {
throw new TypeError("attempted to set private field on non-instance");
}
privateMap.set(receiver, value);
return value;
};
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, privateMap) {
if (!privateMap.has(receiver)) {
throw new TypeError("attempted to get private field on non-instance");
}
return privateMap.get(receiver);
};
var _closed, _producerSocket, _consumerSocket, _nextId, _sents, _recvBuffer;
Object.defineProperty(exports, "__esModule", { value: true });
exports.Channel = void 0;
const os = require("os");
const Logger_1 = require("./Logger");
const EnhancedEventEmitter_1 = require("./EnhancedEventEmitter");
Expand All @@ -24,55 +11,55 @@ const logger = new Logger_1.Logger('Channel');
const MESSAGE_MAX_LEN = 4194308;
const PAYLOAD_MAX_LEN = 4194304;
class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
// Closed flag.
#closed = false;
// Unix Socket instance for sending messages to the worker process.
#producerSocket;
// Unix Socket instance for receiving messages to the worker process.
#consumerSocket;
// Next id for messages sent to the worker process.
#nextId = 0;
// Map of pending sent requests.
#sents = new Map();
// Buffer for reading messages from the worker.
#recvBuffer = Buffer.alloc(0);
/**
* @private
*/
constructor({ producerSocket, consumerSocket, pid }) {
super();
// Closed flag.
_closed.set(this, false);
// Unix Socket instance for sending messages to the worker process.
_producerSocket.set(this, void 0);
// Unix Socket instance for receiving messages to the worker process.
_consumerSocket.set(this, void 0);
// Next id for messages sent to the worker process.
_nextId.set(this, 0);
// Map of pending sent requests.
_sents.set(this, new Map());
// Buffer for reading messages from the worker.
_recvBuffer.set(this, Buffer.alloc(0));
logger.debug('constructor()');
__classPrivateFieldSet(this, _producerSocket, producerSocket);
__classPrivateFieldSet(this, _consumerSocket, consumerSocket);
this.#producerSocket = producerSocket;
this.#consumerSocket = consumerSocket;
// Read Channel responses/notifications from the worker.
__classPrivateFieldGet(this, _consumerSocket).on('data', (buffer) => {
if (!__classPrivateFieldGet(this, _recvBuffer).length) {
__classPrivateFieldSet(this, _recvBuffer, buffer);
this.#consumerSocket.on('data', (buffer) => {
if (!this.#recvBuffer.length) {
this.#recvBuffer = buffer;
}
else {
__classPrivateFieldSet(this, _recvBuffer, Buffer.concat([__classPrivateFieldGet(this, _recvBuffer), buffer], __classPrivateFieldGet(this, _recvBuffer).length + buffer.length));
this.#recvBuffer = Buffer.concat([this.#recvBuffer, buffer], this.#recvBuffer.length + buffer.length);
}
if (__classPrivateFieldGet(this, _recvBuffer).length > PAYLOAD_MAX_LEN) {
if (this.#recvBuffer.length > PAYLOAD_MAX_LEN) {
logger.error('receiving buffer is full, discarding all data in it');
// Reset the buffer and exit.
__classPrivateFieldSet(this, _recvBuffer, Buffer.alloc(0));
this.#recvBuffer = Buffer.alloc(0);
return;
}
let msgStart = 0;
while (true) // eslint-disable-line no-constant-condition
{
const readLen = __classPrivateFieldGet(this, _recvBuffer).length - msgStart;
const readLen = this.#recvBuffer.length - msgStart;
if (readLen < 4) {
// Incomplete data.
break;
}
const dataView = new DataView(__classPrivateFieldGet(this, _recvBuffer).buffer, __classPrivateFieldGet(this, _recvBuffer).byteOffset + msgStart);
const dataView = new DataView(this.#recvBuffer.buffer, this.#recvBuffer.byteOffset + msgStart);
const msgLen = dataView.getUint32(0, littleEndian);
if (readLen < 4 + msgLen) {
// Incomplete data.
break;
}
const payload = __classPrivateFieldGet(this, _recvBuffer).subarray(msgStart + 4, msgStart + 4 + msgLen);
const payload = this.#recvBuffer.subarray(msgStart + 4, msgStart + 4 + msgLen);
msgStart += 4 + msgLen;
try {
// We can receive JSON messages (Channel messages) or log strings.
Expand Down Expand Up @@ -108,42 +95,42 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
}
}
if (msgStart != 0) {
__classPrivateFieldSet(this, _recvBuffer, __classPrivateFieldGet(this, _recvBuffer).slice(msgStart));
this.#recvBuffer = this.#recvBuffer.slice(msgStart);
}
});
__classPrivateFieldGet(this, _consumerSocket).on('end', () => (logger.debug('Consumer Channel ended by the worker process')));
__classPrivateFieldGet(this, _consumerSocket).on('error', (error) => (logger.error('Consumer Channel error: %s', String(error))));
__classPrivateFieldGet(this, _producerSocket).on('end', () => (logger.debug('Producer Channel ended by the worker process')));
__classPrivateFieldGet(this, _producerSocket).on('error', (error) => (logger.error('Producer Channel error: %s', String(error))));
this.#consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process')));
this.#consumerSocket.on('error', (error) => (logger.error('Consumer Channel error: %s', String(error))));
this.#producerSocket.on('end', () => (logger.debug('Producer Channel ended by the worker process')));
this.#producerSocket.on('error', (error) => (logger.error('Producer Channel error: %s', String(error))));
}
/**
* @private
*/
close() {
if (__classPrivateFieldGet(this, _closed))
if (this.#closed)
return;
logger.debug('close()');
__classPrivateFieldSet(this, _closed, true);
this.#closed = true;
// Close every pending sent.
for (const sent of __classPrivateFieldGet(this, _sents).values()) {
for (const sent of this.#sents.values()) {
sent.close();
}
// Remove event listeners but leave a fake 'error' hander to avoid
// propagation.
__classPrivateFieldGet(this, _consumerSocket).removeAllListeners('end');
__classPrivateFieldGet(this, _consumerSocket).removeAllListeners('error');
__classPrivateFieldGet(this, _consumerSocket).on('error', () => { });
__classPrivateFieldGet(this, _producerSocket).removeAllListeners('end');
__classPrivateFieldGet(this, _producerSocket).removeAllListeners('error');
__classPrivateFieldGet(this, _producerSocket).on('error', () => { });
this.#consumerSocket.removeAllListeners('end');
this.#consumerSocket.removeAllListeners('error');
this.#consumerSocket.on('error', () => { });
this.#producerSocket.removeAllListeners('end');
this.#producerSocket.removeAllListeners('error');
this.#producerSocket.on('error', () => { });
// Destroy the socket after a while to allow pending incoming messages.
setTimeout(() => {
try {
__classPrivateFieldGet(this, _producerSocket).destroy();
this.#producerSocket.destroy();
}
catch (error) { }
try {
__classPrivateFieldGet(this, _consumerSocket).destroy();
this.#consumerSocket.destroy();
}
catch (error) { }
}, 200);
Expand All @@ -152,29 +139,29 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
* @private
*/
async request(method, internal, data) {
__classPrivateFieldGet(this, _nextId) < 4294967295 ? __classPrivateFieldSet(this, _nextId, +__classPrivateFieldGet(this, _nextId) + 1) : (__classPrivateFieldSet(this, _nextId, 1));
const id = __classPrivateFieldGet(this, _nextId);
this.#nextId < 4294967295 ? ++this.#nextId : (this.#nextId = 1);
const id = this.#nextId;
logger.debug('request() [method:%s, id:%s]', method, id);
if (__classPrivateFieldGet(this, _closed))
if (this.#closed)
throw new errors_1.InvalidStateError('Channel closed');
const request = { id, method, internal, data };
const payload = JSON.stringify(request);
if (Buffer.byteLength(payload) > MESSAGE_MAX_LEN)
throw new Error('Channel request too big');
// This may throw if closed or remote side ended.
__classPrivateFieldGet(this, _producerSocket).write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
__classPrivateFieldGet(this, _producerSocket).write(payload);
this.#producerSocket.write(Buffer.from(Uint32Array.of(Buffer.byteLength(payload)).buffer));
this.#producerSocket.write(payload);
return new Promise((pResolve, pReject) => {
const sent = {
id: id,
method: method,
resolve: (data2) => {
if (!__classPrivateFieldGet(this, _sents).delete(id))
if (!this.#sents.delete(id))
return;
pResolve(data2);
},
reject: (error) => {
if (!__classPrivateFieldGet(this, _sents).delete(id))
if (!this.#sents.delete(id))
return;
pReject(error);
},
Expand All @@ -183,13 +170,13 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
}
};
// Add sent stuff to the map.
__classPrivateFieldGet(this, _sents).set(id, sent);
this.#sents.set(id, sent);
});
}
processMessage(msg) {
// If a response, retrieve its associated request.
if (msg.id) {
const sent = __classPrivateFieldGet(this, _sents).get(msg.id);
const sent = this.#sents.get(msg.id);
if (!sent) {
logger.error('received response does not match any sent request [id:%s]', msg.id);
return;
Expand Down Expand Up @@ -229,4 +216,3 @@ class Channel extends EnhancedEventEmitter_1.EnhancedEventEmitter {
}
}
exports.Channel = Channel;
_closed = new WeakMap(), _producerSocket = new WeakMap(), _consumerSocket = new WeakMap(), _nextId = new WeakMap(), _sents = new WeakMap(), _recvBuffer = new WeakMap();
Loading

0 comments on commit 5833d50

Please sign in to comment.