diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java index 19fde714686..ee08c9f79db 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java @@ -78,9 +78,9 @@ protected CommandStatusReportsProto getReport() { // If status is still pending then don't remove it from map as // CommandHandler will change its status when it works on this command. if (!cmdStatus.getStatus().equals(Status.PENDING)) { - builder.addCmdStatus(cmdStatus.getProtoBufMessage()); map.remove(key); } + builder.addCmdStatus(cmdStatus.getProtoBufMessage()); }); return builder.getCmdStatusCount() > 0 ? builder.build() : null; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 9fb9c7251cc..42529cabc72 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -173,8 +173,8 @@ public void testCommandStatusPublisher() throws InterruptedException { .build(); cmdStatusMap.put(obj1.getCmdId(), obj1); cmdStatusMap.put(obj2.getCmdId(), obj2); - // We are not sending the commands whose status is PENDING. - Assertions.assertEquals(1, + // We will sending the commands whose status is PENDING and EXECUTED + Assertions.assertEquals(2, ((CommandStatusReportPublisher) publisher).getReport() .getCmdStatusCount(), "Should publish report with 2 status objects"); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index cb9cf603b15..45d53c0ef2c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -19,12 +19,10 @@ import java.util.Set; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto - .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.io.Closeable; import java.io.IOException; @@ -88,14 +86,33 @@ void incrementCount(List txIDs) int resetCount(List txIDs) throws IOException; /** - * Commits a transaction means to delete all footprints of a transaction - * from the log. This method doesn't guarantee all transactions can be - * successfully deleted, it tolerate failures and tries best efforts to. - * @param transactionResults - delete block transaction results. - * @param dnID - ID of datanode which acknowledges the delete block command. + * Records the creation of a transaction for a DataNode. + * + * @param dnId The identifier of the DataNode. + * @param scmCmdId The ID of the SCM command. + * @param dnTxSet Set of transaction IDs for the DataNode. + */ + void recordTransactionCreated( + UUID dnId, long scmCmdId, Set dnTxSet); + + /** + * Handles the cleanup process when a DataNode is reported dead. This method + * is responsible for updating or cleaning up the transaction records + * associated with the dead DataNode. + * + * @param dnId The identifier of the dead DataNode. + */ + void onDatanodeDead(UUID dnId); + + /** + * Records the event of sending a block deletion command to a DataNode. This + * method is called when a command is successfully dispatched to a DataNode, + * and it helps in tracking the status of the command. + * + * @param dnId Details of the DataNode. + * @param scmCommand The block deletion command sent. */ - void commitTransactions(List transactionResults, - UUID dnID); + void onSent(DatanodeDetails dnId, SCMCommand scmCommand); /** * Creates block deletion transactions for a set of containers, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 8e2d014916f..ac64f6e973e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -18,25 +18,25 @@ package org.apache.hadoop.hdds.scm.block; import java.io.IOException; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.UUID; import java.util.Set; import java.util.Map; -import java.util.LinkedHashSet; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -55,11 +55,12 @@ import org.apache.hadoop.hdds.utils.db.TableIterator; import com.google.common.collect.Lists; -import static java.lang.Math.min; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus; import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.DEL_TXN_ID; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,16 +83,15 @@ public class DeletedBlockLogImpl private final int maxRetry; private final ContainerManager containerManager; private final Lock lock; - // Maps txId to set of DNs which are successful in committing the transaction - private final Map> transactionToDNsCommitMap; - // Maps txId to its retry counts; - private final Map transactionToRetryCountMap; // The access to DeletedBlocksTXTable is protected by // DeletedBlockLogStateManager. private final DeletedBlockLogStateManager deletedBlockLogStateManager; private final SCMContext scmContext; private final SequenceIdGenerator sequenceIdGen; private final ScmBlockDeletingServiceMetrics metrics; + private final SCMDeletedBlockTransactionStatusManager + transactionStatusManager; + private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis(); private static final int LIST_ALL_FAILED_TRANSACTIONS = -1; @@ -109,12 +109,6 @@ public DeletedBlockLogImpl(ConfigurationSource conf, this.containerManager = containerManager; this.lock = new ReentrantLock(); - // transactionToDNsCommitMap is updated only when - // transaction is added to the log and when it is removed. - - // maps transaction to dns which have committed it. - transactionToDNsCommitMap = new ConcurrentHashMap<>(); - transactionToRetryCountMap = new ConcurrentHashMap<>(); this.deletedBlockLogStateManager = DeletedBlockLogStateManagerImpl .newBuilder() .setConfiguration(conf) @@ -126,6 +120,9 @@ public DeletedBlockLogImpl(ConfigurationSource conf, this.scmContext = scmContext; this.sequenceIdGen = sequenceIdGen; this.metrics = metrics; + this.transactionStatusManager = + new SCMDeletedBlockTransactionStatusManager(deletedBlockLogStateManager, + containerManager, scmContext, metrics, scmCommandTimeoutMs); } @Override @@ -170,25 +167,7 @@ public void incrementCount(List txIDs) throws IOException { lock.lock(); try { - ArrayList txIDsToUpdate = new ArrayList<>(); - for (Long txID : txIDs) { - int currentCount = - transactionToRetryCountMap.getOrDefault(txID, 0); - if (currentCount > maxRetry) { - continue; - } else { - currentCount += 1; - if (currentCount > maxRetry) { - txIDsToUpdate.add(txID); - } - transactionToRetryCountMap.put(txID, currentCount); - } - } - - if (!txIDsToUpdate.isEmpty()) { - deletedBlockLogStateManager - .increaseRetryCountOfTransactionInDB(txIDsToUpdate); - } + transactionStatusManager.incrementRetryCount(txIDs, maxRetry); } finally { lock.unlock(); } @@ -207,9 +186,7 @@ public int resetCount(List txIDs) throws IOException { .map(DeletedBlocksTransaction::getTxID) .collect(Collectors.toList()); } - for (Long txID: txIDs) { - transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0); - } + transactionStatusManager.resetRetryCount(txIDs); return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB( new ArrayList<>(new HashSet<>(txIDs))); } finally { @@ -227,89 +204,6 @@ private DeletedBlocksTransaction constructNewTransaction( .build(); } - /** - * {@inheritDoc} - * - * @param transactionResults - transaction IDs. - * @param dnID - Id of Datanode which has acknowledged - * a delete block command. - * @throws IOException - */ - @Override - public void commitTransactions( - List transactionResults, UUID dnID) { - lock.lock(); - try { - ArrayList txIDsToBeDeleted = new ArrayList<>(); - Set dnsWithCommittedTxn; - for (DeleteBlockTransactionResult transactionResult : - transactionResults) { - if (isTransactionFailed(transactionResult)) { - metrics.incrBlockDeletionTransactionFailure(); - continue; - } - try { - metrics.incrBlockDeletionTransactionSuccess(); - long txID = transactionResult.getTxID(); - // set of dns which have successfully committed transaction txId. - dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); - final ContainerID containerId = ContainerID.valueOf( - transactionResult.getContainerID()); - if (dnsWithCommittedTxn == null) { - // Mostly likely it's a retried delete command response. - if (LOG.isDebugEnabled()) { - LOG.debug( - "Transaction txId={} commit by dnId={} for containerID={}" - + " failed. Corresponding entry not found.", txID, dnID, - containerId); - } - continue; - } - - dnsWithCommittedTxn.add(dnID); - final ContainerInfo container = - containerManager.getContainer(containerId); - final Set replicas = - containerManager.getContainerReplicas(containerId); - // The delete entry can be safely removed from the log if all the - // corresponding nodes commit the txn. It is required to check that - // the nodes returned in the pipeline match the replication factor. - if (min(replicas.size(), dnsWithCommittedTxn.size()) - >= container.getReplicationConfig().getRequiredNodes()) { - List containerDns = replicas.stream() - .map(ContainerReplica::getDatanodeDetails) - .map(DatanodeDetails::getUuid) - .collect(Collectors.toList()); - if (dnsWithCommittedTxn.containsAll(containerDns)) { - transactionToDNsCommitMap.remove(txID); - transactionToRetryCountMap.remove(txID); - if (LOG.isDebugEnabled()) { - LOG.debug("Purging txId={} from block deletion log", txID); - } - txIDsToBeDeleted.add(txID); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Datanode txId={} containerId={} committed by dnId={}", - txID, containerId, dnID); - } - } catch (IOException e) { - LOG.warn("Could not commit delete block transaction: " + - transactionResult.getTxID(), e); - } - } - try { - deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted); - metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size()); - } catch (IOException e) { - LOG.warn("Could not commit delete block transactions: " - + txIDsToBeDeleted, e); - } - } finally { - lock.unlock(); - } - } - private boolean isTransactionFailed(DeleteBlockTransactionResult result) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -348,7 +242,7 @@ public int getNumOfValidTransactions() throws IOException { @Override public void reinitialize( Table deletedTable) { - // we don't need handle transactionToDNsCommitMap and + // we don't need to handle SCMDeletedBlockTransactionStatusManager and // deletedBlockLogStateManager, since they will be cleared // when becoming leader. deletedBlockLogStateManager.reinitialize(deletedTable); @@ -359,8 +253,7 @@ public void reinitialize( * leader. */ public void onBecomeLeader() { - transactionToDNsCommitMap.clear(); - transactionToRetryCountMap.clear(); + transactionStatusManager.clear(); } /** @@ -404,23 +297,21 @@ public void close() throws IOException { private void getTransaction(DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions, - Set dnList, Set replicas) { + Set dnList, Set replicas, + Map> commandStatus) { DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction.newBuilder(tx) - .setCount(transactionToRetryCountMap.getOrDefault(tx.getTxID(), 0)) + .setCount(transactionStatusManager.getOrDefaultRetryCount( + tx.getTxID(), 0)) .build(); for (ContainerReplica replica : replicas) { - UUID dnID = replica.getDatanodeDetails().getUuid(); - if (!dnList.contains(replica.getDatanodeDetails())) { + DatanodeDetails details = replica.getDatanodeDetails(); + if (!dnList.contains(details)) { continue; } - Set dnsWithTransactionCommitted = - transactionToDNsCommitMap.get(updatedTxn.getTxID()); - if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted - .contains(dnID)) { - // Transaction need not be sent to dns which have - // already committed it - transactions.addTransactionToDN(dnID, updatedTxn); + if (!transactionStatusManager.isDuplication( + details, updatedTxn.getTxID(), commandStatus)) { + transactions.addTransactionToDN(details.getUuid(), updatedTxn); } } } @@ -442,15 +333,26 @@ public DatanodeDeletedBlockTransactions getTransactions( throws IOException { lock.lock(); try { + // Here we can clean up the Datanode timeout command that no longer + // reports heartbeats + getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand( + scmCommandTimeoutMs); DatanodeDeletedBlockTransactions transactions = new DatanodeDeletedBlockTransactions(); try (TableIterator> iter = deletedBlockLogStateManager.getReadOnlyIterator()) { + // Get the CmdStatus status of the aggregation, so that the current + // status of the specified transaction can be found faster + Map> commandStatus = + getSCMDeletedBlockTransactionStatusManager() + .getCommandStatusByTxId(dnList.stream(). + map(DatanodeDetails::getUuid).collect(Collectors.toSet())); ArrayList txIDs = new ArrayList<>(); // Here takes block replica count as the threshold to avoid the case // that part of replicas committed the TXN and recorded in the - // transactionToDNsCommitMap, while they are counted in the threshold. + // SCMDeletedBlockTransactionStatusManager, while they are counted + // in the threshold. while (iter.hasNext() && transactions.getBlocksDeleted() < blockDeletionLimit) { Table.KeyValue keyValue = iter.next(); @@ -471,9 +373,8 @@ public DatanodeDeletedBlockTransactions getTransactions( if (checkInadequateReplica(replicas, txn)) { continue; } - getTransaction(txn, transactions, dnList, replicas); - transactionToDNsCommitMap - .putIfAbsent(txn.getTxID(), new LinkedHashSet<>()); + getTransaction( + txn, transactions, dnList, replicas, commandStatus); } } catch (ContainerNotFoundException ex) { LOG.warn("Container: " + id + " was not found for the transaction: " @@ -492,6 +393,33 @@ public DatanodeDeletedBlockTransactions getTransactions( } } + public void setScmCommandTimeoutMs(long scmCommandTimeoutMs) { + this.scmCommandTimeoutMs = scmCommandTimeoutMs; + } + + @VisibleForTesting + public SCMDeletedBlockTransactionStatusManager + getSCMDeletedBlockTransactionStatusManager() { + return transactionStatusManager; + } + + @Override + public void recordTransactionCreated(UUID dnId, long scmCmdId, + Set dnTxSet) { + getSCMDeletedBlockTransactionStatusManager() + .recordTransactionCreated(dnId, scmCmdId, dnTxSet); + } + + @Override + public void onDatanodeDead(UUID dnId) { + getSCMDeletedBlockTransactionStatusManager().onDatanodeDead(dnId); + } + + @Override + public void onSent(DatanodeDetails dnId, SCMCommand scmCommand) { + getSCMDeletedBlockTransactionStatusManager().onSent(dnId, scmCommand); + } + @Override public void onMessage( DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) { @@ -500,18 +428,30 @@ public void onMessage( return; } - CommandStatus.Status status = deleteBlockStatus.getCmdStatus().getStatus(); - if (status == CommandStatus.Status.EXECUTED) { - ContainerBlocksDeletionACKProto ackProto = - deleteBlockStatus.getCmdStatus().getBlockDeletionAck(); - commitTransactions(ackProto.getResultsList(), - UUID.fromString(ackProto.getDnId())); - metrics.incrBlockDeletionCommandSuccess(); - } else if (status == CommandStatus.Status.FAILED) { - metrics.incrBlockDeletionCommandFailure(); - } else { - LOG.error("Delete Block Command is not executed yet."); - return; + DatanodeDetails details = deleteBlockStatus.getDatanodeDetails(); + UUID dnId = details.getUuid(); + for (CommandStatus commandStatus : deleteBlockStatus.getCmdStatus()) { + CommandStatus.Status status = commandStatus.getStatus(); + lock.lock(); + try { + if (status == CommandStatus.Status.EXECUTED) { + ContainerBlocksDeletionACKProto ackProto = + commandStatus.getBlockDeletionAck(); + getSCMDeletedBlockTransactionStatusManager() + .commitTransactions(ackProto.getResultsList(), dnId); + metrics.incrBlockDeletionCommandSuccess(); + } else if (status == CommandStatus.Status.FAILED) { + metrics.incrBlockDeletionCommandFailure(); + } else { + LOG.debug("Delete Block Command {} is not executed on the Datanode" + + " {}.", commandStatus.getCmdId(), dnId); + } + + getSCMDeletedBlockTransactionStatusManager() + .commitSCMCommandStatus(deleteBlockStatus.getCmdStatus(), dnId); + } finally { + lock.unlock(); + } } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index be20dbd61bd..8677baf33b8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; @@ -176,11 +175,14 @@ public EmptyTaskResult call() throws Exception { UUID dnId = entry.getKey(); List dnTXs = entry.getValue(); if (!dnTXs.isEmpty()) { - processedTxIDs.addAll(dnTXs.stream() + Set dnTxSet = dnTXs.stream() .map(DeletedBlocksTransaction::getTxID) - .collect(Collectors.toSet())); - SCMCommand command = new DeleteBlocksCommand(dnTXs); + .collect(Collectors.toSet()); + processedTxIDs.addAll(dnTxSet); + DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs); command.setTerm(scmContext.getTermOfLeader()); + deletedBlockLog.recordTransactionCreated(dnId, command.getId(), + dnTxSet); eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, new CommandForDatanode<>(dnId, command)); metrics.incrBlockDeletionCommandSent(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java new file mode 100644 index 00000000000..b43e91e0592 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeletedBlockTransactionStatusManager.java @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static java.lang.Math.min; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT; + +/** + * This is a class to manage the status of DeletedBlockTransaction, + * the purpose of this class is to reduce the number of duplicate + * DeletedBlockTransaction sent to the DN. + */ +public class SCMDeletedBlockTransactionStatusManager { + public static final Logger LOG = + LoggerFactory.getLogger(SCMDeletedBlockTransactionStatusManager.class); + // Maps txId to set of DNs which are successful in committing the transaction + private final Map> transactionToDNsCommitMap; + // Maps txId to its retry counts; + private final Map transactionToRetryCountMap; + // The access to DeletedBlocksTXTable is protected by + // DeletedBlockLogStateManager. + private final DeletedBlockLogStateManager deletedBlockLogStateManager; + private final ContainerManager containerManager; + private final ScmBlockDeletingServiceMetrics metrics; + private final SCMContext scmContext; + private final long scmCommandTimeoutMs; + + /** + * Before the DeletedBlockTransaction is executed on DN and reported to + * SCM, it is managed by this {@link SCMDeleteBlocksCommandStatusManager}. + * After the DeletedBlocksTransaction in the DeleteBlocksCommand is + * committed on the SCM, it is managed by + * {@link SCMDeletedBlockTransactionStatusManager#transactionToDNsCommitMap} + */ + private final SCMDeleteBlocksCommandStatusManager + scmDeleteBlocksCommandStatusManager; + + public SCMDeletedBlockTransactionStatusManager( + DeletedBlockLogStateManager deletedBlockLogStateManager, + ContainerManager containerManager, SCMContext scmContext, + ScmBlockDeletingServiceMetrics metrics, long scmCommandTimeoutMs) { + // maps transaction to dns which have committed it. + this.deletedBlockLogStateManager = deletedBlockLogStateManager; + this.metrics = metrics; + this.containerManager = containerManager; + this.scmContext = scmContext; + this.scmCommandTimeoutMs = scmCommandTimeoutMs; + this.transactionToDNsCommitMap = new ConcurrentHashMap<>(); + this.transactionToRetryCountMap = new ConcurrentHashMap<>(); + this.scmDeleteBlocksCommandStatusManager = + new SCMDeleteBlocksCommandStatusManager(); + } + + /** + * A class that manages the status of a DeletedBlockTransaction based + * on DeleteBlocksCommand. + */ + protected static class SCMDeleteBlocksCommandStatusManager { + public static final Logger LOG = + LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class); + private final Map> scmCmdStatusRecord; + + private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT; + private static final Set STATUSES_REQUIRING_TIMEOUT = + new HashSet<>(Arrays.asList(SENT)); + + public SCMDeleteBlocksCommandStatusManager() { + this.scmCmdStatusRecord = new ConcurrentHashMap<>(); + } + + /** + * Status of SCMDeleteBlocksCommand. + */ + public enum CmdStatus { + // The DeleteBlocksCommand has not yet been sent. + // This is the initial status of the command after it's created. + TO_BE_SENT, + // If the DeleteBlocksCommand has been sent but has not been executed + // completely by DN, the DeleteBlocksCommand's state will be SENT. + // Note that the state of SENT includes the following possibilities. + // - The command was sent but not received + // - The command was sent and received by the DN, + // and is waiting to be executed. + // - The Command sent and being executed by DN + SENT + } + + protected static final class CmdStatusData { + private final UUID dnId; + private final long scmCmdId; + private final Set deletedBlocksTxIds; + private Instant updateTime; + private CmdStatus status; + + private CmdStatusData( + UUID dnId, long scmTxID, Set deletedBlocksTxIds) { + this.dnId = dnId; + this.scmCmdId = scmTxID; + this.deletedBlocksTxIds = deletedBlocksTxIds; + setStatus(DEFAULT_STATUS); + } + + public Set getDeletedBlocksTxIds() { + return Collections.unmodifiableSet(deletedBlocksTxIds); + } + + public UUID getDnId() { + return dnId; + } + + public long getScmCmdId() { + return scmCmdId; + } + + public CmdStatus getStatus() { + return status; + } + + public void setStatus(CmdStatus status) { + this.updateTime = Instant.now(); + this.status = status; + } + + public Instant getUpdateTime() { + return updateTime; + } + + @Override + public String toString() { + return "ScmTxStateMachine" + + "{dnId=" + dnId + + ", scmTxID=" + scmCmdId + + ", deletedBlocksTxIds=" + deletedBlocksTxIds + + ", updateTime=" + updateTime + + ", status=" + status + + '}'; + } + } + + protected static CmdStatusData createScmCmdStatusData( + UUID dnId, long scmCmdId, Set deletedBlocksTxIds) { + return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds); + } + + protected void recordScmCommand(CmdStatusData statusData) { + LOG.debug("Record ScmCommand: {}", statusData); + scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k -> + new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData); + } + + protected void onSent(UUID dnId, long scmCmdId) { + updateStatus(dnId, scmCmdId, CommandStatus.Status.PENDING); + } + + protected void onDatanodeDead(UUID dnId) { + LOG.info("Clean SCMCommand record for DN: {}", dnId); + scmCmdStatusRecord.remove(dnId); + } + + protected void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId, + CommandStatus.Status newState) { + updateStatus(dnId, scmCmdId, newState); + } + + protected void cleanAllTimeoutSCMCommand(long timeoutMs) { + for (UUID dnId : scmCmdStatusRecord.keySet()) { + for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) { + removeTimeoutScmCommand( + dnId, getScmCommandIds(dnId, status), timeoutMs); + } + } + } + + public void cleanTimeoutSCMCommand(UUID dnId, long timeoutMs) { + for (CmdStatus status : STATUSES_REQUIRING_TIMEOUT) { + removeTimeoutScmCommand( + dnId, getScmCommandIds(dnId, status), timeoutMs); + } + } + + private Set getScmCommandIds(UUID dnId, CmdStatus status) { + Set scmCmdIds = new HashSet<>(); + Map record = scmCmdStatusRecord.get(dnId); + if (record == null) { + return scmCmdIds; + } + for (CmdStatusData statusData : record.values()) { + if (statusData.getStatus().equals(status)) { + scmCmdIds.add(statusData.getScmCmdId()); + } + } + return scmCmdIds; + } + + private Instant getUpdateTime(UUID dnId, long scmCmdId) { + Map record = scmCmdStatusRecord.get(dnId); + if (record == null || record.get(scmCmdId) == null) { + return null; + } + return record.get(scmCmdId).getUpdateTime(); + } + + private void updateStatus(UUID dnId, long scmCmdId, + CommandStatus.Status newStatus) { + Map recordForDn = scmCmdStatusRecord.get(dnId); + if (recordForDn == null) { + LOG.warn("Unknown Datanode: {} scmCmdId {} newStatus {}", + dnId, scmCmdId, newStatus); + return; + } + if (recordForDn.get(scmCmdId) == null) { + LOG.warn("Unknown SCM Command: {} Datanode {} newStatus {}", + scmCmdId, dnId, newStatus); + return; + } + + boolean changed = false; + CmdStatusData statusData = recordForDn.get(scmCmdId); + CmdStatus oldStatus = statusData.getStatus(); + switch (newStatus) { + case PENDING: + if (oldStatus == TO_BE_SENT || oldStatus == SENT) { + // TO_BE_SENT -> SENT: The DeleteBlocksCommand is sent by SCM, + // The follow-up status has not been updated by Datanode. + + // SENT -> SENT: The DeleteBlocksCommand continues to wait to be + // executed by Datanode. + statusData.setStatus(SENT); + changed = true; + } + break; + case EXECUTED: + case FAILED: + if (oldStatus == SENT) { + // Once the DN executes DeleteBlocksCommands, regardless of whether + // DeleteBlocksCommands is executed successfully or not, + // it will be deleted from record. + // Successful DeleteBlocksCommands are recorded in + // `transactionToDNsCommitMap`. + removeScmCommand(dnId, scmCmdId); + changed = true; + } + if (oldStatus == TO_BE_SENT) { + // SCM receives a reply to an unsent transaction, + // which should not normally occur. + LOG.error("Received {} status for a command marked TO_BE_SENT. " + + "This indicates a potential issue in command handling. " + + "SCM Command ID: {}, Datanode ID: {}, Current Status: {}", + newStatus, scmCmdId, dnId, oldStatus); + removeScmCommand(dnId, scmCmdId); + changed = true; + } + break; + default: + LOG.error("Can not update to Unknown new Status: {}", newStatus); + break; + } + if (!changed) { + LOG.warn("Cannot update illegal status for DN: {} ScmCommandId {} " + + "Status From {} to {}", dnId, scmCmdId, oldStatus, newStatus); + } else { + LOG.debug("Successful update DN: {} ScmCommandId {} Status From {} to" + + " {}", dnId, scmCmdId, oldStatus, newStatus); + } + } + + private void removeTimeoutScmCommand(UUID dnId, + Set scmCmdIds, long timeoutMs) { + Instant now = Instant.now(); + for (Long scmCmdId : scmCmdIds) { + Instant updateTime = getUpdateTime(dnId, scmCmdId); + if (updateTime != null && + Duration.between(updateTime, now).toMillis() > timeoutMs) { + CmdStatusData state = removeScmCommand(dnId, scmCmdId); + LOG.warn("Remove Timeout SCM BlockDeletionCommand {} for DN {} " + + "after without update {}ms}", state, dnId, timeoutMs); + } else { + LOG.warn("Timeout SCM scmCmdIds {} for DN {} " + + "after without update {}ms}", scmCmdIds, dnId, timeoutMs); + } + } + } + + private CmdStatusData removeScmCommand(UUID dnId, long scmCmdId) { + Map record = scmCmdStatusRecord.get(dnId); + if (record == null || record.get(scmCmdId) == null) { + return null; + } + CmdStatusData statusData = record.remove(scmCmdId); + LOG.debug("Remove ScmCommand {} for DN: {} ", statusData, dnId); + return statusData; + } + + public Map> getCommandStatusByTxId( + Set dnIds) { + Map> result = + new HashMap<>(scmCmdStatusRecord.size()); + + for (UUID dnId : dnIds) { + Map record = scmCmdStatusRecord.get(dnId); + if (record == null) { + continue; + } + Map dnStatusMap = new HashMap<>(); + for (CmdStatusData statusData : record.values()) { + CmdStatus status = statusData.getStatus(); + for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) { + dnStatusMap.put(deletedBlocksTxId, status); + } + } + result.put(dnId, dnStatusMap); + } + + return result; + } + + private void clear() { + scmCmdStatusRecord.clear(); + } + + @VisibleForTesting + Map> getScmCmdStatusRecord() { + return scmCmdStatusRecord; + } + } + + public void incrementRetryCount(List txIDs, long maxRetry) + throws IOException { + ArrayList txIDsToUpdate = new ArrayList<>(); + for (Long txID : txIDs) { + int currentCount = + transactionToRetryCountMap.getOrDefault(txID, 0); + if (currentCount > maxRetry) { + continue; + } else { + currentCount += 1; + if (currentCount > maxRetry) { + txIDsToUpdate.add(txID); + } + transactionToRetryCountMap.put(txID, currentCount); + } + } + + if (!txIDsToUpdate.isEmpty()) { + deletedBlockLogStateManager + .increaseRetryCountOfTransactionInDB(txIDsToUpdate); + } + } + + public void resetRetryCount(List txIDs) throws IOException { + for (Long txID: txIDs) { + transactionToRetryCountMap.computeIfPresent(txID, (key, value) -> 0); + } + } + + public int getOrDefaultRetryCount(long txID, int defaultValue) { + return transactionToRetryCountMap.getOrDefault(txID, defaultValue); + } + + public void onSent(DatanodeDetails dnId, SCMCommand scmCommand) { + scmDeleteBlocksCommandStatusManager.onSent( + dnId.getUuid(), scmCommand.getId()); + } + + public Map> getCommandStatusByTxId( + Set dnIds) { + return scmDeleteBlocksCommandStatusManager.getCommandStatusByTxId(dnIds); + } + + public void recordTransactionCreated( + UUID dnId, long scmCmdId, Set dnTxSet) { + scmDeleteBlocksCommandStatusManager.recordScmCommand( + SCMDeleteBlocksCommandStatusManager + .createScmCmdStatusData(dnId, scmCmdId, dnTxSet)); + dnTxSet.forEach(txId -> transactionToDNsCommitMap + .putIfAbsent(txId, new LinkedHashSet<>())); + } + + public void clear() { + transactionToRetryCountMap.clear(); + scmDeleteBlocksCommandStatusManager.clear(); + transactionToDNsCommitMap.clear(); + } + + public void cleanAllTimeoutSCMCommand(long timeoutMs) { + scmDeleteBlocksCommandStatusManager.cleanAllTimeoutSCMCommand(timeoutMs); + } + + public void onDatanodeDead(UUID dnId) { + scmDeleteBlocksCommandStatusManager.onDatanodeDead(dnId); + } + + public boolean isDuplication(DatanodeDetails dnDetail, long tx, + Map> commandStatus) { + if (alreadyExecuted(dnDetail.getUuid(), tx)) { + return true; + } + return inProcessing(dnDetail.getUuid(), tx, commandStatus); + } + + public boolean alreadyExecuted(UUID dnId, long txId) { + Set dnsWithTransactionCommitted = + transactionToDNsCommitMap.get(txId); + return dnsWithTransactionCommitted != null && dnsWithTransactionCommitted + .contains(dnId); + } + + /** + * Commits a transaction means to delete all footprints of a transaction + * from the log. This method doesn't guarantee all transactions can be + * successfully deleted, it tolerate failures and tries best efforts to. + * @param transactionResults - delete block transaction results. + * @param dnId - ID of datanode which acknowledges the delete block command. + */ + @VisibleForTesting + public void commitTransactions( + List transactionResults, UUID dnId) { + + ArrayList txIDsToBeDeleted = new ArrayList<>(); + Set dnsWithCommittedTxn; + for (DeleteBlockTransactionResult transactionResult : + transactionResults) { + if (isTransactionFailed(transactionResult)) { + metrics.incrBlockDeletionTransactionFailure(); + continue; + } + try { + metrics.incrBlockDeletionTransactionSuccess(); + long txID = transactionResult.getTxID(); + // set of dns which have successfully committed transaction txId. + dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); + final ContainerID containerId = ContainerID.valueOf( + transactionResult.getContainerID()); + if (dnsWithCommittedTxn == null) { + // Mostly likely it's a retried delete command response. + if (LOG.isDebugEnabled()) { + LOG.debug( + "Transaction txId={} commit by dnId={} for containerID={}" + + " failed. Corresponding entry not found.", txID, dnId, + containerId); + } + continue; + } + + dnsWithCommittedTxn.add(dnId); + final ContainerInfo container = + containerManager.getContainer(containerId); + final Set replicas = + containerManager.getContainerReplicas(containerId); + // The delete entry can be safely removed from the log if all the + // corresponding nodes commit the txn. It is required to check that + // the nodes returned in the pipeline match the replication factor. + if (min(replicas.size(), dnsWithCommittedTxn.size()) + >= container.getReplicationConfig().getRequiredNodes()) { + List containerDns = replicas.stream() + .map(ContainerReplica::getDatanodeDetails) + .map(DatanodeDetails::getUuid) + .collect(Collectors.toList()); + if (dnsWithCommittedTxn.containsAll(containerDns)) { + transactionToDNsCommitMap.remove(txID); + transactionToRetryCountMap.remove(txID); + if (LOG.isDebugEnabled()) { + LOG.debug("Purging txId={} from block deletion log", txID); + } + txIDsToBeDeleted.add(txID); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Datanode txId={} containerId={} committed by dnId={}", + txID, containerId, dnId); + } + } catch (IOException e) { + LOG.warn("Could not commit delete block transaction: " + + transactionResult.getTxID(), e); + } + } + try { + deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted); + metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size()); + } catch (IOException e) { + LOG.warn("Could not commit delete block transactions: " + + txIDsToBeDeleted, e); + } + } + + @VisibleForTesting + public void commitSCMCommandStatus(List deleteBlockStatus, + UUID dnId) { + processSCMCommandStatus(deleteBlockStatus, dnId); + scmDeleteBlocksCommandStatusManager. + cleanTimeoutSCMCommand(dnId, scmCommandTimeoutMs); + } + + private boolean inProcessing(UUID dnId, long deletedBlocksTxId, + Map> commandStatus) { + Map deletedBlocksTxStatus = commandStatus.get(dnId); + return deletedBlocksTxStatus != null && + deletedBlocksTxStatus.get(deletedBlocksTxId) != null; + } + + private void processSCMCommandStatus(List deleteBlockStatus, + UUID dnID) { + Map lastStatus = new HashMap<>(); + Map summary = new HashMap<>(); + + // The CommandStatus is ordered in the report. So we can focus only on the + // last status in the command report. + deleteBlockStatus.forEach(cmdStatus -> { + lastStatus.put(cmdStatus.getCmdId(), cmdStatus); + summary.put(cmdStatus.getCmdId(), cmdStatus.getStatus()); + }); + LOG.debug("CommandStatus {} from Datanode {} ", summary, dnID); + for (Map.Entry entry : lastStatus.entrySet()) { + CommandStatus.Status status = entry.getValue().getStatus(); + scmDeleteBlocksCommandStatusManager.updateStatusByDNCommandStatus( + dnID, entry.getKey(), status); + } + } + + private boolean isTransactionFailed(DeleteBlockTransactionResult result) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Got block deletion ACK from datanode, TXIDs={}, " + "success={}", + result.getTxID(), result.getSuccess()); + } + if (!result.getSuccess()) { + LOG.warn("Got failed ACK for TXID={}, prepare to resend the " + + "TX in next interval", result.getTxID()); + return true; + } + return false; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index d43311265d7..5d737659ddc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.command; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.HddsIdFactory; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -31,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -54,32 +57,43 @@ public void onMessage(CommandStatusReportFromDatanode report, } // Route command status to its watchers. + List deleteBlocksCommandStatus = new ArrayList<>(); cmdStatusList.forEach(cmdStatus -> { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus .getCmdId(), cmdStatus.getType()); } if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) { - publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, - new DeleteBlockStatus(cmdStatus)); + deleteBlocksCommandStatus.add(cmdStatus); } else { LOGGER.debug("CommandStatus of type:{} not handled in " + "CommandStatusReportHandler.", cmdStatus.getType()); } }); + + /** + * The purpose of aggregating all CommandStatus to commit is to reduce the + * Thread switching. When the Datanode queue has a large number of commands + * , there will have many {@link CommandStatus#Status#PENDING} status + * CommandStatus in report + */ + if (!deleteBlocksCommandStatus.isEmpty()) { + publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus( + deleteBlocksCommandStatus, report.getDatanodeDetails())); + } } /** * Wrapper event for CommandStatus. */ public static class CommandStatusEvent implements IdentifiableEventPayload { - private CommandStatus cmdStatus; + private final List cmdStatus; - CommandStatusEvent(CommandStatus cmdStatus) { + CommandStatusEvent(List cmdStatus) { this.cmdStatus = cmdStatus; } - public CommandStatus getCmdStatus() { + public List getCmdStatus() { return cmdStatus; } @@ -90,7 +104,7 @@ public String toString() { @Override public long getId() { - return cmdStatus.getCmdId(); + return HddsIdFactory.getLongId(); } } @@ -98,8 +112,16 @@ public long getId() { * Wrapper event for DeleteBlock Command. */ public static class DeleteBlockStatus extends CommandStatusEvent { - public DeleteBlockStatus(CommandStatus cmdStatus) { + private final DatanodeDetails datanodeDetails; + + public DeleteBlockStatus(List cmdStatus, + DatanodeDetails datanodeDetails) { super(cmdStatus); + this.datanodeDetails = datanodeDetails; + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java index f05eb761d91..3c40437d7f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; import org.apache.hadoop.hdds.scm.container.ContainerException; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -41,6 +42,8 @@ import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER; /** @@ -51,6 +54,8 @@ public class DeadNodeHandler implements EventHandler { private final NodeManager nodeManager; private final PipelineManager pipelineManager; private final ContainerManager containerManager; + @Nullable + private final DeletedBlockLog deletedBlockLog; private static final Logger LOG = LoggerFactory.getLogger(DeadNodeHandler.class); @@ -58,9 +63,17 @@ public class DeadNodeHandler implements EventHandler { public DeadNodeHandler(final NodeManager nodeManager, final PipelineManager pipelineManager, final ContainerManager containerManager) { + this(nodeManager, pipelineManager, containerManager, null); + } + + public DeadNodeHandler(final NodeManager nodeManager, + final PipelineManager pipelineManager, + final ContainerManager containerManager, + @Nullable final DeletedBlockLog deletedBlockLog) { this.nodeManager = nodeManager; this.pipelineManager = pipelineManager; this.containerManager = containerManager; + this.deletedBlockLog = deletedBlockLog; } @Override @@ -95,6 +108,13 @@ public void onMessage(final DatanodeDetails datanodeDetails, LOG.info("Clearing command queue of size {} for DN {}", cmdList.size(), datanodeDetails); + // remove DeleteBlocksCommand associated with the dead node unless it + // is IN_MAINTENANCE + if (deletedBlockLog != null && + !nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) { + deletedBlockLog.onDatanodeDead(datanodeDetails.getUuid()); + } + //move dead datanode out of ClusterNetworkTopology NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap(); if (nt.contains(datanodeDetails)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 011b361d629..399a7ef952e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -47,6 +47,7 @@ import java.util.Set; import java.util.UUID; import java.util.Collection; +import java.util.function.BiConsumer; /** * A node manager supports a simple interface for managing a datanode. @@ -90,6 +91,18 @@ default RegisteredCommand register( defaultLayoutVersionProto()); } + /** + * Register a SendCommandNotify handler for a specific type of SCMCommand. + * @param type The type of the SCMCommand. + * @param scmCommand A BiConsumer that takes a DatanodeDetails and a + * SCMCommand object and performs the necessary actions. + * @return whatever the regular register command returns with default + * layout version passed in. + */ + default void registerSendCommandNotify(SCMCommandProto.Type type, + BiConsumer> scmCommand) { + } + /** * Gets all Live Datanodes that are currently communicating with SCM. * @param nodeStatus - Status of the node to return diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 167b25afd01..972c061d5d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -84,6 +84,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -128,6 +129,8 @@ public class SCMNodeManager implements NodeManager { private final HDDSLayoutVersionManager scmLayoutVersionManager; private final EventPublisher scmNodeEventPublisher; private final SCMContext scmContext; + private final Map>> sendCommandNotifyMap; /** * Lock used to synchronize some operation in Node manager to ensure a @@ -179,6 +182,13 @@ public SCMNodeManager( String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit); this.scmContext = scmContext; + this.sendCommandNotifyMap = new HashMap<>(); + } + + @Override + public void registerSendCommandNotify(SCMCommandProto.Type type, + BiConsumer> scmCommand) { + this.sendCommandNotifyMap.put(type, scmCommand); } private void registerMXBean() { @@ -521,6 +531,15 @@ public List processHeartbeat(DatanodeDetails datanodeDetails, commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid()); List commands = commandQueue.getCommand(datanodeDetails.getUuid()); + + // Update the SCMCommand of deleteBlocksCommand Status + for (SCMCommand command : commands) { + if (sendCommandNotifyMap.get(command.getType()) != null) { + sendCommandNotifyMap.get(command.getType()) + .accept(datanodeDetails, command); + } + } + if (queueReport != null) { processNodeCommandQueueReport(datanodeDetails, queueReport, summary); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 722244d4c13..c000514ed33 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.PipelineChoosePolicy; import org.apache.hadoop.hdds.scm.PlacementPolicy; @@ -577,6 +578,9 @@ private void initializeEventHandlers() { eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler); + scmNodeManager.registerSendCommandNotify( + SCMCommandProto.Type.deleteBlocksCommand, + scmBlockManager.getDeletedBlockLog()::onSent); } private void initializeCertificateClient() throws IOException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index e1d15146d01..987b1ddbb90 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -49,6 +51,9 @@ .DeleteBlockTransactionResult; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -62,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -71,6 +77,7 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys @@ -255,7 +262,7 @@ private void commitTransactions( List transactionResults, DatanodeDetails... dns) throws IOException { for (DatanodeDetails dnDetails : dns) { - deletedBlockLog + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager() .commitTransactions(transactionResults, dnDetails.getUuid()); } scmHADBTransactionBuffer.flush(); @@ -284,15 +291,6 @@ private void commitTransactions( .collect(Collectors.toList())); } - private void commitTransactions(DatanodeDeletedBlockTransactions - transactions) { - transactions.getDatanodeTransactionMap().forEach((uuid, - deletedBlocksTransactions) -> deletedBlockLog - .commitTransactions(deletedBlocksTransactions.stream() - .map(this::createDeleteBlockTransactionResult) - .collect(Collectors.toList()), uuid)); - } - private DeleteBlockTransactionResult createDeleteBlockTransactionResult( DeletedBlocksTransaction transaction) { return DeleteBlockTransactionResult.newBuilder() @@ -315,6 +313,13 @@ private List getTransactions( transactions.getDatanodeTransactionMap().get(dn.getUuid())) .orElseGet(LinkedList::new)); } + // Simulated transactions are sent + for (Map.Entry> entry : + transactions.getDatanodeTransactionMap().entrySet()) { + DeleteBlocksCommand command = new DeleteBlocksCommand(entry.getValue()); + recordScmCommandToStatusManager(entry.getKey(), command); + sendSCMDeleteBlocksCommand(entry.getKey(), command); + } return txns; } @@ -431,6 +436,9 @@ public void testResetCount() throws Exception { } // Increment for the reset transactions. + // Lets the SCM delete the transaction and wait for the DN reply + // to timeout, thus allowing the transaction to resend the + deletedBlockLog.setScmCommandTimeoutMs(-1L); incrementCount(txIDs); blocks = getAllTransactions(); for (DeletedBlocksTransaction block : blocks) { @@ -442,6 +450,7 @@ public void testResetCount() throws Exception { @Test public void testCommitTransactions() throws Exception { + deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE); addTransactions(generateData(50), true); mockContainerHealthResult(true); List blocks = @@ -458,6 +467,12 @@ public void testCommitTransactions() throws Exception { DatanodeDetails.newBuilder().setUuid(UUID.randomUUID()) .build()); + blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE); + // SCM will not repeat a transaction until it has timed out. + Assertions.assertEquals(0, blocks.size()); + // Lets the SCM delete the transaction and wait for the DN reply + // to timeout, thus allowing the transaction to resend the + deletedBlockLog.setScmCommandTimeoutMs(-1L); blocks = getTransactions(50 * BLOCKS_PER_TXN * THREE); // only uncommitted dn have transactions Assertions.assertEquals(30, blocks.size()); @@ -467,6 +482,173 @@ public void testCommitTransactions() throws Exception { Assertions.assertEquals(0, blocks.size()); } + private void recordScmCommandToStatusManager( + UUID dnId, DeleteBlocksCommand command) { + Set dnTxSet = command.blocksTobeDeleted() + .stream().map(DeletedBlocksTransaction::getTxID) + .collect(Collectors.toSet()); + deletedBlockLog.recordTransactionCreated(dnId, command.getId(), dnTxSet); + } + + private void sendSCMDeleteBlocksCommand(UUID dnId, SCMCommand scmCommand) { + deletedBlockLog.onSent( + DatanodeDetails.newBuilder().setUuid(dnId).build(), scmCommand); + } + + private void assertNoDuplicateTransactions( + DatanodeDeletedBlockTransactions transactions1, + DatanodeDeletedBlockTransactions transactions2) { + Map> map1 = + transactions1.getDatanodeTransactionMap(); + Map> map2 = + transactions2.getDatanodeTransactionMap(); + + for (Map.Entry> entry : + map1.entrySet()) { + UUID dnId = entry.getKey(); + Set txSet1 = new HashSet<>(entry.getValue()); + Set txSet2 = new HashSet<>(map2.get(dnId)); + + txSet1.retainAll(txSet2); + Assertions.assertEquals(0, txSet1.size(), + String.format("Duplicate Transactions found first transactions %s " + + "second transactions %s for Dn %s", txSet1, txSet2, dnId)); + } + } + + + private void assertContainsAllTransactions( + DatanodeDeletedBlockTransactions transactions1, + DatanodeDeletedBlockTransactions transactions2) { + Map> map1 = + transactions1.getDatanodeTransactionMap(); + Map> map2 = + transactions2.getDatanodeTransactionMap(); + + for (Map.Entry> entry : + map1.entrySet()) { + UUID dnId = entry.getKey(); + Set txSet1 = new HashSet<>(entry.getValue()); + Set txSet2 = new HashSet<>(map2.get(dnId)); + + Assertions.assertTrue(txSet1.containsAll(txSet2)); + } + } + + private void commitSCMCommandStatus(Long scmCmdId, UUID dnID, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status status) { + List deleteBlockStatus = new ArrayList<>(); + deleteBlockStatus.add(CommandStatus.CommandStatusBuilder.newBuilder() + .setCmdId(scmCmdId) + .setType(Type.deleteBlocksCommand) + .setStatus(status) + .build() + .getProtoBufMessage()); + + deletedBlockLog.getSCMDeletedBlockTransactionStatusManager() + .commitSCMCommandStatus(deleteBlockStatus, dnID); + } + + private void createDeleteBlocksCommandAndAction( + DatanodeDeletedBlockTransactions transactions, + BiConsumer afterCreate) { + for (Map.Entry> entry : + transactions.getDatanodeTransactionMap().entrySet()) { + UUID dnId = entry.getKey(); + List dnTXs = entry.getValue(); + DeleteBlocksCommand command = new DeleteBlocksCommand(dnTXs); + afterCreate.accept(dnId, command); + } + } + + @Test + public void testNoDuplicateTransactionsForInProcessingSCMCommand() + throws Exception { + // The SCM will not resend these transactions in blow case: + // - If the command has not been sent; + // - The DN does not report the status of the command via heartbeat + // After the command is sent; + // - If the DN reports the command status as PENDING; + addTransactions(generateData(10), true); + int blockLimit = 2 * BLOCKS_PER_TXN * THREE; + mockContainerHealthResult(true); + + // If the command has not been sent + DatanodeDeletedBlockTransactions transactions1 = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + createDeleteBlocksCommandAndAction(transactions1, + this::recordScmCommandToStatusManager); + + // - The DN does not report the status of the command via heartbeat + // After the command is sent + DatanodeDeletedBlockTransactions transactions2 = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + assertNoDuplicateTransactions(transactions1, transactions2); + createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> { + recordScmCommandToStatusManager(dnId, command); + sendSCMDeleteBlocksCommand(dnId, command); + }); + + // - If the DN reports the command status as PENDING + DatanodeDeletedBlockTransactions transactions3 = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + assertNoDuplicateTransactions(transactions1, transactions3); + createDeleteBlocksCommandAndAction(transactions3, (dnId, command) -> { + recordScmCommandToStatusManager(dnId, command); + sendSCMDeleteBlocksCommand(dnId, command); + commitSCMCommandStatus(command.getId(), dnId, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + }); + assertNoDuplicateTransactions(transactions3, transactions1); + assertNoDuplicateTransactions(transactions3, transactions2); + + DatanodeDeletedBlockTransactions transactions4 = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + assertNoDuplicateTransactions(transactions4, transactions1); + assertNoDuplicateTransactions(transactions4, transactions2); + assertNoDuplicateTransactions(transactions4, transactions3); + } + + @Test + public void testFailedAndTimeoutSCMCommandCanBeResend() throws Exception { + // The SCM will be resent these transactions in blow case: + // - Executed failed commands; + // - DN does not refresh the PENDING state for more than a period of time; + deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE); + addTransactions(generateData(10), true); + int blockLimit = 2 * BLOCKS_PER_TXN * THREE; + mockContainerHealthResult(true); + + // - DN does not refresh the PENDING state for more than a period of time; + DatanodeDeletedBlockTransactions transactions = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + createDeleteBlocksCommandAndAction(transactions, (dnId, command) -> { + recordScmCommandToStatusManager(dnId, command); + sendSCMDeleteBlocksCommand(dnId, command); + commitSCMCommandStatus(command.getId(), dnId, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + }); + + // - Executed failed commands; + DatanodeDeletedBlockTransactions transactions2 = + deletedBlockLog.getTransactions(blockLimit, new HashSet<>(dnList)); + createDeleteBlocksCommandAndAction(transactions2, (dnId, command) -> { + recordScmCommandToStatusManager(dnId, command); + sendSCMDeleteBlocksCommand(dnId, command); + commitSCMCommandStatus(command.getId(), dnId, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED); + }); + + deletedBlockLog.setScmCommandTimeoutMs(-1L); + DatanodeDeletedBlockTransactions transactions3 = + deletedBlockLog.getTransactions(Integer.MAX_VALUE, + new HashSet<>(dnList)); + assertNoDuplicateTransactions(transactions, transactions2); + assertContainsAllTransactions(transactions3, transactions); + assertContainsAllTransactions(transactions3, transactions2); + } + @Test public void testDNOnlyOneNodeHealthy() throws Exception { Map> deletedBlocks = generateData(50); @@ -496,9 +678,7 @@ public void testInadequateReplicaCommit() throws Exception { // For the first 30 txn, deletedBlockLog only has the txn from dn1 and dn2 // For the rest txn, txn will be got from all dns. // Committed txn will be: 1-40. 1-40. 31-40 - commitTransactions(deletedBlockLog.getTransactions( - 30 * BLOCKS_PER_TXN * THREE, - dnList.stream().collect(Collectors.toSet()))); + commitTransactions(getTransactions(30 * BLOCKS_PER_TXN * THREE)); // The rest txn shall be: 41-50. 41-50. 41-50 List blocks = getAllTransactions(); @@ -590,6 +770,7 @@ public void testPersistence() throws Exception { @Test public void testDeletedBlockTransactions() throws IOException, TimeoutException { + deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE); mockContainerHealthResult(true); int txNum = 10; List blocks; @@ -622,9 +803,21 @@ public void testDeletedBlockTransactions() // add two transactions for same container containerID = blocks.get(0).getContainerID(); Map> deletedBlocksMap = new HashMap<>(); - deletedBlocksMap.put(containerID, new LinkedList<>()); + Random random = new Random(); + long localId = random.nextLong(); + deletedBlocksMap.put(containerID, new LinkedList<>( + Collections.singletonList(localId))); addTransactions(deletedBlocksMap, true); + blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE); + // Only newly added Blocks will be sent, as previously sent transactions + // that have not yet timed out will not be sent. + Assertions.assertEquals(1, blocks.size()); + Assertions.assertEquals(1, blocks.get(0).getLocalIDCount()); + Assertions.assertEquals(blocks.get(0).getLocalID(0), localId); + // Lets the SCM delete the transaction and wait for the DN reply + // to timeout, thus allowing the transaction to resend the + deletedBlockLog.setScmCommandTimeoutMs(-1L); // get should return two transactions for the same container blocks = getTransactions(txNum * BLOCKS_PER_TXN * ONE); Assertions.assertEquals(2, blocks.size()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java new file mode 100644 index 00000000000..888cb42fd7d --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMDeleteBlocksCommandStatusManager.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT; +import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatusData; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * A test for SCMDeleteBlocksCommandStatusManager. + */ +public class TestSCMDeleteBlocksCommandStatusManager { + + private SCMDeleteBlocksCommandStatusManager manager; + private UUID dnId1; + private UUID dnId2; + private long scmCmdId1; + private long scmCmdId2; + private long scmCmdId3; + private long scmCmdId4; + private Set deletedBlocksTxIds1; + private Set deletedBlocksTxIds2; + private Set deletedBlocksTxIds3; + private Set deletedBlocksTxIds4; + + @BeforeEach + public void setup() throws Exception { + manager = new SCMDeleteBlocksCommandStatusManager(); + // Create test data + dnId1 = UUID.randomUUID(); + dnId2 = UUID.randomUUID(); + scmCmdId1 = 1L; + scmCmdId2 = 2L; + scmCmdId3 = 3L; + scmCmdId4 = 4L; + deletedBlocksTxIds1 = new HashSet<>(); + deletedBlocksTxIds1.add(100L); + deletedBlocksTxIds2 = new HashSet<>(); + deletedBlocksTxIds2.add(200L); + deletedBlocksTxIds3 = new HashSet<>(); + deletedBlocksTxIds3.add(300L); + deletedBlocksTxIds4 = new HashSet<>(); + deletedBlocksTxIds4.add(400L); + } + + @Test + public void testRecordScmCommand() { + CmdStatusData statusData = + SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData( + dnId1, scmCmdId1, deletedBlocksTxIds1); + + manager.recordScmCommand(statusData); + + assertNotNull(manager.getScmCmdStatusRecord().get(dnId1)); + assertEquals(1, manager.getScmCmdStatusRecord().get(dnId1).size()); + CmdStatusData cmdStatusData = + manager.getScmCmdStatusRecord().get(dnId1).get(scmCmdId1); + assertNotNull(cmdStatusData); + assertEquals(dnId1, statusData.getDnId()); + assertEquals(scmCmdId1, statusData.getScmCmdId()); + assertEquals(deletedBlocksTxIds1, statusData.getDeletedBlocksTxIds()); + // The default status is `CmdStatus.TO_BE_SENT` + assertEquals(TO_BE_SENT, statusData.getStatus()); + } + + @Test + public void testOnSent() { + CmdStatusData statusData = + SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData( + dnId1, scmCmdId1, deletedBlocksTxIds1); + manager.recordScmCommand(statusData); + + Map dnStatusRecord = + manager.getScmCmdStatusRecord().get(dnId1); + // After the Command is sent by SCM, the status of the Command + // will change from TO_BE_SENT to SENT + assertEquals(TO_BE_SENT, dnStatusRecord.get(scmCmdId1).getStatus()); + manager.onSent(dnId1, scmCmdId1); + assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus()); + } + + @Test + public void testUpdateStatusByDNCommandStatus() { + // Test all Status update by Datanode Heartbeat report. + // SENT -> PENDING_EXECUTED: The DeleteBlocksCommand is sent and received + // by the Datanode, but the command is not executed by the Datanode, + // the command is waiting to be executed. + + // SENT -> NEED_RESEND: The DeleteBlocksCommand is sent and lost before + // it is received by the DN. + // SENT -> EXECUTED: The DeleteBlocksCommand has been sent to Datanode, + // executed by DN, and executed successfully. + // + // PENDING_EXECUTED -> PENDING_EXECUTED: The DeleteBlocksCommand continues + // to wait to be executed by Datanode. + // PENDING_EXECUTED -> NEED_RESEND: The DeleteBlocksCommand waited for a + // while and was executed, but the execution failed; Or the + // DeleteBlocksCommand was lost while waiting(such as the Datanode restart) + // + // PENDING_EXECUTED -> EXECUTED: The Command waits for a period of + // time on the DN and is executed successfully. + + recordAndSentCommand(manager, dnId1, + Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4), + Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2, + deletedBlocksTxIds3, deletedBlocksTxIds4)); + + Map dnStatusRecord = + manager.getScmCmdStatusRecord().get(dnId1); + assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus()); + assertEquals(SENT, dnStatusRecord.get(scmCmdId2).getStatus()); + assertEquals(SENT, dnStatusRecord.get(scmCmdId3).getStatus()); + assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus()); + + // SENT -> PENDING_EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + // SENT -> EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED); + // SENT -> NEED_RESEND + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED); + // SENT -> PENDING_EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId4, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + + assertEquals(SENT, dnStatusRecord.get(scmCmdId1).getStatus()); + assertNull(dnStatusRecord.get(scmCmdId2)); + assertNull(dnStatusRecord.get(scmCmdId3)); + assertEquals(SENT, dnStatusRecord.get(scmCmdId4).getStatus()); + } + + @Test + public void testCleanSCMCommandForDn() { + // Transactions in states EXECUTED and NEED_RESEND will be cleaned up + // directly, while transactions in states PENDING_EXECUTED and SENT + // will be cleaned up after timeout + recordAndSentCommand(manager, dnId1, + Arrays.asList(scmCmdId1, scmCmdId2, scmCmdId3, scmCmdId4), + Arrays.asList(deletedBlocksTxIds1, deletedBlocksTxIds2, + deletedBlocksTxIds3, deletedBlocksTxIds4)); + + // SENT -> PENDING_EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + // SENT -> EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId2, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED); + // SENT -> NEED_RESEND + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId3, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED); + + Map dnStatusRecord = + manager.getScmCmdStatusRecord().get(dnId1); + assertNotNull(dnStatusRecord.get(scmCmdId1)); + assertNull(dnStatusRecord.get(scmCmdId2)); + assertNull(dnStatusRecord.get(scmCmdId3)); + assertNotNull(dnStatusRecord.get(scmCmdId4)); + + manager.cleanTimeoutSCMCommand(dnId1, Long.MAX_VALUE); + + // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout + assertNotNull(dnStatusRecord.get(scmCmdId1)); + assertNull(dnStatusRecord.get(scmCmdId3)); + assertNull(dnStatusRecord.get(scmCmdId2)); + // scmCmdId4 is SENT will be cleaned up after timeout + assertNotNull(dnStatusRecord.get(scmCmdId4)); + + manager.cleanTimeoutSCMCommand(dnId1, -1); + assertNull(dnStatusRecord.get(scmCmdId1)); + assertNull(dnStatusRecord.get(scmCmdId4)); + } + + @Test + public void testCleanAllTimeoutSCMCommand() { + // Test All EXECUTED and NEED_RESEND status in the DN will be cleaned up + + // Transactions in states EXECUTED and NEED_RESEND will be cleaned up + // directly, while transactions in states PENDING_EXECUTED and SENT + // will be cleaned up after timeout + recordAndSentCommand(manager, dnId1, Arrays.asList(scmCmdId1), + Arrays.asList(deletedBlocksTxIds1)); + recordAndSentCommand(manager, dnId2, Arrays.asList(scmCmdId2), + Arrays.asList(deletedBlocksTxIds2)); + + Map dn1StatusRecord = + manager.getScmCmdStatusRecord().get(dnId1); + Map dn2StatusRecord = + manager.getScmCmdStatusRecord().get(dnId2); + + // Only let the scmCmdId1 have a Heartbeat report, its status will be + // updated, the scmCmdId2 still in SENT status. + // SENT -> PENDING_EXECUTED + manager.updateStatusByDNCommandStatus(dnId1, scmCmdId1, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING); + + manager.cleanAllTimeoutSCMCommand(Long.MAX_VALUE); + // scmCmdId1 is PENDING_EXECUTED will be cleaned up after timeout + assertNotNull(dn1StatusRecord.get(scmCmdId1)); + assertNotNull(dn2StatusRecord.get(scmCmdId2)); + + // scmCmdId2 is SENT will be cleaned up after timeout + manager.cleanAllTimeoutSCMCommand(-1); + assertNull(dn1StatusRecord.get(scmCmdId1)); + assertNull(dn2StatusRecord.get(scmCmdId2)); + + } + + private void recordAndSentCommand( + SCMDeleteBlocksCommandStatusManager statusManager, + UUID dnId, List scmCmdIds, List> txIds) { + assertEquals(scmCmdIds.size(), txIds.size()); + for (int i = 0; i < scmCmdIds.size(); i++) { + long scmCmdId = scmCmdIds.get(i); + Set deletedBlocksTxIds = txIds.get(i); + CmdStatusData statusData = + SCMDeleteBlocksCommandStatusManager.createScmCmdStatusData( + dnId, scmCmdId, deletedBlocksTxIds); + statusManager.recordScmCommand(statusData); + statusManager.onSent(dnId, scmCmdId); + } + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java index 168fdd11a57..0f65cdb1087 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java @@ -47,6 +47,7 @@ .StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLog; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; @@ -90,6 +91,7 @@ public class TestDeadNodeHandler { private EventQueue eventQueue; private String storageDir; private SCMContext scmContext; + private DeletedBlockLog deletedBlockLog; @BeforeEach public void setup() throws IOException, AuthenticationException { @@ -117,8 +119,9 @@ public void setup() throws IOException, AuthenticationException { pipelineManager.setPipelineProvider(RATIS, mockRatisProvider); containerManager = scm.getContainerManager(); + deletedBlockLog = Mockito.mock(DeletedBlockLog.class); deadNodeHandler = new DeadNodeHandler(nodeManager, - Mockito.mock(PipelineManager.class), containerManager); + Mockito.mock(PipelineManager.class), containerManager, deletedBlockLog); healthyReadOnlyNodeHandler = new HealthyReadOnlyNodeHandler(nodeManager, pipelineManager); @@ -134,6 +137,7 @@ public void teardown() { } @Test + @SuppressWarnings("checkstyle:MethodLength") public void testOnMessage() throws Exception { //GIVEN DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails(); @@ -233,6 +237,9 @@ public void testOnMessage() throws Exception { Assertions.assertFalse( nodeManager.getClusterNetworkTopologyMap().contains(datanode1)); + Mockito.verify(deletedBlockLog, Mockito.times(0)) + .onDatanodeDead(datanode1.getUuid()); + Set container1Replicas = containerManager .getContainerReplicas(ContainerID.valueOf(container1.getContainerID())); Assertions.assertEquals(2, container1Replicas.size()); @@ -260,6 +267,9 @@ public void testOnMessage() throws Exception { Assertions.assertEquals(0, nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType())); + Mockito.verify(deletedBlockLog, Mockito.times(1)) + .onDatanodeDead(datanode1.getUuid()); + container1Replicas = containerManager .getContainerReplicas(ContainerID.valueOf(container1.getContainerID())); Assertions.assertEquals(1, container1Replicas.size()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index e3da551c3ed..a3decb0efb5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -954,16 +954,17 @@ scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), @Test public void testProcessCommandQueueReport() - throws IOException, NodeNotFoundException { + throws IOException, NodeNotFoundException, AuthenticationException { OzoneConfiguration conf = new OzoneConfiguration(); SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class); when(scmStorageConfig.getClusterID()).thenReturn("xyz111"); EventPublisher eventPublisher = mock(EventPublisher.class); HDDSLayoutVersionManager lvm = new HDDSLayoutVersionManager(scmStorageConfig.getLayoutVersion()); + createNodeManager(getConf()); SCMNodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), - SCMContext.emptyContext(), lvm); + scmContext, lvm); LayoutVersionProto layoutInfo = toLayoutVersionProto( lvm.getMetadataLayoutVersion(), lvm.getSoftwareLayoutVersion());