Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.lang.reflect.Field;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -107,7 +109,7 @@ public class LoadBalancerTest {

private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);

private static final int MAX_RETRIES = 10;
private static final int MAX_RETRIES = 15;

private static final int BROKER_COUNT = 5;
private int[] brokerWebServicePorts = new int[BROKER_COUNT];
Expand Down Expand Up @@ -176,23 +178,27 @@ void shutdown() throws Exception {
bkEnsemble.stop();
}

private LeaderBroker loopUntilLeaderChanges(LeaderElectionService les, LeaderBroker oldLeader,
LeaderBroker newLeader) throws InterruptedException {
private void loopUntilLeaderChangesForAllBroker(List<PulsarService> activePulsars, LeaderBroker oldLeader)
throws InterruptedException {
int loopCount = 0;

while (loopCount < MAX_RETRIES) {
Thread.sleep(1000);
// Check if the new leader is elected. If yes, break without incrementing the loopCount
newLeader = les.getCurrentLeader().get();
if (newLeader.equals(oldLeader) == false) {
break;
}
++loopCount;
}

Awaitility.await()
.pollInterval(1, TimeUnit.SECONDS)
.atMost(MAX_RETRIES, TimeUnit.SECONDS)
.until(() -> {
boolean settled = true;
// Check if the all active pulsar see a new leader.
for (PulsarService pulsar : activePulsars) {
Optional<LeaderBroker> leader = pulsar.getLeaderElectionService().readCurrentLeader().join();
// Check leader a pulsar see is not empty and not the old leader.
if (!leader.isPresent() || leader.get().equals(oldLeader)) {
settled = false;
break;
}
}
return settled;
});
// Check if maximum retries are already done. If yes, assert.
Assert.assertNotEquals(loopCount, MAX_RETRIES, "Leader is not changed even after maximum retries.");
return newLeader;
}

/*
Expand Down Expand Up @@ -712,10 +718,10 @@ public void testLeaderElection() throws Exception {
PulsarService[] allServices = new PulsarService[pulsarServices.length];
System.arraycopy(pulsarServices, 0, allServices, 0, pulsarServices.length);
for (int i = 0; i < BROKER_COUNT - 1; i++) {
Set<PulsarService> activePulsar = new HashSet<PulsarService>();
List<PulsarService> activePulsar = new ArrayList<>();
List<PulsarService> followerPulsar = new ArrayList<>();
LeaderBroker oldLeader = null;
PulsarService leaderPulsar = null;
PulsarService followerPulsar = null;
for (int j = 0; j < BROKER_COUNT; j++) {
if (allServices[j].getState() != PulsarService.State.Closed) {
activePulsar.add(allServices[j]);
Expand All @@ -727,11 +733,11 @@ public void testLeaderElection() throws Exception {
// in order to prevent closing this PulsarService twice
pulsarServices[i] = null;
} else {
followerPulsar = allServices[j];
followerPulsar.add(allServices[j]);
}
}
}
// Make sure both brokers see the same leader
// Make sure all brokers see the same leader
log.info("Old leader is : {}", oldLeader.getServiceUrl());
for (PulsarService pulsar : activePulsar) {
log.info("Current leader for {} is : {}", pulsar.getWebServiceAddress(), pulsar.getLeaderElectionService().getCurrentLeader());
Expand All @@ -740,8 +746,8 @@ public void testLeaderElection() throws Exception {

// Do leader election by killing the leader broker
leaderPulsar.close();
LeaderBroker newLeader = oldLeader;
newLeader = loopUntilLeaderChanges(followerPulsar.getLeaderElectionService(), oldLeader, newLeader);
loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader);
LeaderBroker newLeader = followerPulsar.get(0).getLeaderElectionService().readCurrentLeader().join().get();
log.info("New leader is : {}", newLeader.getServiceUrl());
Assert.assertNotEquals(newLeader, oldLeader);
}
Expand Down