Skip to content

Commit

Permalink
HDDS-9645. Recon should exclude out-of-service nodes when checking fo…
Browse files Browse the repository at this point in the history
…r healthy containers (apache#5651)
  • Loading branch information
xBis7 authored Jan 27, 2024
1 parent 79d3c87 commit 397f62f
Show file tree
Hide file tree
Showing 13 changed files with 1,138 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort;
import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachHealthState;
import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState;
import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachPersistedOpState;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -209,7 +213,7 @@ public void testNodeWithOpenPipelineCanBeDecommissionedAndRecommissioned()
scmClient.decommissionNodes(Arrays.asList(
getDNHostAndPort(toDecommission)));

waitForDnToReachOpState(toDecommission, DECOMMISSIONED);
waitForDnToReachOpState(nm, toDecommission, DECOMMISSIONED);
// Ensure one node transitioned to DECOMMISSIONING
List<DatanodeDetails> decomNodes = nm.getNodes(
DECOMMISSIONED,
Expand All @@ -225,7 +229,7 @@ public void testNodeWithOpenPipelineCanBeDecommissionedAndRecommissioned()
// Stop the decommissioned DN
int dnIndex = cluster.getHddsDatanodeIndex(toDecommission);
cluster.shutdownHddsDatanode(toDecommission);
waitForDnToReachHealthState(toDecommission, DEAD);
waitForDnToReachHealthState(nm, toDecommission, DEAD);

// Now the decommissioned node is dead, we should have
// 3 replicas for the tracked container.
Expand All @@ -236,7 +240,7 @@ public void testNodeWithOpenPipelineCanBeDecommissionedAndRecommissioned()
cluster.restartHddsDatanode(dnIndex, true);
scmClient.recommissionNodes(Arrays.asList(
getDNHostAndPort(toDecommission)));
waitForDnToReachOpState(toDecommission, IN_SERVICE);
waitForDnToReachOpState(nm, toDecommission, IN_SERVICE);
waitForDnToReachPersistedOpState(toDecommission, IN_SERVICE);
}

Expand Down Expand Up @@ -272,7 +276,7 @@ public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
// After the SCM restart, the DN should report as DECOMMISSIONING, then
// it should re-enter the decommission workflow and move to DECOMMISSIONED
DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString());
waitForDnToReachOpState(newDn, DECOMMISSIONED);
waitForDnToReachOpState(nm, newDn, DECOMMISSIONED);
waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);

// Now the node is decommissioned, so restart SCM again
Expand All @@ -282,7 +286,7 @@ public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()

// On initial registration, the DN should report its operational state
// and if it is decommissioned, that should be updated in the NodeStatus
waitForDnToReachOpState(newDn, DECOMMISSIONED);
waitForDnToReachOpState(nm, newDn, DECOMMISSIONED);
// Also confirm the datanodeDetails correctly reflect the operational
// state.
waitForDnToReachPersistedOpState(newDn, DECOMMISSIONED);
Expand All @@ -291,7 +295,7 @@ public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
// reflect the state of in SCM, in IN_SERVICE.
int dnIndex = cluster.getHddsDatanodeIndex(dn);
cluster.shutdownHddsDatanode(dnIndex);
waitForDnToReachHealthState(dn, DEAD);
waitForDnToReachHealthState(nm, dn, DEAD);
// Datanode is shutdown and dead. Now recommission it in SCM
scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
// Now restart it and ensure it remains IN_SERVICE
Expand All @@ -301,8 +305,8 @@ public void testDecommissioningNodesCompleteDecommissionOnSCMRestart()
// As this is not an initial registration since SCM was started, the DN
// should report its operational state and if it differs from what SCM
// has, then the SCM state should be used and the DN state updated.
waitForDnToReachHealthState(newDn, HEALTHY);
waitForDnToReachOpState(newDn, IN_SERVICE);
waitForDnToReachHealthState(nm, newDn, HEALTHY);
waitForDnToReachOpState(nm, newDn, IN_SERVICE);
waitForDnToReachPersistedOpState(newDn, IN_SERVICE);
}

Expand Down Expand Up @@ -342,7 +346,7 @@ public void testSingleNodeWithOpenPipelineCanGotoMaintenance()
scmClient.startMaintenanceNodes(Arrays.asList(
getDNHostAndPort(dn)), 0);

waitForDnToReachOpState(dn, IN_MAINTENANCE);
waitForDnToReachOpState(nm, dn, IN_MAINTENANCE);
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);

// Should still be 3 replicas online as no replication should happen for
Expand All @@ -356,7 +360,7 @@ public void testSingleNodeWithOpenPipelineCanGotoMaintenance()

// Stop the maintenance DN
cluster.shutdownHddsDatanode(dn);
waitForDnToReachHealthState(dn, DEAD);
waitForDnToReachHealthState(nm, dn, DEAD);

