-
Notifications
You must be signed in to change notification settings - Fork 38.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make @Scheduled's fixedDelay work with methods returning Mono #23533
Comments
@Scheduled.fixedDelay
work with async (i.e. returning Mono<Void>) methods
@Scheduled.fixedDelay
work with async (i.e. returning Mono<Void>) methods
It does not make sense to me when someone wrote an async method then try to make it run in sync manner. Why couple that into the scheduling. |
Someone has a choice to return a Mono/Flux from the method. Returning it would denote they want Spring to consider its asynchronous completion. The user can opt not to return the mono. Furthermore I'm only proposing an API, not an implementation. The implementation could either be truly async or cause a thread to block; that is independent of my proposal. Your argument applies the same on WebFlux asynchronous REST controller methods. Do you also think those do not make sense? |
Currently investigating this. There are several hurdles to overcome which I will below. For reference here is the code that would emulate This assumes that the annotated method returns a fixedDelayAssuming we resolved a Publisher<?> p; // conceptually the publisher returned by the annotated method
Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Mono.delay(this.fixedDelay).then(iterationMono).repeat();
if (!this.initialDelay.isZero()) {
scheduledFlux = Flux.concat(
Mono.delay(this.initialDelay).then(iterationMono),
scheduledFlux
);
}
scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex) fixedRateAssuming the same but with a resolved Publisher<?> p;
Mono<Void> iterationMono = Flux.from(p).then();
Flux<Void> scheduledFlux = Flux.interval(this.initialDelay, this.fixedRate).flatMap(it -> iterationMono);
scheduledFlux.subscribe(it -> {}, ex -> this.logger.error("Unexpected error occurred in scheduled reactive task", ex) Scope of changeI would consider the following out of scope for this change:
I would also challenge the scope of the change with the following questions:
CaveatsOptional dependencies
ReactiveAdapterRegistryIdeally the support would cover not only straight
Unfortunately, the A mitigation could be to add an explicit This also plays into the next caveat. Late scheduling, tracking and cancellingIn the current arrangement, a The issue is that the registrar API only covers tasks that are Yet, the late scheduling logic could be useful. A mitigation would be to create a Note that adding this level of indirection might lead to the |
Thanks for looking into it Simon! Addressing your 3 points on scope
|
Superseded-by gh-29924 |
This commit adds support for `@Scheduled` annotation on reactive methods in fixed delay or fixed rate mode. Cron mode is not supported. Reactive methods are methods that return a Publisher or a subclass of Publisher. This is only considered if Reactor (and Reactive Streams) is present at runtime. This is implemented using Reactor operators, as a `Flux<Void>` that repeatedly flatmaps to the `Publisher` in the background, re-subscribing to it according to the `@Scheduled` configuration. The method which creates the `Publisher` is only called once. If the `Publisher` errors, the exception is logged at warn level and otherwise ignored. As a result the underlying `Flux` will continue re-subscribing to the `Publisher`. Note that if Reactor is not present, the standard processing applies and the method itself will repeatedly be scheduled for execution, ignoring the returned `Publisher` and not even subscribing to it. This effectively becomes a no-op operation unless the method has other side effects. Closes spring-projectsgh-23533
This commit adds support for `@Scheduled` annotation on reactive methods and Kotlin suspending functions. Reactive methods are methods that return a `Publisher` or a subclass of `Publisher`. The `ReactiveAdapterRegistry` is used to support many implementations, such as `Flux`, `Mono`, `Flow`, `Single`, etc. Methods should not take any argument and published values will be ignored, as they are already with synchronous support. This is implemented in `ScheduledAnnotationReactiveSupport`, which "converts" Publishers to `Runnable`. This strategy keeps track of active Subscriptions in the `ScheduledAnnotationBeanPostProcessor`, in order to cancel them all in case of shutdown. The existing scheduling support for tasks is reused, aligning the triggering behavior with the existing support: cron, fixedDelay and fixedRate are all supported strategies. If the `Publisher` errors, the exception is logged at warn level and otherwise ignored. As a result new `Runnable` instances will be created for each execution and scheduling will continue. The only difference with synchronous support is that error signals will not be thrown by those `Runnable` tasks and will not be made available to the `org.springframework.util.ErrorHandler` contract. This is due to the asynchronous and lazy nature of Publishers. Closes gh-23533 Closes gh-28515
This commit adds a note in the reference documentation stating that `ErrorHandler` infrastructure is not involved when reactive methods send an error signal: the exception is sent as a message in the pipeline and is not thrown from the task `Runnable`. See gh-23533
Context
Consider this method
The
job()
method can be counted upon to never be called until the previous run ofjob()
is finished.Now consider this method
Here,
job()
would be called 1000ms after the previous run ofjob()
returns, not when the theMono
returned bysomeAsyncMethodReturningAMono()
would terminate. If the asynchronous work takes just over a bit longer than 1000ms, it may run at the same time as previous runs. This can be mitigated by using the.block()
statement to subscribe tosomeAsyncMethodReturningAMono()
, effectively makingjob()
syncronous again, but it would be better to have a non-blocking answer to this.Proposal
To let
@Scheduled
'sfixedDelay
property, when the annotated method returns aMono
or aFlux
, start the delay count from the moment the returnedMono
orFlux
terminates instead of when the method returns theMono
, so that one can write this:This code would resonate with WebFlux users nicely.
The text was updated successfully, but these errors were encountered: