From f66de114a4581b692da759015def0373c619aab7 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Mon, 29 Nov 2021 23:54:46 +0100 Subject: [PATCH] feat: send response to the requesting node only Previously, a request would be sent to all listening nodes, on channel `${key}-request#${nsp}#` (e.g. "socket.io-request#/#"), and the other nodes would respond on a common channel `${key}-response#${nsp}#`, so every node get the response, instead of only the requesting node. This commit adds a new option: "publishOnSpecificResponseChannel". If it's set to true, then the other nodes will now respond on `${key}-response#${nsp}#${uid}#`, which is the channel specific to the requesting node, thus reducing the noise for the other nodes. To upgrade an existing deployment, users will need to upgrade all nodes to the latest version with publishOnSpecificResponseChannel = false, and then toggle the option on each node. Note: the option will default to true in the next major release Related: https://github.com/socketio/socket.io-redis-adapter/issues/407 --- lib/index.ts | 54 +++++++++++++++++++++++++++++++++++++++++++-------- package.json | 3 ++- test/index.ts | 7 ++++++- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index d60a11d..efa71c4 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -42,6 +42,18 @@ export interface RedisAdapterOptions { * @default 5000 */ requestsTimeout: number; + /** + * Whether to publish a response to the channel specific to the requesting node. + * + * - if true, the response will be published to `${key}-request#${nsp}#${uid}#` + * - if false, the response will be published to `${key}-request#${nsp}#` + * + * This option currently defaults to false for backward compatibility, but will be set to true in the next major + * release. + * + * @default false + */ + publishOnSpecificResponseChannel: boolean; } /** @@ -66,6 +78,7 @@ export function createAdapter( export class RedisAdapter extends Adapter { public readonly uid; public readonly requestsTimeout: number; + public readonly publishOnSpecificResponseChannel: boolean; private readonly channel: string; private readonly requestChannel: string; @@ -92,12 +105,14 @@ export class RedisAdapter extends Adapter { this.uid = uid2(6); this.requestsTimeout = opts.requestsTimeout || 5000; + this.publishOnSpecificResponseChannel = !!opts.publishOnSpecificResponseChannel; const prefix = opts.key || "socket.io"; this.channel = prefix + "#" + nsp.name + "#"; this.requestChannel = prefix + "-request#" + this.nsp.name + "#"; this.responseChannel = prefix + "-response#" + this.nsp.name + "#"; + const specificResponseChannel = this.responseChannel + this.uid + "#"; const onError = (err) => { if (err) { @@ -115,7 +130,7 @@ export class RedisAdapter extends Adapter { true ); this.subClient.subscribe( - [this.requestChannel, this.responseChannel], + [this.requestChannel, this.responseChannel, specificResponseChannel], (msg, channel) => { this.onrequest(channel, msg); } @@ -125,7 +140,7 @@ export class RedisAdapter extends Adapter { this.subClient.on("pmessageBuffer", this.onmessage.bind(this)); this.subClient.subscribe( - [this.requestChannel, this.responseChannel], + [this.requestChannel, this.responseChannel, specificResponseChannel], onError ); this.subClient.on("messageBuffer", this.onrequest.bind(this)); @@ -217,7 +232,7 @@ export class RedisAdapter extends Adapter { sockets: [...sockets], }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.ALL_ROOMS: @@ -230,7 +245,7 @@ export class RedisAdapter extends Adapter { rooms: [...this.rooms.keys()], }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.REMOTE_JOIN: @@ -253,7 +268,7 @@ export class RedisAdapter extends Adapter { requestId: request.requestId, }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.REMOTE_LEAVE: @@ -276,7 +291,7 @@ export class RedisAdapter extends Adapter { requestId: request.requestId, }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.REMOTE_DISCONNECT: @@ -299,7 +314,7 @@ export class RedisAdapter extends Adapter { requestId: request.requestId, }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.REMOTE_FETCH: @@ -327,7 +342,7 @@ export class RedisAdapter extends Adapter { }), }); - this.pubClient.publish(this.responseChannel, response); + this.publishResponse(request, response); break; case RequestType.SERVER_SIDE_EMIT: @@ -366,6 +381,20 @@ export class RedisAdapter extends Adapter { } } + /** + * Send the response to the requesting node + * @param request + * @param response + * @private + */ + private publishResponse(request, response) { + const responseChannel = this.publishOnSpecificResponseChannel + ? `${this.responseChannel}${request.uid}#` + : this.responseChannel; + debug("publishing response to channel %s", responseChannel); + this.pubClient.publish(responseChannel, response); + } + /** * Called on response from another node * @@ -510,6 +539,7 @@ export class RedisAdapter extends Adapter { const requestId = uid2(6); const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.SOCKETS, rooms: [...rooms], @@ -554,6 +584,7 @@ export class RedisAdapter extends Adapter { const requestId = uid2(6); const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.ALL_ROOMS, }); @@ -598,6 +629,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.REMOTE_JOIN, sid: id, @@ -641,6 +673,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.REMOTE_LEAVE, sid: id, @@ -684,6 +717,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.REMOTE_DISCONNECT, sid: id, @@ -729,6 +763,7 @@ export class RedisAdapter extends Adapter { const requestId = uid2(6); const request = JSON.stringify({ + uid: this.uid, requestId, type: RequestType.REMOTE_FETCH, opts: { @@ -766,6 +801,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, type: RequestType.REMOTE_JOIN, opts: { rooms: [...opts.rooms], @@ -783,6 +819,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, type: RequestType.REMOTE_LEAVE, opts: { rooms: [...opts.rooms], @@ -800,6 +837,7 @@ export class RedisAdapter extends Adapter { } const request = JSON.stringify({ + uid: this.uid, type: RequestType.REMOTE_DISCONNECT, opts: { rooms: [...opts.rooms], diff --git a/package.json b/package.json index bc82a97..5410bc9 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,9 @@ "main": "./dist/index.js", "types": "./dist/index.d.ts", "scripts": { - "test": "npm run format:check && tsc && npm run test:redis-v4 && npm run test:redis-v3 && npm run test:ioredis", + "test": "npm run format:check && tsc && npm run test:redis-v4 && npm run test:redis-v4-specific-channel && npm run test:redis-v3 && npm run test:ioredis", "test:redis-v4": "nyc mocha --bail --require ts-node/register test/index.ts", + "test:redis-v4-specific-channel": "SPECIFIC_CHANNEL=1 nyc mocha --bail --require ts-node/register test/index.ts", "test:redis-v3": "REDIS_CLIENT=redis-v3 nyc mocha --bail --require ts-node/register test/index.ts", "test:ioredis": "REDIS_CLIENT=ioredis nyc mocha --bail --require ts-node/register test/index.ts", "format:check": "prettier --parser typescript --check 'lib/**/*.ts' 'test/**/*.ts'", diff --git a/test/index.ts b/test/index.ts index 6a0896e..1351c20 100644 --- a/test/index.ts +++ b/test/index.ts @@ -403,7 +403,12 @@ function _create() { return async (nsp, fn?) => { const httpServer = createServer(); const sio = new Server(httpServer); - sio.adapter(createAdapter(await createClient(), await createClient())); + sio.adapter( + createAdapter(await createClient(), await createClient(), { + publishOnSpecificResponseChannel: + process.env.SPECIFIC_CHANNEL !== undefined, + }) + ); httpServer.listen((err) => { if (err) throw err; // abort tests if ("function" == typeof nsp) {