Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit critical time and processing time meters #6953

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;

class TestingMetricListener : IDisposable
{
readonly MeterListener meterListener;
public static TestingMetricListener SetupNServiceBusMetricsListener() =>
SetupMetricsListener("NServiceBus.Core");

public static TestingMetricListener SetupMetricsListener(string sourceName)
{
var testingMetricListener = new TestingMetricListener(sourceName);
return testingMetricListener;
}

public TestingMetricListener(string sourceName)
{
Expand All @@ -25,61 +32,44 @@ public TestingMetricListener(string sourceName)
}
};

meterListener.SetMeasurementEventCallback((Instrument instrument,
long measurement,
ReadOnlySpan<KeyValuePair<string, object>> t,
object _) =>
{
TestContext.WriteLine($"{instrument.Meter.Name}\\{instrument.Name}:{measurement}");
meterListener.SetMeasurementEventCallback<long>(TrackMeasurement);
meterListener.SetMeasurementEventCallback<double>(TrackMeasurement);

var tags = t.ToArray();
ReportedMeters.AddOrUpdate(instrument.Name, measurement, (_, val) => val + measurement);
Tags.AddOrUpdate(instrument.Name, _ => tags, (_, _) => tags);
});
meterListener.Start();
}

public static TestingMetricListener SetupNServiceBusMetricsListener() =>
SetupMetricsListener("NServiceBus.Core");

public static TestingMetricListener SetupMetricsListener(string sourceName)
void TrackMeasurement<T>(Instrument instrument,
T value,
ReadOnlySpan<KeyValuePair<string, object>> tags,
object _) where T : struct
{
var testingMetricListener = new TestingMetricListener(sourceName);
return testingMetricListener;
TestContext.WriteLine($"{instrument.Meter.Name}\\{instrument.Name}:{value}");

var measurement = new Measurement<T>(value, tags);

ReportedMeters.AddOrUpdate(instrument.Name, (_) => new object[] { measurement }, (_, val) => val.Append(measurement).ToArray());
}

public void Dispose() => meterListener?.Dispose();

public ConcurrentDictionary<string, long> ReportedMeters { get; } = new();
public ConcurrentDictionary<string, KeyValuePair<string, object>[]> Tags { get; } = new();

public void AssertMetric(string metricName, long expected)
public IEnumerable<Measurement<T>> GetReportedMeasurements<T>(string metricName) where T : struct
{
if (expected == 0)
if (!ReportedMeters.TryGetValue(metricName, out var measurements))
{
Assert.False(ReportedMeters.ContainsKey(metricName), $"Should not have '{metricName}' metric reported.");
yield break;
}
else

foreach (var measurement in measurements)
{
Assert.True(ReportedMeters.ContainsKey(metricName), $"'{metricName}' metric was not reported.");
Assert.AreEqual(expected, ReportedMeters[metricName]);
yield return (Measurement<T>)measurement;
}
}

public object AssertTagKeyExists(string metricName, string tagKey)
public void AssertMetricNotReported(string metricName)
{
if (!Tags.ContainsKey(metricName))
{
Assert.Fail($"'{metricName}' metric was not reported");
}

var emptyTag = default(KeyValuePair<string, object>);
var meterTag = Tags[metricName].FirstOrDefault(t => t.Key == tagKey);
if (meterTag.Equals(emptyTag))
{
Assert.Fail($"'{tagKey}' tag was not found.");
}

return meterTag.Value;
Assert.False(ReportedMeters.ContainsKey(metricName), $"Should not have '{metricName}' metric reported.");
}

ConcurrentDictionary<string, object[]> ReportedMeters { get; } = new();
readonly MeterListener meterListener;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
Expand All @@ -26,19 +27,25 @@ public async Task Should_report_successful_message_metric()
.Done(c => c.OutgoingMessagesReceived == 5)
.Run();

metricsListener.AssertMetric("nservicebus.messaging.successes", 5);
metricsListener.AssertMetric("nservicebus.messaging.fetches", 5);
metricsListener.AssertMetric("nservicebus.messaging.failures", 0);
metricsListener.AssertMetricNotReported("nservicebus.messaging.failures");

var successEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue");
var successType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type");
var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue");
var fetchedType = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.message_type").ToString();
//metricsListener.AssertMetric("nservicebus.messaging.fetches", 5);
var successMeasurements = metricsListener.GetReportedMeasurements<long>("nservicebus.messaging.successes");

Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), successEndpoint);
Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), fetchedEndpoint);
Assert.AreEqual(5, successMeasurements.Sum(m => m.Value));

var successMeasurement = successMeasurements.First();

