Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][java-client] Replace ScheduledExecutor to improve performance of message consumption #16236

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
Expand All @@ -38,7 +38,7 @@ public interface PendingAckStore {
* @param pendingAckHandle the handle of pending ack
* @param executorService the replay executor service
*/
void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService);
void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService);

/**
* Close the transaction pending ack store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
Expand All @@ -33,7 +33,7 @@
public class InMemoryPendingAckStore implements PendingAckStore {

@Override
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) {
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) {
pendingAckHandle.changeToReadyState();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -107,7 +107,7 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
}

@Override
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) {
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) {
transactionReplayExecutor
.execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -157,8 +156,7 @@ private void initPendingAckStore() {
this.pendingAckStoreFuture =
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
pendingAckStore.replayAsync(this,
(ScheduledExecutorService) internalPinnedExecutor);
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
changeToErrorState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void setup() throws Exception {
public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
return CompletableFuture.completedFuture(new PendingAckStore() {
@Override
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) {
public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) {
try {
Field field = PendingAckHandleState.class.getDeclaredField("state");
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -79,8 +78,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
protected final ScheduledExecutorService externalPinnedExecutor;
protected final ScheduledExecutorService internalPinnedExecutor;
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
Expand Down Expand Up @@ -128,8 +127,8 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
this.externalPinnedExecutor = executorProvider.getExecutor();
this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1378,10 +1379,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m

// Lazy task scheduling to expire incomplete chunk message
if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
internalPinnedExecutor
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS);
((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS
);
expireChunkMessageTaskScheduled = true;
}

Expand Down Expand Up @@ -2387,7 +2389,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
return;
}

internalPinnedExecutor.schedule(() -> {
((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -276,7 +277,8 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
return null;
}
log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
((ScheduledExecutorService) client.getScheduledExecutorProvider())
.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class PulsarClientImpl implements PulsarClient {
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;

private final ScheduledExecutorProvider scheduledExecutorProvider;
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;

Expand Down Expand Up @@ -193,6 +196,8 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
"pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, this.eventLoopGroup);
} else {
Expand Down Expand Up @@ -1020,7 +1025,7 @@ private void getPartitionedTopicMetadata(TopicName topicName,
}
previousExceptions.add(e);

((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> {
((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> {
log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
Expand Down Expand Up @@ -1142,6 +1147,11 @@ protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(Pulsa
public ExecutorService getInternalExecutorService() {
return internalExecutorProvider.getExecutor();
}

public ScheduledExecutorProvider getScheduledExecutorProvider() {
return scheduledExecutorProvider;
}

//
// Transaction related API
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
Expand All @@ -41,7 +40,7 @@ public class ExecutorProvider {
private final String poolName;
private volatile boolean isShutdown;

private static class ExtendedThreadFactory extends DefaultThreadFactory {
protected static class ExtendedThreadFactory extends DefaultThreadFactory {

@Getter
private Thread thread;
Expand All @@ -56,7 +55,6 @@ public Thread newThread(Runnable r) {
}
}


public ExecutorProvider(int numThreads, String poolName) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
Expand All @@ -65,13 +63,17 @@ public ExecutorProvider(int numThreads, String poolName) {
for (int i = 0; i < numThreads; i++) {
ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
poolName, Thread.currentThread().isDaemon());
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
ExecutorService executor = createExecutor(threadFactory);
executors.add(Pair.of(executor, threadFactory));
}
isShutdown = false;
this.poolName = poolName;
}

protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}

public ExecutorService getExecutor() {
return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.util;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ScheduledExecutorProvider extends ExecutorProvider {

public ScheduledExecutorProvider(int numThreads, String poolName) {
super(numThreads, poolName);
}

@Override
protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}
}