Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Tracing: Fixes traceid null exception issue #4111

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
DiagnosticScope scope = LazyOperationScopeFactory.Value.CreateScope(name: operationName,
kind: clientContext.ClientOptions.ConnectionMode == ConnectionMode.Gateway ? DiagnosticScope.ActivityKind.Internal : DiagnosticScope.ActivityKind.Client);

// Record values only when we have a valid Diagnostic Scope
// The scope here checks for listeners at Operation Level.
// If there are listeners at operation level then scope is enabled and activity is created.
sourabh1007 marked this conversation as resolved.
Show resolved Hide resolved
if (scope.IsEnabled)
{
scope.SetDisplayName($"{operationName} {containerName}");
Expand All @@ -63,14 +64,27 @@ public static OpenTelemetryCoreRecorder CreateRecorder(string operationName,
config: requestOptions?.CosmosThresholdOptions ?? clientContext.ClientOptions?.CosmosClientTelemetryOptions.CosmosThresholdOptions);
}
#if !INTERNAL
else if (Activity.Current is null)
// The scope here checks for listeners at Network Level.
// If there are listeners at network level and no parent activity created at operation level
// then create a network scope that creates a parent activity.
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
else
{
DiagnosticScope requestScope = LazyNetworkScopeFactory.Value.CreateScope(name: operationName);
openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : openTelemetryRecorder;
}

openTelemetryRecorder = requestScope.IsEnabled ? OpenTelemetryCoreRecorder.CreateNetworkLevelParentActivity(networkScope: requestScope) : OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
// If there are no listeners at operation level and network level and no parent activity
// then create a dummy parent activity.
if (Activity.Current is null)
{
openTelemetryRecorder = OpenTelemetryCoreRecorder.CreateParentActivity(operationName);
}
#endif
trace.AddDatum("DistributedTraceId", Activity.Current?.TraceId);
// Safety check as diagnostic logs should not break the code.
if (Activity.Current?.TraceId != null)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
trace.AddDatum("DistributedTraceId", Activity.Current?.TraceId);
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
}
}
return openTelemetryRecorder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace Microsoft.Azure.Cosmos
using System.Diagnostics;
using Microsoft.Azure.Cosmos.Tracing;
using System.Net.Http;
using System.ComponentModel;
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
using Microsoft.Azure.Cosmos.Tests;

[VisualStudio.TestTools.UnitTesting.TestClass]
public sealed class DistributedTracingOTelTests : BaseCosmosClientHelper
Expand All @@ -33,7 +35,8 @@ public void TestInitialize()
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "DirectMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_DirectMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -43,12 +46,12 @@ public async Task SourceEnabled_FlagOn_DirectMode_RecordsActivity_AssertLogTrace
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithConnectionModeDirect());

Container containerResponse = await this.database.CreateContainerAsync(
Expand Down Expand Up @@ -106,7 +109,8 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataTestMethod]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation and network level with Diagnostic TraceId being added to logs")]
[DataRow($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", null, DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at operation level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
[DataRow(null, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", DisplayName = "GatewayMode and DistributedFlag On: Asserts activity creation at network level with Diagnostic TraceId being added to logs")]
public async Task SourceEnabled_FlagOn_GatewayMode_RecordsOperationNetworkActivity_AssertLogTraceId_AssertTraceparent(string operationLevelSource, string networkLevelSource)
{
string[] sources = new string[] { operationLevelSource, networkLevelSource };
sources = sources.Where(x => x != null).ToArray();
Expand All @@ -128,12 +132,12 @@ public async Task SourceEnabled_FlagOn_GatewayMode_RecordsActivity_AssertLogTrac
.AddSource(sources)
.Build();

await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = false
})
})
.WithHttpClientFactory(() => new HttpClient(httpClientHandlerHelper))
.WithConnectionModeGateway());

Expand All @@ -142,7 +146,6 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
partitionKeyPath: "/id",
throughput: 20000);

