Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactorProcessor executes a user-provided function in a thread from the pc-pool rather than in the provided scheduler #793

Closed
yevheniisemenov opened this issue Jun 10, 2024 · 5 comments

Comments

@yevheniisemenov
Copy link
Contributor

As far as I understand, when I use ReactorProcessor, it's expected that the user-provided function will be executed in a scheduler that I provide during initialization. However, it actually executes in a thread from the pc-pool pool.

For example, this code

ReactorProcessor<Object, Object> processor = new ReactorProcessor<>(options, Schedulers::boundedElastic);

processor.react(poolContext -> {
    System.out.println(Thread.currentThread().getName() + ": hello");
    return Mono.fromRunnable(() -> System.out.println(Thread.currentThread().getName() + ": world"));
});

prints

pc-pool-2-thread-1: hello
boundedElastic-1: world

while the expected (as for me) result should be

boundedElastic-1: hello
boundedElastic-1: world

This means that with the current implementation, it's possible for users to block threads from the pc-pool, which may not be clear or expected behavior.

Also, if we dive deeper into the react implementation, we can see that it has multiple thread switches: the first by .subscribeOn and the second by .publishOn.

Publisher<?> publisher = carefullyRun(reactorFunction, pollContext.getPollContext());
Disposable flux = Flux.from(publisher)
// using #subscribeOn so this should be redundant, but testing has shown otherwise
// note this will not cause user's function to run in pool - without successful use of subscribeOn,
// it will run in the controller thread, unless user themselves uses either publishOn or successful
// subscribeOn
.publishOn(getScheduler())
.doOnNext(signal -> log.trace("doOnNext {}", signal))
// cause users Publisher to run a thread pool, if it hasn't already - this is a crucial magical part
.subscribeOn(getScheduler())
.subscribe(ignore -> onComplete(pollContext), throwable -> onError(pollContext, throwable));

However, .subscribeOn only works for the reactive part of the user-defined function, which is why the first print from my example is executed in a pc-pool thread.

As for .publishOn, it switches threads again for the doOnNext and onComplete parts, which seems unnecessary to me - we just spent CPU on a thread switch here.

To fix this problem, we need to move carefullyRun into the reactive context and remove publishOn. This should be enough.

I can provide a PR later today if you agree with my proposal.

Thanks!

@rkolesnev
Copy link
Contributor

Hi @yevheniisemenov,

Thanks for detailed explanation of the issue and PR with a fix. I am not very well versed in usage of Reactor framework so the explanation of the problem is very helpful.

Cheers

@yevheniisemenov
Copy link
Contributor Author

@rkolesnev, thanks for merging! I had a follow-up question: when can I expect to see this commit in the next release? Thanks!

@rkolesnev
Copy link
Contributor

I am actually preparing the release - your PR got included last minute as I thought will rather included it now than wait for next release.
Should be released today or this week at the latest.

@yevheniisemenov
Copy link
Contributor Author

Sounds great, thank you!

@rkolesnev
Copy link
Contributor

Fix merged, closing the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants