Skip to content

Commit

Permalink
[fix][broker] Fix the bug that elected leader thinks it's a follower (#…
Browse files Browse the repository at this point in the history
…23138)

(cherry picked from commit 3560ddb)
  • Loading branch information
heesung-sn authored and lhotari committed Aug 8, 2024
1 parent 0a93eac commit 9a7625c
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ private synchronized CompletableFuture<LeaderElectionState> handleExistingLeader
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue,
path, res.getStat());
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
return CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
// Since the value was created in a different session, it might be expiring. We need to delete it
// and try the election again.
Expand Down Expand Up @@ -257,7 +260,13 @@ public synchronized CompletableFuture<Void> asyncClose() {
return CompletableFuture.completedFuture(null);
}

return store.delete(path, version);
return store.delete(path, version)
.thenAccept(__ -> {
synchronized (LeaderElectionImpl.this) {
leaderElectionState = LeaderElectionState.NoLeader;
}
}
);
}

@Override
Expand All @@ -278,8 +287,8 @@ public Optional<T> getLeaderValueIfPresent() {
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
executor.execute(SafeRunnable.safeRun(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}, event:{}", path, event);

try {
LeaderElectionState les = elect().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public void basicTest(String provider, Supplier<String> urlSupplier) throws Exce

leaderElection.close();

assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);

assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
Expand Down Expand Up @@ -180,4 +181,58 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}


@Test
public void testElectAfterReconnected() throws Exception {
// --- init
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis(2_000)
.build());


BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
store.registerSessionListener(sessionEvents::add);
BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
String path = newKey();

@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
leaderElectionEvents::add);

// --- test manual elect
String proposed = "value-1";
le1.elect(proposed).join();
assertEquals(le1.getState(), LeaderElectionState.Leading);
LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(les, LeaderElectionState.Leading);


// simulate no leader state
FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);

// reconnect
zks.stop();

SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.ConnectionLost);

zks.start();


// --- test le1 can be leader
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> {
assertEquals(le1.getState(),LeaderElectionState.Leading);
}); // reacquire leadership


assertTrue(store.get(path).join().isPresent());
}
}

0 comments on commit 9a7625c

Please sign in to comment.