@@ -389,19 +389,22 @@ public class KRequestingApplication {
389389 }
390390
391391 @Bean
392- public ReplyingKafkaTemplate<String, String, String> kafkaTemplate (
392+ public ReplyingKafkaTemplate<String, String, String> replyingTemplate (
393393 ProducerFactory<String, String> pf,
394- KafkaMessageListenerContainer<String , String> replyContainer ) {
394+ ConcurrentMessageListenerContainer<Long , String> repliesContainer ) {
395395
396- return new ReplyingKafkaTemplate<>(pf, replyContainer );
396+ return new ReplyingKafkaTemplate<>(pf, repliesContainer );
397397 }
398398
399399 @Bean
400- public KafkaMessageListenerContainer<String, String> replyContainer(
401- ConsumerFactory<String, String> cf) {
402-
403- ContainerProperties containerProperties = new ContainerProperties("kReplies");
404- return new KafkaMessageListenerContainer<>(cf, containerProperties);
400+ public ConcurrentMessageListenerContainer<String, String> repliesContainer(
401+ ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
402+
403+ ConcurrentMessageListenerContainer<String, String> repliesContainer =
404+ containerFactory.createContainer("replies");
405+ repliesContainer.getContainerProperties().setGroupId("repliesGroup");
406+ repliesContainer.setAutoStartup(false);
407+ return repliesContainer;
405408 }
406409
407410 @Bean
@@ -417,6 +420,8 @@ public class KRequestingApplication {
417420}
418421----
419422
423+ Note that we can use Boot's auto configured container factory to create the reply container.
424+
420425The template sets a header `KafkaHeaders.CORRELATION_ID` which must be echoed back by the server side.
421426
422427In this case, simple `@KafkaListener` application responds:
0 commit comments