1717package org .springframework .kafka .support .micrometer ;
1818
1919import java .nio .charset .StandardCharsets ;
20+ import java .time .Duration ;
2021import java .util .Arrays ;
2122import java .util .Deque ;
2223import java .util .List ;
7677import org .springframework .kafka .core .KafkaTemplate ;
7778import org .springframework .kafka .core .ProducerFactory ;
7879import org .springframework .kafka .listener .MessageListenerContainer ;
80+ import org .springframework .kafka .requestreply .ReplyingKafkaTemplate ;
7981import org .springframework .kafka .support .ProducerListener ;
8082import org .springframework .kafka .support .micrometer .KafkaListenerObservation .DefaultKafkaListenerObservationConvention ;
8183import org .springframework .kafka .support .micrometer .KafkaTemplateObservation .DefaultKafkaTemplateObservationConvention ;
8284import org .springframework .kafka .test .EmbeddedKafkaBroker ;
8385import org .springframework .kafka .test .context .EmbeddedKafka ;
8486import org .springframework .kafka .test .utils .KafkaTestUtils ;
85- import org .springframework .lang . Nullable ;
87+ import org .springframework .messaging . handler . annotation . SendTo ;
8688import org .springframework .test .annotation .DirtiesContext ;
8789import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
8890import org .springframework .util .StringUtils ;
98100 * @author Wang Zhiyang
99101 * @author Christian Mergenthaler
100102 * @author Soby Chacko
103+ * @author Francois Rosiere
101104 *
102105 * @since 3.0
103106 */
104107@ SpringJUnitConfig
105108@ EmbeddedKafka (topics = {ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
106- ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_RUNTIME_EXCEPTION ,
107- ObservationTests .OBSERVATION_ERROR , ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
109+ ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_TEST_4 , ObservationTests .OBSERVATION_REPLY ,
110+ ObservationTests .OBSERVATION_RUNTIME_EXCEPTION , ObservationTests .OBSERVATION_ERROR ,
111+ ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
108112@ DirtiesContext
109113public class ObservationTests {
110114
@@ -114,6 +118,10 @@ public class ObservationTests {
114118
115119 public final static String OBSERVATION_TEST_3 = "observation.testT3" ;
116120
121+ public final static String OBSERVATION_TEST_4 = "observation.testT4" ;
122+
123+ public final static String OBSERVATION_REPLY = "observation.reply" ;
124+
117125 public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception" ;
118126
119127 public final static String OBSERVATION_ERROR = "observation.error" ;
@@ -356,7 +364,7 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
356364 void observationRuntimeException (@ Autowired ExceptionListener listener , @ Autowired SimpleTracer tracer ,
357365 @ Autowired @ Qualifier ("throwableTemplate" ) KafkaTemplate <Integer , String > runtimeExceptionTemplate ,
358366 @ Autowired KafkaListenerEndpointRegistry endpointRegistry )
359- throws ExecutionException , InterruptedException , TimeoutException {
367+ throws ExecutionException , InterruptedException , TimeoutException {
360368
361369 runtimeExceptionTemplate .send (OBSERVATION_RUNTIME_EXCEPTION , "testRuntimeException" ).get (10 , TimeUnit .SECONDS );
362370 assertThat (listener .latch4 .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -459,6 +467,19 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
459467 tracer .getSpans ().clear ();
460468 }
461469
470+ @ Test
471+ void testReplyingKafkaTemplateObservation (
472+ @ Autowired ReplyingKafkaTemplate <Integer , String , String > template ,
473+ @ Autowired ObservationRegistry observationRegistry ) {
474+ assertThat (template .sendAndReceive (new ProducerRecord <>(OBSERVATION_TEST_4 , "test" ))
475+ // the current observation must be retrieved from the consumer thread of the reply
476+ .thenApply (replyRecord -> observationRegistry .getCurrentObservation ().getContext ()))
477+ .succeedsWithin (Duration .ofSeconds (30 ))
478+ .isInstanceOf (KafkaRecordReceiverContext .class )
479+ .extracting ("name" )
480+ .isEqualTo ("spring.kafka.listener" );
481+ }
482+
462483 @ Configuration
463484 @ EnableKafka
464485 public static class Config {
@@ -530,13 +551,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
530551 return template ;
531552 }
532553
554+ @ Bean
555+ ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (ProducerFactory <Integer , String > pf , ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
556+ ReplyingKafkaTemplate <Integer , String , String > kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
557+ kafkaTemplate .setObservationEnabled (true );
558+ return kafkaTemplate ;
559+ }
560+
533561 @ Bean
534562 ConcurrentKafkaListenerContainerFactory <Integer , String > kafkaListenerContainerFactory (
535- ConsumerFactory <Integer , String > cf ) {
563+ ConsumerFactory <Integer , String > cf , ObservationRegistry observationRegistry ,
564+ KafkaTemplate <Integer , String > kafkaTemplate ) {
536565
537566 ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
538567 new ConcurrentKafkaListenerContainerFactory <>();
539568 factory .setConsumerFactory (cf );
569+ factory .setReplyTemplate (kafkaTemplate );
540570 factory .getContainerProperties ().setObservationEnabled (true );
541571 factory .setContainerCustomizer (container -> {
542572 if (container .getListenerId ().equals ("obs3" )) {
@@ -585,7 +615,7 @@ public List<String> fields() {
585615 // This is called on the producer side when the message is being sent
586616 // Normally we would pass information from tracing context - for tests we don't need to
587617 @ Override
588- public <C > void inject (TraceContext context , @ Nullable C carrier , Setter <C > setter ) {
618+ public <C > void inject (TraceContext context , C carrier , Setter <C > setter ) {
589619 setter .set (carrier , "foo" , "some foo value" );
590620 setter .set (carrier , "bar" , "some bar value" );
591621
@@ -649,6 +679,12 @@ void listen2(ConsumerRecord<?, ?> in) {
649679 void listen3 (ConsumerRecord <Integer , String > in ) {
650680 }
651681
682+ @ KafkaListener (id = "obsReply" , topics = OBSERVATION_TEST_4 )
683+ @ SendTo // default REPLY_TOPIC header
684+ public String replyListener (ConsumerRecord <Integer , String > in ) {
685+ return in .value ().toUpperCase ();
686+ }
687+
652688 }
653689
654690 public static class ExceptionListener {
0 commit comments