diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index ccf7376e45be1..6a637b20a831d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -38,6 +38,7 @@ import java.lang.reflect.Field; import java.net.URL; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -86,6 +87,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -107,7 +109,7 @@ public class LoadBalancerTest { private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class); - private static final int MAX_RETRIES = 10; + private static final int MAX_RETRIES = 15; private static final int BROKER_COUNT = 5; private int[] brokerWebServicePorts = new int[BROKER_COUNT]; @@ -176,23 +178,27 @@ void shutdown() throws Exception { bkEnsemble.stop(); } - private LeaderBroker loopUntilLeaderChanges(LeaderElectionService les, LeaderBroker oldLeader, - LeaderBroker newLeader) throws InterruptedException { + private void loopUntilLeaderChangesForAllBroker(List activePulsars, LeaderBroker oldLeader) + throws InterruptedException { int loopCount = 0; - - while (loopCount < MAX_RETRIES) { - Thread.sleep(1000); - // Check if the new leader is elected. If yes, break without incrementing the loopCount - newLeader = les.getCurrentLeader().get(); - if (newLeader.equals(oldLeader) == false) { - break; - } - ++loopCount; - } - + Awaitility.await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(MAX_RETRIES, TimeUnit.SECONDS) + .until(() -> { + boolean settled = true; + // Check if the all active pulsar see a new leader. + for (PulsarService pulsar : activePulsars) { + Optional leader = pulsar.getLeaderElectionService().readCurrentLeader().join(); + // Check leader a pulsar see is not empty and not the old leader. + if (!leader.isPresent() || leader.get().equals(oldLeader)) { + settled = false; + break; + } + } + return settled; + }); // Check if maximum retries are already done. If yes, assert. Assert.assertNotEquals(loopCount, MAX_RETRIES, "Leader is not changed even after maximum retries."); - return newLeader; } /* @@ -712,10 +718,10 @@ public void testLeaderElection() throws Exception { PulsarService[] allServices = new PulsarService[pulsarServices.length]; System.arraycopy(pulsarServices, 0, allServices, 0, pulsarServices.length); for (int i = 0; i < BROKER_COUNT - 1; i++) { - Set activePulsar = new HashSet(); + List activePulsar = new ArrayList<>(); + List followerPulsar = new ArrayList<>(); LeaderBroker oldLeader = null; PulsarService leaderPulsar = null; - PulsarService followerPulsar = null; for (int j = 0; j < BROKER_COUNT; j++) { if (allServices[j].getState() != PulsarService.State.Closed) { activePulsar.add(allServices[j]); @@ -727,11 +733,11 @@ public void testLeaderElection() throws Exception { // in order to prevent closing this PulsarService twice pulsarServices[i] = null; } else { - followerPulsar = allServices[j]; + followerPulsar.add(allServices[j]); } } } - // Make sure both brokers see the same leader + // Make sure all brokers see the same leader log.info("Old leader is : {}", oldLeader.getServiceUrl()); for (PulsarService pulsar : activePulsar) { log.info("Current leader for {} is : {}", pulsar.getWebServiceAddress(), pulsar.getLeaderElectionService().getCurrentLeader()); @@ -740,8 +746,8 @@ public void testLeaderElection() throws Exception { // Do leader election by killing the leader broker leaderPulsar.close(); - LeaderBroker newLeader = oldLeader; - newLeader = loopUntilLeaderChanges(followerPulsar.getLeaderElectionService(), oldLeader, newLeader); + loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader); + LeaderBroker newLeader = followerPulsar.get(0).getLeaderElectionService().readCurrentLeader().join().get(); log.info("New leader is : {}", newLeader.getServiceUrl()); Assert.assertNotEquals(newLeader, oldLeader); }