Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target specific client on clustered socket.io server and using acknowledgment callback #1811

Closed
peteruithoven opened this issue Oct 2, 2014 · 22 comments
Labels
enhancement New feature or request
Milestone

Comments

@peteruithoven
Copy link

I've clustered my socket.io server and I'm looking for a way to emit to a specific client and receiving a acknowledgment callback. The problem that the server doesn't have a reference to a instance of the target client, because this client can be connected to another socket.io server instance (because it's clustered).
I know I can emit to a room named after the the socket.id or a custom room where I put the client in. But the problem is that this is considered broadcasting and then I can't use a acknowledgment callback.
I don't want to add complexity to the target client. The target client should just be able to call the callback.

I see two possible solutions:

A fake socket instance wouldn't actually have a connection to a client, but I hope to use it's emit function, have it talk to it's adapter (socket.io-redis) and have it receive the acknowledgment callback.

Any tips on these solutions, maybe something I should also consider.

@peteruithoven
Copy link
Author

@FREEZX has been implementing the functionality to receive clients in rooms (even over multiple node instances, see:
#1630
socketio/socket.io-adapter#5
socketio/socket.io-redis-adapter#15
But even then I need a way to emit to a specific client and get my callback called.

Guille: Update: we're making a minor release 1.1.1 with bugfixes, and .clients(fn) should be in the next one 1.2.0

@FREEZX
Copy link

FREEZX commented Oct 2, 2014

Every client joins a room with his socket id as a key. You could emit to that room if you know the id of the client's socket (you do if you're using my redis adapter with the clients function).
However be careful with it because it doesn't clean up the data at exit.

@peteruithoven
Copy link
Author

True, I understand this, but my main issue is that the callback mechanism won't work when you emit in rooms.

@peteruithoven
Copy link
Author

One, quite complex workaround:

  1. Server B which has the target socket, is also a client itself and joined the target room
  2. Server A is also a client
  3. Server A broadcasts event over target room and defines callback function
  4. Server A starts listening to {event}-ack
  5. Server B receives event as client
  6. Server B emit’s event to target socket
  7. Server B receives callback
  8. Server B broadcasts {event}-ack
  9. Server A receives {event}-ack as client
  10. Server A stops listening to {event}-ack
  11. Server A calls callback

@peteruithoven
Copy link
Author

One thing I should clarify, I don't want to add complexity to the target client. The target client should just be able to call the callback.

@FREEZX
Copy link

FREEZX commented Oct 2, 2014

You could implement a redis pubsub to do IPC between nodes, so whichever node receives the acknowledgement can publish a message on the IPC channel and the correct node could handle it as an acknowledgement callback

@FREEZX
Copy link

FREEZX commented Oct 2, 2014

This avoids cluttering the sockets, and IPC will probably also be needed in the future.

@peteruithoven
Copy link
Author

True, but I'm not sure if that's easier. That would work something like this I think:

  1. All servers subscribe to a “targetedevent” pub/sub message.
  2. Server A publishes a "targetedevent" with the event type, data, channel and a unique event id
  3. Server A temporarily also subsribes to the “targetedevent-ack” pub/sub message.
  4. All servers receive the "targetedevent"
  5. if one has the target client they emit it and they wait for a callback
  6. when the callback is called they publish a “targetedevent-ack” with the response data and the unique id
  7. Server A receives the “targetedevent-ack”
  8. Server A checks the unique id
  9. It calls the original callback
  10. It unsubsribes from “targetedevent-ack”

When subscribing, unsubscribing is inefficient I could also store the callbacks in an object under the unique event id.

@FREEZX
Copy link

FREEZX commented Oct 2, 2014

You don't have to temporarily subscribe/unsubscribe. Just handle the events if the client is in an array or something

@peteruithoven
Copy link
Author

Alright, that would work something like:

  1. All servers subscribe to “targetedevent” and “targetedevent-ack” pub/sub messages.
  2. Server A publishes a targetedevent with the event type, data, channel and a unique message id
  3. Server A stores a callback in a object under the unique message id (this could be done per channel)
  4. All servers receive the “targetedevent”
  5. If one server has the target client they emit it to this client and they wait for a callback
  6. When a callback is called they publish “targetedevent-ack” with the response data and the unique message id
  7. All servers receive the “targetedevent-ack”
  8. Server A finds the callback under the unique id
  9. It calls the original callback
  10. It removes the callback from memory

Of course, one day I hope to simply be able to request the clients from a room and emit to a specific client (possibly on another node.js instance) (using a callback). @FREEZX has already made step one possible.

@FREEZX
Copy link

FREEZX commented Oct 3, 2014

