Skip to content

Commit

Permalink
HDDS-11712. Process other DeletedBlocksTransaction before retrying fa…
Browse files Browse the repository at this point in the history
…iled one.
  • Loading branch information
nandakumar131 committed Dec 13, 2024
1 parent bf6f323 commit 0debb78
Showing 1 changed file with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class DeletedBlockLogImpl
private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();

private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
private long lastProcessedTransactionId = -1;

public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
Expand Down Expand Up @@ -344,6 +345,35 @@ public DatanodeDeletedBlockTransactions getTransactions(
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {

if (lastProcessedTransactionId != -1) {
iter.seek(lastProcessedTransactionId);
/*
* We should start from (lastProcessedTransactionId + 1) transaction.
* Now the iterator (iter.next call) is pointing at
* lastProcessedTransactionId, read the current value to move
* the cursor.
*/
if (iter.hasNext()) {
/*
* There is a possibility that the lastProcessedTransactionId got
* deleted from the table, in that case we have to set
* lastProcessedTransactionId to next available transaction in the table.
*
* By doing this there is a chance that we will skip processing the new
* lastProcessedTransactionId, that should be ok. We can get to it in the
* next run.
*/
lastProcessedTransactionId = iter.next().getKey();
}

// If we have reached the end, go to beginning.
if (!iter.hasNext()) {
iter.seekToFirst();
lastProcessedTransactionId = -1;
}
}

// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
Expand All @@ -352,13 +382,14 @@ public DatanodeDeletedBlockTransactions getTransactions(
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
// in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
keyValue = iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
final ContainerID id = ContainerID.valueOf(txn.getContainerID());
try {
Expand Down Expand Up @@ -386,7 +417,24 @@ public DatanodeDeletedBlockTransactions getTransactions(
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}

if (lastProcessedTransactionId == keyValue.getKey()) {
// We have circled back to the last transaction.
break;
}

if (!iter.hasNext() && lastProcessedTransactionId != -1) {
/*
* We started from in-between and reached end of the table,
* now we should go to the start of the table and process
* the transactions.
*/
iter.seekToFirst();
}
}

lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;

if (!txIDs.isEmpty()) {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
Expand Down

0 comments on commit 0debb78

Please sign in to comment.