Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Tracing: Fixes traceid null exception issue #4111

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
DiagnosticScope scope = LazyOperationScopeFactory.Value.CreateScope(name: operationName,
kind: clientContext.ClientOptions.ConnectionMode == ConnectionMode.Gateway ? DiagnosticScope.ActivityKind.Internal : DiagnosticScope.ActivityKind.Client);

// Record values only when we have a valid Diagnostic Scope
// Need a parent activity id associated with the operation which is logged in diagnostics and used for tracing purpose.
// If there are listeners at operation level then scope is enabled and it tries to create activity.
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
// However, if available listeners are not subscribed to operation level event then it will lead to scope being enabled but no activity is created.
if (scope.IsEnabled)
{
scope.SetDisplayName($"{operationName} {containerName}");
Expand All @@ -63,14 +65,28 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
config: requestOptions?.CosmosThresholdOptions ?? clientContext.ClientOptions?.CosmosClientTelemetryOptions.CosmosThresholdOptions);
}
#if !INTERNAL
else if (Activity.Current is null)
// Need a parent activity which groups all network activities under it and is logged in diagnostics and used for tracing purpose.
// If there are listeners at network level then scope is enabled and it tries to create activity.
// However, if available listeners are not subscribed to network event then it will lead to scope being enabled but no activity is created.
else
{
DiagnosticScope requestScope = LazyNetworkScopeFactory.Value.CreateScope(name: operationName);
openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : openTelemetryRecorder;
}

openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
// If there are no listeners at operation level and network level and no parent activity created.
// Then create a dummy activity as there should be a parent level activity always to send a traceid to the backend services through context propagation.
// The parent activity id is logged in diagnostics and used for tracing purpose.
if (Activity.Current is null)
{
openTelemetryRecorder = OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
}
#endif
trace.AddDatum("DistributedTraceId", Activity.Current?.TraceId);
// Safety check as diagnostic logs should not break the code.
if (Activity.Current?.TraceId != null)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
trace.AddDatum("DistributedTraceId", Activity.Current.TraceId);
}
}
return openTelemetryRecorder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos
using System.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;
using System.Net.Http;
using Microsoft.Azure.Cosmos.Tests;

[VisualStudio.TestTools.UnitTesting.TestClass]
public sealed class DistributedTracingOTelTests : BaseCosmosClientHelper
Expand All @@ -33,7 +34,8 @@ public void TestInitialize()
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -43,12 +45,12 @@ public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTrace
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithConnectionModeDirect());

Container containerResponse = await this.database.CreateContainerAsync(
Expand Down Expand Up @@ -106,7 +108,8 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -128,12 +131,12 @@ public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTrac
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithHttpClientFactory(() => new HttpClient(httpClientHandlerHelper))
.WithConnectionModeGateway());

Expand All @@ -142,7 +145,6 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
partitionKeyPath: "/id",
throughput: 20000);

List<Activity> b = CustomOtelExporter.CollectedActivities.ToList();
//Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Expand All @@ -169,7 +171,7 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataRow(true, true, "random.source.name", DisplayName = "GatewayMode, DistributedFlag Off, Random/No Source:Asserts no activity creation")]
[DataRow(false, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "DirectMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
[DataRow(true, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "GatewayMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
public async Task NoSourceNoFlagEnabled_ResultsInNoOperationNetworkActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
{
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddCustomOtelExporter()
Expand All @@ -178,20 +180,20 @@ public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_Assert

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
}));
}
Expand Down Expand Up @@ -221,6 +223,49 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
Assert.AreEqual(0, CustomOtelExporter.CollectedActivities.Count());
}


[DataTestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task SuppressListenerEvents_ResultsInNoScopeActivityCreation_AssertTraceIdNotNull(bool useGateway)
{
// Initialize CustomListener with suppression
CustomListener customListener = new CustomListener($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.*", "Azure-Cosmos-Operation-Request-Diagnostics", true);

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
}));
}

ContainerResponse containerResponse = await this.database.CreateContainerAsync(
id: Guid.NewGuid().ToString(),
partitionKeyPath: "/id",
throughput: 20000);

// Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Assert.IsNotNull(objDiagnosticsCreate["data"]["DistributedTraceId"], "Distributed Trace Id has value in diagnostics i.e. " + (string)objDiagnosticsCreate["data"]["DistributedTraceId"]);

// Cleanup
customListener.Dispose();
}

[TestCleanup]
public async Task CleanUp()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static void AreEqualAcrossListeners()
{
Assert.AreEqual(
JsonConvert.SerializeObject(CustomListener.CollectedOperationActivities.OrderBy(x => x.Id)),
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities.OrderBy(x => x.Id)));
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities
.Where(activity => activity.Source.Name == $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation")
.OrderBy(x => x.Id)));
}

private static void AssertDatabaseAndContainerName(string name, KeyValuePair<string, string> tag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ internal class CustomListener :
{
private readonly Func<string, bool> sourceNameFilter;
private readonly string eventName;

private readonly bool suppressAllEvents;

private ConcurrentBag<IDisposable> subscriptions = new();
private ConcurrentBag<ProducedDiagnosticScope> Scopes { get; } = new();

Expand All @@ -36,21 +37,22 @@ internal class CustomListener :

private static List<EventSource> EventSources { set; get; } = new();

public CustomListener(string name, string eventName)
: this(n => Regex.Match(n, name).Success, eventName)
public CustomListener(string name, string eventName, bool suppressAllEvents = false)
: this(n => Regex.Match(n, name).Success, eventName, suppressAllEvents)
{
}

public CustomListener(Func<string, bool> filter, string eventName)
public CustomListener(Func<string, bool> filter, string eventName, bool suppressAllEvents = false)
{
this.sourceNameFilter = filter;
this.eventName = eventName;
this.suppressAllEvents = suppressAllEvents;

foreach (EventSource eventSource in EventSources)
{
this.OnEventSourceCreated(eventSource);
}

DiagnosticListener.AllListeners.Subscribe(this);
}

Expand Down Expand Up @@ -149,7 +151,15 @@ public void OnNext(DiagnosticListener value)
{
lock (this.Scopes)
{
this.subscriptions?.Add(value.Subscribe(this));
IDisposable subscriber = value.Subscribe(this, isEnabled: (name) =>
{
if (this.suppressAllEvents)
{
return false;
}
return true;
});
this.subscriptions?.Add(subscriber);
}
}
}
Expand All @@ -159,7 +169,12 @@ public void OnNext(DiagnosticListener value)
/// </summary>
protected override void OnEventSourceCreated(EventSource eventSource)
{
if(this.eventName == null)
if (this.eventName == null)
{
EventSources.Add(eventSource);
}

if (this.eventName == null)
{
EventSources.Add(eventSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public override ExportResult Export(in Batch<Activity> batch)

foreach (Activity activity in batch)
{
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase))
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase)
|| string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", StringComparison.OrdinalIgnoreCase))
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
{
AssertActivity.IsValidOperationActivity(activity);

Expand Down
Loading