diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 2c90b8a4047a27..34e1794162fcbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -447,8 +447,10 @@ public static CompletableFuture> getAntiAffinityNamespaceOw .thenAccept(nsPolicies -> { if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().namespaceAntiAffinityGroup)) { - brokerToAntiAffinityNamespaceCount.compute(broker, + if (!ns.equals(namespaceName)) { + brokerToAntiAffinityNamespaceCount.compute(broker, (brokerName, count) -> count == null ? 1 : count + 1); + } } future.complete(null); }).exceptionally(ex -> { @@ -500,14 +502,17 @@ public static boolean shouldAntiAffinityNamespaceUnload( int nsCount = brokerNamespaceCount.getOrDefault(broker, 0); if (currentBroker.equals(broker)) { currentBrokerNsCount = nsCount; - } - if (leastNsCount > nsCount) { + } else if (leastNsCount > nsCount) { + // Avoid currentBrokerNsCount cover the leastNsCount ,because we should + // check currentBrokerNsCount = leastNsCount case, in this case can trigger the bundle unload. leastNsCount = nsCount; } } // check if there is any other broker has less number of ns if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) { return true; + } else if (currentBrokerNsCount < leastNsCount) { + return false; } // check if all the brokers having same number of ns-count then broker can't unload int leastNsOwnerBrokers = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index 4517497f154eb4..c6a48981fc82c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -288,6 +288,15 @@ public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception { LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, brokerToNamespaceToBundleRange, brokerToDomainMap); assertEquals(candidate.size(), 4); + + // check another namespace-1 bundle can get correct broker + // for namespace-1 bundle only domain-1 brokers are available and broker-3 already owns namespace-3 bundle + candidate.addAll(brokers); + assignedNamespace = namespace + "1" + bundle; + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate, + brokerToNamespaceToBundleRange, brokerToDomainMap); + assertEquals(candidate.size(), 1); + candidate.forEach(broker -> assertEquals(brokerToDomainMap.get(broker), "domain-1")); } /** @@ -503,6 +512,20 @@ public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception { pulsar1, brokerToNamespaceToBundleRange, candidate); assertFalse(shouldUnload); + // check another namespace-0 bundle can get another broker to unload + // test1: only one broker satisfy namespace-0, should not unload. + shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker, + pulsar1, brokerToNamespaceToBundleRange, candidate); + assertFalse(shouldUnload); + + //test2: broker-0 and broker-3 owned bundle are namespace-0, the bundle can unload to broker-0. + brokers.add("broker-3"); + candidate.add("broker-3"); + // add ns-0 to broker-3 + selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-3", namespace + "0", assignedNamespace); + shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker, + pulsar1, brokerToNamespaceToBundleRange, candidate); + assertTrue(shouldUnload); } /**