From d4cfcb7f83fccc7e49eb592222f576b7aeda7e2e Mon Sep 17 00:00:00 2001 From: Roman Kolesnev Date: Tue, 11 Jun 2024 11:59:57 +0100 Subject: [PATCH] Run used-defined function in provided scheduler rather than in pc-pool thread (#798) * Run used-defined function in provided scheduler rather than in pc-pool thread (#797) Co-authored-by: Yevhenii Semenov * update readme / changelog * updated license headers --------- Co-authored-by: Yevhenii Semenov --- CHANGELOG.adoc | 1 + README.adoc | 3 +++ .../parallelconsumer/reactor/ReactorProcessor.java | 13 +++---------- .../parallelconsumer/reactor/ReactorPCTest.java | 12 ++++++++---- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index be4ff880e..4455b67cf 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -18,6 +18,7 @@ endif::[] === Fixes +* fix: ReactorProcessor - run used-defined function in provided scheduler rather than in pc-pool thread (#798 / #794), fixes (#793) * fix: fix issue for cannot close and exit properly when re-balancing storm (#787) * fix: Support for PCRetriableException in ReactorProcessor (#733) * fix: NullPointerException on partitions revoked (#757) diff --git a/README.adoc b/README.adoc index 19f349b86..c83453e16 100644 --- a/README.adoc +++ b/README.adoc @@ -1538,6 +1538,8 @@ endif::[] === Fixes +* fix: ReactorProcessor - run used-defined function in provided scheduler rather than in pc-pool thread (#798 / #794), fixes (#793) +* fix: fix issue for cannot close and exit properly when re-balancing storm (#787) * fix: Support for PCRetriableException in ReactorProcessor (#733) * fix: NullPointerException on partitions revoked (#757) * fix: remove lingeringOnCommitWouldBeBeneficial and unused imports (#732) @@ -1546,6 +1548,7 @@ endif::[] === Improvements +* improvement: stale containers exclusion and handling improvement (#779) * improvement: add multiple caches for accelerating available container count calculation (#667) * improvement: RecordContext now exposes lastFailureReason (#725) diff --git a/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java b/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java index 7cac5ca10..0ce8b6a4d 100644 --- a/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java +++ b/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java @@ -16,7 +16,7 @@ import org.reactivestreams.Publisher; import pl.tlinkowski.unij.api.UniLists; import reactor.core.Disposable; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -95,16 +95,9 @@ public void react(Function, Publisher> reactorFunction) { pollContext.streamWorkContainers() .forEach(x -> x.setWorkType(REACTOR_TYPE)); - Publisher publisher = carefullyRun(reactorFunction, pollContext.getPollContext()); - - Disposable flux = Flux.from(publisher) - // using #subscribeOn so this should be redundant, but testing has shown otherwise - // note this will not cause user's function to run in pool - without successful use of subscribeOn, - // it will run in the controller thread, unless user themselves uses either publishOn or successful - // subscribeOn - .publishOn(getScheduler()) + Disposable flux = Mono.fromCallable(() -> carefullyRun(reactorFunction, pollContext.getPollContext())) + .flatMapMany(it -> it) .doOnNext(signal -> log.trace("doOnNext {}", signal)) - // cause users Publisher to run a thread pool, if it hasn't already - this is a crucial magical part .subscribeOn(getScheduler()) .subscribe(ignore -> onComplete(pollContext), throwable -> onError(pollContext, throwable)); diff --git a/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorPCTest.java b/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorPCTest.java index 763497e31..a9dd99c5d 100644 --- a/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorPCTest.java +++ b/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorPCTest.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.reactor; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2024 Confluent, Inc. */ import io.confluent.csid.utils.LatchTestUtils; @@ -44,13 +44,13 @@ void kickTires() { primeFirstRecord(); ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue threads = new ConcurrentLinkedQueue<>(); reactorPC.react((rec) -> { log.info("Reactor user poll function: {}", rec); msgs.add(rec); - Mono result = Mono.just(StringUtils.msg("result: {}:{}", rec.offset(), rec.value())); -// Flux stringFlux = fromPath("/tmp/out.html"); - return result; + threads.add(Thread.currentThread().getName()); + return Mono.just(StringUtils.msg("result: {}:{}", rec.offset(), rec.value())); }); await() @@ -63,6 +63,10 @@ void kickTires() { assertThat(consumerSpy) .hasCommittedToPartition(topicPartition) .atLeastOffset(4); + + assertWithMessage("The user-defined function should be executed by the scheduler") + .that(threads.stream().allMatch(thread -> thread.startsWith("boundedElastic"))) + .isTrue(); }); }