Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug topic header is missing when ReplyingKafkaTemplate's listener's method returns Iterable<Message<?>> #2907

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Venil Noronha
* @author Nathan Xu
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {

Expand Down Expand Up @@ -470,8 +471,8 @@ protected void sendResponse(Object result, String topic, @Nullable Object source
if (!returnTypeMessage && topic == null) {
this.logger.debug(() -> "No replyTopic to handle the reply: " + result);
}
else if (result instanceof Message) {
Message<?> reply = checkHeaders(result, topic, source);
else if (result instanceof Message<?> mResult) {
Message<?> reply = checkHeaders(mResult, topic, source);
this.replyTemplate.send(reply);
}
else {
Expand All @@ -483,8 +484,8 @@ else if (result instanceof Message) {
}
if (iterableOfMessages || this.splitIterables) {
((Iterable<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
if (v instanceof Message<?> mv) {
this.replyTemplate.send(checkHeaders(mv, topic, source));
}
else {
this.replyTemplate.send(topic, v);
Expand All @@ -501,24 +502,23 @@ else if (result instanceof Message) {
}
}

private Message<?> checkHeaders(Object result, String topic, @Nullable Object source) { // NOSONAR (complexity)
Message<?> reply = (Message<?>) result;
private Message<?> checkHeaders(Message<?> reply, String topic, @Nullable Object source) { // NOSONAR (complexity)
MessageHeaders headers = reply.getHeaders();
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
boolean sourceIsMessage = source instanceof Message;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage
&& getCorrelationId((Message<?>) source) != null;
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
&& getReplyPartition((Message<?>) source) != null;
if (needsTopic || needsCorrelation || needsPartition) {
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
if (needsTopic) {
builder.setHeader(KafkaHeaders.TOPIC, topic);
}
if (needsCorrelation && sourceIsMessage) {
builder.setHeader(this.correlationHeaderName,
((Message<?>) source).getHeaders().get(this.correlationHeaderName));
if (needsCorrelation) {
setCorrelationId(builder, (Message<?>) source);
}
if (sourceIsMessage && reply.getHeaders().get(KafkaHeaders.REPLY_PARTITION) == null) {
if (needsPartition) {
setPartition(builder, (Message<?>) source);
}
reply = builder.build();
Expand Down Expand Up @@ -571,6 +571,30 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
this.replyTemplate.send(builder.build());
}

private void setTopic(MessageBuilder<?> builder, Message<?> source) {
byte[] topicBytes = getReplyTopic(source);
if (topicBytes != null) {
builder.setHeader(KafkaHeaders.TOPIC, new String(topicBytes, StandardCharsets.UTF_8));
}
}

@Nullable
private byte[] getReplyTopic(Message<?> source) {
return source.getHeaders().get(KafkaHeaders.REPLY_TOPIC, byte[].class);
}

private void setCorrelationId(MessageBuilder<?> builder, Message<?> source) {
byte[] correlationIdBytes = getCorrelationId(source);
if (correlationIdBytes != null) {
builder.setHeader(this.correlationHeaderName, correlationIdBytes);
}
}

@Nullable
private byte[] getCorrelationId(Message<?> source) {
return source.getHeaders().get(this.correlationHeaderName, byte[].class);
}

private void setPartition(MessageBuilder<?> builder, Message<?> source) {
byte[] partitionBytes = getReplyPartition(source);
if (partitionBytes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

/**
* @author Gary Russell
* @author Nathan Xu
* @since 2.1.3
*
*/
Expand All @@ -116,7 +117,9 @@
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST })
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST
})
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -167,6 +170,10 @@ public class ReplyingKafkaTemplateTests {

public static final String L_REQUEST = "lRequest";

public static final String M_REQUEST = "mRequest";

public static final String M_REPLY = "mReply";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -845,6 +852,24 @@ void requestTimeoutWithMessage() throws Exception {
}
}

@Test
void testMessageIterableReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(M_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
ProducerRecord<Integer, String> record = new ProducerRecord<>(M_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -1011,6 +1036,14 @@ public Message<String> handleL(String in) throws InterruptedException {
.build();
}

@KafkaListener(id = M_REQUEST, topics = M_REQUEST)
@SendTo // default REPLY_TOPIC header
public List<Message<String>> handleM(String in) throws InterruptedException {
return Collections.singletonList(MessageBuilder.withPayload(in.toUpperCase())
.setHeader("serverSentAnError", "user error")
.build());
}

}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand Down
Loading