Skip to content

Commit

Permalink
websocket: fix write back-pressure
Browse files Browse the repository at this point in the history
Websocket transport is eagerly writing to underlaying websocket
without respecting back-pressure.
When an event is emitted to multiple clients, socket.io adapter
sends the same packet object to all socket clients.
These packet objects are shared for all clients inside room.

Once the packet is sent to transport,
transport prepares buffer with transport headers and packet data
and the sharing among clients is lost.

This change significantly reduces memory usage when
many packets are emitted to many clients in a burst.

    This change causes that buffered data is sent to clients
    more evenly packet by packet.
  • Loading branch information
Branislav Katreniak committed Apr 30, 2021
1 parent 4c0aa73 commit 1b32011
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
46 changes: 24 additions & 22 deletions lib/transports/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,35 @@ class WebSocket extends Transport {
* @api private
*/
send(packets) {
for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const packet = packets.shift();
if (typeof packet === 'undefined') {
this.writable = true;
this.emit("drain");
return;
}

// always creates a new object since ws modifies it
const opts = {};
if (packet.options) {
opts.compress = packet.options.compress;
}
// always creates a new object since ws modifies it
const opts = {};
if (packet.options) {
opts.compress = packet.options.compress;
}

this.parser.encodePacket(packet, this.supportsBinary, data => {
if (this.perMessageDeflate) {
const len =
"string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < this.perMessageDeflate.threshold) {
opts.compress = false;
}
this.parser.encodePacket(packet, this.supportsBinary, data => {
if (this.perMessageDeflate) {
const len =
"string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < this.perMessageDeflate.threshold) {
opts.compress = false;
}
debug('writing "%s"', data);
this.writable = false;
}
debug('writing "%s"', data);
this.writable = false;

this.socket.send(data, opts, err => {
if (err) return this.onError("write error", err.stack);
this.writable = true;
this.emit("drain");
});
this.socket.send(data, opts, err => {
if (err) return this.onError("write error", err.stack);
this.send(packets);
});
}
});
}

/**
Expand Down
4 changes: 3 additions & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,9 @@ describe("server", () => {
conn.send("a");
conn.send("b");
conn.send("c");
conn.close();
setTimeout(() => {
conn.close();
}, 50);
});

socket.on("open", () => {
Expand Down

0 comments on commit 1b32011

Please sign in to comment.