From 5ae00385bde1eae35ef9f3a670c9746a4c4b7dfc Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 5 Jun 2024 11:50:16 +0900 Subject: [PATCH 1/3] fix issue #787 for cannot close and exit properly when rebalancing storm --- .../internal/AbstractParallelEoSStreamProcessor.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 0dbeaa39d..a4ffc5ff3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -183,6 +183,12 @@ public static ControllerEventMessage of(WorkContainer work) { */ private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean(); + /** + * used for mark for interruption exclusion + */ + private final AtomicBoolean awaitingSubmittedTaskComplete = new AtomicBoolean(); + + private final OffsetCommitter committer; /** @@ -639,6 +645,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti } log.debug("Awaiting worker pool termination..."); + awaitingSubmittedTaskComplete.getAndSet(true); boolean awaitingInflightCompletion = true; while (awaitingInflightCompletion) { log.debug("Still awaiting completion of inflight work"); @@ -657,6 +664,8 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti awaitingInflightCompletion = true; } } + awaitingSubmittedTaskComplete.getAndSet(false); + if (workerThreadPool.get().getActiveCount() > 0) { log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", workerThreadPool.get().getActiveCount()); } @@ -1401,7 +1410,8 @@ public void registerWork(EpochAndRecordsMap polledRecords) { */ public void notifySomethingToDo() { boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false); - if (noTransactionInProgress) { + // not interrupt when workerThreadPool draining submitted tasks + if (noTransactionInProgress && !awaitingSubmittedTaskComplete.get()) { log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!"); interruptControlThread(); } else { From dd59f8dff6ee79c8e8c49488835a38ec7d4ea8ab Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 5 Jun 2024 11:52:44 +0900 Subject: [PATCH 2/3] update changelog --- CHANGELOG.adoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 443a6b21e..ac6219def 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -18,6 +18,7 @@ endif::[] === Fixes +* fix: fix issue for cannot close and exit properly when re-balancing storm (#787) * fix: Support for PCRetriableException in ReactorProcessor (#733) * fix: NullPointerException on partitions revoked (#757) * fix: remove lingeringOnCommitWouldBeBeneficial and unused imports (#732) From 8b18d9aae7fe2772d3c5e962166e6bb7ee004a9b Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Wed, 5 Jun 2024 19:47:40 +0900 Subject: [PATCH 3/3] address comments --- .../AbstractParallelEoSStreamProcessor.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a4ffc5ff3..5594f841e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -184,10 +184,10 @@ public static ControllerEventMessage of(WorkContainer work) { private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean(); /** - * used for mark for interruption exclusion + * Indicates state of waiting while in-flight messages complete processing on shutdown. + * Used to prevent control thread interrupt due to wakeup logic on rebalances */ - private final AtomicBoolean awaitingSubmittedTaskComplete = new AtomicBoolean(); - + private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean(); private final OffsetCommitter committer; @@ -645,7 +645,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti } log.debug("Awaiting worker pool termination..."); - awaitingSubmittedTaskComplete.getAndSet(true); + awaitingInflightProcessingCompletionOnShutdown.getAndSet(true); boolean awaitingInflightCompletion = true; while (awaitingInflightCompletion) { log.debug("Still awaiting completion of inflight work"); @@ -664,7 +664,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti awaitingInflightCompletion = true; } } - awaitingSubmittedTaskComplete.getAndSet(false); + awaitingInflightProcessingCompletionOnShutdown.getAndSet(false); if (workerThreadPool.get().getActiveCount() > 0) { log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", workerThreadPool.get().getActiveCount()); @@ -1410,8 +1410,8 @@ public void registerWork(EpochAndRecordsMap polledRecords) { */ public void notifySomethingToDo() { boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false); - // not interrupt when workerThreadPool draining submitted tasks - if (noTransactionInProgress && !awaitingSubmittedTaskComplete.get()) { + // do not interrupt while workerThreadPool is draining submitted / inflight tasks + if (noTransactionInProgress && !awaitingInflightProcessingCompletionOnShutdown.get()) { log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!"); interruptControlThread(); } else {