// Now the maintenance node is dead, we should still have
// 3 replicas as we don't purge the replicas for a dead maintenance node
Expand All @@ -368,13 +372,13 @@ public void testSingleNodeWithOpenPipelineCanGotoMaintenance()
// Restart the DN and it should keep the IN_MAINTENANCE state
cluster.restartHddsDatanode(dn, true);
DatanodeDetails newDN = nm.getNodeByUuid(dn.getUuid().toString());
waitForDnToReachHealthState(newDN, HEALTHY);
waitForDnToReachHealthState(nm, newDN, HEALTHY);
waitForDnToReachPersistedOpState(newDN, IN_MAINTENANCE);

// Stop the DN and wait for it to go dead.
int dnIndex = cluster.getHddsDatanodeIndex(dn);
cluster.shutdownHddsDatanode(dnIndex);
waitForDnToReachHealthState(dn, DEAD);
waitForDnToReachHealthState(nm, dn, DEAD);

// Datanode is shutdown and dead. Now recommission it in SCM
scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn)));
Expand All @@ -386,8 +390,8 @@ public void testSingleNodeWithOpenPipelineCanGotoMaintenance()
// As this is not an initial registration since SCM was started, the DN
// should report its operational state and if it differs from what SCM
// has, then the SCM state should be used and the DN state updated.
waitForDnToReachHealthState(newDn, HEALTHY);
waitForDnToReachOpState(newDn, IN_SERVICE);
waitForDnToReachHealthState(nm, newDn, HEALTHY);
waitForDnToReachOpState(nm, newDn, IN_SERVICE);
waitForDnToReachPersistedOpState(dn, IN_SERVICE);
}

Expand All @@ -410,7 +414,7 @@ public void testContainerIsReplicatedWhenAllNodesGotoMaintenance()
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));

scmClient.startMaintenanceNodes(forMaintenance.stream()
.map(this::getDNHostAndPort)
.map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);

// Ensure all 3 DNs go to maintenance
Expand All @@ -429,7 +433,7 @@ public void testContainerIsReplicatedWhenAllNodesGotoMaintenance()

// Ensure all 3 DNs go to maintenance
for (DatanodeDetails dn : forMaintenance) {
waitForDnToReachOpState(dn, IN_SERVICE);
waitForDnToReachOpState(nm, dn, IN_SERVICE);
}
waitForContainerReplicas(container, 3);

Expand All @@ -444,18 +448,18 @@ public void testContainerIsReplicatedWhenAllNodesGotoMaintenance()
.limit(2)
.collect(Collectors.toList());
scmClient.startMaintenanceNodes(ecMaintenance.stream()
.map(this::getDNHostAndPort)
.map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);
for (DatanodeDetails dn : ecMaintenance) {
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
}
assertThat(cm.getContainerReplicas(ecContainer.containerID()).size()).isGreaterThanOrEqualTo(6);
scmClient.recommissionNodes(ecMaintenance.stream()
.map(this::getDNHostAndPort)
.map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()));
// Ensure the 2 DNs go to IN_SERVICE
for (DatanodeDetails dn : ecMaintenance) {
waitForDnToReachOpState(dn, IN_SERVICE);
waitForDnToReachOpState(nm, dn, IN_SERVICE);
}
waitForContainerReplicas(ecContainer, 5);
}
Expand All @@ -478,7 +482,7 @@ public void testEnteringMaintenanceNodeCompletesAfterSCMRestart()
replicas.forEach(r -> forMaintenance.add(r.getDatanodeDetails()));

scmClient.startMaintenanceNodes(forMaintenance.stream()
.map(this::getDNHostAndPort)
.map(TestNodeUtil::getDNHostAndPort)
.collect(Collectors.toList()), 0);

// Ensure all 3 DNs go to entering_maintenance
Expand All @@ -495,7 +499,7 @@ public void testEnteringMaintenanceNodeCompletesAfterSCMRestart()

// Ensure all 3 DNs go to maintenance
for (DatanodeDetails dn : newDns) {
waitForDnToReachOpState(dn, IN_MAINTENANCE);
waitForDnToReachOpState(nm, dn, IN_MAINTENANCE);
}

// There should now be 5-6 replicas of the container we are tracking
Expand Down Expand Up @@ -525,19 +529,19 @@ public void testMaintenanceEndsAutomaticallyAtTimeout()
// decommission interface only allows us to specify hours from now as the
// end time, that is not really suitable for a test like this.
nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime);
waitForDnToReachOpState(dn, IN_SERVICE);
waitForDnToReachOpState(nm, dn, IN_SERVICE);
waitForDnToReachPersistedOpState(dn, IN_SERVICE);

// Put the node back into maintenance and then stop it and wait for it to
// go dead
scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0);
waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE);
cluster.shutdownHddsDatanode(dn);
waitForDnToReachHealthState(dn, DEAD);
waitForDnToReachHealthState(nm, dn, DEAD);

