From 58266fcb3d736d02a6018c50640db724daf60e3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Mon, 10 Oct 2022 11:47:05 +0000 Subject: [PATCH] First implementation of adding ActivitySource to common operations to enable OpenTelemetry scenarios. Linking existing context for publish if one exists dotnet format Adding standard tags without allocating. Updating code after comments Moving ActivitySource tests to Integration Making sure TaskCompletionSources execute asynchronously Moving activity source tests to sequential integration --- .gitignore | 6 + RabbitMQDotNetClient.sln | 1 + .../RabbitMQ.Client/PublicAPI.Unshipped.txt | 9 +- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 3 + .../RabbitMQ.Client/client/api/IChannel.cs | 4 +- .../client/api/IChannelExtensions.cs | 6 +- .../events/AsyncEventingBasicConsumer.cs | 12 +- .../client/events/EventingBasicConsumer.cs | 11 +- .../client/impl/AsyncRpcContinuations.cs | 2 + .../client/impl/AutorecoveringChannel.cs | 8 +- .../client/impl/ChannelBase.cs | 218 ++++++++++++- .../RabbitMQ.Client/client/impl/Connection.cs | 5 + .../client/impl/RabbitMQActivitySource.cs | 290 ++++++++++++++++++ .../client/impl/RpcContinuations.cs | 1 + .../client/impl/SessionBase.cs | 21 +- projects/Test/Common/ProcessUtil.cs | 6 +- .../Test/Integration/TestChannelShutdown.cs | 2 +- ...estConcurrentAccessWithSharedConnection.cs | 2 +- .../Test/Integration/TestConfirmSelect.cs | 6 +- .../Integration/TestConnectionShutdown.cs | 10 +- projects/Test/Integration/TestConsumer.cs | 2 +- .../Integration/TestConsumerCancelNotify.cs | 2 +- .../Integration/TestConsumerExceptions.cs | 2 +- .../TestConsumerOperationDispatch.cs | 6 +- .../Test/Integration/TestEventingConsumer.cs | 8 +- projects/Test/Integration/TestInvalidAck.cs | 2 +- projects/Test/Integration/TestMainLoop.cs | 2 +- .../TestActivitySource.cs | 247 +++++++++++++++ .../TestConnectionBlocked.cs | 4 +- .../TestConnectionRecovery.cs | 24 +- .../TestConnectionRecoveryBase.cs | 6 +- .../TestConnectionRecoveryWithoutSetup.cs | 10 +- .../TestConnectionTopologyRecovery.cs | 22 +- .../Unit/TestTimerBasedCredentialRefresher.cs | 4 +- 34 files changed, 876 insertions(+), 88 deletions(-) create mode 100644 projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs create mode 100644 projects/Test/SequentialIntegration/TestActivitySource.cs diff --git a/.gitignore b/.gitignore index ff820f3350..ec34c77b6b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.orig *.log _site/ @@ -28,6 +29,11 @@ test.sh test-output.log InternalTrace* nunit-agent* +################# +## JetBrains Rider +################# +.idea/ + ################# ## Visual Studio ################# diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index efdd0fcf97..8c37f5f06c 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -5,6 +5,7 @@ MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}" ProjectSection(SolutionItems) = preProject .editorconfig = .editorconfig + .gitignore = .gitignore EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}" diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 3cda2e436f..d470b54eb7 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -481,8 +481,8 @@ RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory body = default(System.ReadOnlyMemory), bool mandatory = false) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler RabbitMQ.Client.IChannel.ChannelNumber.get -> int @@ -658,6 +658,7 @@ RabbitMQ.Client.PublicationAddress RabbitMQ.Client.PublicationAddress.PublicationAddress(string exchangeType, string exchangeName, string routingKey) -> void RabbitMQ.Client.QueueDeclareOk RabbitMQ.Client.QueueDeclareOk.QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) -> void +RabbitMQ.Client.RabbitMQActivitySource RabbitMQ.Client.ReadOnlyBasicProperties RabbitMQ.Client.ReadOnlyBasicProperties.AppId.get -> string RabbitMQ.Client.ReadOnlyBasicProperties.ClusterId.get -> string @@ -851,6 +852,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Cli static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string +static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.get -> bool +static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.set -> void static RabbitMQ.Client.TcpClientAdapter.GetMatchingHost(System.Collections.Generic.IReadOnlyCollection addresses, System.Net.Sockets.AddressFamily addressFamily) -> System.Net.IPAddress static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource.Log.get -> RabbitMQ.Client.TimerBasedCredentialRefresherEventSource static readonly RabbitMQ.Client.CachedString.Empty -> RabbitMQ.Client.CachedString @@ -881,6 +884,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.Dispose(bool disposing) -> void virtual RabbitMQ.Client.TcpClientAdapter.GetStream() -> System.Net.Sockets.NetworkStream virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void +~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string +~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string ~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index e06dd89034..a539ea28fb 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -80,4 +80,7 @@ + + + diff --git a/projects/RabbitMQ.Client/client/api/IChannel.cs b/projects/RabbitMQ.Client/client/api/IChannel.cs index 5a98e65837..e69dc6079c 100644 --- a/projects/RabbitMQ.Client/client/api/IChannel.cs +++ b/projects/RabbitMQ.Client/client/api/IChannel.cs @@ -192,7 +192,7 @@ public interface IChannel : IDisposable /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; /// @@ -203,7 +203,7 @@ ValueTask BasicPublishAsync(string exchange, string routingKey, in /// Routing key must be shorter than 255 bytes. /// /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory body = default, bool mandatory = false) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; #nullable disable diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 862035fa9b..f48be542fa 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -89,14 +89,14 @@ public static Task BasicConsumeAsync(this IChannel channel, string queue public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory body) where T : IReadOnlyBasicProperties, IAmqpHeader { - return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, in basicProperties, body); + return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body); } public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body = default, bool mandatory = false) - => channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory); + => channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory); #nullable disable diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 7264f7e507..59520ef124 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading.Tasks; using RabbitMQ.Client.Impl; @@ -78,8 +79,9 @@ await base.HandleBasicConsumeOk(consumerTag) public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory body) { + var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); // No need to call base, it's empty. - return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); + return BasicDeliverWrapper(deliverEventArgs); } ///Fires the Shutdown event. @@ -93,5 +95,13 @@ await _shutdownWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); } } + + private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs) + { + using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs)) + { + await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false); + } + } } } diff --git a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs index d69d72e23a..153ff7bb8b 100644 --- a/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Diagnostics; using System.Threading.Tasks; namespace RabbitMQ.Client.Events @@ -88,10 +89,12 @@ public override void HandleBasicConsumeOk(string consumerTag) public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, ReadOnlyBasicProperties properties, ReadOnlyMemory body) { - await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - Received?.Invoke( - this, - new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)); + BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default) + { + await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + Received?.Invoke(this, eventArgs); + } } ///Fires the Shutdown event. diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index f673e00091..b9e2d0274d 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -72,6 +72,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout) _tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false); } + internal DateTime StartTime { get; } = DateTime.UtcNow; + public ConfiguredTaskAwaitable.ConfiguredTaskAwaiter GetAwaiter() { return _tcsConfiguredTaskAwaitable.GetAwaiter(); diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 3f6cdba60f..7832dfdde4 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -290,13 +290,13 @@ public ValueTask BasicGetAsync(string queue, bool autoAck) public ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) => InnerChannel.BasicNackAsync(deliveryTag, multiple, requeue); - public ValueTask BasicPublishAsync(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader - => InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory); + => InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory); public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) { diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index cadec4ea49..1e893cf2b2 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -37,7 +37,9 @@ using System.Text; using System.Threading; using System.Threading.Tasks; + using RabbitMQ.Client.client.framing; +using RabbitMQ.Client.client.impl; using RabbitMQ.Client.ConsumerDispatching; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -80,11 +82,13 @@ protected ChannelBase(ConnectionConfig config, ISession session) ConsumerDispatcher = new ConsumerDispatcher(this, config.DispatchConsumerConcurrency); } - Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + Action onException = (exception, context) => + OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); _basicNacksWrapper = new EventingWrapper("OnBasicNack", onException); _basicReturnWrapper = new EventingWrapper("OnBasicReturn", onException); - _callbackExceptionWrapper = new EventingWrapper(string.Empty, (exception, context) => { }); + _callbackExceptionWrapper = + new EventingWrapper(string.Empty, (exception, context) => { }); _flowControlWrapper = new EventingWrapper("OnFlowControl", onException); _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); @@ -101,6 +105,7 @@ public event EventHandler BasicAcks add => _basicAcksWrapper.AddHandler(value); remove => _basicAcksWrapper.RemoveHandler(value); } + private EventingWrapper _basicAcksWrapper; public event EventHandler BasicNacks @@ -108,6 +113,7 @@ public event EventHandler BasicNacks add => _basicNacksWrapper.AddHandler(value); remove => _basicNacksWrapper.RemoveHandler(value); } + private EventingWrapper _basicNacksWrapper; public event EventHandler BasicReturn @@ -115,6 +121,7 @@ public event EventHandler BasicReturn add => _basicReturnWrapper.AddHandler(value); remove => _basicReturnWrapper.RemoveHandler(value); } + private EventingWrapper _basicReturnWrapper; public event EventHandler CallbackException @@ -122,6 +129,7 @@ public event EventHandler CallbackException add => _callbackExceptionWrapper.AddHandler(value); remove => _callbackExceptionWrapper.RemoveHandler(value); } + private EventingWrapper _callbackExceptionWrapper; public event EventHandler FlowControl @@ -129,6 +137,7 @@ public event EventHandler FlowControl add => _flowControlWrapper.AddHandler(value); remove => _flowControlWrapper.RemoveHandler(value); } + private EventingWrapper _flowControlWrapper; public event EventHandler ChannelShutdown @@ -146,6 +155,7 @@ public event EventHandler ChannelShutdown } remove => _channelShutdownWrapper.RemoveHandler(value); } + private EventingWrapper _channelShutdownWrapper; public event EventHandler Recovery @@ -153,6 +163,7 @@ public event EventHandler Recovery add => _recoveryWrapper.AddHandler(value); remove => _recoveryWrapper.RemoveHandler(value); } + private EventingWrapper _recoveryWrapper; internal void RunRecoveryEventHandlers(object sender) @@ -344,7 +355,8 @@ await ModelSendAsync(method) } } - internal async ValueTask ConnectionStartOkAsync(IDictionary clientProperties, string mechanism, byte[] response, + internal async ValueTask ConnectionStartOkAsync( + IDictionary clientProperties, string mechanism, byte[] response, string locale) { await _rpcSemaphore.WaitAsync() @@ -510,11 +522,13 @@ protected void ChannelSend(in TMethod method, in THeader heade { _flowControlBlock.Wait(); } + Session.Transmit(in method, in header, body); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected ValueTask ModelSendAsync(in TMethod method, in THeader header, ReadOnlyMemory body) + protected ValueTask ModelSendAsync(in TMethod method, in THeader header, + ReadOnlyMemory body) where TMethod : struct, IOutgoingAmqpMethod where THeader : IAmqpHeader { @@ -556,9 +570,11 @@ private void OnChannelShutdown(ShutdownEventArgs reason) { confirmsTaskCompletionSource.TrySetException(exception); } + _confirmsTaskCompletionSources.Clear(); } } + _flowControlBlock.Set(); } @@ -680,6 +696,7 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) { confirmsTaskCompletionSource.TrySetResult(_onlyAcksReceived); } + _confirmsTaskCompletionSources.Clear(); _onlyAcksReceived = true; } @@ -1134,7 +1151,17 @@ await _rpcSemaphore.WaitAsync() await ModelSendAsync(method) .ConfigureAwait(false); - return await k; + BasicGetResult result = await k; + + using Activity activity = result != null + ? RabbitMQActivitySource.Receive(result.RoutingKey, + result.Exchange, + result.DeliveryTag, result.BasicProperties, result.Body.Length) + : RabbitMQActivitySource.ReceiveEmpty(queue); + + activity?.SetStartTime(k.StartTime); + + return result; } finally { @@ -1158,6 +1185,17 @@ public ValueTask BasicPublishAsync(string exchange, string routingK try { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + return ModelSendAsync(in cmd, in props, body); + } + return ModelSendAsync(in cmd, in basicProperties, body); } catch @@ -1175,7 +1213,22 @@ public ValueTask BasicPublishAsync(string exchange, string routingK } } - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value) + { + if (!(propsObj is Dictionary headers)) + { + return; + } + + // Only propagate headers if they haven't already been set + if (!headers.ContainsKey(key)) + { + headers[key] = value; + } + } + + public void BasicPublish(CachedString exchange, CachedString routingKey, + in TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { if (NextPublishSeqNo > 0) @@ -1189,7 +1242,63 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr try { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); - return ModelSendAsync(in cmd, in basicProperties, body); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + ChannelSend(in cmd, in props, body); + return; + } + + ChannelSend(in cmd, in basicProperties, body); + } + catch + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + NextPublishSeqNo--; + _pendingDeliveryTags.RemoveLast(); + } + } + + throw; + } + } + + public async ValueTask BasicPublishAsync(string exchange, string routingKey, + TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + } + } + + try + { + var cmd = new BasicPublish(exchange, routingKey, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body); + return; + } + + await ModelSendAsync(in cmd, in basicProperties, body); } catch { @@ -1206,6 +1315,70 @@ public ValueTask BasicPublishAsync(CachedString exchange, CachedStr } } + public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + TProperties basicProperties, ReadOnlyMemory body, bool mandatory) + where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + _pendingDeliveryTags.AddLast(NextPublishSeqNo++); + } + } + + try + { + var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); + RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext); + using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners + ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext) + : default; + + if (sendActivity != null) + { + BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity); + await ModelSendAsync(in cmd, in props, body); + return; + } + + await ModelSendAsync(in cmd, in basicProperties, body); + } + catch + { + if (NextPublishSeqNo > 0) + { + lock (_confirmLock) + { + NextPublishSeqNo--; + _pendingDeliveryTags.RemoveLast(); + } + } + + throw; + } + } + +#if FOO + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); + props.Headers = headers; + return props; + } +#endif + public async Task UpdateSecretAsync(string newSecret, string reason) { if (newSecret is null) @@ -1710,6 +1883,7 @@ public Task WaitForConfirmsAsync(CancellationToken token = default) _onlyAcksReceived = true; return Task.FromResult(false); } + return Task.FromResult(true); } @@ -1780,5 +1954,35 @@ await CloseAsync(ea, false) throw ex; } } + + private static BasicProperties PopulateActivityAndPropagateTraceId(TProperties basicProperties, + Activity sendActivity) where TProperties : IReadOnlyBasicProperties, IAmqpHeader + { + // This activity is marked as recorded, so let's propagate the trace and span ids. + if (sendActivity.IsAllDataRequested) + { + if (!string.IsNullOrEmpty(basicProperties.CorrelationId)) + { + sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, basicProperties.CorrelationId); + } + } + + BasicProperties props = default; + if (basicProperties is BasicProperties properties) + { + props = properties; + } + else if (basicProperties is EmptyBasicProperty) + { + props = new BasicProperties(); + } + + var headers = props.Headers ?? new Dictionary(); + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties); + props.Headers = headers; + return props; + } } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 0645850342..056be859ce 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -31,7 +31,10 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; +using System.Net; +using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -445,6 +448,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args) internal void Write(RentedMemory frames) { + Activity.Current.SetNetworkTags(_frameHandler); ValueTask task = _frameHandler.WriteAsync(frames); if (!task.IsCompletedSuccessfully) { @@ -454,6 +458,7 @@ internal void Write(RentedMemory frames) internal ValueTask WriteAsync(RentedMemory frames) { + Activity.Current.SetNetworkTags(_frameHandler); return _frameHandler.WriteAsync(frames); } diff --git a/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs new file mode 100644 index 0000000000..f68d4e1596 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/RabbitMQActivitySource.cs @@ -0,0 +1,290 @@ +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Reflection; +using System.Text; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Impl; + +namespace RabbitMQ.Client +{ + public static class RabbitMQActivitySource + { + // These constants are defined in the OpenTelemetry specification: + // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes + internal const string MessageId = "messaging.message.id"; + internal const string MessageConversationId = "messaging.message.conversation_id"; + internal const string MessagingOperation = "messaging.operation"; + internal const string MessagingSystem = "messaging.system"; + internal const string MessagingDestination = "messaging.destination.name"; + internal const string MessagingDestinationRoutingKey = "messaging.rabbitmq.destination.routing_key"; + internal const string MessagingBodySize = "messaging.message.body.size"; + internal const string MessagingEnvelopeSize = "messaging.message.envelope.size"; + internal const string ProtocolName = "network.protocol.name"; + internal const string ProtocolVersion = "network.protocol.version"; + internal const string RabbitMQDeliveryTag = "messaging.rabbitmq.delivery_tag"; + + private static readonly string AssemblyVersion = typeof(RabbitMQActivitySource).Assembly + .GetCustomAttribute() + ?.InformationalVersion ?? ""; + + private static readonly ActivitySource s_publisherSource = new ActivitySource(PublisherSourceName, AssemblyVersion); + private static readonly ActivitySource s_subscriberSource = new ActivitySource(SubscriberSourceName, AssemblyVersion); + + public const string PublisherSourceName = "RabbitMQ.Client.Publisher"; + public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber"; + + public static bool UseRoutingKeyAsOperationName { get; set; } = true; + internal static bool PublisherHasListeners => s_publisherSource.HasListeners(); + internal static bool SubscriberHasListeners => s_subscriberSource.HasListeners(); + + internal static readonly IEnumerable> CreationTags = new[] + { + new KeyValuePair(MessagingSystem, "rabbitmq"), + new KeyValuePair(ProtocolName, "amqp"), + new KeyValuePair(ProtocolVersion, "0.9.1") + }; + + internal static Activity Send(string routingKey, string exchange, int bodySize, + ActivityContext linkedContext = default) + { + if (s_publisherSource.HasListeners()) + { + Activity activity = linkedContext == default + ? s_publisherSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + ActivityKind.Producer) + : s_publisherSource.StartLinkedRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{routingKey} publish" : "publish", + ActivityKind.Producer, linkedContext); + if (activity?.IsAllDataRequested == true) + { + PopulateMessagingTags("publish", routingKey, exchange, 0, bodySize, activity); + } + + return activity; + } + + return null; + } + + internal static Activity ReceiveEmpty(string queue) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + Activity activity = s_subscriberSource.StartRabbitMQActivity(UseRoutingKeyAsOperationName ? $"{queue} receive" : "receive", + ActivityKind.Consumer); + if (activity.IsAllDataRequested) + { + activity + .SetTag(MessagingOperation, "receive") + .SetTag(MessagingDestination, "amq.default"); + } + + return activity; + } + + internal static Activity Receive(string routingKey, string exchange, ulong deliveryTag, + in ReadOnlyBasicProperties readOnlyBasicProperties, int bodySize) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + // Extract the PropagationContext of the upstream parent from the message headers. + DistributedContextPropagator.Current.ExtractTraceIdAndState(readOnlyBasicProperties.Headers, + ExtractTraceIdAndState, out string traceParent, out string traceState); + ActivityContext.TryParse(traceParent, traceState, out ActivityContext linkedContext); + Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{routingKey} receive" : "receive", ActivityKind.Consumer, + linkedContext); + if (activity.IsAllDataRequested) + { + PopulateMessagingTags("receive", routingKey, exchange, deliveryTag, readOnlyBasicProperties, + bodySize, activity); + } + + return activity; + } + + internal static Activity Deliver(BasicDeliverEventArgs deliverEventArgs) + { + if (!s_subscriberSource.HasListeners()) + { + return null; + } + + // Extract the PropagationContext of the upstream parent from the message headers. + DistributedContextPropagator.Current.ExtractTraceIdAndState(deliverEventArgs.BasicProperties.Headers, + ExtractTraceIdAndState, out string traceparent, out string traceState); + ActivityContext.TryParse(traceparent, traceState, out ActivityContext parentContext); + Activity activity = s_subscriberSource.StartLinkedRabbitMQActivity( + UseRoutingKeyAsOperationName ? $"{deliverEventArgs.RoutingKey} deliver" : "deliver", + ActivityKind.Consumer, parentContext); + if (activity != null && activity.IsAllDataRequested) + { + PopulateMessagingTags("deliver", deliverEventArgs.RoutingKey, deliverEventArgs.Exchange, + deliverEventArgs.DeliveryTag, deliverEventArgs.BasicProperties, deliverEventArgs.Body.Length, + activity); + } + + return activity; + + } + + private static Activity StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext parentContext = default) + { + Activity activity = source + .CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start(); + return activity; + } + + private static Activity StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, + ActivityContext linkedContext = default, ActivityContext parentContext = default) + { + Activity activity = source.CreateActivity(name, kind, parentContext: parentContext, + links: new[] { new ActivityLink(linkedContext) }, idFormat: ActivityIdFormat.W3C, + tags: CreationTags) + ?.Start(); + return activity; + } + + private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + ulong deliveryTag, in ReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity) + { + PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity); + + if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId)) + { + activity.SetTag(MessageConversationId, readOnlyBasicProperties.CorrelationId); + } + + if (!string.IsNullOrEmpty(readOnlyBasicProperties.MessageId)) + { + activity.SetTag(MessageId, readOnlyBasicProperties.MessageId); + } + } + + private static void PopulateMessagingTags(string operation, string routingKey, string exchange, + ulong deliveryTag, int bodySize, Activity activity) + { + activity + .SetTag(MessagingOperation, operation) + .SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange) + .SetTag(MessagingDestinationRoutingKey, routingKey) + .SetTag(MessagingBodySize, bodySize); + + if (deliveryTag > 0) + { + activity.SetTag(RabbitMQDeliveryTag, deliveryTag); + } + } + + internal static void PopulateMessageEnvelopeSize(Activity activity, int size) + { + if (activity != null && activity.IsAllDataRequested && PublisherHasListeners) + { + activity.SetTag(MessagingEnvelopeSize, size); + } + } + + internal static bool TryGetExistingContext(T props, out ActivityContext context) + where T : IReadOnlyBasicProperties + { + if (props.Headers == null) + { + context = default; + return false; + } + + bool hasHeaders = false; + foreach (string header in DistributedContextPropagator.Current.Fields) + { + if (props.Headers.ContainsKey(header)) + { + hasHeaders = true; + break; + } + } + + if (hasHeaders) + { + DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, ExtractTraceIdAndState, + out string traceParent, out string traceState); + return ActivityContext.TryParse(traceParent, traceState, out context); + } + + context = default; + return false; + } + + private static void ExtractTraceIdAndState(object props, string name, out string value, + out IEnumerable values) + { + if (props is Dictionary headers && headers.TryGetValue(name, out object propsVal) && + propsVal is byte[] bytes) + { + value = Encoding.UTF8.GetString(bytes); + values = default; + } + else + { + value = default; + values = default; + } + } + + internal static void SetNetworkTags(this Activity activity, IFrameHandler frameHandler) + { + if (PublisherHasListeners && activity != null && activity.IsAllDataRequested) + { + switch (frameHandler.RemoteEndPoint.AddressFamily) + { + case AddressFamily.InterNetworkV6: + activity.SetTag("network.type", "ipv6"); + break; + case AddressFamily.InterNetwork: + activity.SetTag("network.type", "ipv4"); + break; + } + + if (!string.IsNullOrEmpty(frameHandler.Endpoint.HostName)) + { + activity + .SetTag("server.address", frameHandler.Endpoint.HostName); + } + + activity + .SetTag("server.port", frameHandler.Endpoint.Port); + + if (frameHandler.RemoteEndPoint is IPEndPoint ipEndpoint) + { + string remoteAddress = ipEndpoint.Address.ToString(); + if (activity.GetTagItem("server.address") == null) + { + activity + .SetTag("server.address", remoteAddress); + } + + activity + .SetTag("network.peer.address", remoteAddress) + .SetTag("network.peer.port", ipEndpoint.Port); + } + + if (frameHandler.LocalEndPoint is IPEndPoint localEndpoint) + { + string localAddress = localEndpoint.Address.ToString(); + activity + .SetTag("client.address", localAddress) + .SetTag("client.port", localEndpoint.Port) + .SetTag("network.local.address", localAddress) + .SetTag("network.local.port", localEndpoint.Port); + } + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs index b84687ca45..2b2dbad895 100644 --- a/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/RpcContinuations.cs @@ -38,6 +38,7 @@ namespace RabbitMQ.Client.Impl internal class SimpleBlockingRpcContinuation : IRpcContinuation { private readonly BlockingCell> m_cell = new BlockingCell>(); + internal DateTime StartTime { get; } = DateTime.UtcNow; public void GetReply(TimeSpan timeout) { diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index a433626878..d2c8d3d545 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -122,7 +123,7 @@ public void Notify() { // Ensure that we notify only when session is already closed // If not, throw exception, since this is a serious bug in the library - var reason = CloseReason; + ShutdownEventArgs reason = CloseReason; if (reason is null) { throw new Exception("Internal Error in Session.Close"); @@ -138,7 +139,9 @@ public virtual void Transmit(in T cmd) where T : struct, IOutgoingAmqpMethod ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + Connection.Write(bytes); } public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingAmqpMethod @@ -148,7 +151,9 @@ public virtual ValueTask TransmitAsync(in T cmd) where T : struct, IOutgoingA ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ChannelNumber); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + return Connection.WriteAsync(bytes); } public void Transmit(in TMethod cmd, in THeader header, ReadOnlyMemory body) @@ -160,7 +165,10 @@ public void Transmit(in TMethod cmd, in THeader header, ReadOn ThrowAlreadyClosedException(); } - Connection.Write(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, + ChannelNumber, Connection.MaxPayloadSize); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + Connection.Write(bytes); } public ValueTask TransmitAsync(in TMethod cmd, in THeader header, ReadOnlyMemory body) @@ -172,7 +180,10 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head ThrowAlreadyClosedException(); } - return Connection.WriteAsync(Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, Connection.MaxPayloadSize)); + RentedMemory bytes = Framing.SerializeToFrames(ref Unsafe.AsRef(cmd), ref Unsafe.AsRef(header), body, ChannelNumber, + Connection.MaxPayloadSize); + RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size); + return Connection.WriteAsync(bytes); } private void ThrowAlreadyClosedException() diff --git a/projects/Test/Common/ProcessUtil.cs b/projects/Test/Common/ProcessUtil.cs index 7eb9196f68..eb262e451c 100644 --- a/projects/Test/Common/ProcessUtil.cs +++ b/projects/Test/Common/ProcessUtil.cs @@ -32,7 +32,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) var processTasks = new List(); // === EXITED Event handling === - var processExitEvent = new TaskCompletionSource(); + var processExitEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.Exited += (sender, args) => { @@ -46,7 +46,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) if (process.StartInfo.RedirectStandardOutput) { - var stdOutCloseEvent = new TaskCompletionSource(); + var stdOutCloseEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.OutputDataReceived += (s, e) => { @@ -72,7 +72,7 @@ public static async Task RunAsync(ProcessStartInfo startInfo) if (process.StartInfo.RedirectStandardError) { - var stdErrCloseEvent = new TaskCompletionSource(); + var stdErrCloseEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); process.ErrorDataReceived += (s, e) => { diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 8a190a1749..f9fbd7831f 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -48,7 +48,7 @@ public TestChannelShutdown(ITestOutputHelper output) : base(output) public async Task TestConsumerDispatcherShutdown() { var autorecoveringChannel = (AutorecoveringChannel)_channel; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index db2f0a1d3b..2afb9d6d66 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -102,7 +102,7 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in { return TestConcurrentChannelOperationsAsync(async (conn) => { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // publishing on a shared channel is not supported // and would missing the point of this test anyway diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index 4adaf31d06..8f17c447fd 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -79,7 +79,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, in properties, body); + await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); try @@ -89,7 +89,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) CorrelationId = new string('o', correlationIdLength) }; // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, in properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); } catch @@ -99,7 +99,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo); - await _channel.BasicPublishAsync("sample", string.Empty, in properties, body); + await _channel.BasicPublishAsync("sample", string.Empty, properties, body); await _channel.WaitForConfirmsOrDieAsync(); // _output.WriteLine("I'm done..."); } diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index d38b3d3ee1..5959ed2b2f 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -48,7 +48,7 @@ public TestConnectionShutdown(ITestOutputHelper output) : base(output) [Fact] public async Task TestCleanClosureWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { tcs.SetResult(true); @@ -64,7 +64,7 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand() [Fact] public async Task TestAbortWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { tcs.SetResult(true); @@ -82,7 +82,7 @@ public async Task TestAbortWithSocketClosedOutOfBand() [Fact] public async Task TestDisposedWithSocketClosedOutOfBand() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { @@ -103,7 +103,7 @@ public async Task TestDisposedWithSocketClosedOutOfBand() [Fact] public async Task TestShutdownSignalPropagationToChannels() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { @@ -119,7 +119,7 @@ public async Task TestShutdownSignalPropagationToChannels() public async Task TestConsumerDispatcherShutdown() { var m = (AutorecoveringChannel)_channel; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channel.ChannelShutdown += (channel, args) => { diff --git a/projects/Test/Integration/TestConsumer.cs b/projects/Test/Integration/TestConsumer.cs index 681969bc0a..e2175a404f 100644 --- a/projects/Test/Integration/TestConsumer.cs +++ b/projects/Test/Integration/TestConsumer.cs @@ -133,7 +133,7 @@ public async Task ConcurrentEventingTestForReceived() countdownEvent.Wait(); // Add last receiver - var lastConsumerReceivedTcs = new TaskCompletionSource(); + var lastConsumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); consumer.Received += (o, a) => { lastConsumerReceivedTcs.SetResult(true); diff --git a/projects/Test/Integration/TestConsumerCancelNotify.cs b/projects/Test/Integration/TestConsumerCancelNotify.cs index 68a64bf1f6..99983c6075 100644 --- a/projects/Test/Integration/TestConsumerCancelNotify.cs +++ b/projects/Test/Integration/TestConsumerCancelNotify.cs @@ -40,7 +40,7 @@ namespace Test.Integration { public class TestConsumerCancelNotify : IntegrationFixture { - private readonly TaskCompletionSource _tcs = new TaskCompletionSource(); + private readonly TaskCompletionSource _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private string _consumerTag; public TestConsumerCancelNotify(ITestOutputHelper output) : base(output) diff --git a/projects/Test/Integration/TestConsumerExceptions.cs b/projects/Test/Integration/TestConsumerExceptions.cs index d119eba2c4..edf2224531 100644 --- a/projects/Test/Integration/TestConsumerExceptions.cs +++ b/projects/Test/Integration/TestConsumerExceptions.cs @@ -108,7 +108,7 @@ public override void HandleBasicCancelOk(string consumerTag) protected async Task TestExceptionHandlingWithAsync(IBasicConsumer consumer, Func action) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool notified = false; string q = await _channel.QueueDeclareAsync(); diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestConsumerOperationDispatch.cs index c90f2e9871..2340ca60e0 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestConsumerOperationDispatch.cs @@ -164,7 +164,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() string q2 = (await ch2.QueueDeclareAsync()).QueueName; await ch2.QueueBindAsync(queue: q2, exchange: _x, routingKey: ""); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await ch1.BasicConsumeAsync(q1, true, new EventingBasicConsumer(ch1)); var c2 = new EventingBasicConsumer(ch2); c2.Received += (object sender, BasicDeliverEventArgs e) => @@ -183,8 +183,8 @@ private class ShutdownLatchConsumer : DefaultBasicConsumer { public ShutdownLatchConsumer() { - Latch = new TaskCompletionSource(); - DuplicateLatch = new TaskCompletionSource(); + Latch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + DuplicateLatch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } public readonly TaskCompletionSource Latch; diff --git a/projects/Test/Integration/TestEventingConsumer.cs b/projects/Test/Integration/TestEventingConsumer.cs index 4ee805dd2f..1721d5cb08 100644 --- a/projects/Test/Integration/TestEventingConsumer.cs +++ b/projects/Test/Integration/TestEventingConsumer.cs @@ -48,10 +48,10 @@ public async Task TestEventingConsumerRegistrationEvents() { string q = await _channel.QueueDeclareAsync(); - var registeredTcs = new TaskCompletionSource(); + var registeredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); object registeredSender = null; - var unregisteredTcs = new TaskCompletionSource(); + var unregisteredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); object unregisteredSender = null; EventingBasicConsumer ec = new EventingBasicConsumer(_channel); @@ -85,7 +85,7 @@ public async Task TestEventingConsumerRegistrationEvents() [Fact] public async Task TestEventingConsumerDeliveryEvents() { - var tcs0 = new TaskCompletionSource(); + var tcs0 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); string q = await _channel.QueueDeclareAsync(); bool receivedInvoked = false; @@ -103,7 +103,7 @@ public async Task TestEventingConsumerDeliveryEvents() await _channel.BasicPublishAsync("", q, _encoding.GetBytes("msg")); await WaitAsync(tcs0, "received event"); - var tcs1 = new TaskCompletionSource(); + var tcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Assert.True(receivedInvoked); Assert.NotNull(receivedSender); diff --git a/projects/Test/Integration/TestInvalidAck.cs b/projects/Test/Integration/TestInvalidAck.cs index 1eb5d735a8..4a349e2916 100644 --- a/projects/Test/Integration/TestInvalidAck.cs +++ b/projects/Test/Integration/TestInvalidAck.cs @@ -45,7 +45,7 @@ public TestInvalidAck(ITestOutputHelper output) : base(output) [Fact] public async Task TestAckWithUnknownConsumerTagAndMultipleFalse() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool shutdownFired = false; ShutdownEventArgs shutdownArgs = null; _channel.ChannelShutdown += (s, args) => diff --git a/projects/Test/Integration/TestMainLoop.cs b/projects/Test/Integration/TestMainLoop.cs index eda581fc37..646e0ce50a 100644 --- a/projects/Test/Integration/TestMainLoop.cs +++ b/projects/Test/Integration/TestMainLoop.cs @@ -63,7 +63,7 @@ public override Task HandleBasicDeliverAsync(string consumerTag, [Fact] public async Task TestCloseWithFaultyConsumer() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, false); CallbackExceptionEventArgs ea = null; diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs new file mode 100644 index 0000000000..2be177bec3 --- /dev/null +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -0,0 +1,247 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.SequentialIntegration +{ + public class TestActivitySource : SequentialIntegrationFixture + { + public TestActivitySource(ITestOutputHelper output) : base(output) + { + } + + void AssertStringTagEquals(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.Equal(expected, tag); + } + + void AssertStringTagStartsWith(Activity activity, string name, string expected) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.StartsWith(expected, tag); + } + + void AssertStringTagNotNullOrEmpty(Activity activity, string name) + { + string tag = activity.GetTagItem(name) as string; + Assert.NotNull(tag); + Assert.False(string.IsNullOrEmpty(tag)); + } + + void AssertIntTagGreaterThanZero(Activity activity, string name) + { + Assert.True(activity.GetTagItem(name) is int result && result > 0); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var _activities = new List(); + using (ActivityListener activityListener = StartActivityListener(_activities)) + { + await Task.Delay(500); + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, _activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + + string queueName = $"{Guid.NewGuid()}"; + QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName); + byte[] sendBody = Encoding.UTF8.GetBytes("hi"); + byte[] consumeBody = null; + var consumer = new EventingBasicConsumer(_channel); + var consumerReceivedTcs = + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + consumer.Received += (o, a) => + { + consumeBody = a.Body.ToArray(); + consumerReceivedTcs.SetResult(true); + }; + + string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); + await _channel.BasicPublishAsync("", q.QueueName, sendBody, mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + + await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.True(await consumerReceivedTcs.Task); + + await _channel.BasicCancelAsync(consumerTag); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queueName, activities, true); + } + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOperationName) + { + await _channel.ConfirmSelectAsync(); + RabbitMQActivitySource.UseRoutingKeyAsOperationName = useRoutingKeyAsOperationName; + var activities = new List(); + using (ActivityListener activityListener = StartActivityListener(activities)) + { + await Task.Delay(500); + string queue = $"queue-{Guid.NewGuid()}"; + const string msg = "for basic.get"; + + try + { + await _channel.QueueDeclareAsync(queue, false, false, false, null); + await _channel.BasicPublishAsync("", queue, Encoding.UTF8.GetBytes(msg), mandatory: true); + await _channel.WaitForConfirmsOrDieAsync(); + QueueDeclareOk ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(1u, ok.MessageCount); + BasicGetResult res = await _channel.BasicGetAsync(queue, true); + Assert.Equal(msg, Encoding.UTF8.GetString(res.Body.ToArray())); + ok = await _channel.QueueDeclarePassiveAsync(queue); + Assert.Equal(0u, ok.MessageCount); + await Task.Delay(500); + AssertActivityData(useRoutingKeyAsOperationName, queue, activities, false); + } + finally + { + await _channel.QueueDeleteAsync(queue); + } + } + } + + private static ActivityListener StartActivityListener(List activities) + { + ActivityListener activityListener = new ActivityListener(); + activityListener.Sample = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded; + activityListener.SampleUsingParentId = (ref ActivityCreationOptions _) => + ActivitySamplingResult.AllDataAndRecorded; + activityListener.ShouldListenTo = + activitySource => activitySource.Name.StartsWith("RabbitMQ.Client."); + activityListener.ActivityStarted = activities.Add; + ActivitySource.AddActivityListener(activityListener); + return activityListener; + } + + private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName, + List activityList, bool isDeliver = false) + { + string childName = isDeliver ? "deliver" : "receive"; + Activity[] activities = activityList.ToArray(); + Assert.NotEmpty(activities); + foreach (var item in activities) + { + _output.WriteLine( + $"{item.Context.TraceId}: {item.OperationName}"); + _output.WriteLine($" Tags: {string.Join(", ", item.Tags.Select(x => $"{x.Key}: {x.Value}"))}"); + _output.WriteLine($" Links: {string.Join(", ", item.Links.Select(x => $"{x.Context.TraceId}"))}"); + } + + Activity sendActivity = activities.First(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} publish" : "publish") && + x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag && + routingKeyTag == $"{queueName}"); + Activity receiveActivity = activities.Single(x => + x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") && + x.Links.First().Context.TraceId == sendActivity.TraceId); + Assert.Equal(ActivityKind.Producer, sendActivity.Kind); + Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind); + Assert.Null(receiveActivity.ParentId); + AssertStringTagNotNullOrEmpty(sendActivity, "network.peer.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "network.local.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "server.address"); + AssertStringTagNotNullOrEmpty(sendActivity, "client.address"); + AssertIntTagGreaterThanZero(sendActivity, "network.peer.port"); + AssertIntTagGreaterThanZero(sendActivity, "network.local.port"); + AssertIntTagGreaterThanZero(sendActivity, "server.port"); + AssertIntTagGreaterThanZero(sendActivity, "client.port"); + AssertStringTagStartsWith(sendActivity, "network.type", "ipv"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingSystem, "rabbitmq"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolName, "amqp"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.ProtocolVersion, "0.9.1"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestination, "amq.default"); + AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingDestinationRoutingKey, queueName); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize); + AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize); + AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize); + } + } +} diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs index f9e3eaf2d4..bfba03f359 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs @@ -53,7 +53,7 @@ public override async Task DisposeAsync() [Fact] public async Task TestConnectionBlockedNotification() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) => { UnblockAsync(); @@ -72,7 +72,7 @@ public async Task TestConnectionBlockedNotification() [Fact] public async Task TestDisposeOnBlockedConnectionDoesNotHang() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await BlockAsync(_channel); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index a31ac8603d..2deed73598 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -73,7 +73,7 @@ public override async Task DisposeAsync() [Fact] public async Task TestBasicAckAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new AckingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); @@ -96,7 +96,7 @@ public async Task TestBasicAckAfterChannelRecovery() [Fact] public async Task TestBasicNackAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new NackingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); @@ -119,7 +119,7 @@ public async Task TestBasicNackAfterChannelRecovery() [Fact] public async Task TestBasicRejectAfterChannelRecovery() { - var allMessagesSeenTcs = new TaskCompletionSource(); + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var cons = new RejectingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName; @@ -160,7 +160,7 @@ public async Task TestBasicAckAfterBasicGetAndChannelRecovery() public async Task TestBasicAckEventHandlerRecovery() { await _channel.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); ((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); @@ -212,7 +212,7 @@ public async Task TestBasicChannelRecoveryOnServerRestart() [Fact] public async Task TestChannelAfterDispose_GH1086() { - TaskCompletionSource sawChannelShutdownTcs = new TaskCompletionSource(); + TaskCompletionSource sawChannelShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); void _channel_ChannelShutdown(object sender, ShutdownEventArgs e) { @@ -269,7 +269,7 @@ public async Task TestBlockedListenersRecovery() { try { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionBlocked += (c, reason) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); @@ -329,7 +329,7 @@ public async Task TestConsumerRecoveryWithManyConsumers() await _channel.BasicConsumeAsync(q, true, cons); } - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); @@ -522,7 +522,7 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() string q = (await _channel.QueueDeclareAsync(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null)).QueueName; string nameBefore = q; string nameAfter = null; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); ((AutorecoveringConnection)_conn).QueueNameChangedAfterRecovery += (source, ea) => { @@ -634,7 +634,7 @@ public async Task TestServerNamedQueueRecovery() string nameBefore = q; string nameAfter = null; - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var connection = (AutorecoveringConnection)_conn; connection.RecoverySucceeded += (source, ea) => tcs.SetResult(true); connection.QueueNameChangedAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; @@ -730,7 +730,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await CloseAndWaitForRecoveryAsync(); await AssertConsumerCountAsync(_channel, q, 1); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await _channel.BasicPublishAsync("", q, _messageBody); @@ -752,7 +752,7 @@ public async Task TestPublishRpcRightAfterReconnect() properties.ReplyTo = "amq.rabbitmq.reply-to"; TimeSpan doneSpan = TimeSpan.FromMilliseconds(100); - var doneTcs = new TaskCompletionSource(); + var doneTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Task closeTask = Task.Run(async () => { try @@ -913,7 +913,7 @@ public async Task TestThatDeletedQueuesDontReappearOnRecovery() [Fact] public async Task TestUnblockedListenersRecovery() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _conn.ConnectionUnblocked += (source, ea) => tcs.SetResult(true); await CloseAndWaitForRecoveryAsync(); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs b/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs index 5d7d495136..d76298f02c 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs @@ -229,7 +229,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) protected static TaskCompletionSource PrepareForShutdown(IConnection conn) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; aconn.ConnectionShutdown += (c, args) => tcs.SetResult(true); @@ -239,7 +239,7 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) protected static TaskCompletionSource PrepareForRecovery(IConnection conn) { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true); @@ -392,7 +392,7 @@ protected static async Task SendAndConsumeMessageAsync(IConnection conn, s { await ch.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer = new AckingBasicConsumer(ch, 1, tcs); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs index 57690db17b..a9c088dff0 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs @@ -132,7 +132,7 @@ public async Task TestConsumerWorkServiceRecovery() await CloseAndWaitForRecoveryAsync(c); Assert.True(ch.IsOpen); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")); @@ -177,7 +177,7 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() await AssertConsumerCountAsync(ch, q1, 1); Assert.False(queueNameChangeAfterRecoveryCalled); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await ch.BasicPublishAsync("", q1, _encoding.GetBytes("msg")); @@ -265,7 +265,7 @@ public async Task TestTopologyRecoveryConsumerFilter() ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered") }; - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter)) { @@ -289,12 +289,12 @@ public async Task TestTopologyRecoveryConsumerFilter() await ch.QueuePurgeAsync(queueWithRecoveredConsumer); await ch.QueuePurgeAsync(queueWithIgnoredConsumer); - var consumerRecoveryTcs = new TaskCompletionSource(); + var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToRecover = new EventingBasicConsumer(ch); consumerToRecover.Received += (source, ea) => consumerRecoveryTcs.SetResult(true); await ch.BasicConsumeAsync(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); - var ignoredTcs = new TaskCompletionSource(); + var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToIgnore = new EventingBasicConsumer(ch); consumerToIgnore.Received += (source, ea) => ignoredTcs.SetResult(true); await ch.BasicConsumeAsync(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); diff --git a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs index f58b1d9ecd..5c6d3f0725 100644 --- a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs @@ -70,7 +70,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await CloseAndWaitForRecoveryAsync(); await AssertConsumerCountAsync(_channel, q, 1); - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); cons.Received += (s, args) => tcs.SetResult(true); await _channel.BasicPublishAsync("", q, _messageBody); @@ -84,7 +84,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() [Fact] public async Task TestTopologyRecoveryQueueFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -130,7 +130,7 @@ public async Task TestTopologyRecoveryQueueFilter() [Fact] public async Task TestTopologyRecoveryExchangeFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -174,7 +174,7 @@ public async Task TestTopologyRecoveryExchangeFilter() [Fact] public async Task TestTopologyRecoveryBindingFilter() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter { @@ -222,7 +222,7 @@ public async Task TestTopologyRecoveryBindingFilter() [Fact] public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var filter = new TopologyRecoveryFilter(); AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter); conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true); @@ -245,12 +245,12 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() await ch.QueuePurgeAsync(queue1); await ch.QueuePurgeAsync(queue2); - var consumerReceivedTcs1 = new TaskCompletionSource(); + var consumerReceivedTcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer1 = new EventingBasicConsumer(ch); consumer1.Received += (source, ea) => consumerReceivedTcs1.SetResult(true); await ch.BasicConsumeAsync(queue1, true, "recovered.consumer", consumer1); - var consumerReceivedTcs2 = new TaskCompletionSource(); + var consumerReceivedTcs2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer2 = new EventingBasicConsumer(ch); consumer2.Received += (source, ea) => consumerReceivedTcs2.SetResult(true); await ch.BasicConsumeAsync(queue2, true, "filtered.consumer", consumer2); @@ -290,7 +290,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() [Fact] public async Task TestTopologyRecoveryQueueExceptionHandler() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var changedQueueArguments = new Dictionary { @@ -352,7 +352,7 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals [Fact] public async Task TestTopologyRecoveryExchangeExceptionHandler() { - var tcs = new TaskCompletionSource(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var exceptionHandler = new TopologyRecoveryExceptionHandler { @@ -408,7 +408,7 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler() [Fact] public async Task TestTopologyRecoveryBindingExceptionHandler() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); const string exchange = "topology.recovery.exchange"; const string queueWithExceptionBinding = "recovery.exception.queue"; @@ -469,7 +469,7 @@ public async Task TestTopologyRecoveryBindingExceptionHandler() [Fact] public async Task TestTopologyRecoveryConsumerExceptionHandler() { - var connectionRecoveryTcs = new TaskCompletionSource(); + var connectionRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); string queueWithExceptionConsumer = "recovery.exception.queue"; diff --git a/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs b/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs index adbc1a0802..87622e98e1 100644 --- a/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs +++ b/projects/Test/Unit/TestTimerBasedCredentialRefresher.cs @@ -129,7 +129,7 @@ public void TestDoNotRegisterWhenHasNoExpiry() [Fact] public async Task TestRefreshToken() { - var cbtcs = new TaskCompletionSource(); + var cbtcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool? callbackArg = null; var credentialsProvider = new MockCredentialsProvider(_testOutputHelper, TimeSpan.FromSeconds(1)); Task cb(bool arg) @@ -158,7 +158,7 @@ Task cb(bool arg) [Fact] public async Task TestRefreshTokenFailed() { - var cbtcs = new TaskCompletionSource(); + var cbtcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); bool? callbackArg = null; var credentialsProvider = new MockCredentialsProvider(_testOutputHelper, TimeSpan.FromSeconds(1)); Task cb(bool arg)