Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition causing acking to fail from AutoRecoveringConnection after a recovery #47

Closed
tiggerite opened this issue Mar 10, 2023 · 11 comments

Comments

@tiggerite
Copy link
Contributor

tiggerite commented Mar 10, 2023

This was a nasty one to track down and (I think) got introduced by Pivotal from RabbitMQ.Client 6.2.4 on.

Looking at TryPerformAutomaticRecovery and CreateModel in AutorecoveringConnection (snipped for relevant parts) there's a big issue around locking on _models in particular, which can lead to race conditions (especially with TopologyRecoveryEnabled with this taking several CPU cycles):

        private bool TryPerformAutomaticRecovery()
        {
            ESLog.Info("Performing automatic recovery");

            try
            {
                if (TryRecoverConnectionDelegate())
                {
                    lock (_recordedEntitiesLock)
                    {
                        RecoverConnectionShutdownHandlers();
                        RecoverConnectionBlockedHandlers();
                        RecoverConnectionUnblockedHandlers();

                        if (_factory.TopologyRecoveryEnabled)
                        {
                            // The recovery sequence is the following:
                            //
                            // 1. Recover exchanges
                            // 2. Recover queues
                            // 3. Recover bindings
                            // 4. Recover consumers
                            using (var recoveryModel = _delegate.CreateModel())
                            {
                                RecoverExchanges(recoveryModel);
                                RecoverQueues(recoveryModel);
                                RecoverBindings(recoveryModel);
                            }
                        }

                        RecoverModelsAndItsConsumers();
                    }

                    ESLog.Info("Connection recovery completed");
                    RunRecoveryEventHandlers();

                    return true;
                }
                else
                {
                    ESLog.Warn("Connection delegate was manually closed. Aborted recovery.");
                }
            }
...
            return false;
        }
        
        private void RecoverModelsAndItsConsumers()
        {
            lock (_models)
            {
                foreach (AutorecoveringModel m in _models)
                {
                    m.AutomaticallyRecover(this, _factory.TopologyRecoveryEnabled);
                }
            }
        }
        
        public IModel CreateModel()
        {
            EnsureIsOpen();
            AutorecoveringModel m = new AutorecoveringModel(this, CreateNonRecoveringModel());
            lock (_models)
            {
                _models.Add(m);
            }
            return m;
        }        

When RecoverModelsAndItsConsumers is hit, this calls AutomaticallyRecover in AutorecoveringModel which sets the offset delivery tags via InheritOffsetFrom - and if this happens after CreateModel is called to create a new channel/model (via MakeChannelAsync in ChannelHost) then the delivery tags for the ReceivedData channels written to the underlying Channel are not adjusted and thus never get acked:

        public void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
        {
            if (_disposed)
            {
                throw new ObjectDisposedException(GetType().FullName);
            }

            _connection = conn;
            RecoveryAwareModel defunctModel = _delegate;

            var newModel = conn.CreateNonRecoveringModel();
            newModel.InheritOffsetFrom(defunctModel);

            lock (_eventLock)
            {
                newModel.ModelShutdown += _recordedShutdownEventHandlers;
                newModel.BasicReturn += _recordedBasicReturnEventHandlers;
                newModel.BasicAcks += _recordedBasicAckEventHandlers;
                newModel.BasicNacks += _recordedBasicNackEventHandlers;
                newModel.CallbackException += _recordedCallbackExceptionEventHandlers;
            }

...
            /*
             * https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1140
             * If this assignment is not done before recovering consumers, there is a good
             * chance that an invalid Model will be used to handle a basic.deliver frame,
             * with the resulting basic.ack never getting sent out.
             */
            _delegate = newModel;

            if (recoverConsumers)
            {
                _connection.RecoverConsumers(this, newModel);
            }

            RunRecoveryEventHandlers();
        }
...
        private void RunRecoveryEventHandlers()
        {
            if (_disposed)
            {
                throw new ObjectDisposedException(GetType().FullName);
            }

            foreach (EventHandler<EventArgs> reh in Recovery?.GetInvocationList() ?? Array.Empty<Delegate>())
            {
                try
                {
                    reh(this, EventArgs.Empty);
                }
                catch (Exception e)
                {
                    var args = new CallbackExceptionEventArgs(e);
                    args.Detail["context"] = "OnModelRecovery";
                    _delegate.OnCallbackException(args);
                }
            }
        }

There is another separate issue around TopologyRecoveryEnabled in that it's used as the same flag to RecoverConsumers (?!) which then recreates the old EventingBasicConsumers, with their events recreated as well. These recreated consumers then pick up the messages instead of the newly-created ones from ChannelHost and so the Consumer classes never receive the events and thus nothing ends up in the Channels.

The best way around this I could find was to implement a RecoverChannelAsync via a RecoveryAwareChannelHost which will attempt to use what the library gives us back (hooking into the relevant events, as long as the IConnection and IModel are IAutorecoveringConnection and IRecoverable respectively) and Record ing (or DeleteRecording when basic.cancel is handled) so that it can hook into the ConsumerTagChangeAfterRecovery event from the connection. It's not perfect as the RecoveryAwareChannelHost has to hook into events from the IAutorecoveringConnection but it's all added/removed and events protected by locks (via protected functions in the base ChannelHost class which also now has protected Channel and ConnHost - only settable by the base class, however).

I also noticed a small oversight in ChannelHost with Close checking !Closed || !Channel.IsOpen which feels like it should be !Closed && Channel.IsOpen, so I changed that, and added a TransientChannelPool as a base class for ChannelPool when you really just want transient channels - for e.g., a separate channel pool for consumers (which has no need for creating ackable and non-ackable collections of channels). It helped me track a lot of this stuff down as there was considerable noise from the publisher channels when using a shared channel pool.

Anyway there is now a PR in which I have battle-tested locally and it works with both TopologyEnabled and not. I would appreciate your feedback on it @houseofcat as this is a blocker to release this to production (thankfully, this issue was spotted in lower environments before it really had an impact there!)

@tiggerite tiggerite changed the title Race condition causing acking to fail from AutoRecoveringConnection Race condition causing acking to fail from AutoRecoveringConnection after a recovery Mar 15, 2023
@houseofcat
Copy link
Owner

I will review this, but it might be time to start the connection pool design over based off of current functionality being implemented.

@tiggerite
Copy link
Contributor Author

tiggerite commented Mar 15, 2023

The one thing I do really dislike is that Pivotal implemented the consumer tag recovery event on the connection, not the channel. Makes no sense to me as it should be the channel or you could argue even the consumer itself.

I didn't handle the event in ConnectionHost for this reason (would need a concurrent dictionary and then ChannelHost to call some method on ConnectionHost to check for a recovered tag, which ConnectionHost would then need to remove). Ugly and an anti pattern.

@houseofcat
Copy link
Owner

houseofcat commented Mar 16, 2023

I will start working on a reliable test - all my chaos engineering I used for its original design has fallen to the dust or I no longer work at those companies.

I will need to start over, configure a RabbitMQ server, start closing connections, restarting servers... I unfortunately will not have multiple nodes to retest on. Once I see the behaviors at play, I can start planning a new design around that.

@tiggerite
Copy link
Contributor Author

If it helps I can try to get a test up which is similar to the one I used to find the issue. It uses EasyNetQ package but I haven't yet updated that to the latest 2.0.0 (has lots of breaking changes over 1.x.x).

@tiggerite
Copy link
Contributor Author

Just to let you know I have begun work on this. I've made a Dockerfile and docker-compose to build and run the tests, plus a separate compose for RabbitMQ which exposes 5672 and 15672. This is then spun up on a network and has a Makefile for ease of use (both local and CI/CD).

It's still a bit of a work in progress but I managed to make a good chunk of the "skipped" Facts no longer so (there's a check in RabbitFixture on connectivity to the host on the 5672 port; it if fails the tests just return).

@houseofcat
Copy link
Owner

Great minds think of like, I was thinking of just fully integrating this type of setup for the tests projects.

@tiggerite
Copy link
Contributor Author

tiggerite commented Mar 23, 2023