newEndTime = System.currentTimeMillis() / 1000 + 5;
nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime);
waitForDnToReachOpState(dn, IN_SERVICE);
waitForDnToReachOpState(nm, dn, IN_SERVICE);
// Ensure there are 3 replicas not including the dead node, indicating a new
// replica was created
GenericTestUtils.waitFor(() -> getContainerReplicas(container)
Expand Down Expand Up @@ -584,7 +588,7 @@ public void testSCMHandlesRestartForMaintenanceNode()
// Now let the node go dead and repeat the test. This time ensure a new
// replica is created.
cluster.shutdownHddsDatanode(dn);
waitForDnToReachHealthState(dn, DEAD);
waitForDnToReachHealthState(nm, dn, DEAD);

cluster.restartStorageContainerManager(false);
setManagers();
Expand Down Expand Up @@ -631,18 +635,6 @@ private void generateData(int keyCount, String keyPrefix,
}
}

/**
* Retrieves the NodeStatus for the given DN or fails the test if the
* Node cannot be found. This is a helper method to allow the nodeStatus to be
* checked in lambda expressions.
* @param dn Datanode for which to retrieve the NodeStatus.
* @return
*/
private NodeStatus getNodeStatus(DatanodeDetails dn) {
return assertDoesNotThrow(() -> nm.getNodeStatus(dn),
"Unexpected exception getting the nodeState");
}

/**
* Retrieves the containerReplica set for a given container or fails the test
* if the container cannot be found. This is a helper method to allow the
Expand All @@ -668,61 +660,6 @@ private DatanodeDetails getOneDNHostingReplica(
return c.getDatanodeDetails();
}

/**
* Given a Datanode, return a string consisting of the hostname and one of its
* ports in the for host:post.
* @param dn Datanode for which to retrieve the host:post string
* @return host:port for the given DN.
*/
private String getDNHostAndPort(DatanodeDetails dn) {
return dn.getHostName() + ":" + dn.getPorts().get(0).getValue();
}

/**
* Wait for the given datanode to reach the given operational state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForDnToReachOpState(DatanodeDetails dn,
HddsProtos.NodeOperationalState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> getNodeStatus(dn).getOperationalState().equals(state),
200, 30000);
}

/**
* Wait for the given datanode to reach the given Health state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForDnToReachHealthState(DatanodeDetails dn,
HddsProtos.NodeState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> getNodeStatus(dn).getHealth().equals(state),
200, 30000);
}

/**
* Wait for the given datanode to reach the given persisted state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
private void waitForDnToReachPersistedOpState(DatanodeDetails dn,
HddsProtos.NodeOperationalState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> dn.getPersistedOpState().equals(state),
200, 30000);
}

/**
* Get any container present in the cluster and wait to ensure 3 replicas
* have been reported before returning the container.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.node;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;

import java.util.concurrent.TimeoutException;

/**
* Utility class with helper methods for testing node state and status.
*/
public final class TestNodeUtil {

private TestNodeUtil() {
}

/**
* Wait for the given datanode to reach the given operational state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
public static void waitForDnToReachOpState(NodeManager nodeManager,
DatanodeDetails dn, HddsProtos.NodeOperationalState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> getNodeStatus(nodeManager, dn)
.getOperationalState().equals(state),
200, 30000);
}

/**
* Wait for the given datanode to reach the given Health state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
public static void waitForDnToReachHealthState(NodeManager nodeManager,
DatanodeDetails dn, HddsProtos.NodeState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> getNodeStatus(nodeManager, dn).getHealth().equals(state),
200, 30000);
}

/**
* Retrieves the NodeStatus for the given DN or fails the test if the
* Node cannot be found. This is a helper method to allow the nodeStatus to be
* checked in lambda expressions.
* @param dn Datanode for which to retrieve the NodeStatus.
*/
public static NodeStatus getNodeStatus(NodeManager nodeManager,
DatanodeDetails dn) {
return Assertions.assertDoesNotThrow(
() -> nodeManager.getNodeStatus(dn),
"Unexpected exception getting the nodeState");
}

/**
* Given a Datanode, return a string consisting of the hostname and one of its
* ports in the for host:post.
* @param dn Datanode for which to retrieve the host:post string
* @return host:port for the given DN.
*/
public static String getDNHostAndPort(DatanodeDetails dn) {
return dn.getHostName() + ":" + dn.getPorts().get(0).getValue();
}

/**
* Wait for the given datanode to reach the given persisted state.
* @param dn Datanode for which to check the state
* @param state The state to wait for.
* @throws TimeoutException
* @throws InterruptedException
*/
public static void waitForDnToReachPersistedOpState(DatanodeDetails dn,
HddsProtos.NodeOperationalState state)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(
() -> dn.getPersistedOpState().equals(state),
200, 30000);
}
}
Loading

0 comments on commit 397f62f

Please sign in to comment.