diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java index f5ff366710251..b7d35f89b6763 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java @@ -142,6 +142,11 @@ public void internalCleanup() throws Exception { .get(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get(); + } + @Override protected String getLeaderNameForResourceManager() { return getLeaderName(RESOURCE_MANAGER_NAME); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java index d6b69c32c04eb..cd78491551058 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java @@ -18,6 +18,9 @@ package org.apache.flink.kubernetes.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.runtime.blob.VoidBlobStore; import org.junit.Test; @@ -76,4 +79,35 @@ public void testInternalCleanupShouldCleanupConfigMaps() throws Exception { } }; } + + @Test + public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception { + new Context() { + { + runTest( + () -> { + final KubernetesHaServices kubernetesHaServices = + new KubernetesHaServices( + flinkKubeClient, + executorService, + configuration, + new VoidBlobStore()); + JobID jobID = new JobID(); + String configMapName = + kubernetesHaServices.getLeaderNameForJobManager(jobID); + final KubernetesConfigMap configMap = + new TestingFlinkKubeClient.MockKubernetesConfigMap( + configMapName); + flinkKubeClient.createConfigMap(configMap); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(true)); + kubernetesHaServices.internalCleanupJobData(jobID); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(false)); + }); + } + }; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 7e2ff220f9492..70bf6aa55cfbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -748,13 +748,14 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobManagerMetricGroup.removeJob(jobId); - boolean cleanupHABlobs = false; + boolean jobGraphRemoved = false; if (cleanupHA) { try { jobGraphWriter.removeJobGraph(jobId); - // only clean up the HA blobs if we could remove the job from HA storage - cleanupHABlobs = true; + // only clean up the HA blobs and ha service data for the particular job + // if we could remove the job from HA storage + jobGraphRemoved = true; } catch (Exception e) { log.warn( "Could not properly remove job {} from submitted job graph store.", @@ -770,6 +771,17 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobId, e); } + + if (jobGraphRemoved) { + try { + highAvailabilityServices.cleanupJobData(jobId); + } catch (Exception e) { + log.warn( + "Could not properly clean data for job {} stored by ha services", + jobId, + e); + } + } } else { try { jobGraphWriter.releaseJobGraph(jobId); @@ -781,7 +793,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { } } - blobServer.cleanupJob(jobId, cleanupHABlobs); + blobServer.cleanupJob(jobId, jobGraphRemoved); } /** Terminate all currently running {@link JobManagerRunner}s. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 9a322a8d92487..4c4dfe610a26c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -205,6 +205,13 @@ public void closeAndCleanupAllData() throws Exception { logger.info("Finished cleaning up the high availability data."); } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + logger.info("Clean up the high availability data for job {}.", jobID); + internalCleanupJobData(jobID); + logger.info("Finished cleaning up the high availability data for job {}.", jobID); + } + /** * Create leader election service with specified leaderName. * @@ -260,6 +267,15 @@ public void closeAndCleanupAllData() throws Exception { */ protected abstract void internalCleanup() throws Exception; + /** + * Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for + * the specified Job. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception when do the cleanup operation on external storage. + */ + protected abstract void internalCleanupJobData(JobID jobID) throws Exception; + /** * Get the leader name for ResourceManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index fdb3fa7dcb535..fdf031e34e19f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -237,4 +237,12 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { * up data stored by them. */ void closeAndCleanupAllData() throws Exception; + + /** + * Deletes all data for specified job stored by these services in external stores. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception Thrown, if an exception occurred while cleaning data stored by them. + */ + void cleanupJobData(JobID jobID) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 188bdbbfd1c42..dfc53dca61ba0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.highavailability.nonha; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -111,6 +112,9 @@ public void closeAndCleanupAllData() throws Exception { close(); } + @Override + public void cleanupJobData(JobID jobID) throws Exception {} + // ---------------------------------------------------------------------- // Helper methods // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index f6ab9e859d630..802162b142846 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -34,10 +34,14 @@ import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; import javax.annotation.Nonnull; +import java.util.List; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -138,6 +142,22 @@ public void internalCleanup() throws Exception { cleanupZooKeeperPaths(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(parent -> parent + "/" + jobID) + .collect(Collectors.toList()); + for (String path : paths) { + deleteZNode(path); + } + } + @Override protected String getLeaderNameForResourceManager() { return RESOURCE_MANAGER_LEADER_PATH; @@ -168,6 +188,10 @@ private void cleanupZooKeeperPaths() throws Exception { } private void deleteOwnedZNode() throws Exception { + deleteZNode("/"); + } + + private void deleteZNode(String path) throws Exception { // delete the HA_CLUSTER_ID znode which is owned by this cluster // Since we are using Curator version 2.12 there is a bug in deleting the children @@ -176,13 +200,18 @@ private void deleteOwnedZNode() throws Exception { // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. boolean zNodeDeleted = false; while (!zNodeDeleted) { + Stat stat = client.checkExists().forPath(path); + if (stat == null) { + logger.debug("znode {} has been deleted", path); + return; + } try { - client.delete().deletingChildrenIfNeeded().forPath("/"); + client.delete().deletingChildrenIfNeeded().forPath(path); zNodeDeleted = true; } catch (KeeperException.NoNodeException ignored) { // concurrent delete operation. Try again. logger.debug( - "Retrying to delete owned znode because of other concurrent delete operation."); + "Retrying to delete znode because of other concurrent delete operation."); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index eb04d9d747f4b..dd80410cb867a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -131,6 +131,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture storedHABlobFuture; private CompletableFuture deleteAllHABlobsFuture; private CompletableFuture cleanupJobFuture; + private CompletableFuture cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @BeforeClass @@ -151,6 +152,8 @@ public void setup() throws Exception { clearedJobLatch = new OneShotLatch(); runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch); highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry); + cleanupJobHADataFuture = new CompletableFuture<>(); + highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture); storedHABlobFuture = new CompletableFuture<>(); deleteAllHABlobsFuture = new CompletableFuture<>(); @@ -456,6 +459,31 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio assertThatHABlobsHaveBeenRemoved(); } + @Test + public void testHaDataCleanupWhenJobFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + finishJob(jobManagerRunner); + JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS); + assertThat(jobID, is(this.jobId)); + } + + @Test + public void testHaDataCleanupWhenJobNotFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId)); + try { + cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS); + fail("We should not delete the HA data for job."); + } catch (TimeoutException ignored) { + // expected + } + assertThat(cleanupJobHADataFuture.isDone(), is(false)); + } + private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) { takeCreatedJobManagerRunner.completeResultFuture( new ExecutionGraphInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a90dc61c40a1b..96e6daf27efb2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -37,9 +37,12 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -63,7 +66,8 @@ public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws Executors.directExecutor(), testingBlobStoreService, closeOperations, - () -> closeOperations.offer(CloseOperations.HA_CLEANUP)); + () -> closeOperations.offer(CloseOperations.HA_CLEANUP), + ignored -> {}); haServices.closeAndCleanupAllData(); @@ -94,7 +98,8 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( closeOperations, () -> { throw new FlinkException("test exception"); - }); + }, + ignored -> {}); try { haServices.closeAndCleanupAllData(); @@ -106,6 +111,29 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE)); } + @Test + public void testCleanupJobData() throws Exception { + final Queue closeOperations = new ArrayDeque<>(3); + final TestingBlobStoreService testingBlobStoreService = + new TestingBlobStoreService(closeOperations); + + JobID jobID = new JobID(); + CompletableFuture jobCleanupFuture = new CompletableFuture<>(); + + final TestingHaServices haServices = + new TestingHaServices( + new Configuration(), + Executors.directExecutor(), + testingBlobStoreService, + closeOperations, + () -> {}, + jobCleanupFuture::complete); + + haServices.cleanupJobData(jobID); + JobID jobIDCleaned = jobCleanupFuture.get(); + assertThat(jobIDCleaned, is(jobID)); + } + private enum CloseOperations { HA_CLEANUP, HA_CLOSE, @@ -156,16 +184,19 @@ private static final class TestingHaServices extends AbstractHaServices { private final Queue closeOperations; private final RunnableWithException internalCleanupRunnable; + private final Consumer internalJobCleanupConsumer; private TestingHaServices( Configuration config, Executor ioExecutor, BlobStoreService blobStoreService, Queue closeOperations, - RunnableWithException internalCleanupRunnable) { + RunnableWithException internalCleanupRunnable, + Consumer internalJobCleanupConsumer) { super(config, ioExecutor, blobStoreService); this.closeOperations = closeOperations; this.internalCleanupRunnable = internalCleanupRunnable; + this.internalJobCleanupConsumer = internalJobCleanupConsumer; } @Override @@ -203,6 +234,11 @@ protected void internalCleanup() throws Exception { internalCleanupRunnable.run(); } + @Override + protected void internalCleanupJobData(JobID jobID) throws Exception { + internalJobCleanupConsumer.accept(jobID); + } + @Override protected String getLeaderNameForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 0afc9812d86e9..4434eefc852be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -72,6 +73,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private CompletableFuture closeAndCleanupAllDataFuture = new CompletableFuture<>(); + private volatile CompletableFuture jobCleanupFuture; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations // ------------------------------------------------------------------------ @@ -145,6 +148,10 @@ public void setCloseAndCleanupAllDataFuture( this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture; } + public void setCleanupJobDataFuture(CompletableFuture jobCleanupFuture) { + this.jobCleanupFuture = jobCleanupFuture; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -277,4 +284,9 @@ public void close() throws Exception { public void closeAndCleanupAllData() throws Exception { closeAndCleanupAllDataFuture.complete(null); } + + @Override + public void cleanupJobData(JobID jobID) { + Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java index 6cbb92c8cc3fc..912b48f9538e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java @@ -136,6 +136,11 @@ public void closeAndCleanupAllData() throws Exception { // nothing to do } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + // nothing to do + } + public void grantLeadership(JobID jobId, int index, UUID leaderId) { ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java index 81dc6b450db15..32601b9532d47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java @@ -49,9 +49,12 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -148,6 +151,41 @@ public void testCloseAndCleanupAllDataWithUncle() throws Exception { assertThat(client.checkExists().forPath(unclePath), is(notNullValue())); } + /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */ + @Test + public void testCleanupJobData() throws Exception { + String rootPath = "/foo/bar/flink"; + final Configuration configuration = createConfiguration(rootPath); + String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(path -> rootPath + namespace + path) + .collect(Collectors.toList()); + + final TestingBlobStoreService blobStoreService = new TestingBlobStoreService(); + + JobID jobID = new JobID(); + runCleanupTestWithJob( + configuration, + blobStoreService, + jobID, + haServices -> { + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, hasItem(jobID.toString())); + } + haServices.cleanupJobData(jobID); + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, not(hasItem(jobID.toString()))); + } + }); + } + private static CuratorFramework startCuratorFramework() { return CuratorFrameworkFactory.builder() .connectString(ZOO_KEEPER_RESOURCE.getConnectString()) @@ -170,6 +208,16 @@ private void runCleanupTest( TestingBlobStoreService blobStoreService, ThrowingConsumer zooKeeperHaServicesConsumer) throws Exception { + runCleanupTestWithJob( + configuration, blobStoreService, new JobID(), zooKeeperHaServicesConsumer); + } + + private void runCleanupTestWithJob( + Configuration configuration, + TestingBlobStoreService blobStoreService, + JobID jobId, + ThrowingConsumer zooKeeperHaServicesConsumer) + throws Exception { try (ZooKeeperHaServices zooKeeperHaServices = new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), @@ -190,13 +238,23 @@ private void runCleanupTest( resourceManagerLeaderRetriever.start(listener); resourceManagerLeaderElectionService.start( new TestingContender("foobar", resourceManagerLeaderElectionService)); - final JobID jobId = new JobID(); + LeaderElectionService jobManagerLeaderElectionService = + zooKeeperHaServices.getJobManagerLeaderElectionService(jobId); + jobManagerLeaderElectionService.start( + new TestingContender("", jobManagerLeaderElectionService)); + LeaderRetrievalService jobManagerLeaderRetriever = + zooKeeperHaServices.getJobManagerLeaderRetriever(jobId); + jobManagerLeaderRetriever.start( + new LeaderRetrievalUtils.LeaderConnectionInfoListener()); + runningJobsRegistry.setJobRunning(jobId); listener.getLeaderConnectionInfoFuture().join(); resourceManagerLeaderRetriever.stop(); resourceManagerLeaderElectionService.stop(); + jobManagerLeaderRetriever.stop(); + jobManagerLeaderElectionService.stop(); runningJobsRegistry.clearJob(jobId); zooKeeperHaServicesConsumer.accept(zooKeeperHaServices);