From 8b204570a94979bbec307f23ca078f30f5cf07b0 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 31 Mar 2022 07:36:41 +0200 Subject: [PATCH] feat: broadcast and expect multiple acks Syntax: ```js io.timeout(1000).emit("some-event", (err, responses) => { // ... }); ``` The adapter exposes two additional methods: - `broadcastWithAck(packets, opts, clientCountCallback, ack)` Similar to `broadcast(packets, opts)`, but: * `clientCountCallback()` is called with the number of clients that received the packet (can be called several times in a cluster) * `ack()` is called for each client response - `serverCount()` It returns the number of Socket.IO servers in the cluster (1 for the in-memory adapter). Those two methods will be implemented in the other adapters (Redis, Postgres, MongoDB, ...). Related: - https://github.com/socketio/socket.io/issues/1811 - https://github.com/socketio/socket.io/issues/4163 - https://github.com/socketio/socket.io-redis-adapter/issues/445 --- lib/broadcast-operator.ts | 86 +++++++++++++++++++++++++++-- lib/index.ts | 17 ++++++ lib/namespace.ts | 17 ++++++ lib/socket.ts | 2 +- package-lock.json | 15 ++--- package.json | 2 +- test/socket.io.ts | 113 ++++++++++++++++++++++++++++++++++++++ test/support/util.ts | 2 +- 8 files changed, 238 insertions(+), 16 deletions(-) diff --git a/lib/broadcast-operator.ts b/lib/broadcast-operator.ts index 42b8f1545c..6339aaf1ae 100644 --- a/lib/broadcast-operator.ts +++ b/lib/broadcast-operator.ts @@ -129,6 +129,29 @@ export class BroadcastOperator ); } + /** + * Adds a timeout in milliseconds for the next operation + * + *

