From 2d9402b21ca34790373ffbe0f33afb269eb8efc4 Mon Sep 17 00:00:00 2001 From: Bruce Irschick Date: Tue, 29 Jul 2025 16:42:35 -0700 Subject: [PATCH] feat(csharp/src/Apache.Arrow.Adbc/Tracing): allow ActivitySource tags to be set from TracingConnection --- .../Tracing/ActivityTrace.cs | 11 ++- .../Tracing/TracingConnection.cs | 9 +- .../Databricks/DatabricksConnection.cs | 9 ++ .../Tracing/TracingTests.cs | 89 +++++++++++++++++-- 4 files changed, 108 insertions(+), 10 deletions(-) diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs index f474820cdb..b332590a56 100644 --- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs +++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityTrace.cs @@ -16,8 +16,8 @@ */ using System; +using System.Collections.Generic; using System.Diagnostics; -using System.Linq; using System.Runtime.CompilerServices; using System.Threading.Tasks; @@ -33,8 +33,11 @@ public sealed class ActivityTrace : IDisposable /// Constructs a new object. If is set, it provides the /// activity source name, otherwise the current assembly name is used as the activity source name. /// - /// - public ActivityTrace(string? activitySourceName = default, string? activitySourceVersion = default, string? traceParent = default) + /// The name of the ActivitySource object + /// The version of the component publishing the tracing info. + /// The trace parent context, which is used to link the activity to a distributed trace. + /// The tags associated with the activity. + public ActivityTrace(string? activitySourceName = default, string? activitySourceVersion = default, string? traceParent = default, IEnumerable>? tags = default) { activitySourceName ??= GetType().Assembly.GetName().Name!; // It's okay to have a null version. @@ -46,7 +49,7 @@ public ActivityTrace(string? activitySourceName = default, string? activitySourc TraceParent = traceParent; // This is required to be disposed - ActivitySource = new(activitySourceName, activitySourceVersion); + ActivitySource = new(activitySourceName, activitySourceVersion, tags); } /// diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs index aaa5ac6aff..cd580a6efb 100644 --- a/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs +++ b/csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs @@ -27,7 +27,7 @@ public abstract class TracingConnection : AdbcConnection, IActivityTracer protected TracingConnection(IReadOnlyDictionary properties) { properties.TryGetValue(AdbcOptions.Telemetry.TraceParent, out string? traceParent); - _trace = new ActivityTrace(this.AssemblyName, this.AssemblyVersion, traceParent); + _trace = new ActivityTrace(AssemblyName, AssemblyVersion, traceParent, GetActivitySourceTags(properties)); } string? IActivityTracer.TraceParent => _trace.TraceParent; @@ -38,6 +38,13 @@ protected TracingConnection(IReadOnlyDictionary properties) public abstract string AssemblyName { get; } + public string ActivitySourceName => _trace.ActivitySourceName; + + public virtual IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + return null; + } + protected void SetTraceParent(string? traceParent) { _trace.TraceParent = traceParent; diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs b/csharp/src/Drivers/Databricks/DatabricksConnection.cs index 37263183cd..b5c11588fa 100644 --- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs +++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs @@ -76,6 +76,15 @@ public DatabricksConnection(IReadOnlyDictionary properties) : ba ValidateProperties(); } + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + IEnumerable>? tags = base.GetActivitySourceTags(properties); + // TODO: Add any additional tags specific to Databricks connection + //tags ??= []; + //tags.Concat([new("key", "value")]); + return tags; + } + protected override TCLIService.IAsync CreateTCLIServiceClient(TProtocol protocol) { return new ThreadSafeClient(new TCLIService.Client(protocol)); diff --git a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs index beee438ebc..a61582f069 100644 --- a/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs +++ b/csharp/test/Apache.Arrow.Adbc.Tests/Tracing/TracingTests.cs @@ -20,6 +20,7 @@ using System.Diagnostics; using System.Linq; using Apache.Arrow.Adbc.Tracing; +using Apache.Arrow.Ipc; using OpenTelemetry; using OpenTelemetry.Trace; using Xunit; @@ -29,6 +30,10 @@ namespace Apache.Arrow.Adbc.Tests.Tracing { public class TracingTests(ITestOutputHelper? outputHelper) : IDisposable { + private const string SourceTagName = "sourceTagName"; + private const string SourceTagValue = "sourceTagValue"; + private const string TraceParent = "00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01"; + private readonly ITestOutputHelper? _outputHelper = outputHelper; private bool _disposed; @@ -143,12 +148,10 @@ internal void CanAddTraceParent() testClass.MethodWithActivity(eventNameWithoutParent); Assert.True(exportedActivities.Count() > 0); - const string traceParent = "00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01"; - const int withParentCountExpected = 10; for (int i = 0; i < withParentCountExpected; i++) { - testClass.MethodWithActivity(eventNameWithParent, traceParent); + testClass.MethodWithActivity(eventNameWithParent, TraceParent); } testClass.MethodWithActivity(eventNameWithoutParent); @@ -169,13 +172,61 @@ internal void CanAddTraceParent() else if (exportedActivity.OperationName.Contains(eventNameWithParent)) { withParentCount++; - Assert.Equal(traceParent, exportedActivity.ParentId); + Assert.Equal(TraceParent, exportedActivity.ParentId); } } Assert.Equal(2, withoutParentCount); Assert.Equal(withParentCountExpected, withParentCount); } + [Fact] + internal void CanListenAndFilterActivitySourceTagsUsingActivityTrace() + { + string activitySourceName = NewName(); + Queue exportedActivities = new(); + using (ActivityListener activityListener = new() + { + ShouldListenTo = source => + { + return source.Name == activitySourceName + && source.Tags?.Any(t => t.Key == SourceTagName && t.Value?.Equals(SourceTagValue) == true) == true; + }, + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => exportedActivities.Enqueue(activity) + }) + { + ActivitySource.AddActivityListener(activityListener); + + var testClass = new TraceProducer(activitySourceName); + testClass.MethodWithActivity(); + } + Assert.Single(exportedActivities); + } + + [Fact] + internal void CanListenAndFilterActivitySourceTagsUsingTracingConnection() + { + string activitySourceName = NewName(); + Queue exportedActivities = new(); + var testClass = new MyTracingConnection(new Dictionary(), activitySourceName); + using (ActivityListener activityListener = new() + { + ShouldListenTo = source => + { + return source.Name == testClass.ActivitySourceName + && source.Tags?.Any(t => t.Key == SourceTagName && t.Value?.Equals(SourceTagValue) == true) == true; + }, + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + ActivityStopped = activity => exportedActivities.Enqueue(activity) + }) + { + ActivitySource.AddActivityListener(activityListener); + + testClass.MethodWithActivity(); + } + Assert.Single(exportedActivities); + } + internal static string NewName() => Guid.NewGuid().ToString().Replace("-", "").ToLower(); protected virtual void Dispose(bool disposing) @@ -203,7 +254,8 @@ private class TraceProducer : IDisposable internal TraceProducer(string? activitySourceName = default, string? traceParent = default) { - _trace = new ActivityTrace(activitySourceName, traceParent: traceParent); + IEnumerable>? tags = [new(SourceTagName, SourceTagValue)]; + _trace = new ActivityTrace(activitySourceName, traceParent: traceParent, tags: tags); } internal void MethodWithNoInstrumentation() @@ -273,6 +325,33 @@ public void Dispose() } } + private class MyTracingConnection(IReadOnlyDictionary properties, string assemblyName) : TracingConnection(properties) + { + public override string AssemblyVersion => "1.0.0"; + public override string AssemblyName { get; } = assemblyName; + + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + return [new KeyValuePair(SourceTagName, SourceTagValue)]; + } + + public void MethodWithActivity() + { + this.TraceActivity(activity => + { + activity?.AddTag("exampleTag", "exampleValue") + .AddBaggage("exampleBaggage", "exampleBaggageValue") + .AddEvent("exampleEvent", [new KeyValuePair("eventTag", "eventValue")]) + .AddLink(TraceParent, [new KeyValuePair("linkTag", "linkValue")]); + }); + } + + public override AdbcStatement CreateStatement() => throw new NotImplementedException(); + public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList? tableTypes, string? columnNamePattern) => throw new NotImplementedException(); + public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName) => throw new NotImplementedException(); + public override IArrowArrayStream GetTableTypes() => throw new NotImplementedException(); + } + internal class ActivityQueueExporter(Queue exportedActivities) : BaseExporter { private Queue ExportedActivities { get; } = exportedActivities;