diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java index 2cab3985d5ea..89cd94f68506 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -135,20 +136,16 @@ public void testGet() } @Test(expectedExceptions = ServiceUnavailableException.class) - public void testRejectHandler() - throws InterruptedException { + public void testRejectHandler() { BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics); - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService(); + ExecutorService threadPoolExecutor = provider.getExecutorService(); // test the rejection policy - AtomicInteger counter = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(10); - for (int i = 0; i < 10; i++) { - threadPoolExecutor.execute(() -> { - counter.incrementAndGet(); - latch.countDown(); - }); + int taskCount = 3; + Phaser phaser = new Phaser(taskCount); + for (int i = 0; i < taskCount; i++) { + threadPoolExecutor.execute(phaser::arriveAndAwaitAdvance); } - latch.await(); + phaser.arriveAndDeregister(); } }