var successQueueName = successMeasurement.Tags.ToArray().First(kvp => kvp.Key == "nservicebus.queue").Value;
var successType = successMeasurement.Tags.ToArray().First(kvp => kvp.Key == "nservicebus.message_type").Value;

//var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue");
//var fetchedType = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.message_type").ToString();
var enpointName = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics));
Assert.AreEqual(enpointName, successQueueName);
//Assert.AreEqual(Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), fetchedEndpoint);
Assert.AreEqual(successType, typeof(OutgoingMessage).AssemblyQualifiedName);
Assert.AreEqual(fetchedType, typeof(OutgoingMessage).AssemblyQualifiedName);
//Assert.AreEqual(fetchedType, typeof(OutgoingMessage).AssemblyQualifiedName);
}

class Context : ScenarioContext
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry;

using System.Linq;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;

public class When_processing_completes : OpenTelemetryAcceptanceTest
{
[Test]
public async Task Should_report_processing_and_critical_time()
{
using var metricsListener = TestingMetricListener.SetupNServiceBusMetricsListener();
_ = await Scenario.Define<Context>()
.WithEndpoint<MyEndpoint>(e => e
.When(s => s.SendLocal(new MyMessage())))
.Done(c => c.HandlerInvoked)
.Run();

var processingTime = metricsListener.GetReportedMeasurements<double>("nservicebus.messaging.processingtime").Single();
var criticalTime = metricsListener.GetReportedMeasurements<double>("nservicebus.messaging.criticaltime").Single();

Assert.Greater(processingTime.Value, 50.0);
Assert.Greater(criticalTime.Value, processingTime.Value);
}

class Context : ScenarioContext
{
public bool HandlerInvoked { get; set; }
}

class MyEndpoint : EndpointConfigurationBuilder
{
public MyEndpoint() => EndpointSetup<OpenTelemetryEnabledEndpoint>();

class MyMessageHandler : IHandleMessages<MyMessage>
{
Context textContext;

public MyMessageHandler(Context textContext) => this.textContext = textContext;

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
await Task.Delay(50);

textContext.HandlerInvoked = true;
}
}
}

public class MyMessage : IMessage
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ public async Task Should_report_failing_message_metrics()
.Done(c => c.HandlerInvoked)
.Run();

metricsListener.AssertMetric("nservicebus.messaging.fetches", 1);
metricsListener.AssertMetric("nservicebus.messaging.failures", 1);
metricsListener.AssertMetric("nservicebus.messaging.successes", 0);
Assert.AreEqual(metricsListener.GetReportedMeasurements<long>("nservicebus.messaging.fetches").Sum(m => m.Value), 1);
Assert.AreEqual(metricsListener.GetReportedMeasurements<long>("nservicebus.messaging.failures").Sum(m => m.Value), 1);
Assert.AreEqual(metricsListener.GetReportedMeasurements<long>("nservicebus.messaging.successes").Sum(m => m.Value), 0);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
Expand Down Expand Up @@ -26,6 +26,7 @@

<ItemGroup>
<RemoveSourceFileFromPackage Include="Core\**\*.cs" />
<RemoveSourceFileFromPackage Remove="Core\OpenTelemetry\When_processing_completes.cs" />
<RemoveSourceFileFromPackage Include="AssemblyInfo.cs" />
</ItemGroup>

Expand Down
29 changes: 29 additions & 0 deletions src/NServiceBus.Core/OpenTelemetry/Metrics/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace NServiceBus;
using System;

