Skip to content

Commit

Permalink
[FLINK-25430][review #2] Makes Dispatcher.recoveredJobs consider the …
Browse files Browse the repository at this point in the history
…dirty JobResults

We don't want to retrigger jobs that finished already based on the JobResultStore.
  • Loading branch information
XComp committed Jan 21, 2022
1 parent 02a4c05 commit e6eb3d5
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -125,8 +126,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher

private final Collection<JobGraph> recoveredJobs;

private final Collection<JobResult> recoveredDirtyJobs;

private final DispatcherBootstrapFactory dispatcherBootstrapFactory;

private final ExecutionGraphInfoStore executionGraphInfoStore;
Expand Down Expand Up @@ -197,12 +196,10 @@ public Dispatcher(

this.dispatcherBootstrapFactory = checkNotNull(dispatcherBootstrapFactory);

this.recoveredJobs = new HashSet<>(recoveredJobs);
this.recoveredJobs = getRecoveredUnfinishedJobs(recoveredJobs, recoveredDirtyJobs);
this.blobServer.retainJobs(
recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()));

this.recoveredDirtyJobs = new HashSet<>(recoveredDirtyJobs);

this.dispatcherCachedOperationsHandler =
new DispatcherCachedOperationsHandler(
dispatcherServices.getOperationCaches(),
Expand Down Expand Up @@ -250,6 +247,20 @@ private void startDispatcherServices() throws Exception {
}
}

private Collection<JobGraph> getRecoveredUnfinishedJobs(
Collection<JobGraph> recoveredJobs, Collection<JobResult> recoveredDirtyJobResults) {
final Set<JobID> jobIdsOfFinishedJobs =
recoveredDirtyJobResults.stream()
.map(JobResult::getJobId)
.collect(Collectors.toSet());

return recoveredJobs.stream()
.filter(
recoveredJobGraph ->
!jobIdsOfFinishedJobs.contains(recoveredJobGraph.getJobID()))
.collect(Collectors.toSet());
}

private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class TestingDispatcherBuilder {

private Collection<JobResult> dirtyJobResults = Collections.emptyList();

private final DispatcherBootstrapFactory dispatcherBootstrapFactory =
private DispatcherBootstrapFactory dispatcherBootstrapFactory =
(dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap();

private HeartbeatServices heartbeatServices = AbstractDispatcherTest.this.heartbeatServices;
Expand Down Expand Up @@ -176,6 +176,12 @@ TestingDispatcherBuilder setDirtyJobResults(Collection<JobResult> dirtyJobResult
return this;
}

TestingDispatcherBuilder setDispatcherBootstrapFactory(
DispatcherBootstrapFactory dispatcherBootstrapFactory) {
this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
return this;
}

TestingDispatcherBuilder setJobManagerRunnerFactory(
JobManagerRunnerFactory jobManagerRunnerFactory) {
this.jobManagerRunnerFactory = jobManagerRunnerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,35 @@ public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
fatalErrorHandler.clearError();
}

@Test
public void testThatDirtilyFinishedJobsNotBeingRetriggered() throws Exception {
jobMasterLeaderElectionService.isLeader(UUID.randomUUID());

final JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
final JobResult jobResult =
new JobResult.Builder().jobId(jobGraph.getJobID()).netRuntime(1).build();
final TestingJobManagerRunnerFactory jobManagerRunnerFactory =
new TestingJobManagerRunnerFactory();
final OneShotLatch bootstrapInstantiationLatch = new OneShotLatch();
dispatcher =
new TestingDispatcherBuilder()
.setJobManagerRunnerFactory(jobManagerRunnerFactory)
.setInitialJobGraphs(Collections.singleton(jobGraph))
.setDirtyJobResults(Collections.singleton(jobResult))
.setDispatcherBootstrapFactory(
(dispatcher, scheduledExecutor, errorHandler) -> {
bootstrapInstantiationLatch.trigger();
return new NoOpDispatcherBootstrap();
})
.build();

dispatcher.start();

bootstrapInstantiationLatch.await();

assertThat(jobManagerRunnerFactory.getQueueSize(), is(0));
}

/** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */
@Test
public void testFailingJobManagerRunnerCleanup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ private TestingJobManagerRunner createTestingJobManagerRunner(JobGraph jobGraph)
public TestingJobManagerRunner takeCreatedJobManagerRunner() throws InterruptedException {
return createdJobManagerRunner.take();
}

public int getQueueSize() {
return createdJobManagerRunner.size();
}
}

0 comments on commit e6eb3d5

Please sign in to comment.