3838import  io .micrometer .observation .Observation ;
3939import  io .micrometer .observation .ObservationHandler ;
4040import  io .micrometer .observation .ObservationRegistry ;
41- import  io .micrometer .observation .tck .TestObservationRegistry ;
4241import  io .micrometer .tracing .Span ;
4342import  io .micrometer .tracing .TraceContext ;
4443import  io .micrometer .tracing .Tracer ;
4544import  io .micrometer .tracing .handler .DefaultTracingObservationHandler ;
4645import  io .micrometer .tracing .handler .PropagatingReceiverTracingObservationHandler ;
4746import  io .micrometer .tracing .handler .PropagatingSenderTracingObservationHandler ;
47+ import  io .micrometer .tracing .handler .TracingAwareMeterObservationHandler ;
4848import  io .micrometer .tracing .propagation .Propagator ;
4949import  io .micrometer .tracing .test .simple .SimpleSpan ;
50+ import  io .micrometer .tracing .test .simple .SimpleTraceContext ;
5051import  io .micrometer .tracing .test .simple .SimpleTracer ;
5152import  org .apache .kafka .clients .admin .AdminClientConfig ;
5253import  org .apache .kafka .clients .consumer .Consumer ;
7071import  org .springframework .context .annotation .Configuration ;
7172import  org .springframework .context .annotation .Primary ;
7273import  org .springframework .kafka .KafkaException ;
74+ import  org .springframework .kafka .annotation .DltHandler ;
7375import  org .springframework .kafka .annotation .EnableKafka ;
7476import  org .springframework .kafka .annotation .KafkaListener ;
77+ import  org .springframework .kafka .annotation .RetryableTopic ;
7578import  org .springframework .kafka .config .ConcurrentKafkaListenerContainerFactory ;
7679import  org .springframework .kafka .config .KafkaListenerEndpointRegistry ;
7780import  org .springframework .kafka .core .ConsumerFactory ;
8083import  org .springframework .kafka .core .KafkaAdmin ;
8184import  org .springframework .kafka .core .KafkaTemplate ;
8285import  org .springframework .kafka .core .ProducerFactory ;
86+ import  org .springframework .kafka .listener .ContainerProperties ;
8387import  org .springframework .kafka .listener .MessageListenerContainer ;
8488import  org .springframework .kafka .listener .RecordInterceptor ;
8589import  org .springframework .kafka .requestreply .ReplyingKafkaTemplate ;
9094import  org .springframework .kafka .test .context .EmbeddedKafka ;
9195import  org .springframework .kafka .test .utils .KafkaTestUtils ;
9296import  org .springframework .messaging .handler .annotation .SendTo ;
97+ import  org .springframework .retry .annotation .Backoff ;
98+ import  org .springframework .scheduling .TaskScheduler ;
99+ import  org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
93100import  org .springframework .test .annotation .DirtiesContext ;
94101import  org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
95102import  org .springframework .util .StringUtils ;
113120@ EmbeddedKafka (topics  = {ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
114121		ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_TEST_4 , ObservationTests .OBSERVATION_REPLY ,
115122		ObservationTests .OBSERVATION_RUNTIME_EXCEPTION , ObservationTests .OBSERVATION_ERROR ,
116- 		ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions  = 1 )
123+ 		ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE , ObservationTests .OBSERVATION_ASYNC_FAILURE_TEST ,
124+ 		ObservationTests .OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST }, partitions  = 1 )
117125@ DirtiesContext 
118126public  class  ObservationTests  {
119127
@@ -137,6 +145,55 @@ public class ObservationTests {
137145
138146	public  final  static  String  OBSERVATION_TRACEPARENT_DUPLICATE  = "observation.traceparent.duplicate" ;
139147
148+ 	public  final  static  String  OBSERVATION_ASYNC_FAILURE_TEST  = "observation.async.failure.test" ;
149+ 
150+ 	public  final  static  String  OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST  = "observation.async.failure.retry.test" ;
151+ 
152+ 	@ Test 
153+ 	void  asyncRetryScopePropagation (@ Autowired  AsyncFailureListener  asyncFailureListener ,
154+ 			@ Autowired  KafkaTemplate <Integer , String > template ,
155+ 			@ Autowired  SimpleTracer  tracer ,
156+ 			@ Autowired  ObservationRegistry  observationRegistry ) throws  InterruptedException  {
157+ 
158+ 		// Clear any previous spans 
159+ 		tracer .getSpans ().clear ();
160+ 
161+ 		// Create an observation scope to ensure we have a proper trace context 
162+ 		var  testObservation  = Observation .createNotStarted ("test.message.send" , observationRegistry );
163+ 
164+ 		// Send a message within the observation scope to ensure trace context is propagated 
165+ 		testObservation .observe (() -> {
166+ 			try  {
167+ 				template .send (OBSERVATION_ASYNC_FAILURE_TEST , "trigger-async-failure" ).get (5 , TimeUnit .SECONDS );
168+ 			}
169+ 			catch  (Exception  e ) {
170+ 				throw  new  RuntimeException ("Failed to send message" , e );
171+ 			}
172+ 		});
173+ 
174+ 		// Wait for the listener to process the message (initial + retry + DLT = 3 invocations) 
175+ 		assertThat (asyncFailureListener .asyncFailureLatch .await (100000 , TimeUnit .SECONDS )).isTrue ();
176+ 
177+ 		// Verify that the captured spans from the listener contexts are all part of the same trace 
178+ 		// This demonstrates that the tracing context propagates correctly through the retry mechanism 
179+ 		Deque <SimpleSpan > spans  = tracer .getSpans ();
180+ 		assertThat (spans ).hasSizeGreaterThanOrEqualTo (4 ); // template + listener + retry + DLT spans 
181+ 
182+ 		// Verify that spans were captured for each phase and belong to the same trace 
183+ 		assertThat (asyncFailureListener .capturedSpanInListener ).isNotNull ();
184+ 		assertThat (asyncFailureListener .capturedSpanInRetry ).isNotNull ();
185+ 		assertThat (asyncFailureListener .capturedSpanInDlt ).isNotNull ();
186+ 
187+ 		// All spans should have the same trace ID, demonstrating trace continuity 
188+ 		var  originalTraceId  = asyncFailureListener .capturedSpanInListener .getTraceId ();
189+ 		assertThat (originalTraceId ).isNotBlank ();
190+ 		assertThat (asyncFailureListener .capturedSpanInRetry .getTraceId ()).isEqualTo (originalTraceId );
191+ 		assertThat (asyncFailureListener .capturedSpanInDlt .getTraceId ()).isEqualTo (originalTraceId );
192+ 
193+ 		// Clear any previous spans 
194+ 		tracer .getSpans ().clear ();
195+ 	}
196+ 
140197	@ Test 
141198	void  endToEnd (@ Autowired  Listener  listener , @ Autowired  KafkaTemplate <Integer , String > template ,
142199			@ Autowired  SimpleTracer  tracer , @ Autowired  KafkaListenerEndpointRegistry  rler ,
@@ -628,6 +685,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628685				if  (container .getListenerId ().equals ("obs3" )) {
629686					container .setKafkaAdmin (this .mockAdmin );
630687				}
688+ 				if  (container .getListenerId ().contains ("asyncFailure" )) {
689+ 					// Enable async acks to trigger async failure handling 
690+ 					container .getContainerProperties ().setAsyncAcks (true );
691+ 					container .getContainerProperties ().setAckMode (ContainerProperties .AckMode .MANUAL );
692+ 				}
631693				if  (container .getListenerId ().equals ("obs4" )) {
632694					container .setRecordInterceptor (new  RecordInterceptor <>() {
633695
@@ -662,17 +724,17 @@ MeterRegistry meterRegistry() {
662724
663725		@ Bean 
664726		ObservationRegistry  observationRegistry (Tracer  tracer , Propagator  propagator , MeterRegistry  meterRegistry ) {
665- 			TestObservationRegistry  observationRegistry  = TestObservationRegistry .create ();
727+ 			var  observationRegistry  = ObservationRegistry .create ();
666728			observationRegistry .observationConfig ().observationHandler (
667729							// Composite will pick the first matching handler 
668730							new  ObservationHandler .FirstMatchingCompositeObservationHandler (
669- 									// This is responsible for creating a child span on the sender side 
670- 									new  PropagatingSenderTracingObservationHandler <>(tracer , propagator ),
671731									// This is responsible for creating a span on the receiver side 
672732									new  PropagatingReceiverTracingObservationHandler <>(tracer , propagator ),
733+ 									// This is responsible for creating a child span on the sender side 
734+ 									new  PropagatingSenderTracingObservationHandler <>(tracer , propagator ),
673735									// This is responsible for creating a default span 
674736									new  DefaultTracingObservationHandler (tracer )))
675- 					.observationHandler (new  DefaultMeterObservationHandler (meterRegistry ));
737+ 					.observationHandler (new  TracingAwareMeterObservationHandler <>( new   DefaultMeterObservationHandler (meterRegistry ),  tracer ));
676738			return  observationRegistry ;
677739		}
678740
@@ -683,29 +745,41 @@ Propagator propagator(Tracer tracer) {
683745				// List of headers required for tracing propagation 
684746				@ Override 
685747				public  List <String > fields () {
686- 					return  Arrays .asList ("foo" , "bar" );
748+ 					return  Arrays .asList ("traceId"  ,  "spanId" ,  " foo" , "bar" );
687749				}
688750
689751				// This is called on the producer side when the message is being sent 
690- 				// Normally we would pass information from tracing context - for tests we don't need to 
691752				@ Override 
692753				public  <C > void  inject (TraceContext  context , @ Nullable  C  carrier , Setter <C > setter ) {
693754					setter .set (carrier , "foo" , "some foo value" );
694755					setter .set (carrier , "bar" , "some bar value" );
695756
757+ 					setter .set (carrier , "traceId" , context .traceId ());
758+ 					setter .set (carrier , "spanId" , context .spanId ());
759+ 
696760					// Add a traceparent header to simulate W3C trace context 
697761					setter .set (carrier , "traceparent" , "traceparent-from-propagator" );
698762				}
699763
700764				// This is called on the consumer side when the message is consumed 
701- 				// Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span 
702765				@ Override 
703766				public  <C > Span .Builder  extract (C  carrier , Getter <C > getter ) {
704767					String  foo  = getter .get (carrier , "foo" );
705768					String  bar  = getter .get (carrier , "bar" );
706- 					return  tracer .spanBuilder ()
769+ 
770+ 					var  traceId  = getter .get (carrier , "traceId" );
771+ 					var  spanId  = getter .get (carrier , "spanId" );
772+ 
773+ 					Span .Builder  spanBuilder  = tracer .spanBuilder ()
707774							.tag ("foo" , foo )
708775							.tag ("bar" , bar );
776+ 
777+ 					var  traceContext  = new  SimpleTraceContext ();
778+ 					traceContext .setTraceId (traceId );
779+ 					traceContext .setSpanId (spanId );
780+ 					spanBuilder  = spanBuilder .setParent (traceContext );
781+ 
782+ 					return  spanBuilder ;
709783				}
710784			};
711785		}
@@ -720,6 +794,15 @@ ExceptionListener exceptionListener() {
720794			return  new  ExceptionListener ();
721795		}
722796
797+ 		@ Bean 
798+ 		AsyncFailureListener  asyncFailureListener (SimpleTracer  tracer ) {
799+ 			return  new  AsyncFailureListener (tracer );
800+ 		}
801+ 
802+ 		@ Bean 
803+ 		public  TaskScheduler  taskExecutor () {
804+ 			return  new  ThreadPoolTaskScheduler ();
805+ 		}
723806	}
724807
725808	public  static  class  Listener  {
@@ -801,4 +884,54 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801884
802885	}
803886
887+ 	public  static  class  AsyncFailureListener  {
888+ 
889+ 		final  CountDownLatch  asyncFailureLatch  = new  CountDownLatch (3 );
890+ 
891+ 		volatile  @ Nullable  SimpleSpan  capturedSpanInListener ;
892+ 
893+ 		volatile  @ Nullable  SimpleSpan  capturedSpanInRetry ;
894+ 
895+ 		volatile  @ Nullable  SimpleSpan  capturedSpanInDlt ;
896+ 
897+ 		private  final  SimpleTracer  tracer ;
898+ 
899+ 		public  AsyncFailureListener (SimpleTracer  tracer ) {
900+ 			this .tracer  = tracer ;
901+ 		}
902+ 
903+ 		@ RetryableTopic (
904+ 				attempts  = "2" ,
905+ 				backoff  = @ Backoff (delay  = 1000 )
906+ 		)
907+ 		@ KafkaListener (id  = "asyncFailure" , topics  = OBSERVATION_ASYNC_FAILURE_TEST )
908+ 		CompletableFuture <Void > handleAsync (ConsumerRecord <Integer , String > record ) {
909+ 
910+ 			// Use topic name to distinguish between original and retry calls 
911+ 			String  topicName  = record .topic ();
912+ 
913+ 			if  (topicName .equals (OBSERVATION_ASYNC_FAILURE_TEST )) {
914+ 				// This is the original call 
915+ 				this .capturedSpanInListener  = this .tracer .currentSpan ();
916+ 			}
917+ 			else  {
918+ 				// This is a retry call (topic name will be different for retry topics) 
919+ 				this .capturedSpanInRetry  = this .tracer .currentSpan ();
920+ 			}
921+ 
922+ 			this .asyncFailureLatch .countDown ();
923+ 
924+ 			// Return a failed CompletableFuture to trigger async failure handling 
925+ 			return  CompletableFuture .supplyAsync (() -> {
926+ 				throw  new  RuntimeException ("Async failure for observation test" );
927+ 			});
928+ 		}
929+ 
930+ 		@ DltHandler 
931+ 		void  handleDlt (ConsumerRecord <Integer , String > record , Exception  exception ) {
932+ 			this .capturedSpanInDlt  = this .tracer .currentSpan ();
933+ 			this .asyncFailureLatch .countDown ();
934+ 		}
935+ 	}
936+ 
804937}
0 commit comments