Skip to content

Commit

Permalink
HDDS-8888. Consider Datanode queue capacity when sending DeleteBlocks…
Browse files Browse the repository at this point in the history
… command (apache#4939)

(cherry picked from commit 43c9565)
  • Loading branch information
xichen01 committed Jul 17, 2024
1 parent 08bf39f commit 96d7864
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ public class DatanodeConfiguration extends ReconfigurableConfig {
defaultValue = "5",
tags = {DATANODE},
description = "The maximum number of block delete commands queued on " +
" a datanode"
" a datanode, This configuration is also used by the SCM to " +
"control whether to send delete commands to the DN. If the DN" +
" has more commands waiting in the queue than this value, " +
"the SCM will not send any new block delete commands. until the " +
"DN has processed some commands and the queue length is reduced."
)
private int blockDeleteQueueLimit = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class SCMBlockDeletingService extends BackgroundService

private long safemodeExitMillis = 0;
private final long safemodeExitRunDelayMillis;
private final long deleteBlocksPendingCommandLimit;
private final Clock clock;

@SuppressWarnings("parameternumber")
Expand All @@ -110,6 +113,9 @@ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
this.deleteBlocksPendingCommandLimit = dnConf.getBlockDeleteQueueLimit();
this.clock = clock;
this.deletedBlockLog = deletedBlockLog;
this.nodeManager = nodeManager;
Expand Down Expand Up @@ -155,13 +161,12 @@ public EmptyTaskResult call() throws Exception {
List<DatanodeDetails> datanodes =
nodeManager.getNodes(NodeStatus.inServiceHealthy());
if (datanodes != null) {
// When DN node is healthy and in-service, and previous commands
// are handled for deleteBlocks Type, then it will be considered
// in this iteration
final Set<DatanodeDetails> included = datanodes.stream().filter(
dn -> nodeManager.getCommandQueueCount(dn.getUuid(),
Type.deleteBlocksCommand) == 0).collect(Collectors.toSet());
try {
// When DN node is healthy and in-service, and their number of
// 'deleteBlocks' type commands is below the limit.
// These nodes will be considered for this iteration.
final Set<DatanodeDetails> included =
getDatanodesWithinCommandLimit(datanodes);
DatanodeDeletedBlockTransactions transactions =
deletedBlockLog.getTransactions(getBlockDeleteTXNum(), included);

Expand Down Expand Up @@ -205,15 +210,15 @@ public EmptyTaskResult call() throws Exception {
deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs));
} catch (NotLeaderException nle) {
LOG.warn("Skip current run, since not leader any more.", nle);
return EmptyTaskResult.newResult();
} catch (NodeNotFoundException e) {
LOG.error("Datanode not found in NodeManager. Should not happen", e);
} catch (IOException e) {
// We may tolerate a number of failures for sometime
// but if it continues to fail, at some point we need to raise
// an exception and probably fail the SCM ? At present, it simply
// continues to retry the scanning.
LOG.error("Failed to get block deletion transactions from delTX log",
e);
return EmptyTaskResult.newResult();
}
}

Expand Down Expand Up @@ -283,4 +288,24 @@ public void stop() {
public ScmBlockDeletingServiceMetrics getMetrics() {
return this.metrics;
}

/**
* Filters and returns a set of healthy datanodes that have not exceeded
* the deleteBlocksPendingCommandLimit.
*
* @param datanodes a list of DatanodeDetails
* @return a set of filtered DatanodeDetails
*/
@VisibleForTesting
protected Set<DatanodeDetails> getDatanodesWithinCommandLimit(
List<DatanodeDetails> datanodes) throws NodeNotFoundException {
final Set<DatanodeDetails> included = new HashSet<>();
for (DatanodeDetails dn : datanodes) {
if (nodeManager.getTotalDatanodeCommandCount(dn, Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit
&& nodeManager.getCommandQueueCount(dn.getUuid(), Type.deleteBlocksCommand) < 2) {
included.add(dn);
}
}
return included;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.block;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.time.Clock;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Test SCMBlockDeletingService.
*/
public class TestSCMBlockDeletingService {
private SCMBlockDeletingService service;
private EventPublisher eventPublisher;
private List<DatanodeDetails> datanodeDetails;
private OzoneConfiguration conf;
private NodeManager nodeManager;
private ScmBlockDeletingServiceMetrics metrics;

@BeforeEach
public void setup() throws Exception {
nodeManager = mock(NodeManager.class);
eventPublisher = mock(EventPublisher.class);
conf = new OzoneConfiguration();
metrics = ScmBlockDeletingServiceMetrics.create();
when(nodeManager.getTotalDatanodeCommandCount(any(),
any())).thenReturn(0);
SCMServiceManager scmServiceManager = mock(SCMServiceManager.class);
SCMContext scmContext = mock(SCMContext.class);

DatanodeDeletedBlockTransactions ddbt =
new DatanodeDeletedBlockTransactions();
DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails();
DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails();
DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails();
datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3);
when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn(
datanodeDetails);
DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L), 1);
ddbt.addTransactionToDN(datanode1.getUuid(), tx1);
ddbt.addTransactionToDN(datanode2.getUuid(), tx1);
ddbt.addTransactionToDN(datanode3.getUuid(), tx1);
DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class);
when(mockDeletedBlockLog.getTransactions(
anyInt(), anySet())).thenReturn(ddbt);

service = spy(new SCMBlockDeletingService(
mockDeletedBlockLog, nodeManager, eventPublisher, scmContext,
scmServiceManager, conf, conf.getObject(ScmConfig.class), metrics, Clock.system(
ZoneOffset.UTC), mock(ReconfigurationHandler.class)));
when(service.shouldRun()).thenReturn(true);
}

@AfterEach
public void stop() {
service.stop();
ScmBlockDeletingServiceMetrics.unRegister();
}

@Test
public void testCall() throws Exception {
callDeletedBlockTransactionScanner();

ArgumentCaptor<CommandForDatanode> argumentCaptor =
ArgumentCaptor.forClass(CommandForDatanode.class);

// Three Datanode is healthy and in-service, and the task queue is empty,
// so the transaction will send to all three Datanode
verify(eventPublisher, times(3)).fireEvent(
eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture());
List<CommandForDatanode> actualCommands = argumentCaptor.getAllValues();
List<UUID> actualDnIds = actualCommands.stream()
.map(CommandForDatanode::getDatanodeId)
.collect(Collectors.toList());
Set<UUID> expectedDnIdsSet = datanodeDetails.stream()
.map(DatanodeDetails::getUuid).collect(Collectors.toSet());

assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds));
assertEquals(datanodeDetails.size(),
metrics.getNumBlockDeletionCommandSent());
// Echo Command has one Transaction
assertEquals(datanodeDetails.size() * 1,
metrics.getNumBlockDeletionTransactionSent());
}

private void callDeletedBlockTransactionScanner() throws Exception {
service.getTasks().poll().call();
}

@Test
public void testLimitCommandSending() throws Exception {
DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit();

// The number of commands pending on all Datanodes has reached the limit.
when(nodeManager.getTotalDatanodeCommandCount(any(),
any())).thenReturn(pendingCommandLimit);
assertEquals(0,
service.getDatanodesWithinCommandLimit(datanodeDetails).size());

// The number of commands pending on all Datanodes is 0
when(nodeManager.getTotalDatanodeCommandCount(any(),
any())).thenReturn(0);
assertEquals(datanodeDetails.size(),
service.getDatanodesWithinCommandLimit(datanodeDetails).size());

// The number of commands pending on first Datanodes has reached the limit.
DatanodeDetails fullDatanode = datanodeDetails.get(0);
when(nodeManager.getTotalDatanodeCommandCount(fullDatanode,
Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit);
Set<DatanodeDetails> includeNodes =
service.getDatanodesWithinCommandLimit(datanodeDetails);
assertEquals(datanodeDetails.size() - 1,
includeNodes.size());
assertFalse(includeNodes.contains(fullDatanode));
}

private DeletedBlocksTransaction createTestDeleteTxn(
long txnID, List<Long> blocks, long containerID) {
return DeletedBlocksTransaction.newBuilder().setTxID(txnID)
.setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
}
}

0 comments on commit 96d7864

Please sign in to comment.