Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction]No TransactionCoordinatorNotFound, but automatic reconnect #13135

Merged
merged 27 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3350b94
[Transaction]No TransactionCoordinatorNotFound, but automatic reconne…
liangyepianzhou Dec 4, 2021
e233fe4
1. use timer
liangyepianzhou Dec 7, 2021
cbe3367
1. use timer from client
liangyepianzhou Dec 7, 2021
85ad312
1. handle ManagedLedgerFencedException
liangyepianzhou Dec 7, 2021
63d9660
1. add handle TransactionMetadataStoreStateExceptionv and Connecting
liangyepianzhou Dec 8, 2021
e9d17fd
listener header
liangyepianzhou Dec 8, 2021
cd56997
1. add a TcOperationRetryException to represent the exceptions which …
liangyepianzhou Dec 8, 2021
1c92668
1. modify PulsarClientException, ClientCnx
liangyepianzhou Dec 8, 2021
e3cc397
1. Add the operation to a ConcurrentLongHashMap when state is connecting
liangyepianzhou Dec 9, 2021
eebd568
fix lock with try finally
liangyepianzhou Dec 9, 2021
7d1caa1
Detail optimization:
liangyepianzhou Dec 10, 2021
ebfd47c
Detail optimization:
liangyepianzhou Dec 10, 2021
56154be
RuntimeException
liangyepianzhou Dec 10, 2021
d545001
Optimize call logic
liangyepianzhou Dec 10, 2021
404d1b0
1. optimize log and naming
liangyepianzhou Dec 11, 2021
85ac0d4
delete the RetryException and relate test
liangyepianzhou Dec 12, 2021
6c8b907
update
liangyepianzhou Dec 12, 2021
22e5811
update
liangyepianzhou Dec 12, 2021
4e3916a
delete tcFenced
liangyepianzhou Dec 12, 2021
1667e77
Merge remote-tracking branch 'apache/master' into TC_not_found
liangyepianzhou Dec 12, 2021
5a18ca6
1. change TransactionMetaStoreHandler single thread
liangyepianzhou Dec 13, 2021
2d06428
1. make the connect be single thread
liangyepianzhou Dec 14, 2021
2fd3b7f
1. make the connect be single thread
liangyepianzhou Dec 14, 2021
3d2d45e
1. Add callback timeout check
liangyepianzhou Dec 14, 2021
e19e3e6
take out the operation of adding op to pendingRequest in checkStateAn…
liangyepianzhou Dec 14, 2021
4a92069
take out the operation of adding op to pendingRequest in checkStateAn…
liangyepianzhou Dec 14, 2021
4d5a792
Merge remote-tracking branch 'apache/master' into TC_not_found
liangyepianzhou Dec 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);

}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
})).exceptionally(e -> {
if (!isRetryableException(e.getCause())) {
Expand All @@ -371,7 +371,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);

}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
} else {
Expand All @@ -391,7 +391,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
} else {
Expand All @@ -409,7 +409,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout),
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
return completableFuture;
Expand Down Expand Up @@ -499,7 +499,7 @@ private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAc
return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
}

private static boolean isRetryableException(Throwable e) {
public static boolean isRetryableException(Throwable e) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
return (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,26 @@ private boolean checkTransactionEnableAndSendError(long requestId) {
return true;
}
}
private Throwable handleTxnException(Throwable ex, String op, long requestId) {
if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null
&& ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) {
if (log.isDebugEnabled()) {
log.debug("The Coordinator was not found for the request {}", op);
}
return ex;
}
if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null
&& ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) {
if (log.isDebugEnabled()) {
log.debug("Throw a CoordinatorNotFoundException to client "
+ "with the message got from a ManagedLedgerFencedException for the request {}", op);
}
return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());

}
log.error("Send response error for {} request {}.", op, requestId, ex);
return ex;
}
@Override
protected void handleNewTxn(CommandNewTxn command) {
final long requestId = command.getRequestId();
Expand All @@ -2022,9 +2041,7 @@ protected void handleNewTxn(CommandNewTxn command) {
ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request {}", requestId, ex);
}
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);

ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
Expand Down Expand Up @@ -2060,19 +2077,11 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}", requestId,
ex);
}
ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
Expand All @@ -2099,16 +2108,10 @@ protected void handleEndTxn(CommandEndTxn command) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
log.error("Send response error for end txn request.", ex);
ex = handleTxnException(ex, BaseCommand.Type.END_TXN.name(), requestId);
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
});
Expand Down Expand Up @@ -2319,20 +2322,11 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
log.info("handle add partition to txn finish.");
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}",
requestId, ex);
}
ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
Expand Down
Loading