From aff6bcafa4f183b90e52992ebc647c3910ace9f1 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 14:11:17 +0800 Subject: [PATCH 1/8] [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption ### Motivation The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method with any schedule delay. Running a performance test for single topic max message read rate test: ``` bin/pulsar-perf consume test -q 1000000 -p 100000000 bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2 ``` Without this PR (2.10.1): Profiling started 2022-06-27T13:44:01,183+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851 msg/s --- 2.027 Mbit/s --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057 2022-06-27T13:44:11,196+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125 msg/s --- 2.112 Mbit/s --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042 2022-06-27T13:44:21,216+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861 msg/s --- 1.598 Mbit/s --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548 2022-06-27T13:44:31,233+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932 msg/s --- 1.878 Mbit/s --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847 2022-06-27T13:44:41,247+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313 msg/s --- 1.791 Mbit/s --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286 With this PR: Profiling started 2022-06-27T13:56:20,426+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516 msg/s --- 8.235 Mbit/s --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524 2022-06-27T13:56:30,438+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852 msg/s --- 7.634 Mbit/s --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55 2022-06-27T13:56:40,450+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040 msg/s --- 7.629 Mbit/s --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177 2022-06-27T13:56:50,462+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458 msg/s --- 7.624 Mbit/s --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52 2022-06-27T13:57:00,475+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584 msg/s --- 7.635 Mbit/s --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66 ### Modification - Change internal executor and external executor to normal executor service - Added a new ScheduledExecutorProvider to handle the scheduled tasks. --- .../pulsar/client/impl/ConsumerBase.java | 9 +++-- .../pulsar/client/impl/ConsumerImpl.java | 12 ++++--- .../client/impl/MultiTopicsConsumerImpl.java | 4 ++- .../pulsar/client/impl/PulsarClientImpl.java | 12 ++++++- .../pulsar/client/util/ExecutorProvider.java | 10 +++--- .../util/ScheduledExecutorProvider.java | 36 +++++++++++++++++++ 6 files changed, 67 insertions(+), 16 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index a129091d609cb..0da17def09757 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -79,8 +78,8 @@ public abstract class ConsumerBase extends HandlerState implements Consumer listener; protected final ConsumerEventListener consumerEventListener; protected final ExecutorProvider executorProvider; - protected final ScheduledExecutorService externalPinnedExecutor; - protected final ScheduledExecutorService internalPinnedExecutor; + protected final ExecutorService externalPinnedExecutor; + protected final ExecutorService internalPinnedExecutor; final BlockingQueue> incomingMessages; protected ConcurrentOpenHashMap unAckedChunkedMessageIdSequenceMap; protected final ConcurrentLinkedQueue>> pendingReceives; @@ -128,8 +127,8 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.unAckedChunkedMessageIdSequenceMap = ConcurrentOpenHashMap.newBuilder().build(); this.executorProvider = executorProvider; - this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor(); - this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService(); + this.externalPinnedExecutor = executorProvider.getExecutor(); + this.internalPinnedExecutor = client.getInternalExecutorService(); this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.pendingBatchReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ffd2f68d7602c..7de1c7b723900 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - internalPinnedExecutor - .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), - expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, - TimeUnit.MILLISECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider()).scheduleAtFixedRate( + catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), + expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, + TimeUnit.MILLISECONDS + ); expireChunkMessageTaskScheduled = true; } @@ -2387,7 +2389,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, return; } - internalPinnedExecutor.schedule(() -> { + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> { log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6cd02f2698f9d..d40fe0b0e43c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -276,7 +277,8 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { return null; } log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex); - internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider()) + .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); return null; }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 8a7d50f0d4b87..2460f4c53f5a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -77,6 +77,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl; import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -108,6 +109,8 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + + private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; private final boolean createdCnxPool; @@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(), + "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { @@ -1020,7 +1025,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, } previousExceptions.add(e); - ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> { + ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> { log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- " + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -1142,6 +1147,11 @@ protected CompletableFuture> preProcessSchemaBeforeSubscribe(Pulsa public ExecutorService getInternalExecutorService() { return internalExecutorProvider.getExecutor(); } + + public ScheduledExecutorProvider getScheduledExecutorProvider() { + return scheduledExecutorProvider; + } + // // Transaction related API // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index b5fb3543b820f..67606af63a770 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -25,7 +25,6 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; @@ -41,7 +40,7 @@ public class ExecutorProvider { private final String poolName; private volatile boolean isShutdown; - private static class ExtendedThreadFactory extends DefaultThreadFactory { + protected static class ExtendedThreadFactory extends DefaultThreadFactory { @Getter private Thread thread; @@ -56,7 +55,6 @@ public Thread newThread(Runnable r) { } } - public ExecutorProvider(int numThreads, String poolName) { checkArgument(numThreads > 0); this.numThreads = numThreads; @@ -65,13 +63,17 @@ public ExecutorProvider(int numThreads, String poolName) { for (int i = 0; i < numThreads; i++) { ExtendedThreadFactory threadFactory = new ExtendedThreadFactory( poolName, Thread.currentThread().isDaemon()); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + ExecutorService executor = createExecutor(threadFactory); executors.add(Pair.of(executor, threadFactory)); } isShutdown = false; this.poolName = poolName; } + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadExecutor(threadFactory); + } + public ExecutorService getExecutor() { return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java new file mode 100644 index 0000000000000..887ae3bb7fff4 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ScheduledExecutorProvider extends ExecutorProvider { + + public ScheduledExecutorProvider(int numThreads, String poolName) { + super(numThreads, poolName); + } + + @Override + protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } +} From fd6ecf3192a0d750ff2353c9e0b9aea7347de004 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 15:13:34 +0800 Subject: [PATCH 2/8] Fix test. --- .../pulsar/broker/transaction/pendingack/PendingAckStore.java | 4 ++-- .../transaction/pendingack/impl/InMemoryPendingAckStore.java | 4 ++-- .../broker/transaction/pendingack/impl/MLPendingAckStore.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java index 3da676eb827d0..2f85d2430dbbd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; @@ -38,7 +38,7 @@ public interface PendingAckStore { * @param pendingAckHandle the handle of pending ack * @param executorService the replay executor service */ - void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService); + void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService); /** * Close the transaction pending ack store. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java index d882c80c47863..44c9fbe039b0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; @@ -33,7 +33,7 @@ public class InMemoryPendingAckStore implements PendingAckStore { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) { pendingAckHandle.changeToReadyState(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index 0828340162ec7..dd58fe774a8fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -26,7 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -107,7 +107,7 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, } @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) { transactionReplayExecutor .execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle))); } From a0f935731ec097b1e02cbec3fc09a74bbff27c98 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 15:16:03 +0800 Subject: [PATCH 3/8] Fix test. --- .../transaction/pendingack/impl/PendingAckHandleImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 6376634761fbc..b85db54dce521 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -157,8 +157,7 @@ private void initPendingAckStore() { this.pendingAckStoreFuture = pendingAckStoreProvider.newPendingAckStore(persistentSubscription); this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - pendingAckStore.replayAsync(this, - (ScheduledExecutorService) internalPinnedExecutor); + pendingAckStore.replayAsync(this, internalPinnedExecutor); }).exceptionally(e -> { acceptQueue.clear(); changeToErrorState(); From 35faa1d176ab963864dbf98977741b5a03d0fd47 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 15:35:14 +0800 Subject: [PATCH 4/8] Fix test. --- .../broker/service/persistent/PersistentSubscriptionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 2ef88c51b5aa2..2d1e72d45c80e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -40,7 +40,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -129,7 +129,7 @@ public void setup() throws Exception { public CompletableFuture newPendingAckStore(PersistentSubscription subscription) { return CompletableFuture.completedFuture(new PendingAckStore() { @Override - public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) { + public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) { try { Field field = PendingAckHandleState.class.getDeclaredField("state"); field.setAccessible(true); From e6dee1e159c4cb5b84b3bf36e91f0e617e190181 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 15:51:41 +0800 Subject: [PATCH 5/8] Fix checkstyle --- .../broker/transaction/pendingack/impl/PendingAckHandleImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index b85db54dce521..f69908827017a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import lombok.Getter; import lombok.extern.slf4j.Slf4j; From 418de4ceb4a4c37039ce9a9aee3181af9c968cd5 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 17:54:36 +0800 Subject: [PATCH 6/8] Fix typo --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 7de1c7b723900..692b2d2868fdb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1379,7 +1379,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { - ((ScheduledExecutorService) client.getScheduledExecutorProvider()).scheduleAtFixedRate( + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate( catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS From b1addfde6093c82cbce2d2a25a48ae875c728036 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 27 Jun 2022 20:36:11 +0800 Subject: [PATCH 7/8] Apply comment. --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 692b2d2868fdb..5fd09b4b3530f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1380,7 +1380,8 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate( - catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages), + () -> internalPinnedExecutor + .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)), expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis, TimeUnit.MILLISECONDS ); From 21c712d2e0e71d79cf1855bb7186f06ba827048b Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 28 Jun 2022 08:47:44 +0800 Subject: [PATCH 8/8] Fix test. --- .../org/apache/pulsar/client/api/MultiTopicsConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index f325091579ce8..14d818ce3bebb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -72,7 +72,7 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws return new PulsarClientImpl(conf) { { ScheduledExecutorService internalExecutorService = - (ScheduledExecutorService) super.getInternalExecutorService(); + (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor(); internalExecutorServiceDelegate = mock(ScheduledExecutorService.class, // a spy isn't used since that doesn't work for private classes, instead // the mock delegatesTo an existing instance. A delegate is sufficient for verifying