Skip to content

Commit

Permalink
[improve][txn] Allow superusers to abort transactions (#19467)
Browse files Browse the repository at this point in the history
Super users must be always allowed to abort a transaction even if they're not the original owner.

* Check that only owner or superusers are allowed to perform txn operations (end, add partition and add subscription)

(cherry picked from commit 459a7a5)
  • Loading branch information
nicoloboschi committed Feb 9, 2023
1 parent a835a84 commit d7686f1
Show file tree
Hide file tree
Showing 18 changed files with 590 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,13 @@ public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordin
}
}

public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills,
String owner) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.newTransaction(timeoutInMills);
return store.newTransaction(timeoutInMills, owner);
}

public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
Expand Down Expand Up @@ -483,6 +484,21 @@ public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
return Collections.unmodifiableMap(stores);
}

public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOwner) {
return getTxnMeta(txnID)
.thenCompose(meta -> {
// owner was null in the old versions or no auth enabled
if (meta.getOwner() == null) {
return CompletableFuture.completedFuture(true);
}
if (meta.getOwner().equals(checkOwner)) {
return CompletableFuture.completedFuture(true);
}
return CompletableFuture.completedFuture(false);
});
}


public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ private void getTransactionMetadata(TxnMeta txnMeta,
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
transactionMetadata.owner = txnMeta.getOwner();

List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>();
Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2311,7 +2311,8 @@ protected void handleNewTxn(CommandNewTxn command) {

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds())
final String owner = getPrincipal();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2355,9 +2356,15 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
.whenComplete(((v, ex) -> {
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService
.addProducedPartitionToTxn(txnID, command.getPartitionsList());
})
.whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}", requestId);
Expand All @@ -2374,7 +2381,25 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
});
}

private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
String msg = String.format(
"Client (%s) is neither the owner of the transaction %s nor a super user",
getPrincipal(), txnID
);
log.warn("[{}] {}", remoteAddress, msg);
return CompletableFuture.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
}

private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
String msg = String.format(
"TC client (%s) is not a super user, and is not allowed to operate on transaction %s",
getPrincipal(), txnID
);
log.warn("[{}] {}", remoteAddress, msg);
return CompletableFuture.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
}

@Override
Expand All @@ -2392,12 +2417,16 @@ protected void handleEndTxn(CommandEndTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService
.endTransaction(txnID, txnAction, false)
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
})
.whenComplete((v, ex) -> {
if (ex == null) {
commandSender.sendEndTxnResponse(requestId,
txnID, txnAction);
commandSender.sendEndTxnResponse(requestId, txnID, txnAction);
} else {
ex = handleTxnException(ex, BaseCommand.Type.END_TXN.name(), requestId);
commandSender.sendEndTxnErrorResponse(requestId, txnID,
Expand All @@ -2408,6 +2437,34 @@ protected void handleEndTxn(CommandEndTxn command) {
});
}

private CompletableFuture<Boolean> verifyTxnOwnershipForTCToBrokerCommands() {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
return getBrokerService()
.getAuthorizationService()
.isSuperUser(getPrincipal(), getAuthenticationData());
} else {
return CompletableFuture.completedFuture(true);
}
}

private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
final String checkOwner = getPrincipal();
return service.pulsar().getTransactionMetadataStoreService()
.verifyTxnOwnership(txnID, checkOwner)
.thenCompose(isOwner -> {
if (isOwner) {
return CompletableFuture.completedFuture(true);
}
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
return getBrokerService()
.getAuthorizationService()
.isSuperUser(checkOwner, getAuthenticationData());
} else {
return CompletableFuture.completedFuture(false);
}
});
}

@Override
protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
checkArgument(state == State.Connected);
Expand All @@ -2424,9 +2481,17 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
// we only accept super user becase this endpoint is reserved for tc to broker communication
verifyTxnOwnershipForTCToBrokerCommands()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
}
return optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark);
})
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
throwable = FutureUtil.unwrapCompletionException(throwable);
log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
Expand All @@ -2438,7 +2503,6 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});

} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
Expand Down Expand Up @@ -2508,23 +2572,28 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
return;
}

CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
completableFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed: " + e.getMessage()));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
// we only accept super user becase this endpoint is reserved for tc to broker communication
verifyTxnOwnershipForTCToBrokerCommands()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
}
return subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
}).whenComplete((ignored, e) -> {
if (e != null) {
e = FutureUtil.unwrapCompletionException(e);
log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed: " + e.getMessage()));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
Expand Down Expand Up @@ -2591,6 +2660,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
checkArgument(state == State.Connected);
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
final List<org.apache.pulsar.common.api.proto.Subscription> subscriptionsList = command.getSubscriptionsList();
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
requestId, remoteAddress, txnID);
Expand All @@ -2605,9 +2675,15 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
.whenComplete(((v, ex) -> {
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList));
})
.whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}",
Expand All @@ -2623,7 +2699,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
});
}

protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
Expand Down
Loading

0 comments on commit d7686f1

Please sign in to comment.