Skip to content

Commit

Permalink
GH-1189: @SendTo for @KafkaHandler after error is handled
Browse files Browse the repository at this point in the history
Sending the result from a `KafkaListenerErrorHandler` was broken
for `@KafkaHandler` because the send to expression
was lost.
  • Loading branch information
Zhiyang.Wang1 committed Jan 14, 2024
1 parent aee58bd commit e07d4a5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -337,6 +337,17 @@ public boolean hasDefaultHandler() {
return this.defaultHandler != null;
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {

InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
if (handler != null) {
return new InvocationResult(result, this.handlerSendTo.get(handler),
this.handlerReturnsMessage.get(handler));
}
return null;
}

private static final class PayloadValidator extends PayloadMethodArgumentResolver {

PayloadValidator(Validator validator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.listener.adapter;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

Expand Down Expand Up @@ -98,4 +99,13 @@ public Object getBean() {
}
}

@Nullable
public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) {

if (this.delegatingHandler != null && inboundPayload != null) {
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -462,16 +463,14 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
String replyTopic = evaluateReplyTopic(request, source, resultArg);
Assert.state(replyTopic == null || this.replyTemplate != null,
"a KafkaTemplate is required to support replies");
Object result;
boolean messageReturnType;
if (resultArg instanceof InvocationResult invocationResult) {
result = invocationResult.getResult();
messageReturnType = invocationResult.isMessageReturnType();
}
else {
result = resultArg;
messageReturnType = this.messageReturnType;
}

Object result = resultArg instanceof InvocationResult invocationResult ?
invocationResult.getResult() :
resultArg;
boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ?
invocationResult.isMessageReturnType() :
this.messageReturnType;

if (result instanceof CompletableFuture<?> completable) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
Expand Down Expand Up @@ -677,9 +676,11 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle
if (NULL_MESSAGE.equals(message)) {
message = new GenericMessage<>(records);
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, acknowledgment, consumer, message);
Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (errorResult != null && !(errorResult instanceof InvocationResult)) {
Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload());
handleResult(Objects.requireNonNullElse(result, errorResult),
records, acknowledgment, consumer, message);
}
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -273,7 +273,6 @@ public void testAnonymous() {
container.stop();
}

@SuppressWarnings("deprecation")
@Test
public void manyTests() throws Exception {
this.recordFilter.called = false;
Expand Down Expand Up @@ -323,7 +322,7 @@ public void manyTests() throws Exception {
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
assertThat(this.listener.ack).isNotNull();
assertThat(this.listener.eventLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.event.getListenerId().startsWith("qux-"));
assertThat(this.listener.event.getListenerId().startsWith("qux-")).isTrue();
MessageListenerContainer manualContainer = this.registry.getListenerContainer("qux");
assertThat(KafkaTestUtils.getPropertyValue(manualContainer, "containerProperties.messageListener"))
.isInstanceOf(FilteringMessageListenerAdapter.class);
Expand All @@ -350,7 +349,7 @@ public void manyTests() throws Exception {

template.send("annotated4", 0, "foo");
assertThat(this.listener.noLongerIdleEventLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.noLongerIdleEvent.getListenerId().startsWith("qux-"));
assertThat(this.listener.noLongerIdleEvent.getListenerId().startsWith("qux-")).isTrue();

template.send("annotated5", 0, 0, "foo");
template.send("annotated5", 1, 0, "bar");
Expand Down Expand Up @@ -472,10 +471,12 @@ public void testMulti() throws Exception {
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
assertThat(reply.value()).isEqualTo("OK");
consumer.close();

template.send("annotated8", 0, 1, "junk");
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
ConsumerRecord<Integer, String> reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
consumer.close();
assertThat(reply2.value()).isEqualTo("JUNK intentional");
assertThat(this.multiListener.meta).isNotNull();
}

Expand Down Expand Up @@ -1130,7 +1131,6 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
}

@Bean
@SuppressWarnings("deprecation")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
factoryWithBadConverter() {

Expand Down Expand Up @@ -1758,7 +1758,8 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
return (m, e) -> {
listener.errorLatch.countDown();
return null;
String payload = m.getPayload().toString().toUpperCase();
return payload + " " + e.getCause().getMessage();
};
}

Expand Down Expand Up @@ -2475,6 +2476,7 @@ static class MultiListenerBean {
volatile ConsumerRecordMetadata meta;

@KafkaHandler
@SendTo("annotated8reply")
public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
if ("junk".equals(bar)) {
throw new RuntimeException("intentional");
Expand Down Expand Up @@ -2652,14 +2654,11 @@ public boolean equals(Object obj) {
}
Foo other = (Foo) obj;
if (this.bar == null) {
if (other.bar != null) {
return false;
}
return other.bar == null;
}
else if (!this.bar.equals(other.bar)) {
return false;
else {
return this.bar.equals(other.bar);
}
return true;
}

}
Expand Down Expand Up @@ -2797,16 +2796,7 @@ interface ProjectionSample {

}

static class CustomMethodArgument {

final String body;

final String topic;

CustomMethodArgument(String body, String topic) {
this.body = body;
this.topic = topic;
}
record CustomMethodArgument(String body, String topic) {

}

Expand Down

0 comments on commit e07d4a5

Please sign in to comment.