Skip to content

Commit

Permalink
Run used-defined function in provided scheduler rather than in pc-poo…
Browse files Browse the repository at this point in the history
…l thread (#798)

* Run used-defined function in provided scheduler rather than in pc-pool thread (#797)

Co-authored-by: Yevhenii Semenov <zhe.semenov@gmail.com>

* update readme / changelog

* updated license headers

---------

Co-authored-by: Yevhenii Semenov <zhe.semenov@gmail.com>
  • Loading branch information
rkolesnev and yevheniisemenov authored Jun 11, 2024
1 parent 80128ee commit d4cfcb7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ endif::[]

=== Fixes

* fix: ReactorProcessor - run used-defined function in provided scheduler rather than in pc-pool thread (#798 / #794), fixes (#793)
* fix: fix issue for cannot close and exit properly when re-balancing storm (#787)
* fix: Support for PCRetriableException in ReactorProcessor (#733)
* fix: NullPointerException on partitions revoked (#757)
Expand Down
3 changes: 3 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,8 @@ endif::[]

=== Fixes

* fix: ReactorProcessor - run used-defined function in provided scheduler rather than in pc-pool thread (#798 / #794), fixes (#793)
* fix: fix issue for cannot close and exit properly when re-balancing storm (#787)
* fix: Support for PCRetriableException in ReactorProcessor (#733)
* fix: NullPointerException on partitions revoked (#757)
* fix: remove lingeringOnCommitWouldBeBeneficial and unused imports (#732)
Expand All @@ -1546,6 +1548,7 @@ endif::[]

=== Improvements

* improvement: stale containers exclusion and handling improvement (#779)
* improvement: add multiple caches for accelerating available container count calculation (#667)
* improvement: RecordContext now exposes lastFailureReason (#725)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.reactivestreams.Publisher;
import pl.tlinkowski.unij.api.UniLists;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -95,16 +95,9 @@ public void react(Function<PollContext<K, V>, Publisher<?>> reactorFunction) {
pollContext.streamWorkContainers()
.forEach(x -> x.setWorkType(REACTOR_TYPE));

Publisher<?> publisher = carefullyRun(reactorFunction, pollContext.getPollContext());

Disposable flux = Flux.from(publisher)
// using #subscribeOn so this should be redundant, but testing has shown otherwise
// note this will not cause user's function to run in pool - without successful use of subscribeOn,
// it will run in the controller thread, unless user themselves uses either publishOn or successful
// subscribeOn
.publishOn(getScheduler())
Disposable flux = Mono.fromCallable(() -> carefullyRun(reactorFunction, pollContext.getPollContext()))
.flatMapMany(it -> it)
.doOnNext(signal -> log.trace("doOnNext {}", signal))
// cause users Publisher to run a thread pool, if it hasn't already - this is a crucial magical part
.subscribeOn(getScheduler())
.subscribe(ignore -> onComplete(pollContext), throwable -> onError(pollContext, throwable));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.reactor;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.LatchTestUtils;
Expand Down Expand Up @@ -44,13 +44,13 @@ void kickTires() {
primeFirstRecord();

ConcurrentLinkedQueue<Object> msgs = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<String> threads = new ConcurrentLinkedQueue<>();

reactorPC.react((rec) -> {
log.info("Reactor user poll function: {}", rec);
msgs.add(rec);
Mono<String> result = Mono.just(StringUtils.msg("result: {}:{}", rec.offset(), rec.value()));
// Flux<String> stringFlux = fromPath("/tmp/out.html");
return result;
threads.add(Thread.currentThread().getName());
return Mono.just(StringUtils.msg("result: {}:{}", rec.offset(), rec.value()));
});

await()
Expand All @@ -63,6 +63,10 @@ void kickTires() {
assertThat(consumerSpy)
.hasCommittedToPartition(topicPartition)
.atLeastOffset(4);

assertWithMessage("The user-defined function should be executed by the scheduler")
.that(threads.stream().allMatch(thread -> thread.startsWith("boundedElastic")))
.isTrue();
});
}

Expand Down

0 comments on commit d4cfcb7

Please sign in to comment.