Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

How to find out if message could not be delivered? (PUB/SUB, PUSH/PULL) #585

Open
patrickjane opened this issue Jan 24, 2017 · 5 comments
Open

Comments

@patrickjane
Copy link

patrickjane commented Jan 24, 2017

I am using pub/sub sockets in my app, but also tried out push/pull. The issue I am having is that I seem to have to way of detecting that delivering a message did not succeed. With the pub socket, the send will immediately fire the callback without any error (which might be WAI, since no subscribers are connected).

The push socket, however, will just never call the callback when the pull socket is disconnected, and thus the message delivery seems to hang. Good thing is, however, as soon as the pull-socket is connected, all pending messages will get delivered.
However, this will probably turn out pretty unstable/unreliable in a high-availability/high throughput scenario, when a lot of messages cannot be delivered (memory problems??)

What I am basically trying to achieve is:

  1. Connect two processes (one-to-one)
  2. Publish a message
  3. If an error occors, try to re-deliver the message as soon as the peer is ready/reconnected again
  4. Drop messages after a given time/number of accumulated messages

What would be the advised approach here? I know that 3) is handled by zmq itself at least for the push/pull pattern, but I dont trust in this as for a situation where the receiver is missing for a longer period of time but thousands of messages keep coming this will most likely result in memory issues.
So should I watch out for connect/disconnect events and block sending if I find I am currently not connected?
Or is there some way to catch errors when sending? I tried publisher.setsockopt(zmq.ZMQ_SNDTIMEO, 1000) but it did not have any effect.

The examples unfortunately don't cover anything regarding error handling, and I cant seem to find a full/helpful JS API description, so I am somewhat lost.

Code:

let publisher = zmq.socket('push');
publisher.setsockopt(zmq.ZMQ_SNDTIMEO, 1);
publisher.bindSync('tcp://*:5556');     
   
...       

publisher.send(message, null, (err) => {
	if (err) {
		console.log('Failed to send message (' + err + ')');
	} else {
		console.log('Message successfully sent via ZMQ. (res: ' + res + ')');
	}
});
@ronkorving
Copy link
Collaborator

One thing you really need to understand about ZMQ is that it takes care of connections. Either it guarantees delivery (eventually), or it drops the message (based on socket types, and sometimes options). You will usually not receive an error in your callback saying "the message was dropped, feel free to retry".

The PUB socket simply publishes to whoever is listening (subscribers), and if nobody is listening, it just drops the message. This is by design.

The way you describe PUSH is also by design. If you don't trust the socket, my advice would be: don't use ZMQ. If you have a demonstrable issue with PUSH sockets that show they are broken (vis-à-vis their designed behavior), I'm sure the nice people at ZMQ would love to know, in which case do share!

Now there are some options you can use to tweak behaviors. For example, you can have a look at ZMQ_IMMEDIATE. Read about all socket options here: http://api.zeromq.org/4-0:zmq-setsockopt

I hope this may help out somewhat. Good luck!

@patrickjane
Copy link
Author

I will agree for the PUB/SUB pattern, no subscriber means no message shall be delivered.

I disagree however for the PUSH/PULL pattern, as in I have currently no control over buffering/memory, and thus cannot handle failure situations as needed by my requirements.

The most simple demonstrable issue will be if you just plain shut down your PULL socket, while the PUSH socket is still trying to send. What will happen is that messages are buffered in memory until the PULL socket is available again.

When it comes to a lot of traffic, this will - please correct me if I am wrong - result in noticeable memory growth, ultimately until the point of failure/crash. Depending on the timespan of the outage, this will have several issues:

  • Some requests may be so old, that they shall not be redelivered (because the client got its error-reply already)
  • Re-delivering all messages might lead to delays as the receiver side will have to process all of those (probably already obsolete) messages
  • The sending process might run out of memory and just crash/lose everything

I am not saying anything in the ZMQ lib is broken, however my situation will require appropriate handling of error situations which is currently not possible (and I am wondering if I am the only one in this regard? Are you saying ZMQ should not be used in production grade/enterprise environments?)
Of course only the application can tell what to do with undeliverable messages, only the application can have knowledge about this (are those fire&forget messages and I dont care about failed deliveries? Are those messages which must be guaranteed to be delivered exactly once? Are those messages which should be dropped after xxx unsuccessful deliveries? ... ) So it is nice that ZMQ is doing alot in the background and already handling a lot, the ultimate decision about what to do with failed deliveries however must be made within the application, and not in the transport layer library (in my humble opinion, that is).

As I see in the docs, the plain C library will have the ZMB_NOBLOCK flag, which would allow the PUSH socket to receive errors when the message can not be delivered. However, the node.js module will not allow me to use the flag, since it is already blocking in zmq_getsockopt (binding.cc:1281).

As for the ZMQ_IMMEDIATE flag (I yet have to check if it would help) it seems not to be in the constants defined in the node.js lib. Should I find out its integer value in the C libary headers and just supply the number value to setsockopt(), or is this flag unsupported?

As a sidenote: I am not bound to use the PUB/SUB or PUSH/PULL pattern. In fact I will try out REQ/RES and see how it goes when the receiver is unavailable. I am just trying to find the correct usage of the ZMQ lib for my requirements/scenario as described in the first post.

@lokhandeomkar
Copy link

@patrickjane How did you solve this issue? I am trying to use ZMQ from my node application for business data logging and need some accounting / error handling for lost messages. Any help will be appreciated. Thanks.

@patrickjane
Copy link
Author

As advised by @ronkorving I dropped zeromq in favour of STOMP via rabbitmq/hornetq, since I was not really able to resolve the issues.

@bajaj689
Copy link

bajaj689 commented Mar 7, 2018

@patrickjane @lokhandeomkar @ronkorving How can I connect to multiple publishers at the same time to receive messages from both. In my code I m only able to receive the messages from the first publisher to which I connect.
Code:

#include <zmq.hpp>
#include

int main (int argc, char *argv[])
{
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);

subscriber.connect("tcp://localhost:5556");
subscriber.connect("tcp://localhost:5557");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);//subscribe to all messages





//  Process 10 updates
int update_nbr;
for (update_nbr = 0; update_nbr < 10 ; update_nbr++) {

    zmq::message_t update;
    subscriber.recv (&update);

   //Prints only the data from publisher bound to port 5556

    std::string updt = std::string(static_cast<char*>(update.data()), update.size());
    std::cout << "Received Update/Messages/TaskList " << update_nbr <<" : "<< updt << std::endl;

}
return 0;

}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants