From 16838a33359e8426974d3166001a78ef40617987 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 2 Aug 2024 17:09:27 -0400 Subject: [PATCH 1/2] GH-3395: Use Original Key in Reply Record Fixes: #3395 When using `ReplyingKafkaTemplate,` include the original key from the request record if such a key exists. --- .../ROOT/pages/kafka/sending-messages.adoc | 7 +++++++ .../antora/modules/ROOT/pages/whats-new.adoc | 6 ++++++ .../MessagingMessageListenerAdapter.java | 17 ++++++++++++++--- .../ReplyingKafkaTemplateTests.java | 6 ++++-- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc index e64ddbfd2a..82ab2e8281 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc @@ -650,6 +650,13 @@ public Message messageReturn(String in) { } ---- +[[record-key-in-reply]] +== Preserving Request Record Key in Reply + +Starting with version 3.3, the Kafka record key from the incoming request (if it exists) will be preserved in the reply record. +This is only applicable for single record request/reply scenario. +When the listener is batch or when the return type is a collection, it is up to the application to specify which keys to use by wrapping the reply record in a `Message` type. + [[aggregating-request-reply]] == Aggregating Multiple Replies diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 225f647697..288a07b3be 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -34,3 +34,9 @@ For more details, see xref:kafka/receiving-messages/filtering.adoc[Message recei The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped. For more details, see xref:kafka/events.adoc[Application Events] and `ConcurrentContainerStoppedEvent` Javadocs. + +[[x33-record-key-in-reply]] +=== Preserving Request Record Key in Reply + +When using `ReplyingKafkaTemplate`, if the original record from the request contains a key, then that same key will be part of the reply as well. +For more details, see xref:kafka/sending-messages.adoc#record-key-in-reply[Record Key in Reply] section of the reference docs. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 483c389472..a69f30b133 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -638,12 +638,12 @@ private void sendReplyForMessageSource(Object result, String topic, Message s builder.setHeader(this.correlationHeaderName, correlationId); } setPartition(builder, source); + setKey(builder, source); this.replyTemplate.send(builder.build()); } protected void asyncSuccess(@Nullable Object result, String replyTopic, Message source, boolean returnTypeMessage) { - if (result == null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Async result is null, ignoring"); @@ -662,7 +662,6 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) { protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, Throwable t, Message source) { - try { handleException(request, acknowledgment, consumer, source, new ListenerExecutionFailedException(createMessagingErrorMessage( @@ -676,7 +675,6 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Message message, ListenerExecutionFailedException e) { - if (this.errorHandler != null) { try { if (NULL_MESSAGE.equals(message)) { @@ -719,11 +717,24 @@ private void setPartition(MessageBuilder builder, Message source) { } } + private void setKey(MessageBuilder builder, Message source) { + Object key = getReplyKeyFromRequest(source); + // Set the reply record key only for non-batch requests + if (key != null && !(key instanceof List)) { + builder.setHeader(KafkaHeaders.KEY, key); + } + } + @Nullable private byte[] getReplyPartition(Message source) { return source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class); } + @Nullable + private Object getReplyKeyFromRequest(Message source) { + return source.getHeaders().get(KafkaHeaders.RECEIVED_KEY); + } + protected final String createMessagingErrorMessage(String description, Object payload) { return description + "\n" + "Endpoint handler details:\n" diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index dd7e406701..9dcd2af69c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -101,6 +101,7 @@ /** * @author Gary Russell * @author Nathan Xu + * @author Soby Chacko * @since 2.1.3 * */ @@ -196,11 +197,12 @@ public void testGood() throws Exception { template.setDefaultReplyTimeout(Duration.ofSeconds(30)); Headers headers = new RecordHeaders(); headers.add("baz", "buz".getBytes()); - ProducerRecord record = new ProducerRecord<>(A_REQUEST, null, null, null, "foo", headers); + ProducerRecord record = new ProducerRecord<>(A_REQUEST, null, null, 1, "foo", headers); RequestReplyFuture future = template.sendAndReceive(record); future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok ConsumerRecord consumerRecord = future.get(30, TimeUnit.SECONDS); assertThat(consumerRecord.value()).isEqualTo("FOO"); + assertThat(consumerRecord.key()).isEqualTo(1); Map receivedHeaders = new HashMap<>(); new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders); assertThat(receivedHeaders).containsKey("baz"); From 70b72d06cd50a59236c6b11aa492992d7fd613e9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 5 Aug 2024 10:50:58 -0400 Subject: [PATCH 2/2] Addressing PR review --- .../antora/modules/ROOT/pages/kafka/sending-messages.adoc | 3 +-- .../src/main/antora/modules/ROOT/pages/whats-new.adoc | 6 +++--- .../listener/adapter/MessagingMessageListenerAdapter.java | 8 ++------ 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc index 82ab2e8281..cae598b0f4 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc @@ -650,8 +650,7 @@ public Message messageReturn(String in) { } ---- -[[record-key-in-reply]] -== Preserving Request Record Key in Reply +=== Original Record Key in Reply Starting with version 3.3, the Kafka record key from the incoming request (if it exists) will be preserved in the reply record. This is only applicable for single record request/reply scenario. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 288a07b3be..9575e4eeea 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -35,8 +35,8 @@ For more details, see xref:kafka/receiving-messages/filtering.adoc[Message recei The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped. For more details, see xref:kafka/events.adoc[Application Events] and `ConcurrentContainerStoppedEvent` Javadocs. -[[x33-record-key-in-reply]] -=== Preserving Request Record Key in Reply +[[x33-original-record-key-in-reply]] +=== Original Record Key in Reply When using `ReplyingKafkaTemplate`, if the original record from the request contains a key, then that same key will be part of the reply as well. -For more details, see xref:kafka/sending-messages.adoc#record-key-in-reply[Record Key in Reply] section of the reference docs. \ No newline at end of file +For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section of the reference docs. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index a69f30b133..fa2bc3fef6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -644,6 +644,7 @@ private void sendReplyForMessageSource(Object result, String topic, Message s protected void asyncSuccess(@Nullable Object result, String replyTopic, Message source, boolean returnTypeMessage) { + if (result == null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Async result is null, ignoring"); @@ -718,7 +719,7 @@ private void setPartition(MessageBuilder builder, Message source) { } private void setKey(MessageBuilder builder, Message source) { - Object key = getReplyKeyFromRequest(source); + Object key = source.getHeaders().get(KafkaHeaders.RECEIVED_KEY); // Set the reply record key only for non-batch requests if (key != null && !(key instanceof List)) { builder.setHeader(KafkaHeaders.KEY, key); @@ -730,11 +731,6 @@ private byte[] getReplyPartition(Message source) { return source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class); } - @Nullable - private Object getReplyKeyFromRequest(Message source) { - return source.getHeaders().get(KafkaHeaders.RECEIVED_KEY); - } - protected final String createMessagingErrorMessage(String description, Object payload) { return description + "\n" + "Endpoint handler details:\n"