-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous server-side processing in a request/reply scenario #1189
Comments
We already added this for However, managing the offsets for Kafka could make this tricky to implement - we wouldn't want to commit the offset until the async operation is complete. If the next async operation completes first, and its offset committed, and then the first one fails, it's too late because its offset has already been implicitly committed. Since Kafka message are not discretely acknowledged, going async with Kafka record consumption is really not recommended because of the risk of message loss and/or the complexity of managing offsets. For this reason we didn't proceed here, but we can put it on the backlog and give it some more thought. |
True, we faced the same issues building a custom request/reply mechanism on top of Spring Kafka. We eventually elected to just commit periodically (with |
OK; with that caveat; we'll try to get it into a future release. |
I think it is reasonable difficulty. But still may could be done even without a full prefect asynchronized flow. for example. when ackMode=BATCH when ackMode=TIME when ackMode=RECORD The real difficulty is timeout/retry management which make the state complex. For another example, |
Preparation for spring-projects#1189 Defer committing out of order offsets until the gaps are filled. Pause the consumer until all acks are acknowledged.
Preparation for spring-projects#1189 Defer committing out of order offsets until the gaps are filled. Pause the consumer until all acks are acknowledged.
Preparation for #1189 Defer committing out of order offsets until the gaps are filled. Pause the consumer until all acks are acknowledged.
I have added support for out-of-order manual commits, which will now make implementing this possible. |
Add note about increased duplicates possibility.
When pending acks are released due to a gap being "filled", wake the consumer if it is currently paused.
When pending acks are released due to a gap being "filled", wake the consumer if it is currently paused.
Also support Kotlin Coroutines, discussed here #2653 |
Hi, @sobychacko @artembilan |
@Wzy19930507 Certainly! |
I'm sorry. What is the plan for this issue? Thanks |
Add support for This is now possible when out of order commits are enabled. |
Cool! |
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
…handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
…handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
…handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
…handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
…handled Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
…ing-projectsGH-1189 # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java # spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java # spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java
Fixes: #1189 * Refactor `MessagingMessageListenerAdapter` * move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter` * move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter` * add `@Nullable` to `KafkaListenerErrorHandler` * GH-1189: support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc` * GH-1189: support Kotlin `suspend` * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3` * auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc * GH-1189: `@SendTo` for `@KafkaHandler` after error is handled * Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost. * add javadoc in `AdapterUtils` * move class from package `annotation` to package `adapter` * re name bar,baz in BatchMessagingMessageListenerAdapterTests * poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests` * poblish doc async-returns.adoc and nav.adoc * rename `HandlerMethodDetect` to `AsyncRepliesAware` * fix javadoc in `ContinuationHandlerMethodArgumentResolver` * After kafka client 2.4 producer uses sticky partition, its randomly chose partition and topic default partitions is 2, configure that `@EmbeddedKafka `to provide just one partition per topic. * javadoc in `AsyncRepliesAware` * fix test in EnableKafkaKotlinCoroutinesTests * polish adoc * polish `DelegatingInvocableHandler` and add javadoc * polish `HandlerAdapter` * change `InvocationResult` to record * Optimization `MessagingMessageListenerAdapter.asyncFailure` * Mention version in the doc for async return types
Related to #1189 * Some other code clean up including deprecation warning
Request/reply semantics requires that the server side listener method (annotated with
@SendTo
) return a response synchronously. There are use cases where server side request processing is asynchronous such that the listener method can't return a result immediately and must defer the response, releasing the consumer thread to handle other incoming requests. Once the result is available, an application thread would need to send it back to the client. This would be a feature similar to Servlet 3.0 Asynchronous Processing.The text was updated successfully, but these errors were encountered: