diff --git a/.gitignore b/.gitignore index b5f51373f7..842267178f 100644 --- a/.gitignore +++ b/.gitignore @@ -51,7 +51,8 @@ build/ BenchmarkDotNet.Artifacts/* -APIApproval.Approve.received.txt +projects/Unit/APIApproval.Approve.received.txt +projects/Unit/APIApproval.Approve.*.received.txt # Visual Studio 2015 cache/options directory .vs/ diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 7069b02e1a..0612337a56 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -97,6 +97,14 @@ public interface IChannel : IDisposable /// ulong NextPublishSeqNo { get; } + /// + /// The name of the last queue declared on this channel. + /// + /// + /// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name + /// + string CurrentQueue { get; } + /// /// Signalled when a Basic.Ack command arrives from the broker. /// diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 8495e56131..a465facccc 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -123,23 +123,76 @@ public event EventHandler Recovery remove { InnerChannel.Recovery -= value; } } - public IEnumerable ConsumerTags => _recordedConsumerTags; + public IEnumerable ConsumerTags + { + get + { + ThrowIfDisposed(); + return _recordedConsumerTags; + } + } - public int ChannelNumber => InnerChannel.ChannelNumber; + public int ChannelNumber + { + get + { + ThrowIfDisposed(); + return InnerChannel.ChannelNumber; + } + } - public ShutdownEventArgs CloseReason => InnerChannel.CloseReason; + public ShutdownEventArgs CloseReason + { + get + { + ThrowIfDisposed(); + return InnerChannel.CloseReason; + } + } public IBasicConsumer DefaultConsumer { - get => InnerChannel.DefaultConsumer; - set => InnerChannel.DefaultConsumer = value; + get + { + ThrowIfDisposed(); + return InnerChannel.DefaultConsumer; + } + + set + { + ThrowIfDisposed(); + InnerChannel.DefaultConsumer = value; + } } public bool IsClosed => !IsOpen; - public bool IsOpen => _innerChannel != null && _innerChannel.IsOpen; + public bool IsOpen + { + get + { + ThrowIfDisposed(); + return _innerChannel != null && _innerChannel.IsOpen; + } + } + + public ulong NextPublishSeqNo + { + get + { + ThrowIfDisposed(); + return InnerChannel.NextPublishSeqNo; + } + } - public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo; + public string CurrentQueue + { + get + { + ThrowIfDisposed(); + return InnerChannel.CurrentQueue; + } + } internal void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index 50be199746..56d5027be8 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -152,7 +152,7 @@ private void UpdateConsumerQueue(string oldName, string newName) { if (consumer.Queue == oldName) { - _recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueNameTag(newName, consumer); + _recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueName(newName, consumer); } } } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index dfd88782af..6ca55f84df 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -175,6 +175,8 @@ public IBasicConsumer DefaultConsumer public ulong NextPublishSeqNo { get; private set; } + public string CurrentQueue { get; private set; } + public ISession Session { get; private set; } protected void TakeOver(ChannelBase other) @@ -1169,7 +1171,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo _Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments); k.GetReply(ContinuationTimeout); } - return k.m_result; + QueueDeclareOk result = k.m_result; + CurrentQueue = result.QueueName; + return result; } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs index 2e0a51b054..a68202091e 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs @@ -59,11 +59,21 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, } _consumer = consumer; - if (string.IsNullOrEmpty(queue)) + if (queue is null) { throw new ArgumentNullException(nameof(queue)); } - _queue = queue; + else + { + if (queue == string.Empty) + { + _queue = _channel.CurrentQueue; + } + else + { + _queue = queue; + } + } if (string.IsNullOrEmpty(consumerTag)) { @@ -89,7 +99,7 @@ public static RecordedConsumer WithNewConsumerTag(string newTag, in RecordedCons return new RecordedConsumer(old.Channel, old.Consumer, newTag, old.Queue, old.AutoAck, old.Exclusive, old.Arguments); } - public static RecordedConsumer WithNewQueueNameTag(string newQueueName, in RecordedConsumer old) + public static RecordedConsumer WithNewQueueName(string newQueueName, in RecordedConsumer old) { return new RecordedConsumer(old.Channel, old.Consumer, old.ConsumerTag, newQueueName, old.AutoAck, old.Exclusive, old.Arguments); } diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 724067cf09..a9280b67f2 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -380,6 +380,7 @@ namespace RabbitMQ.Client int ChannelNumber { get; } RabbitMQ.Client.ShutdownEventArgs CloseReason { get; } System.TimeSpan ContinuationTimeout { get; set; } + string CurrentQueue { get; } RabbitMQ.Client.IBasicConsumer DefaultConsumer { get; set; } bool IsClosed { get; } bool IsOpen { get; } diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index b84c68eee7..e6f770a71b 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -410,6 +410,39 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() } } + [Fact] + public void TestConsumerRecoveryWithServerNamedQueue() + { + // https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238 + using (AutorecoveringConnection c = CreateAutorecoveringConnection()) + { + IChannel ch = c.CreateChannel(); + QueueDeclareOk queueDeclareResult = ch.QueueDeclare(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null); + string qname = queueDeclareResult.QueueName; + Assert.False(string.IsNullOrEmpty(qname)); + + var cons = new EventingBasicConsumer(ch); + ch.BasicConsume(string.Empty, true, cons); + AssertConsumerCount(ch, qname, 1); + + bool queueNameBeforeIsEqual = false; + bool queueNameChangeAfterRecoveryCalled = false; + string qnameAfterRecovery = null; + c.QueueNameChangeAfterRecovery += (source, ea) => + { + queueNameChangeAfterRecoveryCalled = true; + queueNameBeforeIsEqual = qname.Equals(ea.NameBefore); + qnameAfterRecovery = ea.NameAfter; + }; + + CloseAndWaitForRecovery(c); + + AssertConsumerCount(ch, qnameAfterRecovery, 1); + Assert.True(queueNameChangeAfterRecoveryCalled); + Assert.True(queueNameBeforeIsEqual); + } + } + [Fact] public void TestConsumerRecoveryWithManyConsumers() {