You can avoid targetedevent IPC calls, because you could emit that from any process (to the room with the socket's ID as a name). Socket.io automatically decides which server has the socket. So that's one less IPC event to listen for.

@peteruithoven
Copy link
Author

I was curious how difficult it would be to emit to a specific client (possibly on another node.js instance) using a callback.
Looking through the code socket.io, when you call emit on a specific client it's not passed to the adapter, it's passed straight into engine.io instance. The adapter also has no way to receive a acknowledgment event/packet, basically the adapter only get's input by his pub/sub channel and when it's broadcast method is called.

So for this to happen adapters should also handle general packets (not just broadcasts). The socket module's packet method would need to call something like a adapter's packet method, the adapter should pub/sub this to all instances, and the adapters in the other instances would need to call the onpacket of the socket module. This might allow multiple node.js instances to create socket objects that communicate to the same client. Servers also need another way to create socket objects, not only by receiving them through a new connection, but also manually with a socket id.

@peteruithoven
Copy link
Author

@FREEZX, but then there is no way to add a callback? Or am I forgetting something?

@FREEZX
Copy link

FREEZX commented Oct 3, 2014

Add the callback to an array of your choice, where the key is the client id and then broadcast to the client's room. The client responds back to a process of your cluster that emits the ack message via redis IPC. The node that originally sent the message will have the callback stored in the callbacks array and will call it.

@peteruithoven
Copy link
Author

Your help is greatly appreciated but I don't follow the "The client responds back to a process of your cluster" part.

I think I got the basic idea working, using the following utility. It's a pub/sub channel wrapper with a callback feature.

// Redis pub/sub channel wrapper
// Messages are serialized to JSON by default, so you can send regular objects across the wire. 
var redis = require('redis');
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var debug = require('debug')('redis-channel');
var EVENT_MSG = "event";
var ACK_MSG = "ack";
var callbackIDCounter = 0;
var callbacks = {};
module.exports = Channel;

function Channel(name,port,host) {
  if (!(this instanceof Channel)) return new Channel(name,port,host);

  var _self = this;
  var pub = redis.createClient(port,host);
  var sub = redis.createClient(port,host);
  pub.on('error',onRedisError);
  sub.on('error',onRedisError);
  function onRedisError(err){
    _self.emit("error",err);
  }

  sub.subscribe(name,function(err){
    debug("subsribed to channel");
    if(err !== null) _self.emit("error",err);
  });

  sub.on("message", function (channel, packet) {
    if(channel !== name) return;
    packet = JSON.parse(packet);
    debug("received packet: ",packet);
    var data = packet.data;
    var callbackID = packet.id;
    switch(packet.type){
      case EVENT_MSG:
        // ToDo: emit all arguments from data
        data.unshift("message"); // add event type in front
        data.push(eventCallback); // add callback to end
        _self.emit.apply(_self,data);
        //_self.emit("message",message,eventCallback);

        function eventCallback() {
          if(callbackID === undefined) {
            return _self.emit("error","No callback defined");
          }
          var args = Array.prototype.slice.call(arguments);
          var ackPacket = {type:ACK_MSG,
                          data:args,
                          id: callbackID};
          debug("publishing ack packet: ",ackPacket);
          pub.publish(name,JSON.stringify(ackPacket),function(err){
            if(err !== null) _self.emit("error",err);
          });
        }


        break;
      case ACK_MSG:
        if(typeof callbacks[callbackID] == 'function'){
          //callbacks[callbackID](message);
          callbacks[callbackID].apply(this, packet.data);
          delete callbacks[callbackID];
        }
        break;
    }
  });

  this.publish = function() {
    var args = Array.prototype.slice.call(arguments);
    var packet = {type:EVENT_MSG,
                  data:args};
    // is there a callback function?
    if(typeof args[args.length - 1] == 'function') {
      packet.id = callbackIDCounter++;
      callbacks[packet.id] = packet.data.pop();
    }
    debug("publishing packet: ",packet);
    pub.publish(name,JSON.stringify(packet),function(err){
      if(err !== null) _self.emit("error",err);
    });
  };
}
util.inherits(Channel, EventEmitter);

Experiment server:

'use strict';

var debug = require('debug')('clustering:server');

var express = require('express');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var socketIORedis = require('socket.io-redis');

var redisHost = 'localhost';
var redisPort = 6379;

var redisChannel = require('./util/redis-channel')('pub/sub-ack experiment',redisPort,redisHost);

var PORT = process.env.PORT ? process.env.PORT : 7000;

io.adapter(socketIORedis({ key: 'socket.io-redis-experiment'}));

http.listen(PORT, function(){
  debug('server listening on *:' + PORT);
});

var nsp = io.of("/");
var targetSocket = null;
nsp.on('connection', function(socket){
  var query = socket.handshake.query;
  debug('new connection: '+socket.id,query.target||'');

  if(query.type === "target") { 
    targetSocket = socket;
    redisChannel.on("message", function (message,callback) {
      debug("received hello pub/sub message: ",message,callback);
      debug("emitting hello to target");
      targetSocket.emit(message,callback);
    });
  } else {
    socket.on('hello', function(data,callback){
      debug('received hello event: ',data,callback);
      // can't emit to targetSocket directly so 
      // we use a redis pub/sub channel
      debug("publishing hello pub/sub message");
      redisChannel.publish("hello",callback);
    });
  }

  socket.on('disconnect', function(){
    debug('disconnect: '+socket.id);
  });
});

Experiment client:

'use strict';

var debug = require('debug')('clustering:client');
var socketClient = require('socket.io-client');

var PORT = process.env.PORT ? process.env.PORT : 7000;
var TYPE = process.env.TYPE ? process.env.TYPE : '';

var nspName = "/";
var nspURL = "http://localhost:"+PORT+nspName+"?type="+TYPE;
debug("connecting to: "+nspName);
var nsp = socketClient(nspURL);
nsp.once('connect', function(){
  debug("connected to: "+nspName);

  if(TYPE !== "target") {
    nsp.emit("hello",{},function(data) {
      debug("received hello response: ",data);
    });
  }

  nsp.on("hello",function(callback) {
    debug("received hello! ",callback);
    callback("client on port "+PORT);
  });
});
nsp.on('error', function (err){
  debug("error connecting to: "+nspName+": "+err);
  //callback(err,nsp);
});

@peteruithoven
Copy link
Author

I published my pub/sub wrapper as an npm package.
https://www.npmjs.org/package/redis-pubsubber

@FREEZX
Copy link

FREEZX commented Oct 6, 2014

What i meant with "The client responds back to a process of your cluster":
The client can be connected to any cluster worker, and only sends messages to that one worker.

@menklab
Copy link

menklab commented Oct 19, 2014

@FREEZX your solution with the array makes a ton of sense! Im wondering if this is the mechanism you were going to use behind the scenes in 1.2? Im only asking because im litterally migrating my site as we speek to a more scalable solution and need to be able to implement the callback. Would this also work with the socket.io-emitter package if the server registered to the same redis channel?

Do you have an idea of timeframe? I want to determine if I should wait for your release or move forward with the above solution.

Thanks!

@FREEZX
Copy link

FREEZX commented Oct 20, 2014

IPC has to be done manually at the moment, with arrays as i said. I am not an official maintainer so i cannot say whether or not IPC will get added and if so, when.
The given solution is not too complicated for implementing, so you should probably do it.
As for the socket.io-emitter question, i do not know, you should try it out and see if it works as you think.

@sgiachero
Copy link

Has this scenario been solved using socket.io, socket.io-redis, and socket.io-emitter?

@darrachequesne darrachequesne added the enhancement New feature or request label Feb 17, 2021
darrachequesne added a commit to socketio/socket.io-adapter that referenced this issue Mar 29, 2022
Tests will be added in the parent repository.

Related:

- socketio/socket.io#1811
- socketio/socket.io#4163
darrachequesne added a commit that referenced this issue Mar 31, 2022
Syntax:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

The adapter exposes two additional methods:

- `broadcastWithAck(packets, opts, clientCountCallback, ack)`

Similar to `broadcast(packets, opts)`, but:

* `clientCountCallback()` is called with the number of clients that
  received the packet (can be called several times in a cluster)
* `ack()` is called for each client response

- `serverCount()`

It returns the number of Socket.IO servers in the cluster (1 for the
in-memory adapter).

Those two methods will be implemented in the other adapters (Redis,
Postgres, MongoDB, ...).

Related:

- #1811
- #4163
- socketio/socket.io-redis-adapter#445
@darrachequesne
Copy link
Member

This was eventually implemented in version 4.5.0 (8b20457):

io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});

Feedback is welcome!

@darrachequesne darrachequesne added this to the 4.5.0 milestone May 12, 2022
dzad pushed a commit to dzad/socket.io that referenced this issue May 29, 2023
Syntax:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

The adapter exposes two additional methods:

- `broadcastWithAck(packets, opts, clientCountCallback, ack)`

Similar to `broadcast(packets, opts)`, but:

* `clientCountCallback()` is called with the number of clients that
  received the packet (can be called several times in a cluster)
* `ack()` is called for each client response

- `serverCount()`

It returns the number of Socket.IO servers in the cluster (1 for the
in-memory adapter).

Those two methods will be implemented in the other adapters (Redis,
Postgres, MongoDB, ...).

Related:

- socketio#1811
- socketio#4163
- socketio/socket.io-redis-adapter#445
@rituraj12797
Copy link

@FREEZX @peteruithoven @sgiachero @darrachequesne @maccman

recently faced the same issue i think . like i am operating on a clustered environment using NodeJS clusters , what initially i was doing is that
a. i will transmit the message with the data of the room of recipient and the socketID of recipient
but the problem was that when i was sending this message to the room via the IPC provided by cluster adapter
the message is broadcasted to al the clients on reaching the room due to the code

   `socket.to(`${room_no}1).emit("message","<some_message>)` 

b. eventually i figured out that somehow just by writing this code
socket.to(${socketId}).emit("message", {text});

it would automatically search for the client with the given socketID globally ( idk how is this working if anyone could help )
and automatically transmits the message from the sender (in room a) to the receiver (in room b) eliminating the need of finding the room.

still i would like to know if anyone could help me finding how is it able to globally search for the recipient how efficient is it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Archived in project
Development

No branches or pull requests

5 participants