From 1b320118295938068c7fc848403ab545b943ef7f Mon Sep 17 00:00:00 2001 From: Branislav Katreniak Date: Fri, 30 Apr 2021 15:28:49 +0200 Subject: [PATCH] websocket: fix write back-pressure Websocket transport is eagerly writing to underlaying websocket without respecting back-pressure. When an event is emitted to multiple clients, socket.io adapter sends the same packet object to all socket clients. These packet objects are shared for all clients inside room. Once the packet is sent to transport, transport prepares buffer with transport headers and packet data and the sharing among clients is lost. This change significantly reduces memory usage when many packets are emitted to many clients in a burst. This change causes that buffered data is sent to clients more evenly packet by packet. --- lib/transports/websocket.js | 46 +++++++++++++++++++------------------ test/server.js | 4 +++- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/lib/transports/websocket.js b/lib/transports/websocket.js index 500d7ad4..a792083b 100644 --- a/lib/transports/websocket.js +++ b/lib/transports/websocket.js @@ -63,33 +63,35 @@ class WebSocket extends Transport { * @api private */ send(packets) { - for (let i = 0; i < packets.length; i++) { - const packet = packets[i]; + const packet = packets.shift(); + if (typeof packet === 'undefined') { + this.writable = true; + this.emit("drain"); + return; + } - // always creates a new object since ws modifies it - const opts = {}; - if (packet.options) { - opts.compress = packet.options.compress; - } + // always creates a new object since ws modifies it + const opts = {}; + if (packet.options) { + opts.compress = packet.options.compress; + } - this.parser.encodePacket(packet, this.supportsBinary, data => { - if (this.perMessageDeflate) { - const len = - "string" === typeof data ? Buffer.byteLength(data) : data.length; - if (len < this.perMessageDeflate.threshold) { - opts.compress = false; - } + this.parser.encodePacket(packet, this.supportsBinary, data => { + if (this.perMessageDeflate) { + const len = + "string" === typeof data ? Buffer.byteLength(data) : data.length; + if (len < this.perMessageDeflate.threshold) { + opts.compress = false; } - debug('writing "%s"', data); - this.writable = false; + } + debug('writing "%s"', data); + this.writable = false; - this.socket.send(data, opts, err => { - if (err) return this.onError("write error", err.stack); - this.writable = true; - this.emit("drain"); - }); + this.socket.send(data, opts, err => { + if (err) return this.onError("write error", err.stack); + this.send(packets); }); - } + }); } /** diff --git a/test/server.js b/test/server.js index c72ac077..758897c4 100644 --- a/test/server.js +++ b/test/server.js @@ -1681,7 +1681,9 @@ describe("server", () => { conn.send("a"); conn.send("b"); conn.send("c"); - conn.close(); + setTimeout(() => { + conn.close(); + }, 50); }); socket.on("open", () => {