Skip to content

Commit

Permalink
GH-3528: Improving Observability in Asynchronous Processing
Browse files Browse the repository at this point in the history
Fixes: #3528

#3528

- Improve spring-kafka observability for failures in async consumer tasks
  when listener methods return CompletableFuture<?> or Mono<?> and throw
  errors during async execution
- Refactoring code in KafkaMessageListenerContainer and MessagingMessageListenerAdapter
  around observability
- Adding tests to verify
- Add @nullable annotations to relevant methods for better null safety
  • Loading branch information
sobychacko committed Oct 15, 2024
1 parent 3d507c7 commit 9c5ee3e
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -948,6 +950,10 @@ else if (listener instanceof MessageListener) {
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
this.kafkaAdmin = obtainAdmin();

if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) {
rmmla.setObservationRegistry(observationRegistry);
}
}

private AckMode determineAckMode() {
Expand Down Expand Up @@ -2693,6 +2699,7 @@ private void pauseForNackSleep() {
* @throws Error an error.
*/
@Nullable
@SuppressWarnings("try")
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {

Expand All @@ -2703,42 +2710,49 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId,
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {

observation.start();
try (Observation.Scope ignored = observation.openScope()) {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) {
observation.error(e);
}
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
observation.error(e);
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (RecordInRetryException rire) {
this.logger.info("Record in retry and not yet recovered");
return rire;
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
catch (RecordInRetryException rire) {
this.logger.info("Record in retry and not yet recovered");
return rire;
}
return null;
});
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
finally {
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) {
observation.stop();
}
}
return null;
}

private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public boolean isAsyncReplies() {
return this.asyncReplies;
}

@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -73,6 +74,8 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -153,6 +156,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

/**
* Create an instance with the provided bean and method.
* @param bean the bean.
Expand Down Expand Up @@ -382,15 +387,34 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
final Message<?> message) {

Throwable listenerError = null;
Object result = null;
Observation currentObservation = getCurrentObservation();
try {
Object result = invokeHandler(records, acknowledgment, message, consumer);
result = invokeHandler(records, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, records, acknowledgment, consumer, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
catch (ListenerExecutionFailedException e) {
listenerError = e;
currentObservation.error(e);
handleException(records, acknowledgment, consumer, message, e);
}
catch (Error e) {
listenerError = e;
currentObservation.error(e);
}
finally {
if (listenerError != null || result == null) {
currentObservation.stop();
}
}
}

private Observation getCurrentObservation() {
Observation currentObservation = this.observationRegistry.getCurrentObservation();
return currentObservation == null ? Observation.NOOP : currentObservation;
}

/**
Expand All @@ -402,6 +426,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
* @param consumer the consumer.
* @return the result of invocation.
*/
@Nullable
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {

Expand Down Expand Up @@ -460,7 +485,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me
*/
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, @Nullable Message<?> source) {

final Observation observation = getCurrentObservation();
this.logger.debug(() -> "Listener method returned result [" + resultArg
+ "] - generating response message for it");
String replyTopic = evaluateReplyTopic(request, source, resultArg);
Expand All @@ -474,35 +499,39 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
invocationResult.messageReturnType() :
this.messageReturnType;

if (result instanceof CompletableFuture<?> completable) {
if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
result = mono.toFuture();
}
else if (!(result instanceof CompletableFuture<?>)) {
result = CompletableFuture.completedFuture(result);
}
else {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
completable.whenComplete((r, t) -> {
}

((CompletableFuture<?>) result).whenComplete((r, t) -> {
try {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
acknowledge(acknowledgment);
}
else {
asyncFailure(request, acknowledgment, consumer, t, source);
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
observation.error(cause);
asyncFailure(request, acknowledgment, consumer, cause, source);
}
});
}
else if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
mono.subscribe(
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
t -> asyncFailure(request, acknowledgment, consumer, t, source),
() -> acknowledge(acknowledgment)
);
}
else {
sendResponse(result, replyTopic, source, messageReturnType);
}
finally {
observation.stop();
}
});
}

@Nullable
Expand Down Expand Up @@ -870,6 +899,10 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,6 +88,7 @@
import io.micrometer.tracing.propagation.Propagator;
import io.micrometer.tracing.test.simple.SimpleSpan;
import io.micrometer.tracing.test.simple.SimpleTracer;
import reactor.core.publisher.Mono;

/**
* @author Gary Russell
Expand All @@ -112,7 +114,11 @@ public class ObservationTests {

public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";

public final static String OBSERVATION_ERROR = "observation.error";
public final static String OBSERVATION_ERROR = "observation.error.sync";

public final static String OBSERVATION_ERROR_COMPLETABLE_FUTURE = "observation.error.completableFuture";

public final static String OBSERVATION_ERROR_MONO = "observation.error.mono";

@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
Expand Down Expand Up @@ -387,6 +393,41 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
.hasMessage("obs5 error");
}

@Test
void observationErrorExceptionWhenCompletableFutureReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {
errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS);
Deque<SimpleSpan> spans = tracer.getSpans();
await().untilAsserted(() -> assertThat(spans).hasSize(2));
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs6-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("Should report metric.");
}

@Test
void observationErrorExceptionWhenMonoReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR_MONO, "testError").get(10, TimeUnit.SECONDS);
Deque<SimpleSpan> spans = tracer.getSpans();
await().untilAsserted(() -> assertThat(spans).hasSize(2));
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs7-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("Should report metric.");
}

@Test
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
Expand Down Expand Up @@ -590,14 +631,34 @@ public static class ExceptionListener {

@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
this.latch4.countDown();
throw new IllegalStateException("obs4 run time exception");
try {
throw new IllegalStateException("obs4 run time exception");
}
finally {
this.latch4.countDown();
}
}

@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
void listenError(ConsumerRecord<Integer, String> in) {
this.latch5.countDown();
throw new Error("obs5 error");
try {
throw new Error("obs5 error");
}
finally {
this.latch5.countDown();
}
}

@KafkaListener(id = "obs6", topics = OBSERVATION_ERROR_COMPLETABLE_FUTURE)
CompletableFuture<Void> receive(ConsumerRecord<Object, Object> record) {
return CompletableFuture.supplyAsync(() -> {
throw new Error("Should report metric.");
});
}

@KafkaListener(id = "obs7", topics = OBSERVATION_ERROR_MONO)
Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
return Mono.error(new Error("Should report metric."));
}

}
Expand Down

0 comments on commit 9c5ee3e

Please sign in to comment.