diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index b0e62ce611..cd0f311cfa 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -45,10 +45,10 @@ public override async Task HandleBasicConsumeOk(string consumerTag) } ///Fires the Received event. - public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory body) { - await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false); - await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false); + // No need to call base, it's empty. + return Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); } ///Fires the Shutdown event. diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs index 11bf8f8afa..83e270aba8 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs @@ -66,7 +66,7 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag) public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) { // the only case where we ignore the shutdown flag. - Schedule(new ModelShutdown(consumer, reason)); + Schedule(new ModelShutdown(consumer, reason, _model)); } private void ScheduleUnlessShuttingDown(TWork work) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs index 3a953056be..3a35fbb1e5 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using RabbitMQ.Client.Events; namespace RabbitMQ.Client.Impl { @@ -79,7 +81,7 @@ public void Enqueue(Work work) _channel.Writer.TryWrite(work); } - async Task Loop() + private async Task Loop() { while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false)) { @@ -87,21 +89,35 @@ async Task Loop() { try { - Task task = work.Execute(_model); + Task task = work.Execute(); if (!task.IsCompleted) { await task.ConfigureAwait(false); } } - catch(Exception) + catch (Exception e) { + if (!(_model is ModelBase modelBase)) + { + return; + } + var details = new Dictionary + { + { "consumer", work.Consumer }, + { "context", work.Consumer } + }; + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); + } + finally + { + work.PostExecute(); } } } } - async Task LoopWithConcurrency(CancellationToken cancellationToken) + private async Task LoopWithConcurrency(CancellationToken cancellationToken) { try { @@ -125,22 +141,33 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken) } } - static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter) + private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter) { try { - Task task = work.Execute(model); + Task task = work.Execute(); if (!task.IsCompleted) { await task.ConfigureAwait(false); } } - catch (Exception) + catch (Exception e) { - // ignored + if (!(model is ModelBase modelBase)) + { + return; + } + + var details = new Dictionary + { + { "consumer", work.Consumer }, + { "context", work.Consumer } + }; + modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } finally { + work.PostExecute(); limiter.Release(); } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicCancel.cs b/projects/RabbitMQ.Client/client/impl/BasicCancel.cs index 3917955dbc..dd77825b2f 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicCancel.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicCancel.cs @@ -1,40 +1,21 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -using RabbitMQ.Client.Events; +using System.Threading.Tasks; namespace RabbitMQ.Client.Impl { - sealed class BasicCancel : Work + internal sealed class BasicCancel : Work { - readonly string _consumerTag; + private readonly string _consumerTag; + + public override string Context => "HandleBasicCancel"; public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer) { _consumerTag = consumerTag; } - protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) + protected override Task Execute(IAsyncBasicConsumer consumer) { - try - { - await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false); - } - catch (Exception e) - { - if (!(model is ModelBase modelBase)) - { - return; - } - - var details = new Dictionary - { - {"consumer", consumer}, - {"context", "HandleBasicCancel"} - }; - modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); - } + return consumer.HandleBasicCancel(_consumerTag); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs b/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs index 1e38f3b6dc..e63ed3dd1c 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicCancelOk.cs @@ -1,40 +1,21 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -using RabbitMQ.Client.Events; +using System.Threading.Tasks; namespace RabbitMQ.Client.Impl { - sealed class BasicCancelOk : Work + internal sealed class BasicCancelOk : Work { - readonly string _consumerTag; + private readonly string _consumerTag; + + public override string Context => "HandleBasicCancelOk"; public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer) { _consumerTag = consumerTag; } - protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) + protected override Task Execute(IAsyncBasicConsumer consumer) { - try - { - await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false); - } - catch (Exception e) - { - if (!(model is ModelBase modelBase)) - { - return; - } - - var details = new Dictionary() - { - {"consumer", consumer}, - {"context", "HandleBasicCancelOk"} - }; - modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); - } + return consumer.HandleBasicCancelOk(_consumerTag); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs b/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs index 81c429db1e..ad7cf55330 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicConsumeOk.cs @@ -1,40 +1,21 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -using RabbitMQ.Client.Events; +using System.Threading.Tasks; namespace RabbitMQ.Client.Impl { - sealed class BasicConsumeOk : Work + internal sealed class BasicConsumeOk : Work { - readonly string _consumerTag; + private readonly string _consumerTag; + + public override string Context => "HandleBasicConsumeOk"; public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer) { _consumerTag = consumerTag; } - protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) + protected override Task Execute(IAsyncBasicConsumer consumer) { - try - { - await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false); - } - catch (Exception e) - { - if (!(model is ModelBase modelBase)) - { - return; - } - - var details = new Dictionary() - { - {"consumer", consumer}, - {"context", "HandleBasicConsumeOk"} - }; - modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); - } + return consumer.HandleBasicConsumeOk(_consumerTag); } } } diff --git a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs index 3df3e24b82..12d99d7c9c 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs @@ -1,22 +1,21 @@ using System; using System.Buffers; -using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading.Tasks; -using RabbitMQ.Client.Events; - namespace RabbitMQ.Client.Impl { - sealed class BasicDeliver : Work + internal sealed class BasicDeliver : Work { - readonly string _consumerTag; - readonly ulong _deliveryTag; - readonly bool _redelivered; - readonly string _exchange; - readonly string _routingKey; - readonly IBasicProperties _basicProperties; - readonly ReadOnlyMemory _body; + private readonly string _consumerTag; + private readonly ulong _deliveryTag; + private readonly bool _redelivered; + private readonly string _exchange; + private readonly string _routingKey; + private readonly IBasicProperties _basicProperties; + private readonly ReadOnlyMemory _body; + + public override string Context => "HandleBasicDeliver"; public BasicDeliver(IBasicConsumer consumer, string consumerTag, @@ -36,38 +35,22 @@ public BasicDeliver(IBasicConsumer consumer, _body = body; } - protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) + protected override Task Execute(IAsyncBasicConsumer consumer) { - try - { - await consumer.HandleBasicDeliver(_consumerTag, - _deliveryTag, - _redelivered, - _exchange, - _routingKey, - _basicProperties, - _body).ConfigureAwait(false); - } - catch (Exception e) - { - if (!(model is ModelBase modelBase)) - { - return; - } + return consumer.HandleBasicDeliver(_consumerTag, + _deliveryTag, + _redelivered, + _exchange, + _routingKey, + _basicProperties, + _body); + } - var details = new Dictionary() - { - {"consumer", consumer}, - {"context", "HandleBasicDeliver"} - }; - modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); - } - finally + public override void PostExecute() + { + if (MemoryMarshal.TryGetArray(_body, out ArraySegment segment)) { - if (MemoryMarshal.TryGetArray(_body, out ArraySegment segment)) - { - ArrayPool.Shared.Return(segment.Array); - } + ArrayPool.Shared.Return(segment.Array); } } } diff --git a/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs b/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs index aa4b27682c..b38684b74a 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelShutdown.cs @@ -1,40 +1,23 @@ -using System; -using System.Collections.Generic; using System.Threading.Tasks; -using RabbitMQ.Client.Events; - namespace RabbitMQ.Client.Impl { - sealed class ModelShutdown : Work + internal sealed class ModelShutdown : Work { - readonly ShutdownEventArgs _reason; + private readonly ShutdownEventArgs _reason; + private readonly IModel _model; + + public override string Context => "HandleModelShutdown"; - public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer) + public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason, IModel model) : base(consumer) { _reason = reason; + _model = model; } - protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer) + protected override Task Execute(IAsyncBasicConsumer consumer) { - try - { - await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false); - } - catch (Exception e) - { - if (!(model is ModelBase modelBase)) - { - return; - } - - var details = new Dictionary() - { - { "consumer", consumer }, - { "context", "HandleModelShutdown" } - }; - modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); - } + return consumer.HandleModelShutdown(_model, _reason); } } } diff --git a/projects/RabbitMQ.Client/client/impl/Work.cs b/projects/RabbitMQ.Client/client/impl/Work.cs index 325435e255..44c357c7c6 100644 --- a/projects/RabbitMQ.Client/client/impl/Work.cs +++ b/projects/RabbitMQ.Client/client/impl/Work.cs @@ -4,18 +4,24 @@ namespace RabbitMQ.Client.Impl { internal abstract class Work { - readonly IAsyncBasicConsumer _asyncConsumer; + public IAsyncBasicConsumer Consumer { get; } + + public abstract string Context { get; } protected Work(IBasicConsumer consumer) { - _asyncConsumer = (IAsyncBasicConsumer)consumer; + Consumer = (IAsyncBasicConsumer)consumer; } - public Task Execute(IModel model) + public Task Execute() { - return Execute(model, _asyncConsumer); + return Execute(Consumer); } - protected abstract Task Execute(IModel model, IAsyncBasicConsumer consumer); + protected abstract Task Execute(IAsyncBasicConsumer consumer); + + public virtual void PostExecute() + { + } } }