Skip to content

Commit

Permalink
improve the getAntiAffinityNamespaceOwnedBrokers check exclude the cu…
Browse files Browse the repository at this point in the history
…rrent namespace cause count add.
  • Loading branch information
nicklixinyang committed Dec 8, 2022
1 parent c45f16c commit 83e5536
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOw
.thenAccept(nsPolicies -> {
if (nsPolicies.isPresent()
&& antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().namespaceAntiAffinityGroup)) {
brokerToAntiAffinityNamespaceCount.compute(broker,
if (!ns.equals(namespaceName)) {
brokerToAntiAffinityNamespaceCount.compute(broker,
(brokerName, count) -> count == null ? 1 : count + 1);
}
}
future.complete(null);
}).exceptionally(ex -> {
Expand Down Expand Up @@ -500,14 +502,17 @@ public static boolean shouldAntiAffinityNamespaceUnload(
int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
if (currentBroker.equals(broker)) {
currentBrokerNsCount = nsCount;
}
if (leastNsCount > nsCount) {
} else if (leastNsCount > nsCount) {
// Avoid currentBrokerNsCount cover the leastNsCount ,because we should
// check currentBrokerNsCount = leastNsCount case, in this case can trigger the bundle unload.
leastNsCount = nsCount;
}
}
// check if there is any other broker has less number of ns
if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
return true;
} else if (currentBrokerNsCount < leastNsCount) {
return false;
}
// check if all the brokers having same number of ns-count then broker can't unload
int leastNsOwnerBrokers = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 4);

// check another namespace-1 bundle can get correct broker
// for namespace-1 bundle only domain-1 brokers are available and broker-3 already owns namespace-3 bundle
candidate.addAll(brokers);
assignedNamespace = namespace + "1" + bundle;
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 1);
candidate.forEach(broker -> assertEquals(brokerToDomainMap.get(broker), "domain-1"));
}

/**
Expand Down Expand Up @@ -503,6 +512,20 @@ public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
pulsar1, brokerToNamespaceToBundleRange, candidate);
assertFalse(shouldUnload);

// check another namespace-0 bundle can get another broker to unload
// test1: only one broker satisfy namespace-0, should not unload.
shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
pulsar1, brokerToNamespaceToBundleRange, candidate);
assertFalse(shouldUnload);

//test2: broker-0 and broker-3 owned bundle are namespace-0, the bundle can unload to broker-0.
brokers.add("broker-3");
candidate.add("broker-3");
// add ns-0 to broker-3
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-3", namespace + "0", assignedNamespace);
shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
pulsar1, brokerToNamespaceToBundleRange, candidate);
assertTrue(shouldUnload);
}

/**
Expand Down

0 comments on commit 83e5536

Please sign in to comment.