Skip to content

Commit

Permalink
HDDS-10917. Refactor more tests from TestContainerBalancerTask (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
Montura authored and devabhishekpal committed Aug 8, 2024
1 parent 9c4bf8b commit e56d551
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@
import jakarta.annotation.Nonnull;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerReplicaNotFoundException;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -37,16 +45,21 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hadoop.hdds.scm.container.balancer.TestableCluster.RANDOM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -256,14 +269,11 @@ public void testBalancerWithMoveManager(@Nonnull MockedSCM mockedSCM)
mockedSCM.disableLegacyReplicationManager();
mockedSCM.startBalancerTask(config);

verify(mockedSCM.getMoveManager(), atLeastOnce())
.move(any(ContainerID.class),
any(DatanodeDetails.class),
any(DatanodeDetails.class));
verify(mockedSCM.getMoveManager(), atLeastOnce()).
move(any(ContainerID.class), any(DatanodeDetails.class), any(DatanodeDetails.class));

verify(mockedSCM.getReplicationManager(), times(0))
.move(any(ContainerID.class), any(
DatanodeDetails.class), any(DatanodeDetails.class));
.move(any(ContainerID.class), any(DatanodeDetails.class), any(DatanodeDetails.class));
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
Expand Down Expand Up @@ -325,6 +335,259 @@ public void testMetrics(@Nonnull MockedSCM mockedSCM) throws IOException, NodeNo
assertEquals(1, metrics.getNumContainerMovesFailed());
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void containerBalancerShouldSelectOnlyClosedContainers(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

Map<ContainerID, ContainerInfo> cidToInfoMap = mockedSCM.getCluster().getCidToInfoMap();
// Make all containers open, balancer should not select any of them
for (ContainerInfo containerInfo : cidToInfoMap.values()) {
containerInfo.setState(HddsProtos.LifeCycleState.OPEN);
}

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

// Balancer should have identified unbalanced nodes
assertFalse(TestContainerBalancerDatanodeNodeLimit.getUnBalancedNodes(task).isEmpty());
// No container should have been selected
assertTrue(task.getContainerToSourceMap().isEmpty());

// Iteration result should be CAN_NOT_BALANCE_ANY_MORE because no container move is generated
assertEquals(ContainerBalancerTask.IterationResult.CAN_NOT_BALANCE_ANY_MORE, task.getIterationResult());

// Now, close all containers
for (ContainerInfo containerInfo : cidToInfoMap.values()) {
containerInfo.setState(HddsProtos.LifeCycleState.CLOSED);
}
ContainerBalancerTask nextTask = mockedSCM.startBalancerTask(config);

// Check whether all selected containers are closed
for (ContainerID cid: nextTask.getContainerToSourceMap().keySet()) {
assertSame(cidToInfoMap.get(cid).getState(), HddsProtos.LifeCycleState.CLOSED);
}
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void balancerShouldNotSelectNonClosedContainerReplicas(@Nonnull MockedSCM mockedSCM)
throws ContainerNotFoundException {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

// Let's mock such that all replicas have CLOSING state
Map<ContainerID, Set<ContainerReplica>> cidToReplicasMap = mockedSCM.getCluster().getCidToReplicasMap();
when(mockedSCM.getContainerManager().getContainerReplicas(any(ContainerID.class)))
.thenAnswer(invocationOnMock -> {
ContainerID cid = (ContainerID) invocationOnMock.getArguments()[0];
Set<ContainerReplica> replicas = cidToReplicasMap.get(cid);
Set<ContainerReplica> replicasToReturn = new HashSet<>(replicas.size());
for (ContainerReplica replica : replicas) {
ContainerReplica newReplica = replica.toBuilder()
.setContainerState(StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSING)
.build();
replicasToReturn.add(newReplica);
}

return replicasToReturn;
});

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

// Balancer should have identified unbalanced nodes
assertFalse(TestContainerBalancerDatanodeNodeLimit.getUnBalancedNodes(task).isEmpty());
// No container should have moved because all replicas are CLOSING
assertTrue(task.getContainerToSourceMap().isEmpty());
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void containerBalancerShouldObeyMaxSizeToMoveLimit(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(1);
config.setMaxSizeToMovePerIteration(10 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(10 * STORAGE_UNIT);

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

// Balancer should not have moved more size than the limit
assertThat(task.getSizeScheduledForMoveInLatestIteration()).isLessThanOrEqualTo(10 * STORAGE_UNIT);

long size = task.getMetrics().getDataSizeMovedGBInLatestIteration();
assertThat(size).isGreaterThan(0);
assertThat(size).isLessThanOrEqualTo(10);
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void targetDatanodeShouldNotAlreadyContainSelectedContainer(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

Map<ContainerID, DatanodeDetails> map = task.getContainerToTargetMap();
Map<ContainerID, Set<ContainerReplica>> cidToReplicasMap = mockedSCM.getCluster().getCidToReplicasMap();
for (Map.Entry<ContainerID, DatanodeDetails> entry : map.entrySet()) {
ContainerID container = entry.getKey();
DatanodeDetails target = entry.getValue();
assertTrue(cidToReplicasMap.get(container)
.stream()
.map(ContainerReplica::getDatanodeDetails)
.noneMatch(target::equals));
}
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void containerMoveSelectionShouldFollowPlacementPolicy(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

Map<ContainerID, DatanodeDetails> containerFromSourceMap = task.getContainerToSourceMap();
Map<ContainerID, DatanodeDetails> containerToTargetMap = task.getContainerToTargetMap();

// For each move selection, check if {replicas - source + target} satisfies placement policy
for (Map.Entry<ContainerID, DatanodeDetails> entry : containerFromSourceMap.entrySet()) {
ContainerID container = entry.getKey();
DatanodeDetails source = entry.getValue();

List<DatanodeDetails> replicas = mockedSCM.getCluster().getCidToReplicasMap().get(container)
.stream()
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
// Remove source and add target
replicas.remove(source);
replicas.add(containerToTargetMap.get(container));

ContainerInfo containerInfo = mockedSCM.getCluster().getCidToInfoMap().get(container);
ContainerPlacementStatus placementStatus;
int requiredNodes = containerInfo.getReplicationConfig().getRequiredNodes();
if (containerInfo.getReplicationType() == HddsProtos.ReplicationType.RATIS) {
placementStatus = mockedSCM.getPlacementPolicy().validateContainerPlacement(replicas, requiredNodes);
} else {
placementStatus = mockedSCM.getEcPlacementPolicy().validateContainerPlacement(replicas, requiredNodes);
}
assertTrue(placementStatus.isPolicySatisfied());
}
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void targetDatanodeShouldBeInServiceHealthy(@Nonnull MockedSCM mockedSCM) throws NodeNotFoundException {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

for (DatanodeDetails target : task.getSelectedTargets()) {
NodeStatus status = mockedSCM.getNodeManager().getNodeStatus(target);
assertSame(HddsProtos.NodeOperationalState.IN_SERVICE, status.getOperationalState());
assertTrue(status.isHealthy());
}
}


@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void selectedContainerShouldNotAlreadyHaveBeenSelected(@Nonnull MockedSCM mockedSCM)
throws NodeNotFoundException, ContainerNotFoundException, TimeoutException, ContainerReplicaNotFoundException {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);

mockedSCM.enableLegacyReplicationManager();

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);
int numContainers = task.getContainerToTargetMap().size();

/* Assuming move is called exactly once for each unique container, number of calls to move should equal number of
unique containers. If number of calls to move is more than number of unique containers, at least one container
has been re-selected. It's expected that number of calls to move should equal number of unique, selected containers
(from containerToTargetMap).
*/
verify(mockedSCM.getReplicationManager(), times(numContainers))
.move(any(ContainerID.class), any(DatanodeDetails.class), any(DatanodeDetails.class));

// Try the same test by disabling LegacyReplicationManager so that MoveManager is used.
mockedSCM.disableLegacyReplicationManager();
ContainerBalancerTask nextTask = mockedSCM.startBalancerTask(config);

numContainers = nextTask.getContainerToTargetMap().size();
verify(mockedSCM.getMoveManager(), times(numContainers))
.move(any(ContainerID.class), any(DatanodeDetails.class), any(DatanodeDetails.class));
}

@ParameterizedTest(name = "MockedSCM #{index}: {0}")
@MethodSource("createMockedSCMs")
public void balancerShouldNotSelectConfiguredExcludeContainers(@Nonnull MockedSCM mockedSCM) {
ContainerBalancerConfiguration config = balancerConfigByOzoneConfig(new OzoneConfiguration());
int nodeCount = mockedSCM.getCluster().getNodeCount();
if (nodeCount < DATANODE_COUNT_LIMIT_FOR_SMALL_CLUSTER) {
config.setMaxDatanodesPercentageToInvolvePerIteration(100);
}
config.setIterations(1);
config.setThreshold(10);
config.setMaxSizeToMovePerIteration(50 * STORAGE_UNIT);
config.setMaxSizeEnteringTarget(50 * STORAGE_UNIT);
config.setExcludeContainers("1, 4, 5");

ContainerBalancerTask task = mockedSCM.startBalancerTask(config);

Set<ContainerID> excludeContainers = config.getExcludeContainers();
for (ContainerID container : task.getContainerToSourceMap().keySet()) {
assertThat(excludeContainers).doesNotContain(container);
}
}

public static List<DatanodeUsageInfo> getUnBalancedNodes(@Nonnull ContainerBalancerTask task) {
ArrayList<DatanodeUsageInfo> result = new ArrayList<>();
Expand Down
Loading

0 comments on commit e56d551

Please sign in to comment.