Skip to content

Commit

Permalink
Calculate timeSinceEnqueued for consumer spans (#36625)
Browse files Browse the repository at this point in the history
* calculate timesinceenqueued

* refactor

* Update sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/Internals/TraceHelper.cs

Co-authored-by: Rajkumar Rangaraj <rajrang@microsoft.com>

* address pr comments

* address pr comments

* align

* Address pr comments

* spelling

---------

Co-authored-by: Rajkumar Rangaraj <rajrang@microsoft.com>
  • Loading branch information
vishweshbankwar and rajkumar-rangaraj authored May 31, 2023
1 parent 9036b06 commit eacaffc
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ public RequestData(int version, Activity activity, ref ActivityTagsProcessor act
Properties = new ChangeTrackingDictionary<string, string>();
Measurements = new ChangeTrackingDictionary<string, double>();

TraceHelper.AddActivityLinksToProperties(activity, ref activityTagsProcessor.UnMappedTags);
if (activity.Kind == ActivityKind.Consumer)
{
TraceHelper.AddEnqueuedTimeToMeasurementsAndLinksToProperties(activity, Measurements, ref activityTagsProcessor.UnMappedTags);
}
else
{
TraceHelper.AddActivityLinksToProperties(activity, ref activityTagsProcessor.UnMappedTags);
}

TraceHelper.AddPropertiesToTelemetry(Properties, ref activityTagsProcessor.UnMappedTags);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,7 @@ internal static void AddActivityLinksToProperties(Activity activity, ref AzMonLi
linksJson.Append('[');
foreach (ref readonly var link in activity.EnumerateLinks())
{
linksJson
.Append('{')
.Append("\"operation_Id\":")
.Append('\"')
.Append(link.Context.TraceId.ToHexString())
.Append('\"')
.Append(',');
linksJson
.Append("\"id\":")
.Append('\"')
.Append(link.Context.SpanId.ToHexString())
.Append('\"');
linksJson.Append("},");

AddContextToMSLinks(linksJson, link);
maxLinks--;
if (maxLinks == 0)
{
Expand Down Expand Up @@ -300,5 +287,107 @@ private static void AddTelemetryFromActivityEvents(Activity activity, TelemetryI
BaseData = exceptionData,
};
}

internal static void AddEnqueuedTimeToMeasurementsAndLinksToProperties(Activity activity, IDictionary<string, double> measurements, ref AzMonList UnMappedTags)
{
if (activity.Links != null && activity.Links.Any())
{
if (TryGetAverageQueueTimeWithLinks(activity, ref UnMappedTags, out long enqueuedTime))
{
measurements["timeSinceEnqueued"] = enqueuedTime;
}
}
}

private static bool TryGetAverageQueueTimeWithLinks(Activity activity, ref AzMonList UnMappedTags, out long avgTimeInQueue)
{
avgTimeInQueue = 0;
var linksCount = 0;
DateTimeOffset startTime = activity.StartTimeUtc;
long startEpochTime = startTime.ToUnixTimeMilliseconds();
bool isEnqueuedTimeCalculated = true;

string msLinks = "_MS.links";
var linksJson = new StringBuilder();
linksJson.Append('[');
foreach (ref readonly var link in activity.EnumerateLinks())
{
long msgEnqueuedTime = 0;
if (isEnqueuedTimeCalculated && !TryGetEnqueuedTime(link, out msgEnqueuedTime))
{
// instrumentation does not consistently report enqueued time, ignoring whole span
isEnqueuedTimeCalculated = false;
}
if (isEnqueuedTimeCalculated)
{
avgTimeInQueue += Math.Max(startEpochTime - msgEnqueuedTime, 0);
}

linksCount++;

if (linksCount <= MaxlinksAllowed)
{
AddContextToMSLinks(linksJson, link);
}
}

if (linksJson.Length > 0)
{
// trim trailing comma - json does not support it
linksJson.Remove(linksJson.Length - 1, 1);
}
linksJson.Append(']');
AzMonList.Add(ref UnMappedTags, new KeyValuePair<string, object?>(msLinks, linksJson.ToString()));
if (MaxlinksAllowed < linksCount)
{
AzureMonitorExporterEventSource.Log.WriteInformational("ActivityLinksIgnored", $"Max count of {MaxlinksAllowed} has reached.");
}

if (isEnqueuedTimeCalculated)
{
avgTimeInQueue /= linksCount;
}
else
{
avgTimeInQueue = 0;
return false;
}

return true;
}

private static bool TryGetEnqueuedTime(ActivityLink link, out long enqueuedTime)
{
enqueuedTime = 0;

foreach (ref readonly var attribute in link.EnumerateTagObjects())
{
if (attribute.Key == "enqueuedTime")
{
return long.TryParse(attribute.Value?.ToString(), out enqueuedTime);
}
}

return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void AddContextToMSLinks(StringBuilder linksJson, ActivityLink link)
{
linksJson
.Append('{')
.Append("\"operation_Id\":")
.Append('\"')
.Append(link.Context.TraceId.ToHexString())
.Append('\"')
.Append(',');
linksJson
.Append("\"id\":")
.Append('\"')
.Append(link.Context.SpanId.ToHexString())
.Append('\"');
linksJson
.Append("},");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;

using Azure.Monitor.OpenTelemetry.Exporter.Internals;
using Azure.Monitor.OpenTelemetry.Exporter.Models;

Expand Down Expand Up @@ -133,5 +133,100 @@ public void RequestDataContainsAzureNamespace()
Assert.True(activityTagsProcessor.HasAzureNamespace);
Assert.Equal("DemoAzureResource", requestData.Properties["az.namespace"]);
}

[Fact]
public void RequestDataContainsTimeSinceEnqueuedForConsumerSpans()
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
List<ActivityLink>? links = new List<ActivityLink>();
long enqueued0 = DateTimeOffset.UtcNow.AddMilliseconds(-100).ToUnixTimeMilliseconds();
long enqueued1 = DateTimeOffset.UtcNow.AddMilliseconds(-200).ToUnixTimeMilliseconds();
long enqueued2 = DateTimeOffset.UtcNow.AddMilliseconds(-300).ToUnixTimeMilliseconds();

links.Add(AddActivityLink(enqueued0));
links.Add(AddActivityLink(enqueued1));
links.Add(AddActivityLink(enqueued2));

using var activity = activitySource.StartActivity("Activity", ActivityKind.Consumer, null, null, links);
Assert.NotNull(activity);
var activityTagsProcessor = TraceHelper.EnumerateActivityTags(activity);

var requestData = new RequestData(2, activity, ref activityTagsProcessor);

DateTimeOffset startTime = activity.StartTimeUtc;
var startTimeEpoch = startTime.ToUnixTimeMilliseconds();

long expectedTimeInQueue = ((startTimeEpoch - enqueued0) +
(startTimeEpoch - enqueued1) +
(startTimeEpoch - enqueued2)) / 3; // avg diff with request start time across links

Assert.True(requestData.Measurements.TryGetValue("timeSinceEnqueued", out var timeInQueue));

Assert.Equal(expectedTimeInQueue, timeInQueue);
}

[Fact]
public void RequestDataTimeSinceEnqueuedNegative()
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
List<ActivityLink>? links = new List<ActivityLink>();
long enqueued0 = DateTimeOffset.UtcNow.AddMilliseconds(-100).ToUnixTimeMilliseconds();
long enqueued1 = DateTimeOffset.UtcNow.AddMilliseconds(-200).ToUnixTimeMilliseconds();
long enqueued2 = DateTimeOffset.UtcNow.AddMilliseconds(300).ToUnixTimeMilliseconds(); // ignored

links.Add(AddActivityLink(enqueued0));
links.Add(AddActivityLink(enqueued1));
links.Add(AddActivityLink(enqueued2));

using var activity = activitySource.StartActivity("Activity", ActivityKind.Consumer, null, null, links);
Assert.NotNull(activity);
var activityTagsProcessor = TraceHelper.EnumerateActivityTags(activity);

var requestData = new RequestData(2, activity, ref activityTagsProcessor);

DateTimeOffset startTime = activity.StartTimeUtc;
var startTimeEpoch = startTime.ToUnixTimeMilliseconds();

long expectedTimeInQueue = ((startTimeEpoch - enqueued0) +
(startTimeEpoch - enqueued1)) / 3; // avg diff with request start time across links

Assert.True(requestData.Measurements.TryGetValue("timeSinceEnqueued", out var timeInQueue));

Assert.Equal(expectedTimeInQueue, timeInQueue);
}

[Fact]
public void RequestDataTimeSinceEnqueuedInvalidEmqueuedTime()
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
List<ActivityLink>? links = new List<ActivityLink>();

ActivityTagsCollection tags = new ActivityTagsCollection();
tags.Add("enqueuedTime", "Invalid");
var link = new ActivityLink(new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None, null), tags);
links.Add(link);

using var activity = activitySource.StartActivity("Activity", ActivityKind.Consumer, null, null, links);
Assert.NotNull(activity);
var activityTagsProcessor = TraceHelper.EnumerateActivityTags(activity);

var requestData = new RequestData(2, activity, ref activityTagsProcessor);

DateTimeOffset startTime = activity.StartTimeUtc;
var startTimeEpoch = startTime.ToUnixTimeMilliseconds();

Assert.False(requestData.Measurements.TryGetValue("timeSinceEnqueued", out var timeInQueue));
}

private ActivityLink AddActivityLink(long enqueuedTime)
{
ActivityTagsCollection tags = new ActivityTagsCollection
{
{ "enqueuedTime", enqueuedTime.ToString() }
};
var link = new ActivityLink(new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None, null), tags);

return link;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ static TraceHelperTests()
}

