-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: connection automatic reconnect #33
Comments
A second feature request built on top of automatic reconnect is failover. A user could send a list of Address objects to a Connection constructor instead of a single Address. The library would try to connect to each address in turn. |
When connection reconnect is enabled, does it just re-establish the transport? How about sessions/links? I think it makes sense to build these features in a layer on top of the protocol stack. For example, the Service Bus .NET SDK has MessagingFactory/Sender/Receiver objects. They auto-create the underlying connection/session/link when needed. The Proton Messenger might do something similar. |
We would very much like this feature as well. Currently we have built our own reconnection logic which recreates the session/link/connection objects when certain AmqpExceptions are caught in the client. It works, though I am not sure if it would make more sense to handle reconnection at a lower level. If our approach is reasonable, from my understanding there would be no need to call Close() on the existing link/session/connection objects before creating new ones, because closing the ContainerHost on the server or terminating the server process actually triggers closing/aborting logic in the client. Is this a reasonable assumption? |
Thanks for the feedback. We will consider enabling this feature in a future release (cannot promise when it will be ready though). Application would still need to recreate session/link in cases of communication errors. |
@mcmellawatt would you be prepared to share your reconnection wrapper/logic? I am having trouble with keeping connections open over long periods of time.... |
@mcmellawatt I have the same question , would you share the retry logic ? |
Similarly so, my listener client just stops working at some point - I seem not even to receive a |
I created my own reconnection logic, also for the |
Completed the write-up, in case anyone stumbles on the same problem: https://medium.com/@duizendnegen/running-worker-roles-with-docker-in-net-core-543b8d1c4ae7 |
I added an article that describes common reconnect approaches. There is also a test project that demonstrates the usage. I am able to run the test against a Service Bus queue for weeks and transfer tens of millions messages between the senders and the receivers. |
Thanks for the write-up @xinchen10, seems like a holistic approach to solving this problem. |
@duizendnegen Consumer supports three modes for receiving messages: sync, async and callback. The base class Role.SyncTest/AsyncTest has a while loop that drives the test execution by doing EnsureConnection, EnsureLink and Execute repeatedly. The sync and async modes are straightforward. The callback mode is a bit confusing because it does not need a while loop to drive execution, but I used the loop in the base class to run auto-reconnect. The interesting class is Consumer.MyCallbackTest. The CreateLink method subscribes to the Closed event of all Connection, Session and ReceiverLink objects and the Execute method blocks the execution until the manual reset event is signaled, which is done when any of the AMQP objects is closed. In real code this logic may be just inside OnChannelClosed event handler. The while loop is single threaded so I do not need to worry about concurrency issue. If the reconnect code could be invoked concurrently, the application must have some synchronization to ensure an AMQP object is recreated only once. You are right that the Service Bus queue closes idle receivers after 10 minutes but if a receiver has outstanding link credits it is considered active and will not be closed by the idle timer (but it could be closed for other reasons). To test the Closed event of the receiver, I tried the following code:
Note that I did not call Start or Receive so the receiver link does not send any credits to the service. After 10 minutes it is closed and the event handler is fired. Did you subscribe to the Closed event of Connection and Session in your code? |
@xinchen10 you are right, I adapted the example to also test still being able to receive new messages (i.e. with outstanding credits) and that works perfectly - I'm not sure why originally I hadn't been receiving the Just for completeness, the private static Address a;
static void Main(string[] args)
{
a = new Address("queue-experiment.servicebus.windows.net",
5671,
"SendListen",
"W/w7Kf2g33nf0nDbe9gvLN68lCkGb8ZhQPk2WPB4fzI=");
Thread publisher = new Thread(PublisherThread);
Thread consumer = new Thread(ConsumerThread);
publisher.Start();
consumer.Start();
publisher.Join();
consumer.Join();
}
private static void PublisherThread()
{
{
var c = Connection.Factory.CreateAsync(a).Result;
var s = new Session(c);
var p = new SenderLink(s, "sender1", "experiment");
p.Send(new Message("Send 1"));
Console.WriteLine("Send 1 sent");
}
// sleep 60 minutes
Thread.Sleep(1000 * 60 * 60);
{
var c = Connection.Factory.CreateAsync(a).Result;
var s = new Session(c);
var p = new SenderLink(s, "sender2", "experiment");
p.Send(new Message("Send 2"));
Console.WriteLine("Send 2 sent");
}
}
private static void ConsumerThread()
{
var c = Connection.Factory.CreateAsync(a).Result;
var s = new Session(c);
var r = new ReceiverLink(s, "receiver1", "experiment");
r.Closed += R_Closed;
r.Start(10, R_OnMessage);
Thread.Sleep(-1);
c.Close();
}
private static void R_OnMessage(ReceiverLink receiver, Message message)
{
Console.WriteLine(message.GetBody<string>() + " received");
receiver.Accept(message);
}
private static void R_Closed(AmqpObject sender, Error error)
{
Console.WriteLine(sender + " closed");
} where I do not receive the closed event and still receive a new message after 1 hour, as desired. |
@xinchen10 @duizendnegen - I am a bit confused reference to outstanding credit specified on receiver.start as how it works. Does service bus closes the connection once the outstanding credit has reached? So lets say you have specified a credit of 10, so you will be receiving 10 messages without any time restriction but what happens after you have received 10 messages will it reconnect automatically with new credit of 10. Please clarify? |
@brvaland if the client does not issue more credits after the existing credits are used up, the Service Bus service closes the receiver link after 10 minutes (the client is considered idle in this case). Regarding link credit you can find more details here: https://github.com/Azure/amqpnetlite/blob/master/docs/articles/building_application.md#receiving-messages As long as there are outstanding credits on the service side, the link will be kept open, unless some networking error or system transient error happens (then the client needs to reconnect). |
It would be great if this could be implemented in Amqp.Net Lite as we are aiming at clustered setup of RabbitMQ. |
I understand the approach for auto-reconnect has been discussed here. Is there a recommendation for failover? We plan to use Red Hat AMQ (based on Apache Artemis) as the message broker. AMQ will have a failover cluster - A master is active backed up by a slave. The master can go down for some reason and the slave now becomes active. My question is is there any event that AMQP client will receive, which will inform the consumer a failover has occurred, and maybe the address of the failover node? Or will this be more of trial and error - tried master a few times, did not succeed, "suspect" that a failover has occurred and start trying connecting to the slave now? I understand Artemis supplied Java client does this out-of-the box, so any approach that would mimic such a behavior, or any best practice will help |
+1 about @koosala comment. Would be nice to mimic some requirements from the Artemis client in the .NET AMQP. |
[...]
In general, the mechanism to detect from the client side that failover has happened is to
The 2. (and 1.) are universal and dependable (although they depend on timeouts). Anything else requires additional communication, which may not happen, if the reason of master going down is say a cut ethernet cable of the machine running it. For the same reason, it is too late to communicate the address of the failover node when failover happened. The mechanism to communicate this is the I am not aware of any mechanism where the slave broker would inform the clients previously connected to the master broker that they should now reconnect. It would be clumsy to implement, anyways, if a connection from client to to slave did not exist from the start. Also, not that it would speed things up much. The slave takes time to come up, which is comparable to usual heartbeat periods. So, no great time gains seem to be in this approach. There is a reconnect example in amqpnetlite sources at https://github.com/Azure/amqpnetlite/tree/6c195bed83380eff7d730f8108f39f00b35f3356/Examples/Reconnect/ReconnectSender |
Failover is currently supported by Apache ActiveMQ NMS.AMQP Client which is build on top of amqpnetlite. apache/activemq-nms-amqp#4 |
@xinchen10 Any update about this feature request? Do we still need to handle reconnection by ourselves? |
Hi The Apache ActiveMQ team have now relesed a official NMS higher level client (underlying lib is aqmqpnetlite), it includes failover support amongst a number of other higher level abstractions. Its been tested with ActiveMQ 5.x and Artemis brokers as well as Solace. Being AMQP should work also with any other AMQP 1.0 broker. You can find more details Or search Apache.NMS.AMQP on nuget |
Issue #32 discussed this. It is a common pattern that many amqpnetlite users will see and it would be a great feature to consider. A model for this feature is Qpid C++ Messaging which describes reconnect behavior and option settings.
The text was updated successfully, but these errors were encountered: