Skip to content

Commit

Permalink
Add metric for how often a locally chosen partition is not available …
Browse files Browse the repository at this point in the history
…for PUT in remote colo. (linkedin#2742)
  • Loading branch information
litingulfs authored Apr 17, 2024
1 parent 3afcf21 commit dfc09d0
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.helix.HelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand Down Expand Up @@ -694,6 +695,7 @@ static class PartitionSelectionHelper implements ClusterMapChangeListener {
private Collection<? extends PartitionId> allPartitions;
private Map<String, SortedMap<Integer, List<PartitionId>>> partitionIdsByClassAndLocalReplicaCount;
private Map<PartitionId, List<ReplicaId>> partitionIdToLocalReplicas;
private HelixClusterManagerMetrics clusterManagerMetrics;

/**
* @param clusterManagerQueryHelper the {@link ClusterManagerQueryHelper} to query current cluster info
Expand All @@ -702,11 +704,12 @@ static class PartitionSelectionHelper implements ClusterMapChangeListener {
* @param defaultPartitionClass the default partition class to use if a partition class is not found
*/
PartitionSelectionHelper(ClusterManagerQueryHelper<?, ?, ?, ?> clusterManagerQueryHelper, String localDatacenterName,
int minimumLocalReplicaCount, String defaultPartitionClass) {
int minimumLocalReplicaCount, String defaultPartitionClass, HelixClusterManagerMetrics clusterManagerMetrics) {
this.localDatacenterName = localDatacenterName;
this.minimumLocalReplicaCount = minimumLocalReplicaCount;
this.clusterManagerQueryHelper = clusterManagerQueryHelper;
this.defaultPartitionClass = defaultPartitionClass;
this.clusterManagerMetrics = clusterManagerMetrics;
updatePartitions(clusterManagerQueryHelper.getPartitions(), localDatacenterName);
logger.debug("Number of partitions in data center {} {}", localDatacenterName, allPartitions.size());
for (Map.Entry<String, SortedMap<Integer, List<PartitionId>>> entry : partitionIdsByClassAndLocalReplicaCount.entrySet()) {
Expand Down Expand Up @@ -943,9 +946,36 @@ private boolean areAllLocalReplicasForPartitionUp(PartitionId partitionId) {
return false;
}
}

// Generate a metric to track how often we choose a replica that is not globally available for writes.
if (!isPartitionEligibleForParanoidDurability(partitionId)) {
if (clusterManagerMetrics != null)
clusterManagerMetrics.paranoidDurabilityIneligibleReplicaCount.inc();
}
return true;
}

/**
* Checks whether there is at least one replica in every remote data center that is eligible for puts.
* @param partitionId the {@link PartitionId} to check.
* @return true if the given partition has at least one replica in every remote data center that is both up and
* in the right Helix state.
*/
private boolean isPartitionEligibleForParanoidDurability(PartitionId partitionId) {
// Create a map (validRemoteReplicasPerDc) where the keys are names of remote data centers and the values are the number of replicas in that
// data center that are both up and in the right Helix state (LEADER or STANDBY).
Map<ReplicaState, ? extends List<? extends ReplicaId>> replicasByState = partitionId.getReplicaIdsByStates(EnumSet.of(ReplicaState.STANDBY, ReplicaState.LEADER), null);
List<? extends ReplicaId> replicas = replicasByState.values().stream().flatMap(List::stream).collect(Collectors.toList());

Map<String, Integer> validRemoteReplicasPerDc = replicas.stream()
.filter(replica -> !replica.getDataNodeId().getDatacenterName().equals(this.localDatacenterName))
.filter(replica -> !replica.isDown())
.collect(Collectors.groupingBy(replica -> replica.getDataNodeId().getDatacenterName(),
Collectors.reducing(0, replica -> 1, Integer::sum)));

return !validRemoteReplicasPerDc.isEmpty() && validRemoteReplicasPerDc.values().stream().allMatch(count -> count > 0);
}

/**
* Check if all replicas from given partition are in eligible states for PUT request. (That is, replica state should
* be either LEADER or STANDBY.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam
partitionSelectionHelper =
new PartitionSelectionHelper(helixClusterManagerQueryHelper, clusterMapConfig.clusterMapDatacenterName,
clusterMapConfig.clustermapWritablePartitionMinReplicaCount,
clusterMapConfig.clusterMapDefaultPartitionClass);
clusterMapConfig.clusterMapDefaultPartitionClass, helixClusterManagerMetrics);
// register partition selection helper as a listener of cluster map changes.
registerClusterMapListener(partitionSelectionHelper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class HelixClusterManagerMetrics {
public Gauge<Long> helixClusterManagerCurrentXid;
public final Timer routingTableQueryTime;
public final Counter resourceNameMismatchCount;
public final Counter paranoidDurabilityIneligibleReplicaCount;

/**
* Metrics for the {@link HelixClusterManager}
Expand Down Expand Up @@ -102,6 +103,8 @@ class HelixClusterManagerMetrics {
registry.counter(MetricRegistry.name(HelixClusterManager.class, "instanceDeleteTriggerCount"));
resourceNameMismatchCount =
registry.counter(MetricRegistry.name(HelixClusterManager.class, "resourceNameMismatchCount"));
paranoidDurabilityIneligibleReplicaCount =
registry.counter(MetricRegistry.name(HelixClusterManager.class, "ineligibleReplicaCount"));
}

void initializeInstantiationMetric(final boolean instantiated, final long instantiationExceptionCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, Clu
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(new StaticClusterManagerQueryHelper(), localDatacenterName,
clusterMapConfig.clustermapWritablePartitionMinReplicaCount,
clusterMapConfig.clusterMapDefaultPartitionClass);
clusterMapConfig.clusterMapDefaultPartitionClass, null);
}

/**
Expand All @@ -97,7 +97,7 @@ public PartitionLayout(HardwareLayout hardwareLayout, ClusterMapConfig clusterMa
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(new StaticClusterManagerQueryHelper(), localDatacenterName,
clusterMapConfig.clustermapWritablePartitionMinReplicaCount,
clusterMapConfig.clusterMapDefaultPartitionClass);
clusterMapConfig.clusterMapDefaultPartitionClass, null);
}

public HardwareLayout getHardwareLayout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void partitionSelectionHelperTest() {
doReturn(allPartitionIdsMain).when(mockClusterManagerQueryHelper).getPartitions();
ClusterMapUtils.PartitionSelectionHelper psh =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, null, minimumLocalReplicaCount,
maxReplicasAllSites);
maxReplicasAllSites, null);

String[] dcsToTry = {null, "", dc1, dc2};
for (String dc : dcsToTry) {
Expand Down Expand Up @@ -157,7 +157,7 @@ public void partitionSelectionHelperTest() {
}
// additional test: ensure getRandomWritablePartition now honors replica state for PUT request
psh = new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, dc1, minimumLocalReplicaCount,
maxReplicasAllSites);
maxReplicasAllSites, null);
ReplicaId replicaId = everywhere1.getReplicaIds()
.stream()
.filter(r -> r.getDataNodeId().getDatacenterName().equals(dc1))
Expand Down Expand Up @@ -192,7 +192,7 @@ public void partitionWithDifferentReplicaCntTest() {
int minimumLocalReplicaCount = 3;
ClusterMapUtils.PartitionSelectionHelper psh =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, dc1, minimumLocalReplicaCount,
partitionClass);
partitionClass, null);
// verify get all partitions return correct result
assertEquals("Returned partitions are not expected", allPartitions, psh.getPartitions(null));
// verify get writable partitions return partition2 and partition3 only
Expand All @@ -204,7 +204,7 @@ public void partitionWithDifferentReplicaCntTest() {
// create another partition selection helper with minimumLocalReplicaCount = 4
minimumLocalReplicaCount = 4;
psh = new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, dc1, minimumLocalReplicaCount,
partitionClass);
partitionClass, null);
assertEquals("Returned writable partitions are not expected", Arrays.asList(partition3),
psh.getWritablePartitions(partitionClass));
assertEquals("Get random writable partition should return partition3 only", partition3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public MockClusterMap(boolean enableSSLPorts, boolean enableHttp2Ports, int numN
doReturn(partitions.values()).when(mockClusterManagerQueryHelper).getPartitions();
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, localDatacenterName,
Math.min(defaultPartition.getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS);
Math.min(defaultPartition.getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS, null);
}

/**
Expand All @@ -228,7 +228,7 @@ public MockClusterMap(boolean enableSSLPorts, List<MockDataNodeId> datanodes, in
doReturn(partitions.values()).when(mockClusterManagerQueryHelper).getPartitions();
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, localDatacenterName,
Math.min(partitionIdList.get(0).getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS);
Math.min(partitionIdList.get(0).getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS, null);
Set<String> dcNames = new HashSet<>();
datanodes.forEach(node -> dcNames.add(node.getDatacenterName()));
dataCentersInClusterMap.addAll(dcNames);
Expand Down Expand Up @@ -270,7 +270,7 @@ private MockClusterMap(MockDataNodeId recoveryNode, MockDataNodeId vcrNode, Stri
doReturn(partitions.values()).when(mockClusterManagerQueryHelper).getPartitions();
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, localDatacenterName,
Math.min(mockPartitionId.getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS);
Math.min(mockPartitionId.getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS, null);
specialPartition = null;
}

Expand Down

0 comments on commit dfc09d0

Please sign in to comment.