Skip to content

Commit

Permalink
[fix][client] TransactionCoordinatorClient support retry (apache#23081)
Browse files Browse the repository at this point in the history
(cherry picked from commit e42b35c)
  • Loading branch information
chenhongSZ authored and srinath-ctds committed Aug 12, 2024
1 parent 4d19379 commit 5631b66
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -107,4 +111,24 @@ public void testTransactionCoordinatorExceptionUnwrap() {
instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}
}

@Test
public void testClientStartWithRetry() throws Exception{
String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl();
String invalidBrokerServiceUrl = "localhost:0";
String brokerServiceUrl = validBrokerServiceUrl + "," + invalidBrokerServiceUrl;

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
@Cleanup
TransactionCoordinatorClient transactionCoordinatorClient = new TransactionCoordinatorClientImpl(pulsarClient);

try {
transactionCoordinatorClient.start();
}catch (TransactionCoordinatorClientException e) {
Assert.fail("Shouldn't have exception at here", e);
}

Assert.assertEquals(transactionCoordinatorClient.getState(), State.READY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -88,6 +89,10 @@ public RequestTime(long creationTime, long requestId) {
private Timeout requestTimeout;

private final CompletableFuture<Void> connectFuture;
private final long lookupDeadline;
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();



public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic,
CompletableFuture<Void> connectFuture) {
Expand All @@ -109,6 +114,7 @@ public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientIm
this.connectFuture = connectFuture;
this.internalPinnedExecutor = pulsarClient.getInternalExecutorService();
this.timer = pulsarClient.timer();
this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
}

public void start() {
Expand All @@ -117,10 +123,22 @@ public void start() {

@Override
public void connectionFailed(PulsarClientException exception) {
LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.",
transactionCoordinatorId, exception);
if (!this.connectFuture.isDone()) {
this.connectFuture.completeExceptionally(exception);
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptions(previousExceptions);
if (connectFuture.completeExceptionally(exception)) {
if (nonRetriableError) {
LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.",
transactionCoordinatorId, exception);
} else {
LOG.error("Transaction meta handler with transaction coordinator id {} connection failed after "
+ "timeout", transactionCoordinatorId, exception);
}
setState(State.Failed);
}
} else {
previousExceptions.add(exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void start() throws TransactionCoordinatorClientException {
@Override
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
return pulsarClient.getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true)
return pulsarClient.getPartitionedTopicMetadata(
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true)
.thenCompose(partitionMeta -> {
List<CompletableFuture<Void>> connectFutureList = new ArrayList<>();
if (LOG.isDebugEnabled()) {
Expand Down

0 comments on commit 5631b66

Please sign in to comment.