Skip to content

AutorecoveringConnection - Connection reestablished after a dispose() or an abort() #290

@alessnet

Description

@alessnet

Hello,

We have a software that stops its processing when the model or a consumer cancels. However after a while we started to see some orphan connections without any channel. After investigation, it seems that disposing the IConnection does not always release the underlying connection.

We often see this issue after a network disconnection or if we manually disconnects the client from the RabbitMQ Management website.

In AutorecoveringConnection.cs you can see that the variable "manuallyClosed" is not synchronized nor volatile. But this variable is used in the recovery thread (recoveryTaskFactory) which could see the old value (false).
Anyway, even if you synchronize this variable, you can still call "dispose" while the auto-recovery is already in progress. The excepted behaviour is to automatically close the connection just after the recovery ended.

Furthermore, the "Dispose" method can throw an exception "The AMQP operation was interrupted". In C# we generally do not except an exception in the Dispose (what should we do?). But in this case it seems that the reason (operation was interrupted) is not a valid reason to throw. If the connection cannot send the "abort" packet, it should not throw any exception in my opinion.

See the following testcase (the "IsOpen" property becomes "true" after the dispose).

Thank you

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using System.Threading.Tasks;
using System.Net.Sockets;
using System.Collections.Generic;
using System.Threading;
using RabbitMQ.Client.Exceptions;

namespace testbugrabbitmq
{
	[TestClass]
	public class UnitTest1
	{
		[TestMethod]
		public async Task TestMethod1()
		{
			ConnectionFactory factory = new ConnectionFactory();
			factory.AutomaticRecoveryEnabled = true;
			factory.RequestedConnectionTimeout = 5000;
			factory.TopologyRecoveryEnabled = true;
			factory.SocketReadTimeout = 10000;
			factory.SocketWriteTimeout = 10000;
			factory.ContinuationTimeout = TimeSpan.FromSeconds(10);

			// used to keep track of opened connection and simulate a network error
			factory.SocketFactory = addr => new TestTcpClientAdapter(this, new Socket(addr, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true });
			factory.UserName = "testcase";
			factory.Password = "testcase";
			factory.Uri = "amqp://myserver/testcase";

			IConnection connection = factory.CreateConnection();
			Assert.IsTrue(connection.IsOpen);

			bool wasDisconnected = false;
			Exception disposeException = null;
			connection.ConnectionShutdown += (s, e) =>
			{
				Task.Run(() =>
				{
					// the dispose may throw because the connection is closed (which is a weird behaviour)
					connection.Dispose();
				})
				.ContinueWith(t =>
				{
					/* 
					 * even if the "dispose" throwed an exception, you can see with the
					 * debugger that manuallyClosed is set to true.
					 * So we do not except the auto-recovery to reconnect.
					 */
					Volatile.Write(ref wasDisconnected, connection.IsOpen == false);
					Volatile.Write(ref disposeException, t.Exception?.InnerException);
				});
			};

			// simulate a network error
			this.CloseAllConnections();

			// wait to see if the "IsOpen" changes
			await Task.Delay(TimeSpan.FromSeconds(11));

			Assert.IsTrue(Volatile.Read(ref wasDisconnected));
			Exception test = Volatile.Read(ref disposeException);
			Assert.IsTrue(test == null || (test is OperationInterruptedException && test.Message == "The AMQP operation was interrupted"));

			/* The connection was restored */
			Assert.IsTrue(connection.IsOpen);
		}

		private void CloseAllConnections()
		{
			List<TcpClientAdapter> clonedList;
			lock (this.connections)
			{
				clonedList = new List<TcpClientAdapter>(this.connections);
				this.connections.Clear();
			}

			foreach (var connection in clonedList)
				connection.Close();
		}

		private List<TcpClientAdapter> connections = new List<TcpClientAdapter>();
		private class TestTcpClientAdapter : TcpClientAdapter
		{
			private readonly UnitTest1 parent;
			public TestTcpClientAdapter(UnitTest1 p, Socket socket) : base(socket)
			{
				this.parent = p;
				lock (this.parent.connections)
				{
					this.parent.connections.Add(this);
				}
			}

			public override void Close()
			{
				try
				{
					base.Close();
				}
				finally
				{
					lock (this.parent.connections)
					{
						this.parent.connections.Remove(this);
					}
				}

			}
		}
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions