Skip to content

Commit

Permalink
Experimental OpenTelemetry Core (#437)
Browse files Browse the repository at this point in the history
* Experimental OpenTelemetry Core

* Moved OTEL tests

* format

* Span naming

* Fixed test
  • Loading branch information
mtmk authored Mar 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 1c11db3 commit a371623
Showing 20 changed files with 790 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -66,6 +66,9 @@ jobs:
- name: Test Services
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.Services.Tests/NATS.Client.Services.Tests.csproj

- name: Test OpenTelemetry
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Net.OpenTelemetry.Tests/NATS.Net.OpenTelemetry.Tests.csproj

- name: Check Native AOT
run: |
cd tests/NATS.Client.CheckNativeAot
14 changes: 14 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
@@ -101,6 +101,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.DocsExamples", "te
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Hosting.Tests", "tests\NATS.Client.Hosting.Tests\NATS.Client.Hosting.Tests.csproj", "{766C2486-34C3-4DD1-B31C-540C17C044B0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.OpenTelemetry", "sandbox\Example.OpenTelemetry\Example.OpenTelemetry.csproj", "{474BA453-9CFF-41C2-B2E7-ADD92CC93E86}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Tests", "tests\NATS.Net.OpenTelemetry.Tests\NATS.Net.OpenTelemetry.Tests.csproj", "{B8554582-DE19-41A2-9784-9B27C9F22429}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -267,6 +271,14 @@ Global
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{766C2486-34C3-4DD1-B31C-540C17C044B0}.Release|Any CPU.Build.0 = Release|Any CPU
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86}.Debug|Any CPU.Build.0 = Debug|Any CPU
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86}.Release|Any CPU.ActiveCfg = Release|Any CPU
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86}.Release|Any CPU.Build.0 = Release|Any CPU
{B8554582-DE19-41A2-9784-9B27C9F22429}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8554582-DE19-41A2-9784-9B27C9F22429}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8554582-DE19-41A2-9784-9B27C9F22429}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8554582-DE19-41A2-9784-9B27C9F22429}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -313,6 +325,8 @@ Global
{6A7B9B9F-BFA4-4A6D-9006-0AAF597FC6DD} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{389C05EB-A0B3-4097-8C1F-4D55818438CC} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{766C2486-34C3-4DD1-B31C-540C17C044B0} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
37 changes: 37 additions & 0 deletions sandbox/Example.OpenTelemetry/ClientApp.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System.Diagnostics;
using NATS.Client.Core;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace Example.OpenTelemetry;

public static class ClientApp
{
public static async Task Run()
{
var serviceName = "ClientApp";
var serviceVersion = "1.0.0";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddOtlpExporter()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName: serviceName, serviceVersion: serviceVersion))
.AddSource("NATS.Net")
.AddSource("MyClientSource")
.Build();

ActivitySource activitySource = new("MyClientSource");

Console.WriteLine("Client App is starting...");

await using var nats = new NatsConnection();

using (var activity = activitySource.StartActivity("SayHi"))
{
await nats.PublishAsync("greet.presence.client.app", "ClientApp is here!");

var response = await nats.RequestAsync<string, string>("greet.hi", "Hi, telemetry!");
Console.WriteLine($"Response: {response}");
}
}
}
20 changes: 20 additions & 0 deletions sandbox/Example.OpenTelemetry/Example.OpenTelemetry.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="1.7.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.7.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
</ItemGroup>

</Project>
53 changes: 53 additions & 0 deletions sandbox/Example.OpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/****************************************************************************
OpenTelemetry Example
(1) Run Jaeger locally and then run the client and server apps.
https://www.jaegertracing.io/download/
https://medium.com/jaegertracing/introducing-native-support-for-opentelemetry-in-jaeger-eb661be8183c
```powershell
> $env:COLLECTOR_OTLP_ENABLED=true
> jaeger-all-in-one.exe
```
or
```bash
$ COLLECTOR_OTLP_ENABLED=true jaeger-all-in-one
```
(2) Jaeger UI default URL http://localhost:16686/search
(3) In different terminals run:
```
nats-server
```
```
dotnet run -- service
```
```
dotnet run -- client
```
****************************************************************************/

using Example.OpenTelemetry;

if (args.Length > 0 && args[0] == "client")
{
await ClientApp.Run();
}
else if (args.Length > 0 && args[0] == "service")
{
await ServiceApp.Run();
}
else
{
Console.Error.WriteLine("Usage: dotnet run -- <client|service>");
}
49 changes: 49 additions & 0 deletions sandbox/Example.OpenTelemetry/ServiceApp.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Diagnostics;
using NATS.Client.Core;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace Example.OpenTelemetry;

public static class ServiceApp
{
public static async Task Run()
{
var serviceName = "ServiceApp";
var serviceVersion = "1.0.0";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddOtlpExporter()
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName: serviceName, serviceVersion: serviceVersion))
.AddSource("NATS.Net")
.AddSource("MyServiceSource")
.Build();

ActivitySource activitySource = new("MyServiceSource");

Console.WriteLine("Service App is starting...");

await using var nats = new NatsConnection();

await foreach (var msg in nats.SubscribeAsync<string>("greet.>"))
{
using var activity = msg.StartActivity("Greetings");

if (msg.Subject.StartsWith("greet.presence"))
{
Console.WriteLine($"{msg.Data} is here!");

activity?.AddEvent(new ActivityEvent("Presence", tags: new()
{
["subject"] = msg.Subject,
["data"] = msg.Data,
}));

continue;
}

await msg.ReplyAsync($"Hi there! {msg.Data}");
}
}
}
41 changes: 41 additions & 0 deletions src/NATS.Client.Core/Internal/ActivityEndingMsgReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;

namespace NATS.Client.Core.Internal;

internal sealed class ActivityEndingMsgReader<T> : ChannelReader<NatsMsg<T>>
{
private readonly ChannelReader<NatsMsg<T>> _inner;

public ActivityEndingMsgReader(ChannelReader<NatsMsg<T>> inner) => _inner = inner;

public override bool CanCount => _inner.CanCount;

public override bool CanPeek => _inner.CanPeek;

public override int Count => _inner.Count;

public override Task Completion => _inner.Completion;

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override bool TryRead(out NatsMsg<T> item)
{
if (!_inner.TryRead(out item))
return false;

item.Headers?.Activity?.Dispose();

return true;
}

/// <inheritdoc/>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default) => _inner.WaitToReadAsync(cancellationToken);

public override ValueTask<NatsMsg<T>> ReadAsync(CancellationToken cancellationToken = default) => _inner.ReadAsync(cancellationToken);

public override bool TryPeek(out NatsMsg<T> item) => _inner.TryPeek(out item);

public override IAsyncEnumerable<NatsMsg<T>> ReadAllAsync(CancellationToken cancellationToken = default) => _inner.ReadAllAsync(cancellationToken);
}
24 changes: 24 additions & 0 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
@@ -49,6 +49,30 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
if (Telemetry.HasListeners())
{
using var activity = Telemetry.StartSendActivity($"{_connection.SpanDestinationName(sub.Subject)} {Telemetry.Constants.SubscribeActivityName}", _connection, sub.Subject, null, null);
try
{
if (IsInboxSubject(sub.Subject))
{
if (sub.QueueGroup != null)
{
throw new NatsException("Inbox subscriptions don't support queue groups");
}

return SubscribeInboxAsync(sub, cancellationToken);
}

return SubscribeInternalAsync(sub.Subject, sub.QueueGroup, sub.Opts, sub, cancellationToken);
}
catch (Exception ex)
{
Telemetry.SetException(activity, ex);
throw;
}
}

if (IsInboxSubject(sub.Subject))
{
if (sub.QueueGroup != null)
Loading

0 comments on commit a371623

Please sign in to comment.