diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index ab5c9cac10..74edd17fb2 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -208,7 +208,7 @@ public static void ExchangeDeclare(this IChannel channel, string exchange, strin public static ValueTask ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, IDictionary arguments = null) { - return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments); + return channel.ExchangeDeclareAsync(exchange, type, false, durable, autoDelete, arguments); } /// diff --git a/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs b/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs index 396d05a227..423d18fecd 100644 --- a/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs +++ b/projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs @@ -30,7 +30,7 @@ //--------------------------------------------------------------------------- using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Diagnostics.Tracing; namespace RabbitMQ.Client { @@ -57,11 +57,13 @@ public class TimerBasedCredentialRefresherEventSource : EventSource public void TriggeredTimer(string name) => WriteEvent(4, "TriggeredTimer", name); [Event(5)] public void RefreshedCredentials(string name, bool succesfully) => WriteEvent(5, "RefreshedCredentials", name, succesfully); + [Event(6)] + public void AlreadyRegistered(string name) => WriteEvent(6, "AlreadyRegistered", name); } public class TimerBasedCredentialRefresher : ICredentialsRefresher { - private Dictionary _registrations = new Dictionary(); + private readonly ConcurrentDictionary _registrations = new(); public ICredentialsProvider Register(ICredentialsProvider provider, ICredentialsRefresher.NotifyCredentialRefreshed callback) { @@ -70,25 +72,31 @@ public ICredentialsProvider Register(ICredentialsProvider provider, ICredentials return provider; } - _registrations.Add(provider, scheduleTimer(provider, callback)); - TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name); + if (_registrations.TryAdd(provider, scheduleTimer(provider, callback))) + { + TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name); + } + else + { + TimerBasedCredentialRefresherEventSource.Log.AlreadyRegistered(provider.Name); + } + return provider; } public bool Unregister(ICredentialsProvider provider) { - if (!_registrations.ContainsKey(provider)) + if (_registrations.TryRemove(provider, out System.Timers.Timer timer)) { - return false; - } - - var timer = _registrations[provider]; - if (timer != null) - { - TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name); - timer.Stop(); - _registrations.Remove(provider); - timer.Dispose(); + try + { + TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name); + timer.Stop(); + } + finally + { + timer.Dispose(); + } return true; } else diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index ba9db4a41c..83e6eccc1f 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -58,13 +58,14 @@ public OAuth2Options(Mode mode) public int TokenExpiresInSeconds => 60; } - public class TestOAuth2 + public class TestOAuth2 : IAsyncLifetime { private const string Exchange = "test_direct"; private readonly AutoResetEvent _doneEvent = new AutoResetEvent(false); private readonly ITestOutputHelper _testOutputHelper; - private readonly IConnection _connection; + private readonly IConnectionFactory _connectionFactory; + private IConnection _connection; private readonly int _tokenExpiresInSeconds; public TestOAuth2(ITestOutputHelper testOutputHelper) @@ -75,61 +76,76 @@ public TestOAuth2(ITestOutputHelper testOutputHelper) Mode mode = (Mode)Enum.Parse(typeof(Mode), modeStr.ToLowerInvariant()); var options = new OAuth2Options(mode); - var connectionFactory = new ConnectionFactory + _connectionFactory = new ConnectionFactory { AutomaticRecoveryEnabled = true, + DispatchConsumersAsync = true, CredentialsProvider = GetCredentialsProvider(options), CredentialsRefresher = GetCredentialsRefresher(), ClientProvidedName = nameof(TestOAuth2) }; - _connection = connectionFactory.CreateConnection(); _tokenExpiresInSeconds = options.TokenExpiresInSeconds; } + public async Task InitializeAsync() + { + _connection = await _connectionFactory.CreateConnectionAsync(); + } + + public async Task DisposeAsync() + { + await _connection.CloseAsync(); + _connection.Dispose(); + } + [Fact] public async void IntegrationTest() { - using (_connection) + using (IChannel publishChannel = await DeclarePublisherAsync()) + using (IChannel consumeChannel = await DeclareConsumerAsync()) { - using (IChannel publisher = declarePublisher()) - using (IChannel subscriber = await declareConsumer()) - { - await Publish(publisher); - Consume(subscriber); + await PublishAsync(publishChannel); + Consume(consumeChannel); - if (_tokenExpiresInSeconds > 0) + if (_tokenExpiresInSeconds > 0) + { + for (int i = 0; i < 4; i++) { - for (int i = 0; i < 4; i++) - { - _testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1)); + _testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1)); - await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10)); - _testOutputHelper.WriteLine("Resuming .."); + await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10)); + _testOutputHelper.WriteLine("Resuming .."); - await Publish(publisher); - _doneEvent.Reset(); + await PublishAsync(publishChannel); + _doneEvent.Reset(); - Consume(subscriber); - } - } - else - { - throw new InvalidOperationException(); + Consume(consumeChannel); } } + else + { + Assert.Fail("_tokenExpiresInSeconds is NOT greater than 0"); + } } } - private IChannel declarePublisher() + [Fact] + public async void SecondConnectionCrashes_GH1429() + { + // https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429 + using IConnection secondConnection = await _connectionFactory.CreateConnectionAsync(); + } + + private async Task DeclarePublisherAsync() { - IChannel publisher = _connection.CreateChannel(); - publisher.ConfirmSelect(); - publisher.ExchangeDeclare("test_direct", ExchangeType.Direct, true, false); + IChannel publisher = await _connection.CreateChannelAsync(); + await publisher.ConfirmSelectAsync(); + await publisher.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false); return publisher; } - private async Task Publish(IChannel publisher) + private async Task PublishAsync(IChannel publisher) { const string message = "Hello World!"; @@ -146,7 +162,7 @@ private async Task Publish(IChannel publisher) _testOutputHelper.WriteLine("Confirmed Sent message"); } - private async ValueTask declareConsumer() + private async ValueTask DeclareConsumerAsync() { IChannel subscriber = _connection.CreateChannel(); await subscriber.QueueDeclareAsync(queue: "testqueue", passive: false, true, false, false, arguments: null); diff --git a/projects/Test/Unit/APIApproval.Approve.verified.txt b/projects/Test/Unit/APIApproval.Approve.verified.txt index c2f83298d3..1d9773af41 100644 --- a/projects/Test/Unit/APIApproval.Approve.verified.txt +++ b/projects/Test/Unit/APIApproval.Approve.verified.txt @@ -850,6 +850,8 @@ namespace RabbitMQ.Client { public TimerBasedCredentialRefresherEventSource() { } public static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource Log { get; } + [System.Diagnostics.Tracing.Event(6)] + public void AlreadyRegistered(string name) { } [System.Diagnostics.Tracing.Event(5)] public void RefreshedCredentials(string name, bool succesfully) { } [System.Diagnostics.Tracing.Event(1)]