-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQNET-589: Failover implementation #4
AMQNET-589: Failover implementation #4
Conversation
2be0f8b
to
a13a86d
Compare
I have found an issue with the current refactor of the code with creating a connection. Right now the clientId can not be explicitly set by an application after a connection is created as the provider Connect operation establishes a an AmqpConnection. This could breaks applications that use Durable Subscriptions. Looking at Qpid jms the Provider Connect operation the provider establishes the underlying transport only during Provider Connect and establishes the amqp Connection later as a part of the JmsConnection resource create. Since AmqpNetLite creates transports as a part of new amqp Connection it will be difficult (perhaps impossible) to separate the operations of Provider Connect (transport connect) and Amqp Connection (Amqp Connection resource create). The initial implementation used the IProviderTransportContext interface to configure transport properties. When the CreateConnectionBuilder function was used an immutable transport property Connection Create delegate was returned allowing the connection to have mutable amqp connection properties like the ClientId. The CreateConnectionBuilder Function was also used to validate transport properties as much as possible. This to me seems to be the closest a "provider Connect" operation. @HavretGC have any ideas about this issue? |
937dac7
to
6b4dae2
Compare
@cjwmorgan-sol Excellent catch. I hope I've addressed this issue in 6b4dae2 Basically, my idea was to postpone the creation of the underlying amqp lite connection to the point when the client tries to perform any actions on the connection object. Thank for your feedback. If you spot any other issues, please let me know. |
b8373ff
to
1bc97fe
Compare
I appreciate this is a work in progress, so perhaps you already have plans for what follows. From what I see so far, here are some consideration that may effect your plans. Using QPID JMS as a model for the provider interface is excellent. It was my inspiration as well when separating the functional layers. The NMS 'cloak' and JMS 'facade' are pretty much a one to one mapping. You may find a lot of useful code here to save retyping in the same classes and methods. There are a couple of fundamental differences between Proton AMQP and AmqpNetLite that need to considered. There is the fundamental lack of control of the transport session. This forced me to defer creating the connection until some action, like start, was taken just to allow the application to set the connection ID. I see you have reworked this since yesterday along the same lines. A bigger problem is that Proton provides a JMS friendly threading model. AmqpNetLite does not, and all the callbacks occur on the transport thread. JMS/NMS requires callbacks occur on known/dedicated I'll try to keep up to date on the changes as they come. |
@cjwmorgan-sol im not sure a dedicated threads per session are required. I know of few java land jms clients that dont do this and pass tck. Can you point me to the spec on this. |
@michaelandrepearce Session indeed do not require a dedicated thread per session. They do require serialized execution of message delivery callbacks that are independent from other sessions. See JMS 1.0 sections 4.4.16 and 4.4.17. AmqpNetLite transport thread is used to give amqpnetlite callbacks for message delivery that transport thread is used for all resources under the amqp connection so the current WIP implementation satisfies section 4.4.16 for Serialized Client Code execution but not section 4.4.17 for Concurrent Message Delivery. As blocking the transport thread would block all other sessions message delivery callbacks. I thought that a dotnet threadpool or task scheduling approach might be better instead of a dedicated thread approach to satisfy sections 4.4.16 and 4.4.17. |
The sections @cjwmorgan-sol mentions are Serial Execution of Client Code and Concurrent Message Delivery. In JMS1.1 these are 4.4.14 and 4.4.15 while in 2.0 they are 6.2.13 and 6.2.14. The combined effect, is that a particular message listener should not block receiving messages on other sessions. As the AMQPnetLite transport handles all sessions in one thread, the received messages need to be queued a dispatched to each session Message Listener on a separate thread. Not necessarily a thread dedicated per session, it could use .NET tasks or a thread-pool. The existing NMS.AMQP took the simple approach of one thread per session. The key is the mutli-threading of the consumers should be built into the architecture though. |
@RagnarPaulson and @cjwmorgan-sol thanks for the clarification in behaviour you were trying to describe. |
3e617b7
to
011f2e5
Compare
@cjwmorgan-sol I've just pushed changes with concurrent dispatch. I would really appreciate if you take a look at it. |
10370d0
to
2afed4f
Compare
@HavretGC I'm looking through the commit it looks pretty good so far as the ActionBlock provides parallel execution environment it seems. However I noticed that orderly shutdown and pausing message delivery is not there. I imagine this is coming later? As NmsConnection Stop is looking a little empty and the session dispatcher is never "shutdown". |
6ac55a3
to
1aef201
Compare
@cjwmorgan-sol I finally have scenario with connection start / stop covered in 1aef201 |
@HavretGC It is not clear to me if the code for pausing message delivery is blocking from NmsConnection Stop. Connection Stop must be blocking on the return of all message delivery callbacks, see section 6.1.5 jms 2.0. If neither are blocking on the action block to complete there is at least one race condition I can see in NmsMessageConsumer DeliverNextPending where the Listener could be nulled by the application thread after NmsConnection Stop. Although I imagine that would be very difficult for that to happen. Also orderly shutdown (Connection, Session, MessageConsumer Close) follows the same blocking idea so for now I think its possible to receive message deliveries on a closed connection, session, and Message Consumer. See Section 6.1.8 for the connection close. |
@cjwmorgan-sol You made an excellent point. Cancel wasn't blocking and as a result the whole connection stop wasn't blocking. Below you can find simple snipped with proposed implementation. https://gist.github.com/HavretGC/e99f39b72345fda65434e0685eade64e Basically if you cancel ActionBlock it will process the last message to the end, then it will change its state to cancelled. This operation can be "awaited" on ActionBlock Completion task. As a result the spec requirement can be fulfilled. [Update] |
d03c840
to
fb57193
Compare
if (envelope == null) | ||
return; | ||
|
||
lock (SyncRoot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here when the envelope is taken from the message and either the consumer is stopped or closed. That lets the messages listener be invoked after the consumer is closed or the connection is stopped. I was able to see this by modifying TestConsumerCloseWaitsForAsyncDeliveryToComplete test to send another message after the "latch.WaitOne(TimeSpan.FromMilliseconds(30000)" line and setting some break points (with freezing threads).
The qpid design that this is based off of avoids this by having the messageQueue maintain stop/start state and synchronizing the message delivery dispatch and the message queue start/stop state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we really want to maintain messageQueue state as this information is already represented by consumer's started
field. The reason why this race condition might happen is lack of double check inside the lock statement. This is what Qpid jms is doing under the hood when you try to dequeue next element. In fact, as I'm looking at this implementation, there is still a risk of having null pointer exception on message listener there. As dequeueNoWait do not check if message listener is a null, sb may clear messageListener field between the check on top of the method and taking the lock.
The suggested change is to add the double check and this should solve the problem.
private void DeliverNextPending()
{
if (Session.IsStarted && started && Listener != null)
{
lock (SyncRoot)
{
if (started && Listener != null)
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
return;
// the rest of the method
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds a sensible solution to be @HavretGC as pointed out even in qpid there is some small edge cases. I think if you impl your suggested solution. We can always improve it in future.
Lets be clear i think once we get a client fairly good shape, its more important to release it, than try to be too perfectionist after all bugs will always be there, best thing is to get it out there, and iterate improve as needed
You're right thanks for that. I found that issue while running TestSendWhenLinkCreditIsZeroAndTimeout (modified for sendTimeout -1). The issue I saw was the waitOne threw an exception "Thread Abort" when the test would timeout after 2000ms while debugging. Send Timeout works fine as intended. |
src/NMS.AMQP/NmsSession.cs
Outdated
|
||
if (closed.CompareAndSet(false, true)) | ||
{ | ||
foreach (NmsMessageConsumer consumer in consumers.Values.ToArray()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that shutdown is not called from here so the SessionDispatcher is still running after the NmsSession is closed also the parent connection still has the closed session as a member.
Is detaching the session from the connection handled elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I totally missed that one. I made changes on my private branch, and I will cherry pick them here tomorrow morning.
src/NMS.AMQP/NmsMessageConsumer.cs
Outdated
|
||
try | ||
{ | ||
Listener.Invoke(envelope.Message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Messages delivered to the application can be modified passing a reference is not sufficient as the message contents could be modified changing what delivered after session recover.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I copy the message before handing it over to the application?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats is qpid doing here. We should just do the same. If it is by ref, then leave as is, if it is by copy then by copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a copy in the qpid implementation, see the JmsMessageConsumer deliveryNextPending method. Qpid also copies the message in the send case, right before handing the message off to the provider, as well however I believe this is because qpid jms support async send paths so application that reuse messages for sending won't modify the message before it gets written to the wire.
I'm not sure that's the case with the nms amqp provider as the send operation are either presettled (non-persistent) or synchronous (persistent).
However, it might be worth looking into the amqpnetlite code to understand how the given Message object is stored and used and make sure the Message reference can not be modified before amqpnetlite encodes and writes out to the socket. Also another pointer where a message copy might be useful is on failover where the send is wrapped in a failover request that I think could be re-executed after failover (I'm still coming up to speed with that part of the code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what you can see in amqp lite code it seems that message is being encoded long before control is returned to caller even for async call.
Message member of Delivery object is only being used by sender link as a parameter to async callback, thus I think there is no need to copy the message for send operation.
To support full immutability of received messages for session recovery scenario I don't see any other way, unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On send there is no need to make a copy.i agree.
On receive it seems a copy is needed.
DateTime NMSTimestamp { get; set; } | ||
string NMSType { get; set; } | ||
DateTime Expiration { get; set; } | ||
double JmsMsgType { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the type double for JmsMsgType? The qpid implementation uses a signed byte. I found using Helloworld the JMS msg type annotation is of type is double. This breaks the JMS to amqp bind mapping restriction Section 3.2.4.1 for sending messages where the msg type annotation must be of type byte (signed byte if you want to inter-operable with qpid jms though).
I would suggest changing the type of the JmsMsgType to sbyte (and everywhere else). Unless there is a very good reason not to.
double JmsMsgType { get; } | |
sbyte JmsMsgType { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely right. I will make necessary adjustments.
Are you sure that using byte would break interoperability with qpid jms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make a .net to java check tomorrow on a test platform. Simplest way to be sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I added the comment. I took a look at the qpid code which does a cast a to signed "byte" type on the annotation value and I verified the UnsignedByte class from proton can not be cast to "byte". However I have not tried to receive a message sent from the amqp provider to a qpid jms app yet.
I'm definitely in favour for a .net to java check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed. Using bytes instead of sbytes breaks interoperability.
|
||
[Test, Timeout(2000)] | ||
public void TestRemotelyCloseConnectionDuringSessionCreation() | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this test fails for me unless I increase the timeout value from 2000 to about 2800. I also found adding connection.Start before the the testAmqpPeer.Close() has the test run in about 0.5 secs.
@HavretGC any ideas thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to say anything here because these timeouts are arbitrary. All tests were passing on my work machine, but apparently I have the same problem as you on my home box. It's a shame that we don't have any CI configured to catch this kind of issues automatically.
I would suggest to adjust these timeouts accordingly.
@michaelandrepearce What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a classical testing issue we have in artemis. Typically we tune the client or broker (by params) for such tests so they should timeout much faster thus ensuring test completes fast regardless. Also test timeouts tend to be 60secs thus avoiding issue of someone having a super fast, or super slow machine
@cjwmorgan-sol Off topic question. It was a conscious decision not to use "var" keyword in your original implementation? |
No I believe not, I remember an internal discussion about coding style and I do not recall var ever coming up, maybe @RagnarPaulson knows better? I think it was more of a personal coding style. There are some good reasons for and against using the keyword 'var', see blog. At the time however I think the idea was to write "good descriptive code" for opensource and at the time, at least to me, using the keyword 'var' did not register to me as "good descriptive code". Although now I think the old original implementation could have had a few benefits from using var based on the aforementioned blog with maybe a code style section in the ReadMe.md to explain the desired uses for it. |
@HavretGC I've found that I can not build the projects without some changes to the lang version. It might be a good idea to add |
if (typeValue == null) | ||
annotations.Map.Remove(key); | ||
else | ||
annotations.Map.Add(key, typeValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This throws an exception when a message object is reused to send multiple messages as adding a pre-existing key to the annotation map throws an exception.
annotations.Map.Add(key, typeValue); | |
annotations.Map.[key] = typeValue; |
Or
annotations.Map.Add(key, typeValue); | |
else if (!annotations.Map.ContainsKey(key)) | |
annotations.Map.Add(key, typeValue); | |
else | |
annotations.Map.[key] = typeValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
I've found the Helloworld sample project doe snot compile as the current version of the commandLineParser (2.1.X) dependency does not support netcoreapp2.2 that was added in 2.4 I believe there is a 2.5. I've found the Helloworld sample project does not work against an amqp broker. I get an Exception when trying to send the second message, see comment #4 (comment). Even after I modified the code to work around the issue described in #4 (comment) I still get an exception where the consumer link is dropped by the broker.
@HavretGC does Helloworld work for you? Also does the other sample project StructuredMessage work? |
It's really strange because it works for me just fine. I am using Windows 10 Box with Apache ActiveMQ Artemis 2.8.1 broker. Here's the trace log from HelloWorld netcore2.2 and net462 runs:
The same thing for StructuredMessage:
The only problem I see here (apart from from the blunder with adding the same key to annotation map) is the fact that the clean connection close is sending the exception as the same callback is being invoked whether or not the connection close was clean. |
I find I only hit the case when I publish and receive more then 30 messages, I tried with 50 messages. Also I think the broker I'm using hits the issue in amqpnetlite Azure/amqpnetlite#351 where sync sends take 1 sec per message. This makes the first 30 messages expired. Turns out my broker does not support the modified outcome for deliveries. This causes the exception to occur since the broker returns an outcome list without the modified outcome symbol, in the attach response so when the client sends an modified disposition the broker terminates the link. Would it be possible to have the AmqpConsumer check the attach response for support outcomes and do the following:
|
So the broker should handle that. Eg thats same with qpid jms. Im against making a workaround for a specific broker. And behaviour aligned with qpid jms. The whole point of the spec is that its irrelevant. Broker should be supporting spec feature. |
Just tested seems fine against qpid broker, activemq5 and artemis. Really looks like issue with broker youre using. And it should be supported in the broker if its declaring amqp 1.0 compatibility |
As far as I can tell the broker is not breaking spec as the only outcome a broker MUST implement is Accepted. Links negotiate the Supported outcomes as a part of link establishment. The only requirement for modified outcome support seems come from qpid jms implementation as that appears to be how qpid broker handle an expired message over the wire, which is not a standard. I tried to find something to indicate a standard way of handling expired from jms over amqp but I could not find anything as its not in the amqp to jms binding doc. Handling the outcome of the for expired messages seems to be somewhat configurable (RedeliveryPolicy, etc) in the qpid jms client. The brokers: qpid broker, activemq5 and artemis, are likely to have consistent concepts for message expiry and match the qpid client's default behaviour. To have the amqp provider be a little more robust (and like the qpid jms client) it might be worth while thinking about a future enhancement to give a little more control to the application. Based on what I've found I do not think this PR should be blocked by a future enhancement. So I think the current behaviour is fine. Is there a plan to support an IRedeliveryPolicy? I noticed some ToDos for it. |
I also noticed the amqpnetlite logging system (ITrace) does not plugin into the NMS Tracer logging system. It might be worth while implementing that as it was in the original implementation before the refactor. |
So i think first state is to get functionality inline with Qpid JMS, glad we're aligned there. I think a disucssion on if the client should handle the other case and how it should, probably should start over with Qpid JMS where we are taking our design and impl detail from, as their lib has been production tested and run, also i know they have some people who actually contribute to AMQP spec in their PMC, so will be much better placed to discuss correct details and where that responsibility should be (e.g. clieint should handle, or it really is a broker pre req) . Maybe we could start a thread there, and possible implmentation pr in java first there? Im not against it, i just want really the two to be aligned, and at this stage i think Qpid JMS kinda is the lead, whilst we are still very much not even released. |
@cjwmorgan-sol amqp logging system is plugged into nms tracer again |
@cjwmorgan-sol barring the fact i need @HavretGC to squash the commits to merge, are we in a place we're happy to merge this, and follow up anything else in seperate PR's or other dicussions |
Agreed. I think the PR is good to merge. |
a8bd8f0
to
896e51c
Compare
896e51c
to
e9d8fd2
Compare
…ion_close_when_broker_is_down AMQNET-623: Error logged during connection close when broker is down
This PR introduces failover feature. It was implemented the same way as in QpidJMS. It involved some major rework as previous design was not suited to introduce this feature. It will require some thorough review and testing.