Skip to content

Commit

Permalink
fix: wait for all packets to be sent before closing the WebSocket con…
Browse files Browse the repository at this point in the history
…nection

This reverts commit [1], which was included in `engine.io@5.1.0` and
`socket.io@4.1.0`.

The WebSocket connection was closed before all packets were written
out, so for example when calling `socket.disconnect(true)` on the
Socket.IO server (which disconnect from all namespaces and close the
connection), the client would receive only the first disconnect packet
and kept trying to reconnect to the other namespaces.

The only difference with the previous implementation (pre 5.1.0) is
that the "drain" event gets only called once at the end, and not after
each packet.

[1]: ad5306a

Related: #648
  • Loading branch information
darrachequesne committed Jan 10, 2023
1 parent ed87609 commit a65a047
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 67 deletions.
11 changes: 10 additions & 1 deletion lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,18 @@ export class Socket extends EventEmitter {
this.readyState = "closing";

if (this.writeBuffer.length) {
this.once("drain", this.closeTransport.bind(this, discard));
debug(
"there are %d remaining packets in the buffer, waiting for the 'drain' event",
this.writeBuffer.length
);
this.once("drain", () => {
debug("all packets have been sent, closing the transport");
this.closeTransport(discard);
});
return;
}

debug("the buffer is empty, closing the transport right away", discard);
this.closeTransport(discard);
}

Expand All @@ -574,6 +582,7 @@ export class Socket extends EventEmitter {
* @api private
*/
private closeTransport(discard) {
debug("closing the transport (discard? %s)", discard);
if (discard) this.transport.discard();
this.transport.close(this.onClose.bind(this, "forced close"));
}
Expand Down
50 changes: 23 additions & 27 deletions lib/transports-uws/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,32 @@ export class WebSocket extends Transport {
* @api private
*/
send(packets) {
const packet = packets.shift();
if (typeof packet === "undefined") {
this.writable = true;
this.emit("drain");
return;
}
this.writable = false;

// always creates a new object since ws modifies it
const opts: { compress?: boolean } = {};
if (packet.options) {
opts.compress = packet.options.compress;
}
for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const isLast = i + 1 === packets.length;

const send = (data) => {
const isBinary = typeof data !== "string";
const compress =
this.perMessageDeflate &&
Buffer.byteLength(data) > this.perMessageDeflate.threshold;
const send = (data) => {
const isBinary = typeof data !== "string";
const compress =
this.perMessageDeflate &&
Buffer.byteLength(data) > this.perMessageDeflate.threshold;

debug('writing "%s"', data);
this.writable = false;
debug('writing "%s"', data);
this.socket.send(data, isBinary, compress);

this.socket.send(data, isBinary, compress);
this.send(packets);
};
if (isLast) {
this.writable = true;
this.emit("drain");
}
};

if (packet.options && typeof packet.options.wsPreEncoded === "string") {
send(packet.options.wsPreEncoded);
} else {
this.parser.encodePacket(packet, this.supportsBinary, send);
if (packet.options && typeof packet.options.wsPreEncoded === "string") {
send(packet.options.wsPreEncoded);
} else {
this.parser.encodePacket(packet, this.supportsBinary, send);
}
}
}

Expand All @@ -94,7 +90,7 @@ export class WebSocket extends Transport {
doClose(fn) {
debug("closing");
fn && fn();
// call fn first since socket.close() immediately emits a "close" event
this.socket.close();
// call fn first since socket.end() immediately emits a "close" event
this.socket.end();
}
}
73 changes: 37 additions & 36 deletions lib/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,47 +61,48 @@ export class WebSocket extends Transport {
* @api private
*/
send(packets) {
const packet = packets.shift();
if (typeof packet === "undefined") {
this.writable = true;
this.emit("drain");
return;
}
this.writable = false;

// always creates a new object since ws modifies it
const opts: { compress?: boolean } = {};
if (packet.options) {
opts.compress = packet.options.compress;
}
for (let i = 0; i < packets.length; i++) {
const packet = packets[i];
const isLast = i + 1 === packets.length;

const send = (data) => {
if (this.perMessageDeflate) {
const len =
"string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < this.perMessageDeflate.threshold) {
opts.compress = false;
}
// always creates a new object since ws modifies it
const opts: { compress?: boolean } = {};
if (packet.options) {
opts.compress = packet.options.compress;
}
debug('writing "%s"', data);
this.writable = false;

this.socket.send(data, opts, (err) => {
if (err) return this.onError("write error", err.stack);
this.send(packets);
});
};
const onSent = (err) => {
if (err) {
return this.onError("write error", err.stack);
} else if (isLast) {
this.writable = true;
this.emit("drain");
}
};

if (packet.options && typeof packet.options.wsPreEncoded === "string") {
send(packet.options.wsPreEncoded);
} else if (this._canSendPreEncodedFrame(packet)) {
// the WebSocket frame was computed with WebSocket.Sender.frame()
// see https://github.com/websockets/ws/issues/617#issuecomment-283002469
this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, (err) => {
if (err) return this.onError("write error", err.stack);
this.send(packets);
});
} else {
this.parser.encodePacket(packet, this.supportsBinary, send);
const send = (data) => {
if (this.perMessageDeflate) {
const len =
"string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < this.perMessageDeflate.threshold) {
opts.compress = false;
}
}
debug('writing "%s"', data);
this.socket.send(data, opts, onSent);
};

if (packet.options && typeof packet.options.wsPreEncoded === "string") {
send(packet.options.wsPreEncoded);
} else if (this._canSendPreEncodedFrame(packet)) {
// the WebSocket frame was computed with WebSocket.Sender.frame()
// see https://github.com/websockets/ws/issues/617#issuecomment-283002469
this.socket._sender.sendFrame(packet.options.wsPreEncodedFrame, onSent);
} else {
this.parser.encodePacket(packet, this.supportsBinary, send);
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -1797,9 +1797,7 @@ describe("server", () => {
conn.send("a");
conn.send("b");
conn.send("c");
setTimeout(() => {
conn.close();
}, 50);
conn.close();
});

socket.on("open", () => {
Expand Down

0 comments on commit a65a047

Please sign in to comment.