Skip to content

Commit

Permalink
[cherry-pick][branch-2.10] Allow superusers to abort transactions (ap…
Browse files Browse the repository at this point in the history
…ache#19467) (apache#19473)

Co-authored-by: Nicolò Boschi <boschi1997@gmail.com>
  • Loading branch information
liangyepianzhou and nicoloboschi authored Feb 9, 2023
1 parent 5dd13ec commit cb91c4a
Show file tree
Hide file tree
Showing 17 changed files with 570 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,13 @@ public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordin
}
}

public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills,
String owner) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.newTransaction(timeoutInMills);
return store.newTransaction(timeoutInMills, owner);
}

public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
Expand Down Expand Up @@ -524,7 +525,22 @@ public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
return Collections.unmodifiableMap(stores);
}

public synchronized void close () {
public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOwner) {
return getTxnMeta(txnID)
.thenCompose(meta -> {
// owner was null in the old versions or no auth enabled
if (meta.getOwner() == null) {
return CompletableFuture.completedFuture(true);
}
if (meta.getOwner().equals(checkOwner)) {
return CompletableFuture.completedFuture(true);
}
return CompletableFuture.completedFuture(false);
});
}


public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
metadataStore.closeAsync().whenComplete((v, ex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private void getTransactionMetadata(TxnMeta txnMeta,
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
transactionMetadata.owner = txnMeta.getOwner();

List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>();
Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -2226,7 +2227,8 @@ protected void handleNewTxn(CommandNewTxn command) {

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds())
final String owner = getPrincipal();
transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner)
.whenComplete(((txnID, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -2261,9 +2263,15 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {

TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
.whenComplete(((v, ex) -> {
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService
.addProducedPartitionToTxn(txnID, command.getPartitionsList());
})
.whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}", requestId);
Expand All @@ -2278,7 +2286,25 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
});
}

private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
String msg = String.format(
"Client (%s) is neither the owner of the transaction %s nor a super user",
getPrincipal(), txnID
);
log.warn("[{}] {}", remoteAddress, msg);
return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
}

private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
String msg = String.format(
"TC client (%s) is not a super user, and is not allowed to operate on transaction %s",
getPrincipal(), txnID
);
log.warn("[{}] {}", remoteAddress, msg);
return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
}

@Override
Expand All @@ -2295,8 +2321,13 @@ protected void handleEndTxn(CommandEndTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService
.endTransaction(txnID, txnAction, false)
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
})
.whenComplete((v, ex) -> {
if (ex == null) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
Expand All @@ -2311,6 +2342,34 @@ protected void handleEndTxn(CommandEndTxn command) {
});
}

private CompletableFuture<Boolean> verifyTxnOwnershipForTCToBrokerCommands() {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
return getBrokerService()
.getAuthorizationService()
.isSuperUser(getPrincipal(), getAuthenticationData());
} else {
return CompletableFuture.completedFuture(true);
}
}

private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
final String checkOwner = getPrincipal();
return service.pulsar().getTransactionMetadataStoreService()
.verifyTxnOwnership(txnID, checkOwner)
.thenCompose(isOwner -> {
if (isOwner) {
return CompletableFuture.completedFuture(true);
}
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
return getBrokerService()
.getAuthorizationService()
.isSuperUser(checkOwner, getAuthenticationData());
} else {
return CompletableFuture.completedFuture(false);
}
});
}

@Override
protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final long requestId = command.getRequestId();
Expand All @@ -2326,9 +2385,17 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
// we only accept super user becase this endpoint is reserved for tc to broker communication
verifyTxnOwnershipForTCToBrokerCommands()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
}
return optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark);
})
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
throwable = FutureUtil.unwrapCompletionException(throwable);
log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
Expand All @@ -2340,7 +2407,6 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});

} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
Expand Down Expand Up @@ -2409,23 +2475,28 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
return;
}

CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
completableFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed."));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
// we only accept super user becase this endpoint is reserved for tc to broker communication
verifyTxnOwnershipForTCToBrokerCommands()
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnTcNotAllowed(txnID);
}
return subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
}).whenComplete((ignored, e) -> {
if (e != null) {
e = FutureUtil.unwrapCompletionException(e);
log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed: " + e.getMessage()));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
Expand Down Expand Up @@ -2490,6 +2561,7 @@ private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData sc
protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
final List<org.apache.pulsar.common.api.proto.Subscription> subscriptionsList = command.getSubscriptionsList();
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
requestId, remoteAddress, txnID);
Expand All @@ -2504,9 +2576,15 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();

transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
.whenComplete(((v, ex) -> {
verifyTxnOwnership(txnID)
.thenCompose(isOwner -> {
if (!isOwner) {
return failedFutureTxnNotOwned(txnID);
}
return transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList));
})
.whenComplete((v, ex) -> {
if (ex == null) {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published partition to txn request {}",
Expand All @@ -2522,7 +2600,7 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
});
}

@Override
Expand Down
Loading

0 comments on commit cb91c4a

Please sign in to comment.