Skip to content

Commit

Permalink
HDDS-11498. Improve SCM deletion efficiency. (apache#7249)
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 17, 2024
1 parent 3fb2cf0 commit 4670a5e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,11 @@ private void processCmd(DeleteCmdInfo cmd) {
DeletedContainerBlocksSummary summary =
DeletedContainerBlocksSummary.getFrom(containerBlocks);
LOG.info("Summary of deleting container blocks, numOfTransactions={}, "
+ "numOfContainers={}, numOfBlocks={}",
+ "numOfContainers={}, numOfBlocks={}, commandId={}.",
summary.getNumOfTxs(),
summary.getNumOfContainers(),
summary.getNumOfBlocks());
summary.getNumOfBlocks(),
cmd.getCmd().getId());
if (LOG.isDebugEnabled()) {
LOG.debug("Start to delete container blocks, TXIDs={}",
summary.getTxIDSummary());
Expand All @@ -389,7 +390,8 @@ private void processCmd(DeleteCmdInfo cmd) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result : blockDeletionACK
.getResultsList()) {
LOG.debug("{} : {}", result.getTxID(), result.getSuccess());
LOG.debug("TxId = {} : ContainerId = {} : {}",
result.getTxID(), result.getContainerID(), result.getSuccess());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
blocksDeleted += tx.getLocalIDCount();
if (SCMBlockDeletingService.LOG.isDebugEnabled()) {
SCMBlockDeletingService.LOG
.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
.debug("Transaction added: {} <- TX({}), DN {} <- blocksDeleted Add {}.",
dnID, tx.getTxID(), dnID, tx.getLocalIDCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
Expand Down Expand Up @@ -200,20 +199,6 @@ private DeletedBlocksTransaction constructNewTransaction(
.build();
}

private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
result.getTxID(), result.getSuccess());
}
if (!result.getSuccess()) {
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ "TX in next interval", result.getTxID());
return true;
}
return false;
}

@Override
public int getNumOfValidTransactions() throws IOException {
lock.lock();
Expand Down Expand Up @@ -300,26 +285,46 @@ private void getTransaction(DeletedBlocksTransaction tx,
.setCount(transactionStatusManager.getOrDefaultRetryCount(
tx.getTxID(), 0))
.build();

for (ContainerReplica replica : replicas) {
DatanodeDetails details = replica.getDatanodeDetails();
if (!dnList.contains(details)) {
continue;
}
if (!transactionStatusManager.isDuplication(
details, updatedTxn.getTxID(), commandStatus)) {
transactions.addTransactionToDN(details.getUuid(), updatedTxn);
metrics.incrProcessedTransaction();
}
}
}

private Boolean checkInadequateReplica(Set<ContainerReplica> replicas,
DeletedBlocksTransaction txn) throws ContainerNotFoundException {
DeletedBlocksTransaction txn,
Set<DatanodeDetails> dnList) throws ContainerNotFoundException {
ContainerInfo containerInfo = containerManager
.getContainer(ContainerID.valueOf(txn.getContainerID()));
ReplicationManager replicationManager =
scmContext.getScm().getReplicationManager();
ContainerHealthResult result = replicationManager
.getContainerReplicationHealth(containerInfo, replicas);

// We have made an improvement here, and we expect that all replicas
// of the Container being sent will be included in the dnList.
// This change benefits ACK confirmation and improves deletion speed.
// The principle behind it is that
// DN can receive the command to delete a certain Container at the same time and provide
// feedback to SCM at roughly the same time.
// This avoids the issue of deletion blocking,
// where some replicas of a Container are deleted while others do not receive the delete command.
long containerId = txn.getContainerID();
for (ContainerReplica replica : replicas) {
DatanodeDetails datanodeDetails = replica.getDatanodeDetails();
if (!dnList.contains(datanodeDetails)) {
DatanodeDetails dnDetail = replica.getDatanodeDetails();
LOG.debug("Skip Container = {}, because DN = {} is not in dnList.",
containerId, dnDetail.getUuid());
return true;
}
}

return result.getHealthState() != ContainerHealthResult.HealthState.HEALTHY;
}

Expand All @@ -345,6 +350,7 @@ public DatanodeDeletedBlockTransactions getTransactions(
.getCommandStatusByTxId(dnList.stream().
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
Expand All @@ -358,23 +364,25 @@ public DatanodeDeletedBlockTransactions getTransactions(
// HDDS-7126. When container is under replicated, it is possible
// that container is deleted, but transactions are not deleted.
if (containerManager.getContainer(id).isDeleted()) {
LOG.warn("Container: " + id + " was deleted for the " +
"transaction: " + txn);
LOG.warn("Container: {} was deleted for the " +
"transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
} else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
&& !containerManager.getContainer(id).isOpen()) {
Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(
ContainerID.valueOf(txn.getContainerID()));
if (checkInadequateReplica(replicas, txn)) {
if (checkInadequateReplica(replicas, txn, dnList)) {
metrics.incrSkippedTransaction();
continue;
}
getTransaction(
txn, transactions, dnList, replicas, commandStatus);
} else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) {
metrics.incrSkippedTransaction();
}
} catch (ContainerNotFoundException ex) {
LOG.warn("Container: " + id + " was not found for the transaction: "
+ txn);
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ public EmptyTaskResult call() throws Exception {
}
}
LOG.info("Totally added {} blocks to be deleted for"
+ " {} datanodes, task elapsed time: {}ms",
+ " {} datanodes / {} totalnodes, task elapsed time: {}ms",
transactions.getBlocksDeleted(),
transactions.getDatanodeTransactionMap().size(),
included.size(),
Time.monotonicNow() - startTime);
deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs));
} catch (NotLeaderException nle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* Metrics related to Block Deleting Service running in SCM.
Expand Down Expand Up @@ -76,6 +77,15 @@ public final class ScmBlockDeletingServiceMetrics {
@Metric(about = "The number of created txs which are added into DB.")
private MutableCounterLong numBlockDeletionTransactionCreated;

@Metric(about = "The number of skipped transactions")
private MutableCounterLong numSkippedTransactions;

@Metric(about = "The number of processed transactions")
private MutableCounterLong numProcessedTransactions;

@Metric(about = "The number of dataNodes of delete transactions.")
private MutableGaugeLong numBlockDeletionTransactionDataNodes;

private ScmBlockDeletingServiceMetrics() {
}

Expand Down Expand Up @@ -130,6 +140,18 @@ public void incrBlockDeletionTransactionCreated(long count) {
this.numBlockDeletionTransactionCreated.incr(count);
}

public void incrSkippedTransaction() {
this.numSkippedTransactions.incr();
}

public void incrProcessedTransaction() {
this.numProcessedTransactions.incr();
}

public void setNumBlockDeletionTransactionDataNodes(long dataNodes) {
this.numBlockDeletionTransactionDataNodes.set(dataNodes);
}

public long getNumBlockDeletionCommandSent() {
return numBlockDeletionCommandSent.value();
}
Expand Down Expand Up @@ -162,6 +184,18 @@ public long getNumBlockDeletionTransactionCreated() {
return numBlockDeletionTransactionCreated.value();
}

public long getNumSkippedTransactions() {
return numSkippedTransactions.value();
}

public long getNumProcessedTransactions() {
return numProcessedTransactions.value();
}

public long getNumBlockDeletionTransactionDataNodes() {
return numBlockDeletionTransactionDataNodes.value();
}

@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
Expand Down

0 comments on commit 4670a5e

Please sign in to comment.