+   *
+   * io.timeout(1000).emit("some-event", (err, responses) => {
+   *   // ...
+   * });
+   *
+   * 
+ * + * @param timeout + */ + public timeout(timeout: number) { + const flags = Object.assign({}, this.flags, { timeout }); + return new BroadcastOperator( + this.adapter, + this.rooms, + this.exceptRooms, + flags + ); + } + /** * Emits to all clients. * @@ -149,14 +172,65 @@ export class BroadcastOperator data: data, }; - if ("function" == typeof data[data.length - 1]) { - throw new Error("Callbacks are not supported when broadcasting"); + const withAck = typeof data[data.length - 1] === "function"; + + if (!withAck) { + this.adapter.broadcast(packet, { + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }); + + return true; } - this.adapter.broadcast(packet, { - rooms: this.rooms, - except: this.exceptRooms, - flags: this.flags, + const ack = data.pop() as (...args: any[]) => void; + let timedOut = false; + let responses: any[] = []; + + const timer = setTimeout(() => { + timedOut = true; + ack.apply(this, [new Error("operation has timed out"), responses]); + }, this.flags.timeout); + + let expectedServerCount = -1; + let actualServerCount = 0; + let expectedClientCount = 0; + + const checkCompleteness = () => { + if ( + !timedOut && + expectedServerCount === actualServerCount && + responses.length === expectedClientCount + ) { + clearTimeout(timer); + ack.apply(this, [null, responses]); + } + }; + + this.adapter.broadcastWithAck( + packet, + { + rooms: this.rooms, + except: this.exceptRooms, + flags: this.flags, + }, + (clientCount) => { + // each Socket.IO server in the cluster sends the number of clients that were notified + expectedClientCount += clientCount; + actualServerCount++; + checkCompleteness(); + }, + (clientResponse) => { + // each client sends an acknowledgement + responses.push(clientResponse); + checkCompleteness(); + } + ); + + this.adapter.serverCount().then((serverCount) => { + expectedServerCount = serverCount; + checkCompleteness(); }); return true; diff --git a/lib/index.ts b/lib/index.ts index 1aee313fa1..27916f0b10 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -772,6 +772,23 @@ export class Server< return this.sockets.local; } + /** + * Adds a timeout in milliseconds for the next operation + * + *

+   *
+   * io.timeout(1000).emit("some-event", (err, responses) => {
+   *   // ...
+   * });
+   *
+   * 
+ * + * @param timeout + */ + public timeout(timeout: number) { + return this.sockets.timeout(timeout); + } + /** * Returns the matching socket instances * diff --git a/lib/namespace.ts b/lib/namespace.ts index c03d73f629..bec55c1623 100644 --- a/lib/namespace.ts +++ b/lib/namespace.ts @@ -379,6 +379,23 @@ export class Namespace< return new BroadcastOperator(this.adapter).local; } + /** + * Adds a timeout in milliseconds for the next operation + * + *

+   *
+   * io.timeout(1000).emit("some-event", (err, responses) => {
+   *   // ...
+   * });
+   *
+   * 
+ * + * @param timeout + */ + public timeout(timeout: number) { + return new BroadcastOperator(this.adapter).timeout(timeout); + } + /** * Returns the matching socket instances * diff --git a/lib/socket.ts b/lib/socket.ts index e5e43dbd72..c748d5d681 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -140,7 +140,7 @@ export class Socket< private readonly adapter: Adapter; private acks: Map void> = new Map(); private fns: Array<(event: Event, next: (err?: Error) => void) => void> = []; - private flags: BroadcastFlags & { timeout?: number } = {}; + private flags: BroadcastFlags = {}; private _anyListeners?: Array<(...args: any[]) => void>; /** diff --git a/package-lock.json b/package-lock.json index 3fda73bd87..f82c8485b0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "base64id": "~2.0.0", "debug": "~4.3.2", "engine.io": "~6.1.2", - "socket.io-adapter": "~2.3.3", + "socket.io-adapter": "~2.4.0", "socket.io-parser": "~4.0.4" }, "devDependencies": { @@ -3116,9 +3116,9 @@ } }, "node_modules/socket.io-adapter": { - "version": "2.3.3", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.3.tgz", - "integrity": "sha512-Qd/iwn3VskrpNO60BeRyCyr8ZWw9CPZyitW4AQwmRZ8zCiyDiL+znRnWX6tDHXnWn1sJrM1+b6Mn6wEDJJ4aYQ==" + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.4.0.tgz", + "integrity": "sha512-W4N+o69rkMEGVuk2D/cvca3uYsvGlMwsySWV447y99gUPghxq42BxqLNMndb+a1mm/5/7NeXVQS7RLa2XyXvYg==" }, "node_modules/socket.io-client": { "version": "4.4.1", @@ -6253,9 +6253,9 @@ "dev": true }, "socket.io-adapter": { - "version": "2.3.3", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.3.tgz", - "integrity": "sha512-Qd/iwn3VskrpNO60BeRyCyr8ZWw9CPZyitW4AQwmRZ8zCiyDiL+znRnWX6tDHXnWn1sJrM1+b6Mn6wEDJJ4aYQ==" + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.4.0.tgz", + "integrity": "sha512-W4N+o69rkMEGVuk2D/cvca3uYsvGlMwsySWV447y99gUPghxq42BxqLNMndb+a1mm/5/7NeXVQS7RLa2XyXvYg==" }, "socket.io-client": { "version": "4.4.1", @@ -6670,6 +6670,7 @@ }, "uWebSockets.js": { "version": "git+https://git@github.com/uNetworking/uWebSockets.js.git#4558ee00f9f1f686fffe1accbfc2e85b1af9c50f", + "integrity": "sha512-I+2JIZFeqYDQN9OfCRvVF0GgFqwT9x/Nue+eMGoWzp03wHbXXoFhCvmrrxOkbsNhdPT+HO3xIxtLhevurKX2yg==", "dev": true, "from": "uWebSockets.js@github:uNetworking/uWebSockets.js#v20.0.0" }, diff --git a/package.json b/package.json index 3f186c00aa..b0c4baadca 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "base64id": "~2.0.0", "debug": "~4.3.2", "engine.io": "~6.1.2", - "socket.io-adapter": "~2.3.3", + "socket.io-adapter": "~2.4.0", "socket.io-parser": "~4.0.4" }, "devDependencies": { diff --git a/test/socket.io.ts b/test/socket.io.ts index 1dd869b0d9..b3973378f5 100644 --- a/test/socket.io.ts +++ b/test/socket.io.ts @@ -2519,6 +2519,119 @@ describe("socket.io", () => { }); }); }); + + it("should broadcast and expect multiple acknowledgements", (done) => { + const srv = createServer(); + const sio = new Server(srv); + + srv.listen(async () => { + const socket1 = client(srv, { multiplex: false }); + const socket2 = client(srv, { multiplex: false }); + const socket3 = client(srv, { multiplex: false }); + + await Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + waitFor(socket3, "connect"), + ]); + + socket1.on("some event", (cb) => { + cb(1); + }); + + socket2.on("some event", (cb) => { + cb(2); + }); + + socket3.on("some event", (cb) => { + cb(3); + }); + + sio.timeout(2000).emit("some event", (err, responses) => { + expect(err).to.be(null); + expect(responses).to.have.length(3); + expect(responses).to.contain(1, 2, 3); + + done(); + }); + }); + }); + + it("should fail when a client does not acknowledge the event in the given delay", (done) => { + const srv = createServer(); + const sio = new Server(srv); + + srv.listen(async () => { + const socket1 = client(srv, { multiplex: false }); + const socket2 = client(srv, { multiplex: false }); + const socket3 = client(srv, { multiplex: false }); + + await Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + waitFor(socket3, "connect"), + ]); + + socket1.on("some event", (cb) => { + cb(1); + }); + + socket2.on("some event", (cb) => { + cb(2); + }); + + socket3.on("some event", (cb) => { + // timeout + }); + + sio.timeout(200).emit("some event", (err, responses) => { + expect(err).to.be.an(Error); + expect(responses).to.have.length(2); + expect(responses).to.contain(1, 2); + + done(); + }); + }); + }); + + it("should broadcast and return if the packet is sent to 0 client", (done) => { + const srv = createServer(); + const sio = new Server(srv); + + srv.listen(async () => { + const socket1 = client(srv, { multiplex: false }); + const socket2 = client(srv, { multiplex: false }); + const socket3 = client(srv, { multiplex: false }); + + await Promise.all([ + waitFor(socket1, "connect"), + waitFor(socket2, "connect"), + waitFor(socket3, "connect"), + ]); + + socket1.on("some event", () => { + done(new Error("should not happen")); + }); + + socket2.on("some event", () => { + done(new Error("should not happen")); + }); + + socket3.on("some event", () => { + done(new Error("should not happen")); + }); + + sio + .to("room123") + .timeout(200) + .emit("some event", (err, responses) => { + expect(err).to.be(null); + expect(responses).to.have.length(0); + + done(); + }); + }); + }); }); describe("middleware", () => { diff --git a/test/support/util.ts b/test/support/util.ts index 5de5cf825a..87672b8204 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -12,7 +12,7 @@ const i = expect.stringify; // add support for Set/Map const contain = expect.Assertion.prototype.contain; expect.Assertion.prototype.contain = function (...args) { - if (typeof this.obj === "object") { + if (this.obj instanceof Set || this.obj instanceof Map) { args.forEach((obj) => { this.assert( this.obj.has(obj),