Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,12 @@ public Message<?> messageReturn(String in) {
}
----

=== Original Record Key in Reply
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you still think that this short paragraph deserves its own title?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a problem with a separate section for this, but I am neutral either way.


Starting with version 3.3, the Kafka record key from the incoming request (if it exists) will be preserved in the reply record.
This is only applicable for single record request/reply scenario.
When the listener is batch or when the return type is a collection, it is up to the application to specify which keys to use by wrapping the reply record in a `Message` type.

[[aggregating-request-reply]]
== Aggregating Multiple Replies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ For more details, see xref:kafka/receiving-messages/filtering.adoc[Message recei

The `ConcurentContainerMessageListenerContainer` emits now a `ConcurrentContainerStoppedEvent` when all of its child containers are stopped.
For more details, see xref:kafka/events.adoc[Application Events] and `ConcurrentContainerStoppedEvent` Javadocs.

[[x33-original-record-key-in-reply]]
=== Original Record Key in Reply

When using `ReplyingKafkaTemplate`, if the original record from the request contains a key, then that same key will be part of the reply as well.
For more details, see xref:kafka/sending-messages.adoc[Sending Messages] section of the reference docs.
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
builder.setHeader(this.correlationHeaderName, correlationId);
}
setPartition(builder, source);
setKey(builder, source);
this.replyTemplate.send(builder.build());
}

Expand All @@ -662,7 +663,6 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) {

protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Throwable t, Message<?> source) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why haven't you reverted this?

try {
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
Expand All @@ -676,7 +676,6 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO, please.

if (this.errorHandler != null) {
try {
if (NULL_MESSAGE.equals(message)) {
Expand Down Expand Up @@ -719,6 +718,14 @@ private void setPartition(MessageBuilder<?> builder, Message<?> source) {
}
}

private void setKey(MessageBuilder<?> builder, Message<?> source) {
Object key = source.getHeaders().get(KafkaHeaders.RECEIVED_KEY);
// Set the reply record key only for non-batch requests
if (key != null && !(key instanceof List)) {
builder.setHeader(KafkaHeaders.KEY, key);
}
}

@Nullable
private byte[] getReplyPartition(Message<?> source) {
return source.getHeaders().get(KafkaHeaders.REPLY_PARTITION, byte[].class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,6 +101,7 @@
/**
* @author Gary Russell
* @author Nathan Xu
* @author Soby Chacko
* @since 2.1.3
*
*/
Expand Down Expand Up @@ -196,11 +197,12 @@ public void testGood() throws Exception {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
headers.add("baz", "buz".getBytes());
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, null, "foo", headers);
ProducerRecord<Integer, String> record = new ProducerRecord<>(A_REQUEST, null, null, 1, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
assertThat(consumerRecord.key()).isEqualTo(1);
Map<String, Object> receivedHeaders = new HashMap<>();
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
assertThat(receivedHeaders).containsKey("baz");
Expand Down