11/*
2- * Copyright 2022-2024 the original author or authors.
2+ * Copyright 2022-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1818
1919import java .nio .charset .StandardCharsets ;
2020import java .time .Duration ;
21+ import java .util .ArrayList ;
2122import java .util .Arrays ;
2223import java .util .Deque ;
2324import java .util .List ;
2728import java .util .concurrent .ExecutionException ;
2829import java .util .concurrent .TimeUnit ;
2930import java .util .concurrent .TimeoutException ;
31+ import java .util .concurrent .atomic .AtomicInteger ;
3032import java .util .concurrent .atomic .AtomicReference ;
3133import java .util .stream .StreamSupport ;
3234
4749import io .micrometer .tracing .handler .PropagatingSenderTracingObservationHandler ;
4850import io .micrometer .tracing .propagation .Propagator ;
4951import io .micrometer .tracing .test .simple .SimpleSpan ;
52+ import io .micrometer .tracing .test .simple .SimpleTraceContext ;
5053import io .micrometer .tracing .test .simple .SimpleTracer ;
54+ import io .opentelemetry .api .trace .SpanContext ;
55+ import io .opentelemetry .context .Context ;
5156import org .apache .kafka .clients .admin .AdminClientConfig ;
5257import org .apache .kafka .clients .consumer .Consumer ;
5358import org .apache .kafka .clients .consumer .ConsumerConfig ;
6166import org .apache .kafka .common .header .Headers ;
6267import org .apache .kafka .common .header .internals .RecordHeader ;
6368import org .junit .jupiter .api .Test ;
69+ import org .springframework .core .task .TaskExecutor ;
70+ import org .springframework .kafka .annotation .DltHandler ;
71+ import org .springframework .kafka .annotation .EnableKafkaRetryTopic ;
72+ import org .springframework .kafka .annotation .RetryableTopic ;
73+ import org .springframework .kafka .listener .ContainerProperties ;
74+ import org .springframework .retry .annotation .Backoff ;
75+ import org .springframework .scheduling .TaskScheduler ;
76+ import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
77+ import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
6478import reactor .core .publisher .Mono ;
6579
6680import org .springframework .beans .factory .annotation .Autowired ;
7185import org .springframework .kafka .KafkaException ;
7286import org .springframework .kafka .annotation .EnableKafka ;
7387import org .springframework .kafka .annotation .KafkaListener ;
88+
89+ import org .springframework .kafka .listener .DefaultErrorHandler ;
90+ import org .springframework .kafka .listener .MessageListenerContainer ;
7491import org .springframework .kafka .config .ConcurrentKafkaListenerContainerFactory ;
7592import org .springframework .kafka .config .KafkaListenerEndpointRegistry ;
7693import org .springframework .kafka .core .ConsumerFactory ;
8198import org .springframework .kafka .core .ProducerFactory ;
8299import org .springframework .kafka .listener .MessageListenerContainer ;
83100import org .springframework .kafka .listener .RecordInterceptor ;
101+
84102import org .springframework .kafka .requestreply .ReplyingKafkaTemplate ;
85103import org .springframework .kafka .support .ProducerListener ;
86104import org .springframework .kafka .support .micrometer .KafkaListenerObservation .DefaultKafkaListenerObservationConvention ;
93111import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
94112import org .springframework .util .StringUtils ;
95113
114+ import static java .util .concurrent .CompletableFuture .runAsync ;
115+ import static java .util .concurrent .CompletableFuture .supplyAsync ;
96116import static org .assertj .core .api .Assertions .assertThat ;
97117import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
98118import static org .awaitility .Awaitility .await ;
112132@ EmbeddedKafka (topics = {ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
113133 ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_TEST_4 , ObservationTests .OBSERVATION_REPLY ,
114134 ObservationTests .OBSERVATION_RUNTIME_EXCEPTION , ObservationTests .OBSERVATION_ERROR ,
115- ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
135+ ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE , ObservationTests .OBSERVATION_ASYNC_FAILURE_TEST ,
136+ ObservationTests .OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST }, partitions = 1 )
116137@ DirtiesContext
117138public class ObservationTests {
118139
@@ -136,6 +157,51 @@ public class ObservationTests {
136157
137158 public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate" ;
138159
160+ public final static String OBSERVATION_ASYNC_FAILURE_TEST = "observation.async.failure.test" ;
161+
162+ public final static String OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST = "observation.async.failure.retry.test" ;
163+
164+ @ Test
165+ void asyncRetryScopePropagation (@ Autowired AsyncFailureListener asyncFailureListener ,
166+ @ Autowired KafkaTemplate <Integer , String > template ,
167+ @ Autowired SimpleTracer tracer ,
168+ @ Autowired ObservationRegistry observationRegistry ) throws InterruptedException {
169+
170+ // Clear any previous spans
171+ tracer .getSpans ().clear ();
172+
173+ // Create an observation scope to ensure we have a proper trace context
174+ var testObservation = Observation .createNotStarted ("test.message.send" , observationRegistry );
175+
176+ // Send a message within the observation scope to ensure trace context is propagated
177+ testObservation .observe (() -> {
178+ try {
179+ template .send (OBSERVATION_ASYNC_FAILURE_TEST , "trigger-async-failure" ).get (5 , TimeUnit .SECONDS );
180+ } catch (Exception e ) {
181+ throw new RuntimeException ("Failed to send message" , e );
182+ }
183+ });
184+
185+ // Wait for the listener to process the message (initial + retry + DLT = 3 invocations)
186+ assertThat (asyncFailureListener .asyncFailureLatch .await (15 , TimeUnit .SECONDS )).isTrue ();
187+
188+ // Verify that the captured spans from the listener contexts are all part of the same trace
189+ // This demonstrates that the tracing context propagates correctly through the retry mechanism
190+ Deque <SimpleSpan > spans = tracer .getSpans ();
191+ assertThat (spans ).hasSizeGreaterThanOrEqualTo (4 ); // template + listener + retry + DLT spans
192+
193+ // Verify that spans were captured for each phase and belong to the same trace
194+ assertThat (asyncFailureListener .capturedSpanInListener ).isNotNull ();
195+ assertThat (asyncFailureListener .capturedSpanInRetry ).isNotNull ();
196+ assertThat (asyncFailureListener .capturedSpanInDlt ).isNotNull ();
197+
198+ // All spans should have the same trace ID, demonstrating trace continuity
199+ var originalTraceId = asyncFailureListener .capturedSpanInListener .getTraceId ();
200+ assertThat (originalTraceId ).isNotBlank ();
201+ assertThat (asyncFailureListener .capturedSpanInRetry .getTraceId ()).isEqualTo (originalTraceId );
202+ assertThat (asyncFailureListener .capturedSpanInDlt .getTraceId ()).isEqualTo (originalTraceId );
203+ }
204+
139205 @ Test
140206 void endToEnd (@ Autowired Listener listener , @ Autowired KafkaTemplate <Integer , String > template ,
141207 @ Autowired SimpleTracer tracer , @ Autowired KafkaListenerEndpointRegistry rler ,
@@ -628,6 +694,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628694 if (container .getListenerId ().equals ("obs3" )) {
629695 container .setKafkaAdmin (this .mockAdmin );
630696 }
697+ if (container .getListenerId ().contains ("asyncFailure" )) {
698+ // Enable async acks to trigger async failure handling
699+ container .getContainerProperties ().setAsyncAcks (true );
700+ container .getContainerProperties ().setAckMode (ContainerProperties .AckMode .MANUAL );
701+ }
631702 if (container .getListenerId ().equals ("obs4" )) {
632703 container .setRecordInterceptor (new RecordInterceptor <>() {
633704
@@ -683,29 +754,45 @@ Propagator propagator(Tracer tracer) {
683754 // List of headers required for tracing propagation
684755 @ Override
685756 public List <String > fields () {
686- return Arrays .asList ("foo" , "bar" );
757+ return Arrays .asList ("traceId" , " foo" , "bar" );
687758 }
688759
689760 // This is called on the producer side when the message is being sent
690- // Normally we would pass information from tracing context - for tests we don't need to
691761 @ Override
692762 public <C > void inject (TraceContext context , C carrier , Setter <C > setter ) {
693763 setter .set (carrier , "foo" , "some foo value" );
694764 setter .set (carrier , "bar" , "some bar value" );
695765
766+ if (context .traceId () != "" ) {
767+ setter .set (carrier , "traceId" , context .traceId ());
768+ setter .set (carrier , "spanId" , context .spanId ());
769+ }
770+
696771 // Add a traceparent header to simulate W3C trace context
697772 setter .set (carrier , "traceparent" , "traceparent-from-propagator" );
698773 }
699774
700775 // This is called on the consumer side when the message is consumed
701- // Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span
702776 @ Override
703777 public <C > Span .Builder extract (C carrier , Getter <C > getter ) {
704778 String foo = getter .get (carrier , "foo" );
705779 String bar = getter .get (carrier , "bar" );
706- return tracer .spanBuilder ()
780+
781+ var traceId = getter .get (carrier , "traceId" );
782+ var spanId = getter .get (carrier , "spanId" );
783+
784+ Span .Builder spanBuilder = tracer .spanBuilder ()
707785 .tag ("foo" , foo )
708786 .tag ("bar" , bar );
787+ // If we have trace context from headers, tag it for verification
788+ if (traceId != null ) {
789+ var traceContext = new SimpleTraceContext ();
790+ traceContext .setTraceId (traceId );
791+ traceContext .setSpanId (spanId );
792+ spanBuilder = spanBuilder .setParent (traceContext );
793+ }
794+
795+ return spanBuilder ;
709796 }
710797 };
711798 }
@@ -720,6 +807,15 @@ ExceptionListener exceptionListener() {
720807 return new ExceptionListener ();
721808 }
722809
810+ @ Bean
811+ AsyncFailureListener asyncFailureListener (SimpleTracer tracer ) {
812+ return new AsyncFailureListener (tracer );
813+ }
814+
815+ @ Bean
816+ public TaskScheduler taskExecutor () {
817+ return new ThreadPoolTaskScheduler ();
818+ }
723819 }
724820
725821 public static class Listener {
@@ -801,4 +897,52 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801897
802898 }
803899
900+ public static class AsyncFailureListener {
901+
902+ final CountDownLatch asyncFailureLatch = new CountDownLatch (3 );
903+
904+ volatile SimpleSpan capturedSpanInListener ;
905+ volatile SimpleSpan capturedSpanInRetry ;
906+ volatile SimpleSpan capturedSpanInDlt ;
907+
908+ private final SimpleTracer tracer ;
909+
910+ public AsyncFailureListener (SimpleTracer tracer ) {
911+ this .tracer = tracer ;
912+ }
913+
914+ @ RetryableTopic (
915+ attempts = "2" ,
916+ backoff = @ Backoff (delay = 1000 )
917+ )
918+ @ KafkaListener (id = "asyncFailure" , topics = OBSERVATION_ASYNC_FAILURE_TEST )
919+ CompletableFuture <Void > handleAsync (ConsumerRecord <Integer , String > record ) {
920+ // Use topic name to distinguish between original and retry calls
921+ String topicName = record .topic ();
922+
923+ if (topicName .equals (OBSERVATION_ASYNC_FAILURE_TEST )) {
924+ // This is the original call
925+ this .capturedSpanInListener = this .tracer .currentSpan ();
926+ } else {
927+ // This is a retry call (topic name will be different for retry topics)
928+ this .capturedSpanInRetry = this .tracer .currentSpan ();
929+ }
930+
931+ this .asyncFailureLatch .countDown ();
932+
933+ // Return a failed CompletableFuture to trigger async failure handling
934+ return supplyAsync (() -> {
935+ throw new RuntimeException ("Async failure for observation test" );
936+ });
937+ }
938+
939+ @ DltHandler
940+ void handleDlt (ConsumerRecord <Integer , String > record , Exception exception ) {
941+ this .capturedSpanInDlt = this .tracer .currentSpan ();
942+ this .asyncFailureLatch .countDown ();
943+ }
944+ }
945+
946+
947+
804948}
0 commit comments