static class Extensions
{
public static bool TryGetTimeSent(this ReceivePipelineCompleted completed, out DateTimeOffset timeSent)
{
var headers = completed.ProcessedMessage.Headers;
if (headers.TryGetValue(Headers.TimeSent, out var timeSentString))
{
timeSent = DateTimeOffsetHelper.ToDateTimeOffset(timeSentString);
return true;
}
timeSent = DateTimeOffset.MinValue;
return false;
}

public static bool TryGetDeliverAt(this ReceivePipelineCompleted completed, out DateTimeOffset deliverAt)
{
var headers = completed.ProcessedMessage.Headers;
if (headers.TryGetValue(Headers.DeliverAt, out var deliverAtString))
{
deliverAt = DateTimeOffsetHelper.ToDateTimeOffset(deliverAtString);
return true;
}
deliverAt = DateTimeOffset.MinValue;
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
namespace NServiceBus;

using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using System;
using Features;

/// <summary>
Expand All @@ -20,5 +24,26 @@ protected internal override void Setup(FeatureConfigurationContext context)
performanceDiagnosticsBehavior,
"Provides OpenTelemetry counters for message processing"
);

context.Pipeline.OnReceivePipelineCompleted((e, _) =>
{
e.ProcessedMessage.Headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes);

var tags = new TagList(new KeyValuePair<string, object>[]
{
new(MeterTags.EndpointDiscriminator, discriminator ?? ""),
new(MeterTags.QueueName, queueNameBase ?? ""),
new(MeterTags.MessageType, messageTypes ?? "")
});

Meters.ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbrandt I think you're referring to this right? From what I understand is that they recommend to add a suffix to identify unit of measurement. I'm not sure if this value can be a double/float. We likely still want to report duration in floating point when reporting in seconds. The current code also passed double (result of TotalMilliseconds).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, for histograms using seconds likely wouldn't make much sense for message processing. Wouldn't you want to have millisecond granularity at the base of a histogram?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, for histograms using seconds likely wouldn't make much sense for message processing. Wouldn't you want to have millisecond granularity at the base of a histogram?

This is addressed in comments for open-telemetry/opentelemetry-dotnet#4797.
If you create a histogram with ms units, the default histogram boundaries to { 0, 5, 10, 25, 50, 75,
100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000} and if you create with seconds as the unit the default histogram boundaries are those same values, divided by 1000.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbrandt I think you're referring to this right? From what I understand is that they recommend to add a suffix to identify unit of measurement. I'm not sure if this value can be a double/float. We likely still want to report duration in floating point when reporting in seconds. The current code also passed double (result of TotalMilliseconds).

Yes. Value will be reported as a double.

My preference is using https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.stopwatch?view=net-8.0 instead of DateTime.UtcNow, but this is a minor difference.


if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
{
Meters.CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
}

return Task.CompletedTask;
});
}
}
8 changes: 7 additions & 1 deletion src/NServiceBus.Core/OpenTelemetry/Metrics/Meters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Meters
{
internal static readonly Meter NServiceBusMeter = new Meter(
"NServiceBus.Core",
"0.1.0");
"0.2.0");

internal static readonly Counter<long> TotalProcessedSuccessfully =
NServiceBusMeter.CreateCounter<long>("nservicebus.messaging.successes", description: "Total number of messages processed successfully by the endpoint.");
Expand All @@ -16,4 +16,10 @@ class Meters

internal static readonly Counter<long> TotalFailures =
NServiceBusMeter.CreateCounter<long>("nservicebus.messaging.failures", description: "Total number of messages processed unsuccessfully by the endpoint.");

internal static readonly Histogram<double> ProcessingTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processingtime", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the relationship between NServiceBusMeter.CreateHistogram and OpenTelemetry?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See CreateHistogram:
https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.metrics.meter.createhistogram?view=net-8.0

NserviceBusMeter is of type Meter:
https://learn.microsoft.com/en-us/dotnet/api/system.diagnostics.metrics.meter?view=net-8.0

There is actually no direct reference to the OpenTelemetry SDK from Meter in .NET or from NServiceBus, even though there is an endpointConfiguration.EnableOpenTelemetry() method.

It was actually a great decision by the .NET team to keep system.diagnostics.metrics decoupled from OpenTelemetry, seeing as how many unified observability/telemetry tools have come before it. There's no guarantee OpenTelemetry will be the last, but there has been a great deal more buying across the industry it seems. That's all in the realm of interesting, but not important.

What is important is that Meter surfaces information in a way it can be consumed by otel and any proprietary tools or future standard tools.

Sorry about mistakes, while on my phone.


internal static readonly Histogram<double> CriticalTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.criticaltime", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this will make it into the NSB v9 release that is RTM?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbrandt, we started working on some of those metrics in #7048. The goal is to release them in 9.1. We don't have an ETA yet, but we're actively working on it.

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace NServiceBus;

class ReceiveDiagnosticsBehavior : IBehavior<IIncomingPhysicalMessageContext, IIncomingPhysicalMessageContext>
{

public ReceiveDiagnosticsBehavior(string queueNameBase, string discriminator)
{
this.queueNameBase = queueNameBase;
Expand All @@ -19,12 +18,12 @@ public async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncoming
{
context.MessageHeaders.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes);

var tags = new TagList(new KeyValuePair<string, object>[]
{
var tags = new TagList(
[
new(MeterTags.EndpointDiscriminator, discriminator ?? ""),
new(MeterTags.QueueName, queueNameBase ?? ""),
new(MeterTags.MessageType, messageTypes ?? ""),
}.AsSpan());
]);

Meters.TotalFetched.Add(1, tags);

Expand Down
Loading