Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11712. Process other DeletedBlocksTransaction before retrying failed one #7532

Merged
merged 7 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class DeletedBlockLogImpl
private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();

private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
private long lastProcessedTransactionId = -1;

public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
Expand Down Expand Up @@ -344,6 +345,34 @@ public DatanodeDeletedBlockTransactions getTransactions(
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (lastProcessedTransactionId != -1) {
iter.seek(lastProcessedTransactionId);
/*
* We should start from (lastProcessedTransactionId + 1) transaction.
* Now the iterator (iter.next call) is pointing at
* lastProcessedTransactionId, read the current value to move
* the cursor.
*/
if (iter.hasNext()) {
/*
* There is a possibility that the lastProcessedTransactionId got
* deleted from the table, in that case we have to set
* lastProcessedTransactionId to next available transaction in the table.
*
* By doing this there is a chance that we will skip processing the new
* lastProcessedTransactionId, that should be ok. We can get to it in the
* next run.
*/
lastProcessedTransactionId = iter.next().getKey();
}

// If we have reached the end, go to beginning.
if (!iter.hasNext()) {
iter.seekToFirst();
lastProcessedTransactionId = -1;
}
}

// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
Expand All @@ -352,13 +381,14 @@ public DatanodeDeletedBlockTransactions getTransactions(
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
// in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
keyValue = iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
final ContainerID id = ContainerID.valueOf(txn.getContainerID());
try {
Expand Down Expand Up @@ -386,7 +416,24 @@ public DatanodeDeletedBlockTransactions getTransactions(
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}

if (lastProcessedTransactionId == keyValue.getKey()) {
// We have circled back to the last transaction.
break;
}

if (!iter.hasNext() && lastProcessedTransactionId != -1) {
/*
* We started from in-between and reached end of the table,
* now we should go to the start of the table and process
* the transactions.
*/
iter.seekToFirst();
}
}

lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;

if (!txIDs.isEmpty()) {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public void close() throws IOException {

@Override
public void seekToFirst() {
throw new UnsupportedOperationException("seekToFirst");
iter.seekToFirst();
findNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,75 @@ public void testResetCount() throws Exception {
assertEquals(30 * THREE, blocks.size());
}


@Test
public void testSCMDelIteratorProgress() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);

// CASE1: When all transactions are valid and available
// Create 8 TXs in the log.
int noOfTransactions = 8;
addTransactions(generateData(noOfTransactions), true);
mockContainerHealthResult(true);
List<DeletedBlocksTransaction> blocks;

List<Long> txIDs = new ArrayList<>();
int i = 1;
while (i < noOfTransactions) {
// In each iteration read two transaction, API returns all the transactions in order.
// 1st iteration: {1, 2}
// 2nd iteration: {3, 4}
// 3rd iteration: {5, 6}
// 4th iteration: {7, 8}
blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
assertEquals(blocks.get(0).getTxID(), i++);
assertEquals(blocks.get(1).getTxID(), i++);
}

// CASE2: When some transactions are not available for delete in the current iteration,
// either due to max retry reach or some other issue.
// New transactions Id is { 9, 10, 11, 12, 13, 14, 15, 16}
addTransactions(generateData(noOfTransactions), true);
mockContainerHealthResult(true);

// Mark transaction Id 11 as reached max retry count so that it will be ignored
// by scm deleting service while fetching transaction for delete
int ignoreTransactionId = 11;
txIDs.add((long) ignoreTransactionId);
for (i = 0; i < maxRetry; i++) {
incrementCount(txIDs);
}
incrementCount(txIDs);

i = 9;
while (true) {
// In each iteration read two transaction.
// If any transaction which is not available for delete in the current iteration,
// it will be ignored and will be re-checked again only after complete table is read.
// 1st iteration: {9, 10}
// 2nd iteration: {12, 13} Transaction 11 is ignored here
// 3rd iteration: {14, 15} Transaction 11 is available here,
// but it will be read only when all db records are read till the end.
// 4th iteration: {16, 11} Since iterator reached at the end of table after reading transaction 16,
// Iterator starts from beginning again, and it returns transaction 11 as well
blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
if (i == ignoreTransactionId) {
i++;
}
assertEquals(blocks.get(0).getTxID(), i++);
if (i == 17) {
assertEquals(blocks.get(1).getTxID(), ignoreTransactionId);
break;
}
assertEquals(blocks.get(1).getTxID(), i++);

if (i == 14) {
// Reset transaction 11 so that it will be available in scm key deleting service in the subsequent iterations.
resetCount(txIDs);
}
}
}

@Test
public void testCommitTransactions() throws Exception {
deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
Expand Down
Loading