[Theory]
[InlineData("RequestData")]
[InlineData("RemoteDependencyData")]
public void PropertiesDoesNotContainMSLinksWhenActivityHasNoLinks(string telemetryType)
[InlineData("RequestData", ActivityKind.Server)]
[InlineData("RequestData", ActivityKind.Consumer)]
[InlineData("RemoteDependencyData", ActivityKind.Client)]
public void PropertiesDoesNotContainMSLinksWhenActivityHasNoLinks(string telemetryType, ActivityKind kind)
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
using var activity = activitySource.StartActivity(
ActivityName,
ActivityKind.Client,
kind,
parentContext: default,
startTime: DateTime.UtcNow);

Expand All @@ -67,9 +68,10 @@ public void PropertiesDoesNotContainMSLinksWhenActivityHasNoLinks(string telemet
}

[Theory]
[InlineData("RequestData")]
[InlineData("RemoteDependencyData")]
public void PropertiesContainMSLinksWhenActivityHasLinks(string telemetryType)
[InlineData("RequestData", ActivityKind.Server)]
[InlineData("RequestData", ActivityKind.Consumer)]
[InlineData("RemoteDependencyData", ActivityKind.Client)]
public void PropertiesContainMSLinksWhenActivityHasLinks(string telemetryType, ActivityKind kind)
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
ActivityLink activityLink = new ActivityLink(new ActivityContext(
Expand All @@ -82,7 +84,7 @@ public void PropertiesContainMSLinksWhenActivityHasLinks(string telemetryType)

using var activity = activitySource.StartActivity(
ActivityName,
ActivityKind.Client,
kind,
parentContext: default,
null,
links,
Expand All @@ -109,9 +111,10 @@ public void PropertiesContainMSLinksWhenActivityHasLinks(string telemetryType)
}

[Theory]
[InlineData("RequestData")]
[InlineData("RemoteDependencyData")]
public void LinksAreTruncatedWhenCannotFitInMaxLength(string telemetryType)
[InlineData("RequestData", ActivityKind.Server)]
[InlineData("RequestData", ActivityKind.Consumer)]
[InlineData("RemoteDependencyData", ActivityKind.Client)]
public void LinksAreTruncatedWhenCannotFitInMaxLength(string telemetryType, ActivityKind kind)
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
List<ActivityLink> links = new List<ActivityLink>();
Expand All @@ -133,7 +136,7 @@ public void LinksAreTruncatedWhenCannotFitInMaxLength(string telemetryType)

using var activity = activitySource.StartActivity(
ActivityName,
ActivityKind.Client,
kind,
parentContext: default,
null,
links,
Expand Down Expand Up @@ -168,9 +171,10 @@ public void LinksAreTruncatedWhenCannotFitInMaxLength(string telemetryType)
}

[Theory]
[InlineData("RequestData")]
[InlineData("RemoteDependencyData")]
public void LinksAreNotTruncatedWhenCanBeFitInMaxLength(string telemetryType)
[InlineData("RequestData", ActivityKind.Server)]
[InlineData("RequestData", ActivityKind.Consumer)]
[InlineData("RemoteDependencyData", ActivityKind.Client)]
public void LinksAreNotTruncatedWhenCanBeFitInMaxLength(string telemetryType, ActivityKind kind)
{
using ActivitySource activitySource = new ActivitySource(ActivitySourceName);
List<ActivityLink> links = new List<ActivityLink>();
Expand All @@ -186,7 +190,7 @@ public void LinksAreNotTruncatedWhenCanBeFitInMaxLength(string telemetryType)

using var activity = activitySource.StartActivity(
ActivityName,
ActivityKind.Client,
kind,
parentContext: default,
null,
links,
Expand Down

0 comments on commit eacaffc

Please sign in to comment.