From 9609cebcf6a8b0e4269e22752bafde02fc72a859 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Sun, 14 Jan 2024 21:48:29 +0800 Subject: [PATCH] GH-1189: `@SendTo` for `@KafkaHandler` after error is handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost. --- .../adapter/DelegatingInvocableHandler.java | 13 ++++++++- .../listener/adapter/HandlerAdapter.java | 12 +++++++- .../MessagingMessageListenerAdapter.java | 29 ++++++++++--------- .../EnableKafkaIntegrationTests.java | 8 +++-- .../EnableKafkaKotlinCoroutinesTests.kt | 8 ++--- 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java index c826f2f201..5d85dee5e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -337,6 +337,17 @@ private boolean assignPayload(MethodParameter methodParameter, Class payloadC && methodParameter.getParameterType().isAssignableFrom(payloadClass); } + @Nullable + public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) { + + InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass()); + if (handler != null) { + return new InvocationResult(result, this.handlerSendTo.get(handler), + this.handlerReturnsMessage.get(handler)); + } + return null; + } + private static final class PayloadValidator extends PayloadMethodArgumentResolver { PayloadValidator(Validator validator) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java index 7809cb2cbe..0aac47f788 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2021 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.kafka.listener.adapter; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -98,4 +99,13 @@ public Object getBean() { } } + @Nullable + public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) { + + if (this.delegatingHandler != null && inboundPayload != null) { + return this.delegatingHandler.getInvocationResultFor(result, inboundPayload); + } + return null; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 552ba51251..9b143ef277 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -462,16 +463,14 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle String replyTopic = evaluateReplyTopic(request, source, resultArg); Assert.state(replyTopic == null || this.replyTemplate != null, "a KafkaTemplate is required to support replies"); - Object result; - boolean messageReturnType; - if (resultArg instanceof InvocationResult invocationResult) { - result = invocationResult.getResult(); - messageReturnType = invocationResult.isMessageReturnType(); - } - else { - result = resultArg; - messageReturnType = this.messageReturnType; - } + + Object result = resultArg instanceof InvocationResult invocationResult ? + invocationResult.getResult() : + resultArg; + boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ? + invocationResult.isMessageReturnType() : + this.messageReturnType; + if (result instanceof CompletableFuture completable) { if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " @@ -677,9 +676,11 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle if (NULL_MESSAGE.equals(message)) { message = new GenericMessage<>(records); } - Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); - if (result != null) { - handleResult(result, records, acknowledgment, consumer, message); + Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment); + if (errorResult != null && !(errorResult instanceof InvocationResult)) { + Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload()); + handleResult(Objects.requireNonNullElse(result, errorResult), + records, acknowledgment, consumer, message); } } catch (Exception ex) { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index dd757944f6..27808beea3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -470,10 +470,12 @@ public void testMulti() throws Exception { assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue(); ConsumerRecord reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); assertThat(reply.value()).isEqualTo("OK"); - consumer.close(); template.send("annotated8", 0, 1, "junk"); assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue(); + ConsumerRecord reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply"); + consumer.close(); + assertThat(reply2.value()).isEqualTo("JUNK intentional"); assertThat(this.multiListener.meta).isNotNull(); } @@ -1754,7 +1756,8 @@ public Object resolveArgument(MethodParameter parameter, Message message) { public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) { return (m, e) -> { listener.errorLatch.countDown(); - return null; + String payload = m.getPayload().toString().toUpperCase(); + return payload + " " + e.getCause().getMessage(); }; } @@ -2468,6 +2471,7 @@ static class MultiListenerBean { volatile ConsumerRecordMetadata meta; @KafkaHandler + @SendTo("annotated8reply") public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) { if ("junk".equals(bar)) { throw new RuntimeException("intentional"); diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt index a312d374cc..33bc106f44 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2", - "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2"]) + "kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"]) class EnableKafkaKotlinCoroutinesTests { @Autowired @@ -96,7 +96,7 @@ class EnableKafkaKotlinCoroutinesTests { @Test fun `test checkedKh reply`() { this.template.send("kotlinAsyncTestTopic3", "foo") - val cr = this.template.receive("sendTopic1", 0, 0, Duration.ofSeconds(30)) + val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30)) assertThat(cr.value()).isEqualTo("FOO") } @@ -105,7 +105,7 @@ class EnableKafkaKotlinCoroutinesTests { class Listener { @KafkaHandler - @SendTo("sendTopic1") + @SendTo("sendTopicReply1") suspend fun handler1(value: String) : String { return value.uppercase() }