Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3528: Improving Observability in Asynchronous Processing #3558

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
Expand Down Expand Up @@ -948,6 +949,10 @@ else if (listener instanceof MessageListener) {
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
this.kafkaAdmin = obtainAdmin();

if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) {
rmmla.setObservationRegistry(observationRegistry);
}
}

private AckMode determineAckMode() {
Expand Down Expand Up @@ -2693,6 +2698,7 @@ private void pauseForNackSleep() {
* @throws Error an error.
*/
@Nullable
@SuppressWarnings("try")
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
Iterator<ConsumerRecord<K, V>> iterator) {

Expand All @@ -2703,42 +2709,49 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId,
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {

observation.start();
try (Observation.Scope ignored = observation.openScope()) {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
observation.error(e);
}
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
observation.error(e);
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (RecordInRetryException rire) {
this.logger.info("Record in retry and not yet recovered");
return rire;
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
catch (RecordInRetryException rire) {
this.logger.info("Record in retry and not yet recovered");
return rire;
}
return null;
});
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
finally {
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
observation.stop();
}
}
return null;
}

private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public boolean isAsyncReplies() {
return this.asyncReplies;
}

@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -73,6 +74,8 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -153,6 +156,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

/**
* Create an instance with the provided bean and method.
* @param bean the bean.
Expand Down Expand Up @@ -247,6 +252,15 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
this.handlerMethod = handlerMethod;
}

/**
* Set the {@link ObservationRegistry} to handle observability.
* @param observationRegistry {@link ObservationRegistry} instance.
* @since 3.3.0
*/
public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

public boolean isAsyncReplies() {
return this.handlerMethod != null && this.handlerMethod.isAsyncReplies();
}
Expand Down Expand Up @@ -382,15 +396,34 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
final Message<?> message) {

Throwable listenerError = null;
Object result = null;
Observation currentObservation = getCurrentObservation();
try {
Object result = invokeHandler(records, acknowledgment, message, consumer);
result = invokeHandler(records, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, records, acknowledgment, consumer, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
catch (ListenerExecutionFailedException e) {
listenerError = e;
currentObservation.error(e);
handleException(records, acknowledgment, consumer, message, e);
}
catch (Error e) {
listenerError = e;
currentObservation.error(e);
}
finally {
if (listenerError != null || result == null) {
currentObservation.stop();
}
}
}

private Observation getCurrentObservation() {
Observation currentObservation = this.observationRegistry.getCurrentObservation();
return currentObservation == null ? Observation.NOOP : currentObservation;
}

/**
Expand All @@ -402,6 +435,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
* @param consumer the consumer.
* @return the result of invocation.
*/
@Nullable
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
Consumer<?, ?> consumer) {

Expand Down Expand Up @@ -460,7 +494,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me
*/
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, @Nullable Message<?> source) {

final Observation observation = getCurrentObservation();
this.logger.debug(() -> "Listener method returned result [" + resultArg
+ "] - generating response message for it");
String replyTopic = evaluateReplyTopic(request, source, resultArg);
Expand All @@ -474,35 +508,42 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
invocationResult.messageReturnType() :
this.messageReturnType;

if (result instanceof CompletableFuture<?> completable) {
CompletableFuture<?> completableFutureResult;

if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
completableFutureResult = mono.toFuture();
}
else if (!(result instanceof CompletableFuture<?>)) {
completableFutureResult = CompletableFuture.completedFuture(result);
}
else {
completableFutureResult = (CompletableFuture<?>) result;
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
completable.whenComplete((r, t) -> {
}

completableFutureResult.whenComplete((r, t) -> {
try {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
acknowledge(acknowledgment);
}
else {
asyncFailure(request, acknowledgment, consumer, t, source);
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
observation.error(cause);
asyncFailure(request, acknowledgment, consumer, cause, source);
}
});
}
else if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
}
mono.subscribe(
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
t -> asyncFailure(request, acknowledgment, consumer, t, source),
() -> acknowledge(acknowledgment)
);
}
else {
sendResponse(result, replyTopic, source, messageReturnType);
}
finally {
observation.stop();
}
});
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,6 +88,7 @@
import io.micrometer.tracing.propagation.Propagator;
import io.micrometer.tracing.test.simple.SimpleSpan;
import io.micrometer.tracing.test.simple.SimpleTracer;
import reactor.core.publisher.Mono;

/**
* @author Gary Russell
Expand All @@ -112,7 +114,11 @@ public class ObservationTests {

public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";

public final static String OBSERVATION_ERROR = "observation.error";
public final static String OBSERVATION_ERROR = "observation.error.sync";

public final static String OBSERVATION_ERROR_COMPLETABLE_FUTURE = "observation.error.completableFuture";

public final static String OBSERVATION_ERROR_MONO = "observation.error.mono";

@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
Expand Down Expand Up @@ -387,6 +393,42 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
.hasMessage("obs5 error");
}

@Test
void observationErrorExceptionWhenCompletableFutureReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {
artembilan marked this conversation as resolved.
Show resolved Hide resolved

errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS);
Deque<SimpleSpan> spans = tracer.getSpans();
await().untilAsserted(() -> assertThat(spans).hasSize(2));
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs6-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("Should report metric.");
}

@Test
void observationErrorExceptionWhenMonoReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR_MONO, "testError").get(10, TimeUnit.SECONDS);
Deque<SimpleSpan> spans = tracer.getSpans();
await().untilAsserted(() -> assertThat(spans).hasSize(2));
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs7-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("Should report metric.");
}

@Test
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
Expand Down Expand Up @@ -590,14 +632,34 @@ public static class ExceptionListener {

@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
this.latch4.countDown();
throw new IllegalStateException("obs4 run time exception");
try {
throw new IllegalStateException("obs4 run time exception");
}
finally {
this.latch4.countDown();
}
}

@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
void listenError(ConsumerRecord<Integer, String> in) {
this.latch5.countDown();
throw new Error("obs5 error");
try {
throw new Error("obs5 error");
}
finally {
this.latch5.countDown();
}
}

@KafkaListener(id = "obs6", topics = OBSERVATION_ERROR_COMPLETABLE_FUTURE)
CompletableFuture<Void> receive(ConsumerRecord<Object, Object> record) {
return CompletableFuture.supplyAsync(() -> {
throw new Error("Should report metric.");
});
}

@KafkaListener(id = "obs7", topics = OBSERVATION_ERROR_MONO)
Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
return Mono.error(new Error("Should report metric."));
}

}
Expand Down