Sorry for the delay with this @houseofcat . It's now in #48 - minimal changes made to actual code (only ones necessary to override/access particular methods/properties to prove that using the built-in recovery fixes the issues). You can run it locally with make tests or via your IDE of choice normally (but run make rmq first to spin up rabbit). All assuming you have Docker installed, of course!

The tests are now passing on your automated checks, albeit the new ones will be skipped there of course as there's no rabbit connection.

@tiggerite
Copy link
Contributor Author

tiggerite commented Mar 25, 2023

Now the 6.5.0 client is out there may be a tidier way by hooking into rabbitmq/rabbitmq-dotnet-client#1304 - will check it out later (have a plan anyway).

Edit: Yep, the new event made things quite a lot neater and fewer changes overall to base classes; I also made a new library RabbitMQ.Recoverable for everything (decided to move away from the default/current classes having Arguments with an Id they didn't use etc; seemed wrong).

@tiggerite
Copy link
Contributor Author

tiggerite commented Mar 27, 2023

With the latest RabbitMQ.Client 6.5.0, if we always call (around try/catch) _channel.Close on the Close() of ChannelHost, the issue of duplicate consumers has gone (thanks to rabbitmq/rabbitmq-dotnet-client#1317). Messages still remain unacked, however, without the recovery events/new classes.

Edit: Actually, they sometimes ack the messages now, I guess that is the race condition though as more often than not they don't.

@tiggerite
Copy link
Contributor Author

tiggerite commented Mar 28, 2023

Hmmm. So I added an extra bit to the tests whereby it publishes and consumes an initial message (just as a sanity check), that of course works for everything; then it recovers (closes and waits for them to reconnect) the connections, pauses processing, publishes half the prefetch and recovers them again (once that half are unacknowledged). It then publishes the remainder (plus an extra 10), once the unacknowledged reaches prefetch it resumes processing and waits for all messages to be processed.

Now, this works fine (all passes) for the new recoverable channel host/pool in the tests; but without them the connections are never reconnected the second time, which I didn't expect. Because of this it doesn't even get as far as the failure to acknowledge the messages (because there's not even a connection to publish to, let alone consume from). Any ideas why that might be happening? I'm perplexed.

RabbitMQ listening on localhost:5672
0 messages on TestRabbitServiceQueue in 1.1257s
1 active connection(s) in 2.9952s
1 consumer(s) on TestRabbitServiceQueue in 1.3761s
1 unacknowledged on TestRabbitServiceQueue in 4.9332s
0 messages on TestRabbitServiceQueue in 9.9379s
0 active connection(s) in 0.0231s
0 consumer(s) on TestRabbitServiceQueue in 4.8504s
1 active connection(s) in 9.9767s
1 consumer(s) on TestRabbitServiceQueue in 4.8978s
64 unacknowledged on TestRabbitServiceQueue in 4.9373s
0 active connection(s) in 0.0387s
0 consumer(s) on TestRabbitServiceQueue in 9.8839s
0 active connection(s) (expected 1) after 15.0000s

as opposed to

RabbitMQ listening on localhost:5672
0 messages on TestRabbitServiceQueue in 3.0078s
1 active connection(s) in 0.8080s
1 consumer(s) on TestRabbitServiceQueue in 0.0080s
1 unacknowledged on TestRabbitServiceQueue in 9.0593s
0 messages on TestRabbitServiceQueue in 9.8961s
0 active connection(s) in 0.0039s
0 consumer(s) on TestRabbitServiceQueue in 5.1399s
1 active connection(s) in 9.7283s
1 consumer(s) on TestRabbitServiceQueue in 5.1218s
64 unacknowledged on TestRabbitServiceQueue in 9.7531s
0 active connection(s) in 0.0205s
0 consumer(s) on TestRabbitServiceQueue in 5.0273s
1 active connection(s) in 9.7987s
1 consumer(s) on TestRabbitServiceQueue in 5.1215s
128 unacknowledged on TestRabbitServiceQueue in 9.7202s
0 messages on TestRabbitServiceQueue in 9.9526s

@houseofcat
Copy link
Owner

I believe this is coincidentally solved by #50

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants