Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test to monitor the distribution of vnode to physical node #18147

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -48,6 +51,45 @@ public void uninitializedThrowsException() {
assertThrows(IllegalStateException.class, () -> provider.get(OBJECT_KEY, 0));
}

@Test
/**
* This test calculates the standard deviation over mean on the collection of
* virtual nodes assigned to physical nodes. It arbitrarily bounds it at 0.25,
* but ideally this number should get smaller over time as we improve hashing algorithm
* and use better ways to assign virtual nodes to physical nodes.
*
* This uses 2000 virtual nodes and 50 physical nodes, if these parameters change,
* the bound is likely going to change.
*/
public void virtualNodeDistribution() {
ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS);
List<BlockWorkerInfo> workerList = generateRandomWorkerList(50);
// set initial state
provider.refresh(workerList, 2000);
NavigableMap<Integer, BlockWorkerInfo> map = provider.getActiveNodesMap();
Map<BlockWorkerInfo, Long> count = new HashMap<>();
long last = Integer.MIN_VALUE;
for (Map.Entry<Integer, BlockWorkerInfo> entry: map.entrySet()) {
count.put(entry.getValue(), count.getOrDefault(entry.getValue(), 0L)
+ (entry.getKey() - last));
last = entry.getKey().intValue();
}
assertTrue(calcSDoverMean(count.values()) < 0.25);
}

private double calcSDoverMean(Collection<Long> list) {
long sum = 0L;
double var = 0;
for (long num : list) {
sum += num;
}
double avg = sum * 1.0 / list.size();
for (long num : list) {
var = var + (num - avg) * (num - avg);
}
return Math.sqrt(var / list.size()) / avg;
}

@Test
public void concurrentInitialization() {
ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS);
Expand Down
Loading