List<Activity> b = CustomOtelExporter.CollectedActivities.ToList();
//Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Expand All @@ -169,7 +172,7 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
[DataRow(true, true, "random.source.name", DisplayName = "GatewayMode, DistributedFlag Off, Random/No Source:Asserts no activity creation")]
[DataRow(false, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "DirectMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
[DataRow(true, true, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", DisplayName = "GatewayMode, DistributedFlag Off, OperationLevel Source:Asserts no activity creation")]
public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
public async Task NoSourceNoFlagEnabled_ResultsInNoOperationNetworkActivityCreation_AssertLogTraceId(bool useGateway, bool disableDistributingTracing, string source)
{
using TracerProvider provider = Sdk.CreateTracerProviderBuilder()
.AddCustomOtelExporter()
Expand All @@ -178,20 +181,20 @@ public async Task NoSourceEnabled_ResultsInNoSourceParentActivityCreation_Assert

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
{
DisableDistributedTracing = disableDistributingTracing
}));
}
Expand Down Expand Up @@ -221,6 +224,49 @@ await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
Assert.AreEqual(0, CustomOtelExporter.CollectedActivities.Count());
}


[DataTestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task SuppressListenerEvents_ResultsInNoScopeActivityCreation_AssertTraceIdNotNull(bool useGateway)
{
// Initialize CustomListener with suppression
CustomListener customListener = new CustomListener($"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.*", "Azure-Cosmos-Operation-Request-Diagnostics", true);

if (useGateway)
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
})
.WithConnectionModeGateway());
}
else
{
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: false,
customizeClientBuilder: (builder) => builder
.WithClientTelemetryOptions(new CosmosClientTelemetryOptions()
{
DisableDistributedTracing = false
}));
}

ContainerResponse containerResponse = await this.database.CreateContainerAsync(
id: Guid.NewGuid().ToString(),
partitionKeyPath: "/id",
throughput: 20000);

// Assert traceId in Diagnostics logs
string diagnosticsCreateContainer = containerResponse.Diagnostics.ToString();
JObject objDiagnosticsCreate = JObject.Parse(diagnosticsCreateContainer);
Assert.IsNotNull(objDiagnosticsCreate["data"]["DistributedTraceId"], "Distributed Trace Id has value in diagnostics i.e. " + (string)objDiagnosticsCreate["data"]["DistributedTraceId"]);

// Cleanup
customListener.Dispose();
}

[TestCleanup]
public async Task CleanUp()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static void AreEqualAcrossListeners()
{
Assert.AreEqual(
JsonConvert.SerializeObject(CustomListener.CollectedOperationActivities.OrderBy(x => x.Id)),
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities.OrderBy(x => x.Id)));
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
JsonConvert.SerializeObject(CustomOtelExporter.CollectedActivities
.Where(activity => activity.Source.Name == $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation")
.OrderBy(x => x.Id)));
}

private static void AssertDatabaseAndContainerName(string name, KeyValuePair<string, string> tag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ internal class CustomListener :
{
private readonly Func<string, bool> sourceNameFilter;
private readonly string eventName;

private readonly bool suppressAllEvents;

private ConcurrentBag<IDisposable> subscriptions = new();
private ConcurrentBag<ProducedDiagnosticScope> Scopes { get; } = new();

Expand All @@ -36,21 +37,22 @@ internal class CustomListener :

private static List<EventSource> EventSources { set; get; } = new();

public CustomListener(string name, string eventName)
: this(n => Regex.Match(n, name).Success, eventName)
public CustomListener(string name, string eventName, bool suppressAllEvents = false)
: this(n => Regex.Match(n, name).Success, eventName, suppressAllEvents)
{
}

public CustomListener(Func<string, bool> filter, string eventName)
public CustomListener(Func<string, bool> filter, string eventName, bool suppressAllEvents = false)
{
this.sourceNameFilter = filter;
this.eventName = eventName;
this.suppressAllEvents = suppressAllEvents;

foreach (EventSource eventSource in EventSources)
{
this.OnEventSourceCreated(eventSource);
}

DiagnosticListener.AllListeners.Subscribe(this);
}

Expand Down Expand Up @@ -149,7 +151,15 @@ public void OnNext(DiagnosticListener value)
{
lock (this.Scopes)
{
this.subscriptions?.Add(value.Subscribe(this));
IDisposable subscriber = value.Subscribe(this, isEnabled: (name) =>
{
if (this.suppressAllEvents)
{
return false;
}
return true;
});
this.subscriptions?.Add(subscriber);
}
}
}
Expand All @@ -159,7 +169,12 @@ public void OnNext(DiagnosticListener value)
/// </summary>
protected override void OnEventSourceCreated(EventSource eventSource)
{
if(this.eventName == null)
if (this.eventName == null)
{
EventSources.Add(eventSource);
}

if (this.eventName == null)
{
EventSources.Add(eventSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public override ExportResult Export(in Batch<Activity> batch)

foreach (Activity activity in batch)
{
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase))
if (string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Operation", StringComparison.OrdinalIgnoreCase)
|| string.Equals(activity.Source.Name, $"{OpenTelemetryAttributeKeys.DiagnosticNamespace}.Request", StringComparison.OrdinalIgnoreCase))
aavasthy marked this conversation as resolved.
Show resolved Hide resolved
{
AssertActivity.IsValidOperationActivity(activity);

Expand Down
Loading