From 1b0e49e6aceb2a2017b1780adad4bd51ab1ae25d Mon Sep 17 00:00:00 2001 From: XiChen <32928346+xichen01@users.noreply.github.com> Date: Sun, 7 Jan 2024 05:38:30 +0800 Subject: [PATCH] HDDS-8888. Consider Datanode queue capacity when sending DeleteBlocks command (#4939) (cherry picked from commit 43c9565d9d0250af5694d10c02f7634b79206ab6) --- .../statemachine/DatanodeConfiguration.java | 6 +- .../scm/block/SCMBlockDeletingService.java | 41 +++- .../block/TestSCMBlockDeletingService.java | 177 ++++++++++++++++++ 3 files changed, 215 insertions(+), 9 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index d7a3e391c921..a8b0d8cfa4bc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -184,7 +184,11 @@ public class DatanodeConfiguration extends ReconfigurableConfig { defaultValue = "5", tags = {DATANODE}, description = "The maximum number of block delete commands queued on " + - " a datanode" + " a datanode, This configuration is also used by the SCM to " + + "control whether to send delete commands to the DN. If the DN" + + " has more commands waiting in the queue than this value, " + + "the SCM will not send any new block delete commands. until the " + + "DN has processed some commands and the queue length is reduced." ) private int blockDeleteQueueLimit = 5; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 8677baf33b85..7271d9dcba68 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -42,11 +42,13 @@ import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.util.Time; @@ -90,6 +92,7 @@ public class SCMBlockDeletingService extends BackgroundService private long safemodeExitMillis = 0; private final long safemodeExitRunDelayMillis; + private final long deleteBlocksPendingCommandLimit; private final Clock clock; @SuppressWarnings("parameternumber") @@ -110,6 +113,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, TimeUnit.MILLISECONDS); + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + this.deleteBlocksPendingCommandLimit = dnConf.getBlockDeleteQueueLimit(); this.clock = clock; this.deletedBlockLog = deletedBlockLog; this.nodeManager = nodeManager; @@ -155,13 +161,12 @@ public EmptyTaskResult call() throws Exception { List datanodes = nodeManager.getNodes(NodeStatus.inServiceHealthy()); if (datanodes != null) { - // When DN node is healthy and in-service, and previous commands - // are handled for deleteBlocks Type, then it will be considered - // in this iteration - final Set included = datanodes.stream().filter( - dn -> nodeManager.getCommandQueueCount(dn.getUuid(), - Type.deleteBlocksCommand) == 0).collect(Collectors.toSet()); try { + // When DN node is healthy and in-service, and their number of + // 'deleteBlocks' type commands is below the limit. + // These nodes will be considered for this iteration. + final Set included = + getDatanodesWithinCommandLimit(datanodes); DatanodeDeletedBlockTransactions transactions = deletedBlockLog.getTransactions(getBlockDeleteTXNum(), included); @@ -205,7 +210,8 @@ public EmptyTaskResult call() throws Exception { deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs)); } catch (NotLeaderException nle) { LOG.warn("Skip current run, since not leader any more.", nle); - return EmptyTaskResult.newResult(); + } catch (NodeNotFoundException e) { + LOG.error("Datanode not found in NodeManager. Should not happen", e); } catch (IOException e) { // We may tolerate a number of failures for sometime // but if it continues to fail, at some point we need to raise @@ -213,7 +219,6 @@ public EmptyTaskResult call() throws Exception { // continues to retry the scanning. LOG.error("Failed to get block deletion transactions from delTX log", e); - return EmptyTaskResult.newResult(); } } @@ -283,4 +288,24 @@ public void stop() { public ScmBlockDeletingServiceMetrics getMetrics() { return this.metrics; } + + /** + * Filters and returns a set of healthy datanodes that have not exceeded + * the deleteBlocksPendingCommandLimit. + * + * @param datanodes a list of DatanodeDetails + * @return a set of filtered DatanodeDetails + */ + @VisibleForTesting + protected Set getDatanodesWithinCommandLimit( + List datanodes) throws NodeNotFoundException { + final Set included = new HashSet<>(); + for (DatanodeDetails dn : datanodes) { + if (nodeManager.getTotalDatanodeCommandCount(dn, Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit + && nodeManager.getCommandQueueCount(dn.getUuid(), Type.deleteBlocksCommand) < 2) { + included.add(dn); + } + } + return included; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java new file mode 100644 index 000000000000..3bd7ad00f6a8 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.time.Clock; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test SCMBlockDeletingService. + */ +public class TestSCMBlockDeletingService { + private SCMBlockDeletingService service; + private EventPublisher eventPublisher; + private List datanodeDetails; + private OzoneConfiguration conf; + private NodeManager nodeManager; + private ScmBlockDeletingServiceMetrics metrics; + + @BeforeEach + public void setup() throws Exception { + nodeManager = mock(NodeManager.class); + eventPublisher = mock(EventPublisher.class); + conf = new OzoneConfiguration(); + metrics = ScmBlockDeletingServiceMetrics.create(); + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(0); + SCMServiceManager scmServiceManager = mock(SCMServiceManager.class); + SCMContext scmContext = mock(SCMContext.class); + + DatanodeDeletedBlockTransactions ddbt = + new DatanodeDeletedBlockTransactions(); + DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails(); + datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3); + when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn( + datanodeDetails); + DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L), 1); + ddbt.addTransactionToDN(datanode1.getUuid(), tx1); + ddbt.addTransactionToDN(datanode2.getUuid(), tx1); + ddbt.addTransactionToDN(datanode3.getUuid(), tx1); + DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class); + when(mockDeletedBlockLog.getTransactions( + anyInt(), anySet())).thenReturn(ddbt); + + service = spy(new SCMBlockDeletingService( + mockDeletedBlockLog, nodeManager, eventPublisher, scmContext, + scmServiceManager, conf, conf.getObject(ScmConfig.class), metrics, Clock.system( + ZoneOffset.UTC), mock(ReconfigurationHandler.class))); + when(service.shouldRun()).thenReturn(true); + } + + @AfterEach + public void stop() { + service.stop(); + ScmBlockDeletingServiceMetrics.unRegister(); + } + + @Test + public void testCall() throws Exception { + callDeletedBlockTransactionScanner(); + + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(CommandForDatanode.class); + + // Three Datanode is healthy and in-service, and the task queue is empty, + // so the transaction will send to all three Datanode + verify(eventPublisher, times(3)).fireEvent( + eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture()); + List actualCommands = argumentCaptor.getAllValues(); + List actualDnIds = actualCommands.stream() + .map(CommandForDatanode::getDatanodeId) + .collect(Collectors.toList()); + Set expectedDnIdsSet = datanodeDetails.stream() + .map(DatanodeDetails::getUuid).collect(Collectors.toSet()); + + assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds)); + assertEquals(datanodeDetails.size(), + metrics.getNumBlockDeletionCommandSent()); + // Echo Command has one Transaction + assertEquals(datanodeDetails.size() * 1, + metrics.getNumBlockDeletionTransactionSent()); + } + + private void callDeletedBlockTransactionScanner() throws Exception { + service.getTasks().poll().call(); + } + + @Test + public void testLimitCommandSending() throws Exception { + DatanodeConfiguration dnConf = + conf.getObject(DatanodeConfiguration.class); + int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit(); + + // The number of commands pending on all Datanodes has reached the limit. + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(pendingCommandLimit); + assertEquals(0, + service.getDatanodesWithinCommandLimit(datanodeDetails).size()); + + // The number of commands pending on all Datanodes is 0 + when(nodeManager.getTotalDatanodeCommandCount(any(), + any())).thenReturn(0); + assertEquals(datanodeDetails.size(), + service.getDatanodesWithinCommandLimit(datanodeDetails).size()); + + // The number of commands pending on first Datanodes has reached the limit. + DatanodeDetails fullDatanode = datanodeDetails.get(0); + when(nodeManager.getTotalDatanodeCommandCount(fullDatanode, + Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit); + Set includeNodes = + service.getDatanodesWithinCommandLimit(datanodeDetails); + assertEquals(datanodeDetails.size() - 1, + includeNodes.size()); + assertFalse(includeNodes.contains(fullDatanode)); + } + + private DeletedBlocksTransaction createTestDeleteTxn( + long txnID, List blocks, long containerID) { + return DeletedBlocksTransaction.newBuilder().setTxID(txnID) + .setContainerID(containerID).addAllLocalID(blocks).setCount(0).build(); + } +}