Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: classify mux #731

Merged
merged 1 commit into from
Jun 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 96 additions & 99 deletions lib/mux.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,117 +13,114 @@ var assert = require('assert');
var schedule = (typeof setImmediate === 'function') ?
setImmediate : process.nextTick;

function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}

// There are 2 states we can be in:

// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,

// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event

// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
class Mux {
constructor (downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function () {
self.blocked = false;
self._readIncoming();
});
}

Mux.prototype._readIncoming = function() {
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
_readIncoming () {

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;

var accepting = true;
var out = this.out;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin (streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
roundrobin(this.newStreams);

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
}

var accepting = true;
var out = this.out;
_scheduleRead () {
var self = this;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
if (!self.scheduledRead) {
schedule(function () {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
}

roundrobin(this.newStreams);
pipeFrom (readable) {
var self = this;

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.

if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};

Mux.prototype._scheduleRead = function() {
var self = this;

if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
function enqueue () {
self.newStreams.push(readable);
self._scheduleRead();
}

Mux.prototype.pipeFrom = function(readable) {
var self = this;
function cleanup () {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe (dest) {
if (dest === self) cleanup();
}

function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
}

function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
unpipeFrom (readable) {
readable.emit('unpipeFrom', this);
}

readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};

Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
}

module.exports.Mux = Mux;