Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void runTest(RunnableWithException testMethod) throws Exception {
electionEventHandler.init(leaderElectionDriver);
testMethod.run();

electionEventHandler.close();
leaderElectionDriver.close();
leaderRetrievalDriver.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public void testLeaderElectionAndRetrieval() throws Exception {
KubernetesLeaderElectionDriver leaderElectionDriver = null;
KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null;

final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(LEADER_INFORMATION);

try {
final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
leaderElectionDriver =
new KubernetesLeaderElectionDriver(
kubernetesResource.getFlinkKubeClient(),
Expand Down Expand Up @@ -88,6 +89,7 @@ public void testLeaderElectionAndRetrieval() throws Exception {
assertThat(
retrievalEventHandler.getAddress(), is(LEADER_INFORMATION.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
public class TestingLeaderElectionEventHandler extends TestingLeaderBase
implements LeaderElectionEventHandler {

private final Object lock = new Object();

private final LeaderInformation leaderInformation;

private final OneShotLatch initializationLatch;
Expand All @@ -40,6 +42,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase

private LeaderInformation confirmedLeaderInformation = LeaderInformation.empty();

private boolean running = true;

public TestingLeaderElectionEventHandler(LeaderInformation leaderInformation) {
this.leaderInformation = leaderInformation;
this.initializationLatch = new OneShotLatch();
Expand All @@ -51,35 +55,53 @@ public void init(LeaderElectionDriver leaderElectionDriver) {
initializationLatch.trigger();
}

private void ifRunning(Runnable action) {
synchronized (lock) {
if (running) {
action.run();
}
}
}

@Override
public void onGrantLeadership() {
waitForInitialization(
leaderElectionDriver -> {
confirmedLeaderInformation = leaderInformation;
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
});
ifRunning(
() ->
waitForInitialization(
leaderElectionDriver -> {
confirmedLeaderInformation = leaderInformation;
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
}));
}

@Override
public void onRevokeLeadership() {
waitForInitialization(
(leaderElectionDriver) -> {
confirmedLeaderInformation = LeaderInformation.empty();
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
});
ifRunning(
() ->
waitForInitialization(
(leaderElectionDriver) -> {
confirmedLeaderInformation = LeaderInformation.empty();
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
}));
}

@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation) {
waitForInitialization(
leaderElectionDriver -> {
if (confirmedLeaderInformation.getLeaderSessionID() != null
&& !this.confirmedLeaderInformation.equals(leaderInformation)) {
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
}
});
ifRunning(
() ->
waitForInitialization(
leaderElectionDriver -> {
if (confirmedLeaderInformation.getLeaderSessionID() != null
&& !this.confirmedLeaderInformation.equals(
leaderInformation)) {
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
}
}));
}

private void waitForInitialization(Consumer<? super LeaderElectionDriver> operation) {
Expand All @@ -94,6 +116,14 @@ private void waitForInitialization(Consumer<? super LeaderElectionDriver> operat
}

public LeaderInformation getConfirmedLeaderInformation() {
return confirmedLeaderInformation;
synchronized (lock) {
return confirmedLeaderInformation;
}
}

public void close() {
synchronized (lock) {
running = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -401,6 +402,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -450,6 +452,7 @@ public void testExceptionForwarding() throws Exception {
.isPresent(),
is(true));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -528,6 +531,7 @@ public void testEphemeralZooKeeperNodes() throws Exception {
// that was expected
}
} finally {
electionEventHandler.close();
if (leaderRetrievalDriver != null) {
leaderRetrievalDriver.close();
}
Expand Down Expand Up @@ -577,6 +581,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down