2020using System . Diagnostics ;
2121using System . Linq ;
2222using Apache . Arrow . Adbc . Tracing ;
23+ using Apache . Arrow . Ipc ;
2324using OpenTelemetry ;
2425using OpenTelemetry . Trace ;
2526using Xunit ;
@@ -29,6 +30,10 @@ namespace Apache.Arrow.Adbc.Tests.Tracing
2930{
3031 public class TracingTests ( ITestOutputHelper ? outputHelper ) : IDisposable
3132 {
33+ private const string SourceTagName = "sourceTagName" ;
34+ private const string SourceTagValue = "sourceTagValue" ;
35+ private const string TraceParent = "00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01" ;
36+
3237 private readonly ITestOutputHelper ? _outputHelper = outputHelper ;
3338 private bool _disposed ;
3439
@@ -143,12 +148,10 @@ internal void CanAddTraceParent()
143148 testClass . MethodWithActivity ( eventNameWithoutParent ) ;
144149 Assert . True ( exportedActivities . Count ( ) > 0 ) ;
145150
146- const string traceParent = "00-3236da27af79882bd317c4d1c3776982-a3cc9bd52ccd58e6-01" ;
147-
148151 const int withParentCountExpected = 10 ;
149152 for ( int i = 0 ; i < withParentCountExpected ; i ++ )
150153 {
151- testClass . MethodWithActivity ( eventNameWithParent , traceParent ) ;
154+ testClass . MethodWithActivity ( eventNameWithParent , TraceParent ) ;
152155 }
153156
154157 testClass . MethodWithActivity ( eventNameWithoutParent ) ;
@@ -169,13 +172,61 @@ internal void CanAddTraceParent()
169172 else if ( exportedActivity . OperationName . Contains ( eventNameWithParent ) )
170173 {
171174 withParentCount ++ ;
172- Assert . Equal ( traceParent , exportedActivity . ParentId ) ;
175+ Assert . Equal ( TraceParent , exportedActivity . ParentId ) ;
173176 }
174177 }
175178 Assert . Equal ( 2 , withoutParentCount ) ;
176179 Assert . Equal ( withParentCountExpected , withParentCount ) ;
177180 }
178181
182+ [ Fact ]
183+ internal void CanListenAndFilterActivitySourceTagsUsingActivityTrace ( )
184+ {
185+ string activitySourceName = NewName ( ) ;
186+ Queue < Activity > exportedActivities = new ( ) ;
187+ using ( ActivityListener activityListener = new ( )
188+ {
189+ ShouldListenTo = source =>
190+ {
191+ return source . Name == activitySourceName
192+ && source . Tags ? . Any ( t => t . Key == SourceTagName && t . Value ? . Equals ( SourceTagValue ) == true ) == true ;
193+ } ,
194+ Sample = ( ref ActivityCreationOptions < ActivityContext > options ) => ActivitySamplingResult . AllDataAndRecorded ,
195+ ActivityStopped = activity => exportedActivities . Enqueue ( activity )
196+ } )
197+ {
198+ ActivitySource . AddActivityListener ( activityListener ) ;
199+
200+ var testClass = new TraceProducer ( activitySourceName ) ;
201+ testClass . MethodWithActivity ( ) ;
202+ }
203+ Assert . Single ( exportedActivities ) ;
204+ }
205+
206+ [ Fact ]
207+ internal void CanListenAndFilterActivitySourceTagsUsingTracingConnection ( )
208+ {
209+ string activitySourceName = NewName ( ) ;
210+ Queue < Activity > exportedActivities = new ( ) ;
211+ var testClass = new MyTracingConnection ( new Dictionary < string , string > ( ) , activitySourceName ) ;
212+ using ( ActivityListener activityListener = new ( )
213+ {
214+ ShouldListenTo = source =>
215+ {
216+ return source . Name == testClass . ActivitySourceName
217+ && source . Tags ? . Any ( t => t . Key == SourceTagName && t . Value ? . Equals ( SourceTagValue ) == true ) == true ;
218+ } ,
219+ Sample = ( ref ActivityCreationOptions < ActivityContext > options ) => ActivitySamplingResult . AllDataAndRecorded ,
220+ ActivityStopped = activity => exportedActivities . Enqueue ( activity )
221+ } )
222+ {
223+ ActivitySource . AddActivityListener ( activityListener ) ;
224+
225+ testClass . MethodWithActivity ( ) ;
226+ }
227+ Assert . Single ( exportedActivities ) ;
228+ }
229+
179230 internal static string NewName ( ) => Guid . NewGuid ( ) . ToString ( ) . Replace ( "-" , "" ) . ToLower ( ) ;
180231
181232 protected virtual void Dispose ( bool disposing )
@@ -203,7 +254,8 @@ private class TraceProducer : IDisposable
203254
204255 internal TraceProducer ( string ? activitySourceName = default , string ? traceParent = default )
205256 {
206- _trace = new ActivityTrace ( activitySourceName , traceParent : traceParent ) ;
257+ IEnumerable < KeyValuePair < string , object ? > > ? tags = [ new ( SourceTagName , SourceTagValue ) ] ;
258+ _trace = new ActivityTrace ( activitySourceName , traceParent : traceParent , tags : tags ) ;
207259 }
208260
209261 internal void MethodWithNoInstrumentation ( )
@@ -273,6 +325,33 @@ public void Dispose()
273325 }
274326 }
275327
328+ private class MyTracingConnection ( IReadOnlyDictionary < string , string > properties , string assemblyName ) : TracingConnection ( properties )
329+ {
330+ public override string AssemblyVersion => "1.0.0" ;
331+ public override string AssemblyName { get ; } = assemblyName ;
332+
333+ public override IEnumerable < KeyValuePair < string , object ? > > ? GetActivitySourceTags ( IReadOnlyDictionary < string , string > properties )
334+ {
335+ return [ new KeyValuePair < string , object ? > ( SourceTagName , SourceTagValue ) ] ;
336+ }
337+
338+ public void MethodWithActivity ( )
339+ {
340+ this . TraceActivity ( activity =>
341+ {
342+ activity ? . AddTag ( "exampleTag" , "exampleValue" )
343+ . AddBaggage ( "exampleBaggage" , "exampleBaggageValue" )
344+ . AddEvent ( "exampleEvent" , [ new KeyValuePair < string , object ? > ( "eventTag" , "eventValue" ) ] )
345+ . AddLink ( TraceParent , [ new KeyValuePair < string , object ? > ( "linkTag" , "linkValue" ) ] ) ;
346+ } ) ;
347+ }
348+
349+ public override AdbcStatement CreateStatement ( ) => throw new NotImplementedException ( ) ;
350+ public override IArrowArrayStream GetObjects ( GetObjectsDepth depth , string ? catalogPattern , string ? dbSchemaPattern , string ? tableNamePattern , IReadOnlyList < string > ? tableTypes , string ? columnNamePattern ) => throw new NotImplementedException ( ) ;
351+ public override Schema GetTableSchema ( string ? catalog , string ? dbSchema , string tableName ) => throw new NotImplementedException ( ) ;
352+ public override IArrowArrayStream GetTableTypes ( ) => throw new NotImplementedException ( ) ;
353+ }
354+
276355 internal class ActivityQueueExporter ( Queue < Activity > exportedActivities ) : BaseExporter < Activity >
277356 {
278357 private Queue < Activity > ExportedActivities { get ; } = exportedActivities ;
0 commit comments