Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache/helix] -- Added detail in the Exception message for WAGED rebalance (hard constraint) failures. #2829

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
Expand Down Expand Up @@ -136,11 +138,16 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}).collect(Collectors.toList());

if (candidateNodes.isEmpty()) {
LOG.info("Found no eligible candidate nodes. Enabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
enableFullLoggingForCluster();
optimalAssignment.recordAssignmentFailure(replica,
Maps.transformValues(hardConstraintFailures, this::convertFailureReasons));
return Optional.empty();
}

LOG.info("Disabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
removeFullLoggingForCluster();

return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
getAssignmentNormalizedScore(node, replica, clusterContext)))
.max((nodeEntry1, nodeEntry2) -> {
Expand Down Expand Up @@ -179,6 +186,24 @@ private List<String> convertFailureReasons(List<HardConstraint> hardConstraints)
.collect(Collectors.toList());
}

/**
* Enables logging of failures in all hard constraints
*/
private void enableFullLoggingForCluster() {
for (HardConstraint hardConstraint : _hardConstraints) {
hardConstraint.setEnableLogging(true);
}
}

/**
* Removes the cluster from full logging in all hard constraints (if added previously)
*/
private void removeFullLoggingForCluster() {
for (HardConstraint hardConstraint : _hardConstraints) {
hardConstraint.setEnableLogging(false);
}
}

private static class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
private final AssignableReplica _replica;
private float _score = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
clusterContext.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone());

if (partitionsForResourceAndFaultZone.contains(replica.getPartitionName())) {
LOG.debug("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
if (enableLogging) {
LOG.info("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
abstract class HardConstraint {

protected boolean enableLogging = false;

/**
* Check if the replica could be assigned to the node
* @return True if the proposed assignment is valid; False otherwise
Expand All @@ -44,4 +46,12 @@ abstract boolean isAssignmentValid(AssignableNode node, AssignableReplica replic
String getDescription() {
return getClass().getName();
}

/**
* Sets the flag to enable constraint level logging
*/
public void setEnableLogging(boolean enableLogging) {
this.enableLogging = enableLogging;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
for (String key : replicaCapacity.keySet()) {
if (nodeCapacity.containsKey(key)) {
if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
LOG.debug("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
if (enableLogging) {
LOG.info("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
|| assignedPartitionsByResourceSize < resourceMaxPartitionsPerInstance;

if (!exceedResourceMaxPartitionLimit) {
LOG.debug("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
if (enableLogging) {
LOG.info("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
LOG.debug("Cannot assign the inactive replica: {}", replica.getPartitionName());
if (enableLogging) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
Set<String> assignedPartitionsByResource = node.getAssignedPartitionsByResource(replica.getResourceName());

if (assignedPartitionsByResource.contains(replica.getPartitionName())) {
LOG.debug("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
if (enableLogging) {
LOG.info("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
}

if (!node.getInstanceTags().contains(replica.getResourceInstanceGroupTag())) {
LOG.debug("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
if (enableLogging) {
LOG.info("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
}
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public Map<String, Integer> getClusterCapacityMap() {
return _clusterCapacityMap;
}

public String getClusterName() {
return _clusterName;
}

public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
Expand All @@ -33,14 +34,20 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class TestConstraintBasedAlgorithm {
@Test(expectedExceptions = HelixRebalanceException.class)
public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceException {

@Test
public void testCalculateNoValidAssignment() throws IOException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
Expand All @@ -49,7 +56,40 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

verify(mockHardConstraint, times(1)).setEnableLogging(eq(true));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());
}

@Test
public void testCalculateNoValidAssignmentFirstAndThenRecovery() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any()))
.thenReturn(false) // hard constraint fails
.thenReturn(true); // hard constraint recovers
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0);
ConstraintBasedAlgorithm algorithm =
new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

verify(mockHardConstraint, times(1)).setEnableLogging(eq(true));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());

// calling again for recovery (no exception)
algorithm.calculate(clusterModel);
verify(mockHardConstraint, atLeastOnce()).setEnableLogging(eq(false));
}

@Test
Expand Down
Loading