-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3528: Improving Observability in Asynchronous Processing #3558
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,6 +106,8 @@ | |
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption; | ||
import org.springframework.kafka.listener.ContainerProperties.EOSMode; | ||
import org.springframework.kafka.listener.adapter.AsyncRepliesAware; | ||
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; | ||
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; | ||
import org.springframework.kafka.support.Acknowledgment; | ||
import org.springframework.kafka.support.KafkaHeaders; | ||
import org.springframework.kafka.support.KafkaUtils; | ||
|
@@ -948,6 +950,10 @@ else if (listener instanceof MessageListener) { | |
this.lastAlertPartition = new HashMap<>(); | ||
this.wasIdlePartition = new HashMap<>(); | ||
this.kafkaAdmin = obtainAdmin(); | ||
|
||
if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) { | ||
rmmla.setObservationRegistry(observationRegistry); | ||
} | ||
} | ||
|
||
private AckMode determineAckMode() { | ||
|
@@ -2693,6 +2699,7 @@ private void pauseForNackSleep() { | |
* @throws Error an error. | ||
*/ | ||
@Nullable | ||
@SuppressWarnings("try") | ||
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR | ||
Iterator<ConsumerRecord<K, V>> iterator) { | ||
|
||
|
@@ -2703,42 +2710,49 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco | |
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId, | ||
this::clusterId), | ||
this.observationRegistry); | ||
return observation.observe(() -> { | ||
|
||
observation.start(); | ||
try (Observation.Scope ignored = observation.openScope()) { | ||
invokeOnMessage(cRecord); | ||
successTimer(sample, cRecord); | ||
recordInterceptAfter(cRecord, null); | ||
} | ||
catch (RuntimeException e) { | ||
failureTimer(sample, cRecord); | ||
recordInterceptAfter(cRecord, e); | ||
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) { | ||
observation.error(e); | ||
} | ||
if (this.commonErrorHandler == null) { | ||
throw e; | ||
} | ||
try { | ||
invokeOnMessage(cRecord); | ||
successTimer(sample, cRecord); | ||
recordInterceptAfter(cRecord, null); | ||
invokeErrorHandler(cRecord, iterator, e); | ||
commitOffsetsIfNeededAfterHandlingError(cRecord); | ||
} | ||
catch (RuntimeException e) { | ||
failureTimer(sample, cRecord); | ||
recordInterceptAfter(cRecord, e); | ||
if (this.commonErrorHandler == null) { | ||
throw e; | ||
} | ||
observation.error(e); | ||
try { | ||
invokeErrorHandler(cRecord, iterator, e); | ||
commitOffsetsIfNeededAfterHandlingError(cRecord); | ||
} | ||
catch (RecordInRetryException rire) { | ||
this.logger.info("Record in retry and not yet recovered"); | ||
return rire; | ||
} | ||
catch (KafkaException ke) { | ||
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); | ||
return ke; | ||
} | ||
catch (RuntimeException ee) { | ||
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); | ||
return ee; | ||
} | ||
catch (Error er) { // NOSONAR | ||
this.logger.error(er, "Error handler threw an error"); | ||
throw er; | ||
} | ||
catch (RecordInRetryException rire) { | ||
this.logger.info("Record in retry and not yet recovered"); | ||
return rire; | ||
} | ||
return null; | ||
}); | ||
catch (KafkaException ke) { | ||
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); | ||
return ke; | ||
} | ||
catch (RuntimeException ee) { | ||
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION); | ||
return ee; | ||
} | ||
catch (Error er) { // NOSONAR | ||
this.logger.error(er, "Error handler threw an error"); | ||
throw er; | ||
} | ||
} | ||
finally { | ||
if (!(this.listener instanceof MessagingMessageListenerAdapter<?, ?>)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DITTO |
||
observation.stop(); | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.commons.logging.LogFactory; | ||
|
@@ -73,6 +74,8 @@ | |
import org.springframework.util.ObjectUtils; | ||
import org.springframework.util.StringUtils; | ||
|
||
import io.micrometer.observation.Observation; | ||
import io.micrometer.observation.ObservationRegistry; | ||
import reactor.core.publisher.Mono; | ||
|
||
/** | ||
|
@@ -153,6 +156,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS | |
|
||
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID; | ||
|
||
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; | ||
|
||
/** | ||
* Create an instance with the provided bean and method. | ||
* @param bean the bean. | ||
|
@@ -382,15 +387,34 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable | |
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, | ||
final Message<?> message) { | ||
|
||
Throwable listenerError = null; | ||
Object result = null; | ||
Observation currentObservation = getCurrentObservation(); | ||
try { | ||
Object result = invokeHandler(records, acknowledgment, message, consumer); | ||
result = invokeHandler(records, acknowledgment, message, consumer); | ||
if (result != null) { | ||
handleResult(result, records, acknowledgment, consumer, message); | ||
} | ||
} | ||
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control | ||
catch (ListenerExecutionFailedException e) { | ||
listenerError = e; | ||
currentObservation.error(e); | ||
handleException(records, acknowledgment, consumer, message, e); | ||
} | ||
catch (Error e) { | ||
listenerError = e; | ||
currentObservation.error(e); | ||
} | ||
finally { | ||
if (listenerError != null || result == null) { | ||
currentObservation.stop(); | ||
} | ||
} | ||
} | ||
|
||
private Observation getCurrentObservation() { | ||
Observation currentObservation = this.observationRegistry.getCurrentObservation(); | ||
return currentObservation == null ? Observation.NOOP : currentObservation; | ||
} | ||
|
||
/** | ||
|
@@ -402,6 +426,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C | |
* @param consumer the consumer. | ||
* @return the result of invocation. | ||
*/ | ||
@Nullable | ||
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message, | ||
Consumer<?, ?> consumer) { | ||
|
||
|
@@ -460,7 +485,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me | |
*/ | ||
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, | ||
Consumer<?, ?> consumer, @Nullable Message<?> source) { | ||
|
||
final Observation observation = getCurrentObservation(); | ||
this.logger.debug(() -> "Listener method returned result [" + resultArg | ||
+ "] - generating response message for it"); | ||
String replyTopic = evaluateReplyTopic(request, source, resultArg); | ||
|
@@ -474,35 +499,39 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle | |
invocationResult.messageReturnType() : | ||
this.messageReturnType; | ||
|
||
if (result instanceof CompletableFuture<?> completable) { | ||
if (monoPresent && result instanceof Mono<?> mono) { | ||
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { | ||
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " + | ||
"(or Kotlin suspend function); otherwise the container will ack the message immediately"); | ||
} | ||
result = mono.toFuture(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not OK to re-assign method argument. |
||
} | ||
else if (!(result instanceof CompletableFuture<?>)) { | ||
result = CompletableFuture.completedFuture(result); | ||
} | ||
else { | ||
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { | ||
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; " | ||
+ "otherwise the container will ack the message immediately"); | ||
} | ||
completable.whenComplete((r, t) -> { | ||
} | ||
|
||
((CompletableFuture<?>) result).whenComplete((r, t) -> { | ||
try { | ||
if (t == null) { | ||
asyncSuccess(r, replyTopic, source, messageReturnType); | ||
acknowledge(acknowledgment); | ||
} | ||
else { | ||
asyncFailure(request, acknowledgment, consumer, t, source); | ||
Throwable cause = t instanceof CompletionException ? t.getCause() : t; | ||
observation.error(cause); | ||
asyncFailure(request, acknowledgment, consumer, cause, source); | ||
} | ||
}); | ||
} | ||
else if (monoPresent && result instanceof Mono<?> mono) { | ||
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) { | ||
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " + | ||
"(or Kotlin suspend function); otherwise the container will ack the message immediately"); | ||
} | ||
mono.subscribe( | ||
r -> asyncSuccess(r, replyTopic, source, messageReturnType), | ||
t -> asyncFailure(request, acknowledgment, consumer, t, source), | ||
() -> acknowledge(acknowledgment) | ||
); | ||
} | ||
else { | ||
sendResponse(result, replyTopic, source, messageReturnType); | ||
} | ||
finally { | ||
observation.stop(); | ||
} | ||
}); | ||
} | ||
|
||
@Nullable | ||
|
@@ -870,6 +899,10 @@ private boolean rawByParameterIsType(Type parameterType, Type type) { | |
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); | ||
} | ||
|
||
public void setObservationRegistry(ObservationRegistry observationRegistry) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The setter is in wrong place. |
||
this.observationRegistry = observationRegistry; | ||
} | ||
|
||
/** | ||
* Root object for reply expression evaluation. | ||
* @param request the request. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if for consistency this condition has also to be against
RecordMessagingMessageListenerAdapter
...