Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry support via ActivitySource #1261

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.orig
*.log
_site/

Expand Down Expand Up @@ -28,6 +29,11 @@ test.sh
test-output.log
InternalTrace*
nunit-agent*
#################
## JetBrains Rider
#################
.idea/

#################
## Visual Studio
#################
Expand Down
1 change: 1 addition & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
.gitignore = .gitignore
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}"
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events
RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck) -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.BasicGetResult>
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
Expand Down Expand Up @@ -658,6 +658,7 @@ RabbitMQ.Client.PublicationAddress
RabbitMQ.Client.PublicationAddress.PublicationAddress(string exchangeType, string exchangeName, string routingKey) -> void
RabbitMQ.Client.QueueDeclareOk
RabbitMQ.Client.QueueDeclareOk.QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) -> void
RabbitMQ.Client.RabbitMQActivitySource
RabbitMQ.Client.ReadOnlyBasicProperties
RabbitMQ.Client.ReadOnlyBasicProperties.AppId.get -> string
RabbitMQ.Client.ReadOnlyBasicProperties.ClusterId.get -> string
Expand Down Expand Up @@ -851,6 +852,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Cli
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.get -> bool
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.set -> void
static RabbitMQ.Client.TcpClientAdapter.GetMatchingHost(System.Collections.Generic.IReadOnlyCollection<System.Net.IPAddress> addresses, System.Net.Sockets.AddressFamily addressFamily) -> System.Net.IPAddress
static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource.Log.get -> RabbitMQ.Client.TimerBasedCredentialRefresherEventSource
static readonly RabbitMQ.Client.CachedString.Empty -> RabbitMQ.Client.CachedString
Expand Down Expand Up @@ -881,6 +884,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.Dispose(bool disposing) -> void
virtual RabbitMQ.Client.TcpClientAdapter.GetStream() -> System.Net.Sockets.NetworkStream
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string
~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string
~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="7.0.2" />
Copy link

@lmolkova lmolkova Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason to require 7.0? Would it be possible to use 6.0?
The context: some environments like Azure Functions (and I believe AWS Lambda) could have a problem with newer versions of DiagnsoticSource. E.g. Azure/azure-functions-host#7135.
Of course, there would be no OTel story there, but at least some custom solutions might work.

So keeping the dependency version as low as possible would be nice.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenTelemetry .NET took a stand to use the latest stable always, irrespective of the fact that environments like AzureFunctions might see issues. It does not mean that this repo should follow them! If there is a need to use newer features (Like Metrics), then it must to use newest, otherwise, lowest is good enough in this repo.

OTel .NET:
https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/Directory.Packages.props#L46-L54

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aim is to start pushing metrics through OTel as well through another PR once tracing is in place.

</ItemGroup>
</Project>
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public interface IChannel : IDisposable
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
stebet marked this conversation as resolved.
Show resolved Hide resolved
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
Expand All @@ -203,7 +203,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

#nullable disable
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, in basicProperties, body);
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -78,8 +79,9 @@ await base.HandleBasicConsumeOk(consumerTag)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
// No need to call base, it's empty.
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
return BasicDeliverWrapper(deliverEventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand All @@ -93,5 +95,13 @@ await _shutdownWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
}
}

private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
}
}
}
11 changes: 7 additions & 4 deletions projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
Expand Down Expand Up @@ -88,10 +89,12 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(
this,
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
stebet marked this conversation as resolved.
Show resolved Hide resolved
using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(this, eventArgs);
}
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
_tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false);
}

internal DateTime StartTime { get; } = DateTime.UtcNow;

public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
{
return _tcsConfiguredTaskAwaitable.GetAwaiter();
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,13 @@ public ValueTask<BasicGetResult> BasicGetAsync(string queue, bool autoAck)
public ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
=> InnerChannel.BasicNackAsync(deliveryTag, multiple, requeue);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global)
{
Expand Down
Loading
Loading