Skip to content

Commit

Permalink
fix: fix websocket and webtransport send callbacks (#699)
Browse files Browse the repository at this point in the history
With the `websocket` transport, the callbacks which indicate that the
packets are actually written were not properly called.

Example:

```js
socket.send("hello", () => {
  // the message has been written to the underlying transport
});
```

The bug was caused by the `websocket` transport (and `webtransport` as
well) having its `supportsFraming` property set to `true`, despite
having been changed in [1] to emit a single `drain` event for each
batch of messages written to the transport like the `polling` transport
always did. Note that although [1] is partially reverted in [2], the
new `drain` event behavior is preserved as called out in that commit's
message.

The `supportsFraming` attribute was introduced in [3] (amended by [4])
as a way to distinguish transports that emit one `drain` per message
from those that emit one `drain` per batch. Since the delivery of
`send` callbacks depends on matching `drain` events with
`transport.send` calls, that distinction is vital to correct behavior.

However, now that all transports have converged to "one `drain` per
batch" behavior, this `supportsFraming` property can be retired (and
the code for calling callbacks simplified).

[1]: #618
[2]: a65a047
[3]: #130
[4]: #132

Related: #698
  • Loading branch information
jonathanperret authored Jun 13, 2024
1 parent 79ea52d commit fc21c4a
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 61 deletions.
38 changes: 14 additions & 24 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface SendOptions {

type ReadyState = "opening" | "open" | "closing" | "closed";

type SendCallback = (transport: Transport) => void;

export class Socket extends EventEmitter {
public readonly protocol: number;
// TODO for the next major release: do not keep the reference to the first HTTP request, as it stays in memory
Expand All @@ -27,8 +29,8 @@ export class Socket extends EventEmitter {
private upgrading = false;
private upgraded = false;
private writeBuffer: Packet[] = [];
private packetsFn: Array<() => void> = [];
private sentCallbackFn: any[] = [];
private packetsFn: SendCallback[] = [];
private sentCallbackFn: SendCallback[][] = [];
private cleanupFn: any[] = [];
private pingTimeoutTimer;
private pingIntervalTimer;
Expand Down Expand Up @@ -395,19 +397,11 @@ export class Socket extends EventEmitter {
// the message was sent successfully, execute the callback
const onDrain = () => {
if (this.sentCallbackFn.length > 0) {
const seqFn = this.sentCallbackFn.splice(0, 1)[0];
if ("function" === typeof seqFn) {
debug("executing send callback");
seqFn(this.transport);
} else if (Array.isArray(seqFn)) {
debug("executing batch send callback");
const l = seqFn.length;
let i = 0;
for (; i < l; i++) {
if ("function" === typeof seqFn[i]) {
seqFn[i](this.transport);
}
}
debug("executing batch send callback");
const seqFn = this.sentCallbackFn.shift();
const l = seqFn.length;
for (let i = 0; i < l; i++) {
seqFn[i](this.transport);
}
}
};
Expand All @@ -428,7 +422,7 @@ export class Socket extends EventEmitter {
* @return {Socket} for chaining
* @api public
*/
public send(data: RawData, options?: SendOptions, callback?: () => void) {
public send(data: RawData, options?: SendOptions, callback?: SendCallback) {
this.sendPacket("message", data, options, callback);
return this;
}
Expand All @@ -440,7 +434,7 @@ export class Socket extends EventEmitter {
* @param options
* @param callback
*/
public write(data: RawData, options?: SendOptions, callback?: () => void) {
public write(data: RawData, options?: SendOptions, callback?: SendCallback) {
this.sendPacket("message", data, options, callback);
return this;
}
Expand All @@ -459,7 +453,7 @@ export class Socket extends EventEmitter {
type: PacketType,
data?: RawData,
options: SendOptions = {},
callback?: () => void
callback?: SendCallback
) {
if ("function" === typeof options) {
callback = options;
Expand All @@ -485,7 +479,7 @@ export class Socket extends EventEmitter {
this.writeBuffer.push(packet);

// add send callback to object, if defined
if (callback) this.packetsFn.push(callback);
if ("function" === typeof callback) this.packetsFn.push(callback);

this.flush();
}
Expand All @@ -507,11 +501,7 @@ export class Socket extends EventEmitter {
this.server.emit("flush", this, this.writeBuffer);
const wbuf = this.writeBuffer;
this.writeBuffer = [];
if (!this.transport.supportsFraming) {
this.sentCallbackFn.push(this.packetsFn);
} else {
this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
}
this.sentCallbackFn.push(this.packetsFn);
this.packetsFn = [];
this.transport.send(wbuf);
this.emit("drain");
Expand Down
5 changes: 0 additions & 5 deletions lib/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ export abstract class Transport extends EventEmitter {
this.emit("close");
}

/**
* Advertise framing support.
*/
abstract get supportsFraming();

/**
* The name of the transport.
*/
Expand Down
4 changes: 0 additions & 4 deletions lib/transports-uws/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ export class Polling extends Transport {
return "polling";
}

get supportsFraming() {
return false;
}

/**
* Overrides onRequest.
*
Expand Down
9 changes: 0 additions & 9 deletions lib/transports-uws/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ export class WebSocket extends Transport {
return true;
}

/**
* Advertise framing support.
*
* @api public
*/
get supportsFraming() {
return true;
}

/**
* Writes a packet payload.
*
Expand Down
4 changes: 0 additions & 4 deletions lib/transports/polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ export class Polling extends Transport {
return "polling";
}

get supportsFraming() {
return false;
}

/**
* Overrides onRequest.
*
Expand Down
9 changes: 0 additions & 9 deletions lib/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ export class WebSocket extends Transport {
return true;
}

/**
* Advertise framing support.
*
* @api public
*/
get supportsFraming() {
return true;
}

/**
* Writes a packet payload.
*
Expand Down
4 changes: 0 additions & 4 deletions lib/transports/webtransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ export class WebTransport extends Transport {
return "webtransport";
}

get supportsFraming() {
return true;
}

async send(packets) {
this.writable = false;

Expand Down
14 changes: 12 additions & 2 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -2759,13 +2759,23 @@ describe("server", () => {
});
});

it("should execute in multipart packet", (done) => {
it("should execute in multipart packet (websocket)", (done) => {
const engine = listen((port) => {
const socket = new ClientSocket(`ws://localhost:${port}`);
const socket = new ClientSocket(`ws://localhost:${port}`, {
transports: ["websocket"],
});
let i = 0;
let j = 0;

engine.on("connection", (conn) => {
conn.send("d", (transport) => {
i++;
});

conn.send("c", (transport) => {
i++;
});

conn.send("b", (transport) => {
i++;
});
Expand Down
15 changes: 15 additions & 0 deletions test/webtransport.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,21 @@ describe("WebTransport", () => {
});
});

it("should invoke send callbacks (server to client)", (done) => {
setup({}, async ({ engine, h3Server, socket, reader }) => {
const messageCount = 4;
let receivedCallbacks = 0;

for (let i = 0; i < messageCount; i++) {
socket.send("hello", () => {
if (++receivedCallbacks === messageCount) {
success(engine, h3Server, done);
}
});
}
});
});

it("should send some binary data (client to server)", (done) => {
setup({}, async ({ engine, h3Server, socket, writer }) => {
socket.on("data", (data) => {
Expand Down

0 comments on commit fc21c4a

Please sign in to comment.