diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java index 18aad5551329b..a89cb50119d3c 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java @@ -122,6 +122,7 @@ void runTest(RunnableWithException testMethod) throws Exception { electionEventHandler.init(leaderElectionDriver); testMethod.run(); + electionEventHandler.close(); leaderElectionDriver.close(); leaderRetrievalDriver.close(); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java index eb7813817f09e..a22432bb96ee4 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java @@ -52,9 +52,10 @@ public void testLeaderElectionAndRetrieval() throws Exception { KubernetesLeaderElectionDriver leaderElectionDriver = null; KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null; + final TestingLeaderElectionEventHandler electionEventHandler = + new TestingLeaderElectionEventHandler(LEADER_INFORMATION); + try { - final TestingLeaderElectionEventHandler electionEventHandler = - new TestingLeaderElectionEventHandler(LEADER_INFORMATION); leaderElectionDriver = new KubernetesLeaderElectionDriver( kubernetesResource.getFlinkKubeClient(), @@ -88,6 +89,7 @@ public void testLeaderElectionAndRetrieval() throws Exception { assertThat( retrievalEventHandler.getAddress(), is(LEADER_INFORMATION.getLeaderAddress())); } finally { + electionEventHandler.close(); if (leaderElectionDriver != null) { leaderElectionDriver.close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java index a659b19ef6100..ff91f42ccea86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java @@ -32,6 +32,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase implements LeaderElectionEventHandler { + private final Object lock = new Object(); + private final LeaderInformation leaderInformation; private final OneShotLatch initializationLatch; @@ -40,6 +42,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase private LeaderInformation confirmedLeaderInformation = LeaderInformation.empty(); + private boolean running = true; + public TestingLeaderElectionEventHandler(LeaderInformation leaderInformation) { this.leaderInformation = leaderInformation; this.initializationLatch = new OneShotLatch(); @@ -51,35 +55,53 @@ public void init(LeaderElectionDriver leaderElectionDriver) { initializationLatch.trigger(); } + private void ifRunning(Runnable action) { + synchronized (lock) { + if (running) { + action.run(); + } + } + } + @Override public void onGrantLeadership() { - waitForInitialization( - leaderElectionDriver -> { - confirmedLeaderInformation = leaderInformation; - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation); - leaderEventQueue.offer(confirmedLeaderInformation); - }); + ifRunning( + () -> + waitForInitialization( + leaderElectionDriver -> { + confirmedLeaderInformation = leaderInformation; + leaderElectionDriver.writeLeaderInformation( + confirmedLeaderInformation); + leaderEventQueue.offer(confirmedLeaderInformation); + })); } @Override public void onRevokeLeadership() { - waitForInitialization( - (leaderElectionDriver) -> { - confirmedLeaderInformation = LeaderInformation.empty(); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation); - leaderEventQueue.offer(confirmedLeaderInformation); - }); + ifRunning( + () -> + waitForInitialization( + (leaderElectionDriver) -> { + confirmedLeaderInformation = LeaderInformation.empty(); + leaderElectionDriver.writeLeaderInformation( + confirmedLeaderInformation); + leaderEventQueue.offer(confirmedLeaderInformation); + })); } @Override public void onLeaderInformationChange(LeaderInformation leaderInformation) { - waitForInitialization( - leaderElectionDriver -> { - if (confirmedLeaderInformation.getLeaderSessionID() != null - && !this.confirmedLeaderInformation.equals(leaderInformation)) { - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation); - } - }); + ifRunning( + () -> + waitForInitialization( + leaderElectionDriver -> { + if (confirmedLeaderInformation.getLeaderSessionID() != null + && !this.confirmedLeaderInformation.equals( + leaderInformation)) { + leaderElectionDriver.writeLeaderInformation( + confirmedLeaderInformation); + } + })); } private void waitForInitialization(Consumer operation) { @@ -94,6 +116,14 @@ private void waitForInitialization(Consumer operat } public LeaderInformation getConfirmedLeaderInformation() { - return confirmedLeaderInformation; + synchronized (lock) { + return confirmedLeaderInformation; + } + } + + public void close() { + synchronized (lock) { + running = false; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 0c26ad55dde7d..d8acb0ee5b25a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -154,6 +154,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception { is(TEST_LEADER.getLeaderSessionID())); assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress())); } finally { + electionEventHandler.close(); if (leaderElectionDriver != null) { leaderElectionDriver.close(); } @@ -401,6 +402,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { is(TEST_LEADER.getLeaderSessionID())); assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress())); } finally { + electionEventHandler.close(); if (leaderElectionDriver != null) { leaderElectionDriver.close(); } @@ -450,6 +452,7 @@ public void testExceptionForwarding() throws Exception { .isPresent(), is(true)); } finally { + electionEventHandler.close(); if (leaderElectionDriver != null) { leaderElectionDriver.close(); } @@ -528,6 +531,7 @@ public void testEphemeralZooKeeperNodes() throws Exception { // that was expected } } finally { + electionEventHandler.close(); if (leaderRetrievalDriver != null) { leaderRetrievalDriver.close(); } @@ -577,6 +581,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception is(TEST_LEADER.getLeaderSessionID())); assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress())); } finally { + electionEventHandler.close(); if (leaderElectionDriver != null) { leaderElectionDriver.close(); }