Skip to content

Commit

Permalink
[fix][client] Fix broker/Client CPU reaching 100% during retriable co…
Browse files Browse the repository at this point in the history
…nnection failure (apache#23251)
  • Loading branch information
rdhabalia authored Sep 9, 2024
1 parent 0aaa906 commit 21e256f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.client.api;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand All @@ -34,7 +34,7 @@
@SuppressWarnings("serial")
public class PulsarClientException extends IOException {
private long sequenceId = -1;
private Collection<Throwable> previous;
private AtomicInteger previousExceptionAttempt;

/**
* Constructs an {@code PulsarClientException} with the specified detail message.
Expand Down Expand Up @@ -87,47 +87,16 @@ public PulsarClientException(String msg, Throwable t) {
super(msg, t);
}

/**
* Add a list of previous exception which occurred for the same operation
* and have been retried.
*
* @param previous A collection of throwables that triggered retries
*/
public void setPreviousExceptions(Collection<Throwable> previous) {
this.previous = previous;
}

/**
* Get the collection of previous exceptions which have caused retries
* for this operation.
*
* @return a collection of exception, ordered as they occurred
*/
public Collection<Throwable> getPreviousExceptions() {
return this.previous;
public void setPreviousExceptionCount(AtomicInteger previousExceptionCount) {
this.previousExceptionAttempt = previousExceptionCount;
}

@Override
public String toString() {
if (previous == null || previous.isEmpty()) {
if (previousExceptionAttempt == null || previousExceptionAttempt.get() == 0) {
return super.toString();
} else {
StringBuilder sb = new StringBuilder(super.toString());
int i = 0;
boolean first = true;
sb.append("{\"previous\":[");
for (Throwable t : previous) {
if (first) {
first = false;
} else {
sb.append(',');
}
sb.append("{\"attempt\":").append(i++)
.append(",\"error\":\"").append(t.toString().replace("\"", "\\\""))
.append("\"}");
}
sb.append("]}");
return sb.toString();
return super.toString() + ", previous-attempt: " + previousExceptionAttempt;
}
}
/**
Expand Down Expand Up @@ -1156,39 +1125,9 @@ public static PulsarClientException unwrap(Throwable t) {
newException = new PulsarClientException(t);
}

Collection<Throwable> previousExceptions = getPreviousExceptions(t);
if (previousExceptions != null) {
newException.setPreviousExceptions(previousExceptions);
}
return newException;
}

public static Collection<Throwable> getPreviousExceptions(Throwable t) {
Throwable e = t;
for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) {
if (e instanceof PulsarClientException) {
Collection<Throwable> previous = ((PulsarClientException) e).getPreviousExceptions();
if (previous != null) {
return previous;
}
}
e = t.getCause();
}
return null;
}

public static void setPreviousExceptions(Throwable t, Collection<Throwable> previous) {
Throwable e = t;
for (int maxDepth = 20; maxDepth > 0 && e != null; maxDepth--) {
if (e instanceof PulsarClientException) {
((PulsarClientException) e).setPreviousExceptions(previous);
return;
}
e = t.getCause();
}
}


public long getSequenceId() {
return sequenceId;
}
Expand Down Expand Up @@ -1222,4 +1161,12 @@ public static boolean isRetriableError(Throwable t) {
}
return true;
}

public static void setPreviousExceptionCount(Throwable e, AtomicInteger previousExceptionCount) {
if (e instanceof PulsarClientException) {
((PulsarClientException) e).setPreviousExceptionCount(previousExceptionCount);
return;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -235,7 +234,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final Counter consumerDlqMessagesCounter;

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();
private volatile boolean hasSoughtByTimestamp = false;

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
Expand Down Expand Up @@ -825,7 +824,7 @@ public void negativeAcknowledge(Message<?> message) {

@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
previousExceptionCount.set(0);
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());

final State state = getState();
Expand Down Expand Up @@ -1069,7 +1068,7 @@ public void connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptions(previousExceptions);
exception.setPreviousExceptionCount(previousExceptionCount);
if (subscribeFuture.completeExceptionally(exception)) {
setState(State.Failed);
if (nonRetriableError) {
Expand All @@ -1083,7 +1082,7 @@ public void connectionFailed(PulsarClientException exception) {
client.cleanupConsumer(this);
}
} else {
previousExceptions.add(exception);
previousExceptionCount.incrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -174,7 +173,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private long lastBatchSendNanoTime;

private Optional<Long> topicEpoch = Optional.empty();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();

private boolean errorState;

Expand Down Expand Up @@ -1749,7 +1748,7 @@ public Iterator<OpSendMsg> iterator() {

@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
previousExceptionCount.set(0);
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
setChunkMaxMessageSize();

Expand Down Expand Up @@ -1955,7 +1954,7 @@ public void connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptions(previousExceptions);
exception.setPreviousExceptionCount(previousExceptionCount);
if (producerCreatedFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
log.info("[{}] Producer creation failed for producer {} with unretriableError = {}",
Expand All @@ -1968,7 +1967,7 @@ public void connectionFailed(PulsarClientException exception) {
client.cleanupProducer(this);
}
} else {
previousExceptions.add(exception);
previousExceptionCount.incrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -1152,7 +1153,8 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.create();
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture, new ArrayList<>(),
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture,
new AtomicInteger(0),
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
Expand All @@ -1164,7 +1166,7 @@ private void getPartitionedTopicMetadata(TopicName topicName,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture<PartitionedTopicMetadata> future,
List<Throwable> previousExceptions,
AtomicInteger previousExceptionCount,
boolean metadataAutoCreationEnabled,
boolean useFallbackForNonPIP344Brokers) {
long startTime = System.nanoTime();
Expand All @@ -1179,17 +1181,17 @@ private void getPartitionedTopicMetadata(TopicName topicName,
|| e.getCause() instanceof PulsarClientException.AuthenticationException
|| e.getCause() instanceof PulsarClientException.NotFoundException;
if (nextDelay <= 0 || isLookupThrottling) {
PulsarClientException.setPreviousExceptions(e, previousExceptions);
PulsarClientException.setPreviousExceptionCount(e, previousExceptionCount);
future.completeExceptionally(e);
return null;
}
previousExceptions.add(e);
previousExceptionCount.getAndIncrement();

((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> {
log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
+ "Will try again in {} ms", topicName, nextDelay);
remainingTime.addAndGet(-nextDelay);
getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions,
getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptionCount,
metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@

import com.google.re2j.Pattern;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -54,7 +53,7 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private String topicsHash;
private final CompletableFuture<TopicListWatcher> watcherFuture;

private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();
private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>();

private final Runnable recheckTopicsChangeAfterReconnect;
Expand Down Expand Up @@ -93,21 +92,21 @@ public TopicListWatcher(PatternConsumerUpdateQueue patternConsumerUpdateQueue,
public void connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
if (nonRetriableError) {
exception.setPreviousExceptions(previousExceptions);
exception.setPreviousExceptionCount(previousExceptionCount);
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
log.info("[{}] Watcher creation failed for {} with non-retriable error {}",
topic, name, exception.getMessage());
deregisterFromClientCnx();
}
} else {
previousExceptions.add(exception);
previousExceptionCount.incrementAndGet();
}
}

@Override
public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
previousExceptions.clear();
previousExceptionCount.set(0);

State state = getState();
if (state == State.Closing || state == State.Closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand Down Expand Up @@ -90,7 +90,7 @@ public RequestTime(long creationTime, long requestId) {

private final CompletableFuture<Void> connectFuture;
private final long lookupDeadline;
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();



Expand Down Expand Up @@ -126,7 +126,7 @@ public void connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptions(previousExceptions);
exception.setPreviousExceptionCount(previousExceptionCount);
if (connectFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.",
Expand All @@ -138,7 +138,7 @@ public void connectionFailed(PulsarClientException exception) {
setState(State.Failed);
}
} else {
previousExceptions.add(exception);
previousExceptionCount.getAndIncrement();
}
}

Expand Down

0 comments on commit 21e256f

Please sign in to comment.