diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 63fa09c3467a5..35b6c53922643 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -55,11 +54,9 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; -import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -92,6 +89,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -119,6 +117,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private final Time rpcTimeout; + @GuardedBy("lock") + private final List taskManagers; + + private final TerminatingFatalErrorHandlerFactory taskManagerTerminatingFatalErrorHandlerFactory = new TerminatingFatalErrorHandlerFactory(); + private CompletableFuture terminationFuture; @GuardedBy("lock") @@ -145,8 +148,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { @GuardedBy("lock") private BlobCacheService blobCacheService; - private volatile TaskExecutor[] taskManagers; - @GuardedBy("lock") private LeaderRetrievalService resourceManagerLeaderRetriever; @@ -168,6 +169,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { @GuardedBy("lock") private LeaderRetriever webMonitorLeaderRetriever; + @GuardedBy("lock") + private RpcServiceFactory taskManagerRpcServiceFactory; + /** Flag marking the mini cluster as started/running. */ private volatile boolean running; @@ -186,6 +190,8 @@ public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) { this.rpcTimeout = Time.seconds(10L); this.terminationFuture = CompletableFuture.completedFuture(null); running = false; + + this.taskManagers = new ArrayList<>(miniClusterConfiguration.getNumTaskManagers()); } public CompletableFuture getRestAddress() { @@ -209,6 +215,14 @@ public HighAvailabilityServices getHighAvailabilityServices() { } } + @VisibleForTesting + @Nonnull + protected Collection> getDispatcherResourceManagerComponents() { + synchronized (lock) { + return Collections.unmodifiableCollection(dispatcherResourceManagerComponents); + } + } + // ------------------------------------------------------------------------ // life cycle // ------------------------------------------------------------------------ @@ -234,7 +248,6 @@ public void start() throws Exception { LOG.debug("Using configuration {}", miniClusterConfiguration); final Configuration configuration = miniClusterConfiguration.getConfiguration(); - final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { @@ -248,7 +261,6 @@ public void start() throws Exception { AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration); - final RpcServiceFactory taskManagerRpcServiceFactory; final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory; if (useSingleRpcService) { @@ -287,16 +299,7 @@ public void start() throws Exception { configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); - // bring up the TaskManager(s) for the mini cluster - LOG.info("Starting {} TaskManger(s)", numTaskManagers); - taskManagers = startTaskManagers( - configuration, - haServices, - heartbeatServices, - metricRegistry, - blobCacheService, - numTaskManagers, - taskManagerRpcServiceFactory); + startTaskManagers(); MetricQueryServiceRetriever metricQueryServiceRetriever = new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, @@ -410,14 +413,7 @@ public CompletableFuture closeAsync() { final int numComponents = 2 + miniClusterConfiguration.getNumTaskManagers(); final Collection> componentTerminationFutures = new ArrayList<>(numComponents); - if (taskManagers != null) { - for (TaskExecutor tm : taskManagers) { - if (tm != null) { - componentTerminationFutures.add(tm.closeAsync()); - } - } - taskManagers = null; - } + componentTerminationFutures.addAll(terminateTaskExecutors()); componentTerminationFutures.add(shutDownResourceManagerComponents()); @@ -470,6 +466,62 @@ private CompletableFuture closeMetricSystem() { } } + @GuardedBy("lock") + private void startTaskManagers() throws Exception { + final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); + + LOG.info("Starting {} TaskManger(s)", numTaskManagers); + + for (int i = 0; i < numTaskManagers; i++) { + startTaskExecutor(); + } + } + + @VisibleForTesting + void startTaskExecutor() throws Exception { + synchronized (lock) { + final Configuration configuration = miniClusterConfiguration.getConfiguration(); + + final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( + configuration, + new ResourceID(UUID.randomUUID().toString()), + taskManagerRpcServiceFactory.createRpcService(), + haServices, + heartbeatServices, + metricRegistry, + blobCacheService, + useLocalCommunication(), + taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size())); + + taskExecutor.start(); + taskManagers.add(taskExecutor); + } + } + + @VisibleForTesting + protected boolean useLocalCommunication() { + return miniClusterConfiguration.getNumTaskManagers() == 1; + } + + @GuardedBy("lock") + private Collection> terminateTaskExecutors() { + final Collection> terminationFutures = new ArrayList<>(taskManagers.size()); + for (int i = 0; i < taskManagers.size(); i++) { + terminationFutures.add(terminateTaskExecutor(i)); + } + + return terminationFutures; + } + + @VisibleForTesting + @Nonnull + protected CompletableFuture terminateTaskExecutor(int index) { + synchronized (lock) { + final TaskExecutor taskExecutor = taskManagers.get(index); + return taskExecutor.closeAsync(); + } + } + // ------------------------------------------------------------------------ // Accessing jobs // ------------------------------------------------------------------------ @@ -670,69 +722,6 @@ protected RpcService createRpcService( return new AkkaRpcService(actorSystem, akkaRpcServiceConfig); } - protected ResourceManagerRunner startResourceManager( - Configuration configuration, - HighAvailabilityServices haServices, - HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, - RpcService resourceManagerRpcService, - ClusterInformation clusterInformation, - JobManagerMetricGroup jobManagerMetricGroup) throws Exception { - - final ResourceManagerRunner resourceManagerRunner = new ResourceManagerRunner( - ResourceID.generate(), - FlinkResourceManager.RESOURCE_MANAGER_NAME + '_' + UUID.randomUUID(), - configuration, - resourceManagerRpcService, - haServices, - heartbeatServices, - metricRegistry, - clusterInformation, - jobManagerMetricGroup); - - resourceManagerRunner.start(); - - return resourceManagerRunner; - } - - protected TaskExecutor[] startTaskManagers( - Configuration configuration, - HighAvailabilityServices haServices, - HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, - BlobCacheService blobCacheService, - int numTaskManagers, - RpcServiceFactory rpcServiceFactory) throws Exception { - - final TaskExecutor[] taskExecutors = new TaskExecutor[numTaskManagers]; - final boolean localCommunication = numTaskManagers == 1; - - for (int i = 0; i < numTaskManagers; i++) { - taskExecutors[i] = TaskManagerRunner.startTaskManager( - configuration, - new ResourceID(UUID.randomUUID().toString()), - rpcServiceFactory.createRpcService(), - haServices, - heartbeatServices, - metricRegistry, - blobCacheService, - localCommunication, - new TerminatingFatalErrorHandler(i)); - - taskExecutors[i].start(); - } - - return taskExecutors; - } - - @VisibleForTesting - @Nonnull - protected Collection> getDispatcherResourceManagerComponents() { - synchronized (lock) { - return Collections.unmodifiableCollection(dispatcherResourceManagerComponents); - } - } - // ------------------------------------------------------------------------ // Internal methods // ------------------------------------------------------------------------ @@ -924,13 +913,8 @@ public void onFatalError(Throwable exception) { if (running) { LOG.error("TaskManager #{} failed.", index, exception); - // let's check if there are still TaskManagers because there could be a concurrent - // shut down operation taking place - TaskExecutor[] currentTaskManagers = taskManagers; - - if (currentTaskManagers != null) { - // the shutDown is asynchronous - currentTaskManagers[index].closeAsync(); + synchronized (lock) { + taskManagers.get(index).closeAsync(); } } } @@ -944,4 +928,19 @@ public void onFatalError(Throwable exception) { closeAsync(); } } + + private class TerminatingFatalErrorHandlerFactory { + + /** + * Create a new {@link TerminatingFatalErrorHandler} for the {@link TaskExecutor} with + * the given index. + * + * @param index into the {@link #taskManagers} collection to identify the correct {@link TaskExecutor}. + * @return {@link TerminatingFatalErrorHandler} for the given index + */ + @GuardedBy("lock") + private TerminatingFatalErrorHandler create(int index) { + return new TerminatingFatalErrorHandler(index); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java index c8bf632daa39b..42e5cb439f670 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java @@ -91,7 +91,7 @@ public void testConstraintsAfterRestart() throws Exception { eg.scheduleForExecution(); }); - Predicate isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING); + Predicate isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING); ExecutionGraphTestUtils.waitForAllExecutionsPredicate( eg, isDeploying, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 10e30c752c4ef..11f1058381264 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -197,23 +197,14 @@ public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex */ public static void waitForAllExecutionsPredicate( ExecutionGraph executionGraph, - Predicate executionPredicate, + Predicate executionPredicate, long maxWaitMillis) throws TimeoutException { - final Iterable allExecutionVertices = executionGraph.getAllExecutionVertices(); - + final Predicate allExecutionsPredicate = allExecutionsPredicate(executionPredicate); final Deadline deadline = Deadline.fromNow(Duration.ofMillis(maxWaitMillis)); boolean predicateResult; do { - predicateResult = true; - for (ExecutionVertex executionVertex : allExecutionVertices) { - final Execution currentExecution = executionVertex.getCurrentExecutionAttempt(); - - if (currentExecution == null || !executionPredicate.test(currentExecution)) { - predicateResult = false; - break; - } - } + predicateResult = allExecutionsPredicate.test(executionGraph); if (!predicateResult) { try { @@ -229,13 +220,29 @@ public static void waitForAllExecutionsPredicate( } } + public static Predicate allExecutionsPredicate(final Predicate executionPredicate) { + return accessExecutionGraph -> { + final Iterable allExecutionVertices = accessExecutionGraph.getAllExecutionVertices(); + + for (AccessExecutionVertex executionVertex : allExecutionVertices) { + final AccessExecution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); + + if (currentExecutionAttempt == null || !executionPredicate.test(currentExecutionAttempt)) { + return false; + } + } + + return true; + }; + } + /** * Predicate which is true if the given {@link Execution} has a resource assigned. */ static final Predicate hasResourceAssigned = (Execution execution) -> execution.getAssignedResource() != null; - static Predicate isInExecutionState(ExecutionState executionState) { - return (Execution execution) -> execution.getState() == executionState; + public static Predicate isInExecutionState(ExecutionState executionState) { + return (AccessExecution execution) -> execution.getState() == executionState; } public static void waitUntilFailoverRegionState(FailoverRegion region, JobStatus status, long maxWaitMillis) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 1c5a3484ceb9d..5d4df0bc85972 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -100,6 +100,7 @@ import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.runtime.taskexecutor.AccumulatorReport; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; @@ -151,6 +152,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -181,7 +186,7 @@ public class JobMasterTest extends TestLogger { private static final long fastHeartbeatTimeout = 5L; private static final long heartbeatInterval = 1000L; - private static final long heartbeatTimeout = 5000L; + private static final long heartbeatTimeout = 5_000_000L; private static final JobGraph jobGraph = new JobGraph(); @@ -642,12 +647,11 @@ public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception { final long start = System.nanoTime(); jobMaster.start(JobMasterId.generate()).get(); - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + final TestingResourceManagerGateway resourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(2); resourceManagerGateway.setRequestSlotConsumer(blockingQueue::offer); - rpcService.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); - rmLeaderRetrievalService.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID()); + notifyResourceManagerLeaderListeners(resourceManagerGateway); // wait for the first slot request blockingQueue.take(); @@ -707,15 +711,9 @@ public void testCloseUnestablishedResourceManagerConnection() throws Exception { try { jobMaster.start(JobMasterId.generate()).get(); - final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final String firstResourceManagerAddress = "address1"; - final String secondResourceManagerAddress = "address2"; - final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway(); - final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway(); - - rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway); - rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway); + final TestingResourceManagerGateway firstResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); + final TestingResourceManagerGateway secondResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final OneShotLatch firstJobManagerRegistration = new OneShotLatch(); final OneShotLatch secondJobManagerRegistration = new OneShotLatch(); @@ -726,13 +724,13 @@ public void testCloseUnestablishedResourceManagerConnection() throws Exception { secondResourceManagerGateway.setRegisterJobManagerConsumer( jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger()); - rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID()); + notifyResourceManagerLeaderListeners(firstResourceManagerGateway); // wait until we have seen the first registration attempt firstJobManagerRegistration.await(); // this should stop the connection attempts towards the first RM - rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID()); + notifyResourceManagerLeaderListeners(secondResourceManagerGateway); // check that we start registering at the second RM secondJobManagerRegistration.await(); @@ -760,18 +758,14 @@ public void testReconnectionAfterDisconnect() throws Exception { try { // wait for the start to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final BlockingQueue registrationsQueue = new ArrayBlockingQueue<>(1); testingResourceManagerGateway.setRegisterJobManagerConsumer( jobMasterIdResourceIDStringJobIDTuple4 -> registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0)); - rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); - final ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken(); - rmLeaderRetrievalService.notifyListener( - testingResourceManagerGateway.getAddress(), - resourceManagerId.toUUID()); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); // wait for first registration attempt final JobMasterId firstRegistrationAttempt = registrationsQueue.take(); @@ -805,16 +799,13 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep // wait for the start to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final BlockingQueue registrationQueue = new ArrayBlockingQueue<>(1); testingResourceManagerGateway.setRegisterJobManagerConsumer( jobMasterIdResourceIDStringJobIDTuple4 -> registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0)); - final String resourceManagerAddress = testingResourceManagerGateway.getAddress(); - rpcService.registerGateway(resourceManagerAddress, testingResourceManagerGateway); - - rmLeaderRetrievalService.notifyListener(resourceManagerAddress, testingResourceManagerGateway.getFencingToken().toUUID()); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); final JobMasterId firstRegistrationAttempt = registrationQueue.take(); @@ -1229,13 +1220,10 @@ public void testRequestPartitionState() throws Exception { // wait for the start to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); - + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); final CompletableFuture allocationIdFuture = new CompletableFuture<>(); testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); - rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); - final CompletableFuture tddFuture = new CompletableFuture<>(); final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { @@ -1247,7 +1235,7 @@ public void testRequestPartitionState() throws Exception { final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); - rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); final AllocationID allocationId = allocationIdFuture.get(); @@ -1313,6 +1301,10 @@ public void testRequestPartitionState() throws Exception { } } + private void notifyResourceManagerLeaderListeners(TestingResourceManagerGateway testingResourceManagerGateway) { + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + } + /** * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)} * is respected. @@ -1380,9 +1372,8 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception jobManagerSharedServices, heartbeatServices); - final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); - rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); - rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); final CompletableFuture allocationIdFuture = new CompletableFuture<>(); @@ -1430,6 +1421,125 @@ public void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception } } + @Nonnull + private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + return testingResourceManagerGateway; + } + + /** + * Tests that the job execution is failed if the TaskExecutor disconnects from the + * JobMaster. + */ + @Test + public void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { + runJobFailureWhenTaskExecutorTerminatesTest( + heartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager( + localTaskManagerLocation.getResourceID(), + new FlinkException("Test disconnectTaskManager exception.")), + (jobMasterGateway, resourceID) -> ignored -> {}); + } + + @Test + public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { + final AtomicBoolean respondToHeartbeats = new AtomicBoolean(true); + runJobFailureWhenTaskExecutorTerminatesTest( + fastHeartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> respondToHeartbeats.set(false), + (jobMasterGateway, taskManagerResourceId) -> resourceId -> { + if (respondToHeartbeats.get()) { + jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new AccumulatorReport(Collections.emptyList())); + } + } + ); + } + + private void runJobFailureWhenTaskExecutorTerminatesTest( + HeartbeatServices heartbeatServices, + BiConsumer jobReachedRunningState, + BiFunction> heartbeatConsumerFunction) throws Exception { + final JobGraph jobGraph = createSingleVertexJobGraph(); + final TestingOnCompletionActions onCompletionActions = new TestingOnCompletionActions(); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatServices, + onCompletionActions); + + final TestingResourceManagerGateway testingResourceManagerGateway = createAndRegisterTestingResourceManagerGateway(); + notifyResourceManagerLeaderListeners(testingResourceManagerGateway); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + final CompletableFuture taskDeploymentFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId()); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway, taskManagerLocation.getResourceID())) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway); + + jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + + offerSingleSlot(jobMasterGateway, taskManagerLocation); + + final ExecutionAttemptID executionAttemptId = taskDeploymentFuture.get(); + + jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), executionAttemptId, ExecutionState.RUNNING)).get(); + + jobReachedRunningState.accept(taskManagerLocation, jobMasterGateway); + + final ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions.getJobReachedGloballyTerminalStateFuture().get(); + + assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + private void offerSingleSlot(JobMasterGateway jobMasterGateway, LocalTaskManagerLocation taskManagerLocation) throws InterruptedException, ExecutionException { + final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN); + final Collection slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(); + + assertThat(slotOffers, hasSize(1)); + } + + private static final class TestingOnCompletionActions implements OnCompletionActions { + + private final CompletableFuture jobReachedGloballyTerminalStateFuture = new CompletableFuture<>(); + private final CompletableFuture jobFinishedByOtherFuture = new CompletableFuture<>(); + private final CompletableFuture jobMasterFailedFuture = new CompletableFuture<>(); + + @Override + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { + jobReachedGloballyTerminalStateFuture.complete(executionGraph); + } + + @Override + public void jobFinishedByOther() { + jobFinishedByOtherFuture.complete(null); + } + + @Override + public void jobMasterFailed(Throwable cause) { + jobMasterFailedFuture.complete(cause); + } + + public CompletableFuture getJobReachedGloballyTerminalStateFuture() { + return jobReachedGloballyTerminalStateFuture; + } + } + private JobGraph producerConsumerJobGraph() { final JobVertex producer = new JobVertex("Producer"); producer.setInvokableClass(NoOpInvokable.class); @@ -1530,6 +1640,23 @@ private JobMaster createJobMaster( JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices) throws Exception { + return createJobMaster( + configuration, + jobGraph, + highAvailabilityServices, + jobManagerSharedServices, + heartbeatServices, + new NoOpOnCompletionActions()); + } + + @Nonnull + private JobMaster createJobMaster( + Configuration configuration, + JobGraph jobGraph, + HighAvailabilityServices highAvailabilityServices, + JobManagerSharedServices jobManagerSharedServices, + HeartbeatServices heartbeatServices, + OnCompletionActions onCompletionActions) throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); return new JobMaster( @@ -1542,22 +1669,28 @@ private JobMaster createJobMaster( jobManagerSharedServices, heartbeatServices, UnregisteredJobManagerJobMetricGroupFactory.INSTANCE, - new NoOpOnCompletionActions(), + onCompletionActions, testingFatalErrorHandler, JobMasterTest.class.getClassLoader()); } private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException { - final JobVertex jobVertex = new JobVertex("Test vertex"); - jobVertex.setInvokableClass(NoOpInvokable.class); + final JobGraph jobGraph = createSingleVertexJobGraph(); final ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0L)); + jobGraph.setExecutionConfig(executionConfig); + + return jobGraph; + } + + @Nonnull + private JobGraph createSingleVertexJobGraph() { + final JobVertex jobVertex = new JobVertex("Test vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); final JobGraph jobGraph = new JobGraph(jobVertex); jobGraph.setAllowQueuedScheduling(true); - jobGraph.setExecutionConfig(executionConfig); - return jobGraph; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java index d43286253dac2..22d7b4ba1a38d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Supplier; @@ -66,6 +67,22 @@ public Collection> getDispatcherResourceMa return super.getDispatcherResourceManagerComponents(); } + @Nonnull + @Override + public CompletableFuture terminateTaskExecutor(int index) { + return super.terminateTaskExecutor(index); + } + + @Override + public void startTaskExecutor() throws Exception { + super.startTaskExecutor(); + } + + @Override + protected boolean useLocalCommunication() { + return false; + } + @Override protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception { if (highAvailabilityServicesSupplier != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 950a4e136748e..ed8002b33ee06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -48,6 +48,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -93,7 +94,7 @@ public TestingResourceManagerGateway() { this( ResourceManagerId.generate(), ResourceID.generate(), - "localhost", + "localhost/" + UUID.randomUUID(), "localhost"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index d2b976f9a7491..16450f9d73af2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -18,222 +18,139 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobCacheService; -import org.apache.flink.runtime.blob.VoidBlobStore; -import org.apache.flink.runtime.clusterframework.FlinkResourceManager; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.entrypoint.ClusterInformation; -import org.apache.flink.runtime.heartbeat.HeartbeatServices; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; -import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; -import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; -import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; -import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; -import org.apache.flink.runtime.taskexecutor.slot.TimerService; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SupplierWithException; -import org.hamcrest.Matchers; -import org.junit.Rule; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; - -import java.io.File; -import java.net.InetAddress; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.UUID; + +import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.RETURNS_MOCKS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +/** + * Integration tests for the {@link TaskExecutor}. + */ public class TaskExecutorITCase extends TestLogger { - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L); + private static final int NUM_TMS = 2; + private static final int SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + + private TestingMiniCluster miniCluster; + + @Before + public void setup() throws Exception { + miniCluster = new TestingMiniCluster( + new TestingMiniClusterConfiguration.Builder() + .setNumTaskManagers(NUM_TMS) + .setNumSlotsPerTaskManager(SLOTS_PER_TM) + .build(), + null); - private final Time timeout = Time.seconds(10L); + miniCluster.start(); + } + + @After + public void teardown() throws Exception { + if (miniCluster != null) { + miniCluster.close(); + } + } + /** + * Tests that a job will be re-executed if a new TaskExecutor joins the cluster. + */ @Test - public void testSlotAllocation() throws Exception { - TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices testingHAServices = new TestingHighAvailabilityServices(); - final Configuration configuration = new Configuration(); - final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - final ResourceID taskManagerResourceId = new ResourceID("foobar"); - final UUID rmLeaderId = UUID.randomUUID(); - final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final SettableLeaderRetrievalService rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null); - final String rmAddress = "rm"; - final String jmAddress = "jm"; - final JobMasterId jobMasterId = JobMasterId.generate(); - final ResourceID rmResourceId = new ResourceID(rmAddress); - final ResourceID jmResourceId = new ResourceID(jmAddress); - final JobID jobId = new JobID(); - final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); - - testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); - testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - testingHAServices.setJobMasterLeaderRetriever(jobId, new SettableLeaderRetrievalService(jmAddress, jobMasterId.toUUID())); - - TestingRpcService rpcService = new TestingRpcService(); - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( - testingHAServices, - rpcService.getScheduledExecutor(), - Time.minutes(5L)); - MetricRegistry metricRegistry = NoOpMetricRegistry.INSTANCE; - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - - final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); - final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234); - final List resourceProfiles = Arrays.asList(resourceProfile); - final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, new TimerService(scheduledExecutorService, 100L)); - final SlotManager slotManager = new SlotManager( - rpcService.getScheduledExecutor(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime()); - - final File[] taskExecutorLocalStateRootDirs = - new File[]{ new File(tempFolder.getRoot(),"localRecovery") }; - - final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager( - false, - taskExecutorLocalStateRootDirs, - rpcService.getExecutor()); - - ResourceManager resourceManager = new StandaloneResourceManager( - rpcService, - FlinkResourceManager.RESOURCE_MANAGER_NAME, - rmResourceId, - testingHAServices, - heartbeatServices, - slotManager, - metricRegistry, - jobLeaderIdService, - new ClusterInformation("localhost", 1234), - testingFatalErrorHandler, - UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup()); - - final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() - .setTaskManagerLocation(taskManagerLocation) - .setTaskSlotTable(taskSlotTable) - .setTaskStateManager(taskStateManager) - .build(); - - TaskExecutor taskExecutor = new TaskExecutor( - rpcService, - taskManagerConfiguration, - testingHAServices, - taskManagerServices, - heartbeatServices, - UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, - new BlobCacheService( - configuration, - new VoidBlobStore(), - null), - testingFatalErrorHandler); - - JobMasterGateway jmGateway = mock(JobMasterGateway.class); - - when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(taskManagerResourceId))); - when(jmGateway.getHostname()).thenReturn(jmAddress); - when(jmGateway.offerSlots( - eq(taskManagerResourceId), - any(Collection.class), - any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); - when(jmGateway.getFencingToken()).thenReturn(jobMasterId); - - - rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class)); - rpcService.registerGateway(jmAddress, jmGateway); - rpcService.registerGateway(taskExecutor.getAddress(), taskExecutor.getSelfGateway(TaskExecutorGateway.class)); - - final AllocationID allocationId = new AllocationID(); - final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, jmAddress); - final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile); - - try { - resourceManager.start(); - taskExecutor.start(); - - final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - - // notify the RM that it is the leader - CompletableFuture isLeaderFuture = rmLeaderElectionService.isLeader(rmLeaderId); - - // wait for the completion of the leader election - assertEquals(rmLeaderId, isLeaderFuture.get()); - - // notify the TM about the new RM leader - rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId); - - CompletableFuture registrationResponseFuture = rmGateway.registerJobManager( - jobMasterId, - jmResourceId, - jmAddress, - jobId, - timeout); - - RegistrationResponse registrationResponse = registrationResponseFuture.get(); - - assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess); - - CompletableFuture slotAck = rmGateway.requestSlot(jobMasterId, slotRequest, timeout); - - slotAck.get(); - - verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( - eq(taskManagerResourceId), - (Collection)argThat(Matchers.contains(slotOffer)), - any(Time.class)); - } finally { - if (testingFatalErrorHandler.hasExceptionOccurred()) { - testingFatalErrorHandler.rethrowError(); - } - - RpcUtils.terminateRpcService(rpcService, timeout); + public void testNewTaskExecutorJoinsCluster() throws Exception { + final JobGraph jobGraph = createJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).get(); + + final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); + + assertThat(jobResultFuture.isDone(), is(false)); + + CommonTestUtils.waitUntilCondition( + jobIsRunning(() -> miniCluster.getExecutionGraph(jobGraph.getJobID())), + Deadline.fromNow(TESTING_TIMEOUT), + 20L); + + // kill one TaskExecutor which should fail the job execution + miniCluster.terminateTaskExecutor(0); + + final JobResult jobResult = jobResultFuture.get(); + + assertThat(jobResult.isSuccess(), is(false)); + + miniCluster.startTaskExecutor(); + + BlockingOperator.unblock(); + + miniCluster.submitJob(jobGraph).get(); + + miniCluster.requestJobResult(jobGraph.getJobID()).get(); + } + + private SupplierWithException jobIsRunning(Supplier> executionGraphFutureSupplier) { + final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING)); + + return () -> { + final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join(); + return allExecutionsRunning.test(executionGraph); + }; + } + + private JobGraph createJobGraph(int parallelism) { + final JobVertex vertex = new JobVertex("blocking operator"); + vertex.setParallelism(parallelism); + vertex.setInvokableClass(BlockingOperator.class); + + BlockingOperator.reset(); + + return new JobGraph("Blocking test job", vertex); + } + + /** + * Blocking invokable which is controlled by a static field. + */ + public static class BlockingOperator extends AbstractInvokable { + private static CountDownLatch countDownLatch = new CountDownLatch(1); + + public BlockingOperator(Environment environment) { + super(environment); } + @Override + public void invoke() throws Exception { + countDownLatch.await(); + } + + public static void unblock() { + countDownLatch.countDown(); + } + public static void reset() { + countDownLatch = new CountDownLatch(1); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 1dca276a01910..b05b0e3961515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -108,18 +108,18 @@ import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.mockito.hamcrest.MockitoHamcrest; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.slf4j.Logger; import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -130,8 +130,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.stream.Collectors; -import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -144,14 +145,12 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -160,6 +159,7 @@ */ public class TaskExecutorTest extends TestLogger { + public static final HeartbeatServices HEARTBEAT_SERVICES = new HeartbeatServices(1000L, 1000L); @Rule public final TemporaryFolder tmp = new TemporaryFolder(); @@ -468,8 +468,6 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { */ @Test public void testHeartbeatSlotReporting() throws Exception { - final long verificationTimeout = 1000L; - final long heartbeatTimeout = 10000L; final String rmAddress = "rm"; final UUID rmLeaderId = UUID.randomUUID(); @@ -499,7 +497,6 @@ public void testHeartbeatSlotReporting() throws Exception { rpc.registerGateway(rmAddress, rmGateway); - final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotReport slotReport1 = new SlotReport( @@ -513,28 +510,7 @@ public void testHeartbeatSlotReporting() throws Exception { new JobID(), new AllocationID())); - when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport1, slotReport2); - - final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); - - when(heartbeatServices.createHeartbeatManager( - eq(taskManagerLocation.getResourceID()), - any(HeartbeatListener.class), - any(ScheduledExecutor.class), - any(Logger.class))).thenAnswer( - new Answer>() { - @Override - public HeartbeatManagerImpl answer(InvocationOnMock invocation) throws Throwable { - return spy(new HeartbeatManagerImpl<>( - heartbeatTimeout, - taskManagerLocation.getResourceID(), - (HeartbeatListener) invocation.getArguments()[1], - (Executor) invocation.getArguments()[2], - (ScheduledExecutor) invocation.getArguments()[2], - (Logger) invocation.getArguments()[3])); - } - } - ); + final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2))); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( false, @@ -552,7 +528,7 @@ public HeartbeatManagerImpl answer(InvocationOnMock invocation taskManagerConfiguration, haServices, taskManagerServices, - heartbeatServices, + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -561,9 +537,6 @@ public HeartbeatManagerImpl answer(InvocationOnMock invocation try { taskManager.start(); - // wait for spied heartbeat manager instance - HeartbeatManager heartbeatManager = taskManager.getResourceManagerHeartbeatManager(); - // define a leader and see that a registration happens resourceManagerLeaderRetriever.notifyListener(rmAddress, rmLeaderId); @@ -571,8 +544,6 @@ public HeartbeatManagerImpl answer(InvocationOnMock invocation assertThat(taskExecutorRegistrationFuture.get(), equalTo(taskManagerLocation.getResourceID())); assertThat(initialSlotReportFuture.get(), equalTo(slotReport1)); - verify(heartbeatManager, timeout(verificationTimeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class)); - TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class); // trigger the heartbeat asynchronously @@ -631,7 +602,7 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -694,7 +665,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -816,7 +787,7 @@ public void testTaskSubmission() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -864,9 +835,6 @@ public void testJobLeaderDetection() throws Exception { final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration()); - final String resourceManagerAddress = "rm"; - final ResourceManagerId resourceManagerLeaderId = ResourceManagerId.generate(); - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); CompletableFuture initialSlotReportFuture = new CompletableFuture<>(); resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> { @@ -874,28 +842,20 @@ public void testJobLeaderDetection() throws Exception { return CompletableFuture.completedFuture(Acknowledge.get()); }); - final String jobManagerAddress = "jm"; - final UUID jobManagerLeaderId = UUID.randomUUID(); - final ResourceID jmResourceId = new ResourceID(jobManagerAddress); - final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + final CompletableFuture> offeredSlotsFuture = new CompletableFuture<>(); + final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() + .setOfferSlotsFunction((resourceID, slotOffers) -> { - when(jobMasterGateway.registerTaskManager( - any(String.class), - eq(taskManagerLocation), - any(Time.class) - )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId))); - when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); - when(jobMasterGateway.offerSlots( - any(ResourceID.class), - any(Collection.class), - any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); + offeredSlotsFuture.complete(new ArrayList<>(slotOffers)); + return CompletableFuture.completedFuture(slotOffers); + }) + .build(); - rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); - rpc.registerGateway(jobManagerAddress, jobMasterGateway); + rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); + rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); final AllocationID allocationId = new AllocationID(); final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0); - final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( false, @@ -915,7 +875,7 @@ public void testJobLeaderDetection() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -927,7 +887,7 @@ public void testJobLeaderDetection() throws Exception { final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class); // tell the task manager about the rm leader - resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, resourceManagerLeaderId.toUUID()); + resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID()); // wait for the initial slot report initialSlotReportFuture.get(); @@ -937,20 +897,18 @@ public void testJobLeaderDetection() throws Exception { slotId, jobId, allocationId, - jobManagerAddress, - resourceManagerLeaderId, + jobMasterGateway.getAddress(), + resourceManagerGateway.getFencingToken(), timeout); slotRequestAck.get(); // now inform the task manager about the new job leader - jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobManagerLeaderId); + jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID()); - // the job leader should get the allocation id offered - verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( - any(ResourceID.class), - (Collection) MockitoHamcrest.argThat(contains(slotOffer)), - any(Time.class)); + final Collection offeredSlots = offeredSlotsFuture.get(); + final Collection allocationIds = offeredSlots.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList()); + assertThat(allocationIds, containsInAnyOrder(allocationId)); } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); } @@ -1030,7 +988,7 @@ public void testSlotAcceptance() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -1124,7 +1082,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -1359,7 +1317,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception { taskManagerConfiguration, haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -1838,7 +1796,7 @@ private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) TaskManagerConfiguration.fromConfiguration(configuration), haServices, taskManagerServices, - new HeartbeatServices(1000L, 1000L), + HEARTBEAT_SERVICES, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, @@ -1941,4 +1899,18 @@ public void monitorTarget(ResourceID resourceID, HeartbeatTarget heartbeatTar monitoredTargets.offer(resourceID); } } + + private static final class TestingTaskSlotTable extends TaskSlotTable { + private final Queue slotReports; + + private TestingTaskSlotTable(Queue slotReports) { + super(Collections.singleton(ResourceProfile.UNKNOWN), new TimerService<>(TestingUtils.defaultExecutor(), 10000L)); + this.slotReports = slotReports; + } + + @Override + public SlotReport createSlotReport(ResourceID resourceId) { + return slotReports.poll(); + } + } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala deleted file mode 100644 index 69ad2d7dc640a..0000000000000 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.runtime.taskmanager - -import akka.actor.{ActorSystem, Kill, PoisonPill} -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions} -import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} -import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -@RunWith(classOf[JUnitRunner]) -class TaskManagerFailsITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - - def this() = this(ActorSystem("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager" should { - - "detect a failing task manager" in { - - val num_slots = 11 - val cluster = createDeathwatchCluster(num_slots, 2) - - cluster.start() - - val taskManagers = cluster.getTaskManagers - val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) - - jmGateway.tell(DisableDisconnect) - - try{ - within(TestingUtils.TESTING_DURATION){ - jmGateway.tell(RequestNumberRegisteredTaskManager, self) - expectMsg(2) - - jmGateway.tell(NotifyWhenTaskManagerTerminated(taskManagers(0)), self) - - taskManagers(0) ! PoisonPill - - val TaskManagerTerminated(tm) = expectMsgClass(classOf[TaskManagerTerminated]) - - jmGateway.tell(RequestNumberRegisteredTaskManager, self) - expectMsg(1) - } - } - finally { - cluster.stop() - } - } - - "handle gracefully failing task manager" in { - - val num_tasks = 31 - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks, 2) - - val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID())) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - - expectMsg(AllVerticesRunning(jobID)) - - jmGateway.tell(RequestWorkingTaskManager(jobID), self) - - val gatewayOption = expectMsgType[WorkingTaskManager].gatewayOption - - gatewayOption match { - case Some(gateway) => - // kill one task manager - gateway.tell(PoisonPill) - - case None => fail("Could not retrieve a working task manager.") - } - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - } - } finally { - cluster.stop() - } - } - - "handle hard failing task manager" in { - val num_tasks = 31 - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks, 2) - - val taskManagers = cluster.getTaskManagers - val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID())) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - expectMsg(AllVerticesRunning(jobID)) - - // kill one task manager - taskManagers(0) ! Kill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - } - } finally { - cluster.stop() - } - } - - "go into a clean state in case of a TaskManager failure" in { - val num_slots = 20 - - val sender = new JobVertex("BlockingSender") - sender.setParallelism(num_slots) - sender.setInvokableClass(classOf[BlockingNoOpInvokable]) - val jobGraph = new JobGraph("Blocking Testjob", sender) - - val noOp = new JobVertex("NoOpInvokable") - noOp.setParallelism(num_slots) - noOp.setInvokableClass(classOf[NoOpInvokable]) - val jobGraph2 = new JobGraph("NoOp Testjob", noOp) - - val cluster = createDeathwatchCluster(num_slots/2, 2) - - cluster.start() - - var tm = cluster.getTaskManagers(0) - val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) - - try{ - within(TestingUtils.TESTING_DURATION){ - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - tm ! PoisonPill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - - cluster.restartTaskManager(0) - - tm = cluster.getTaskManagers(0) - - tm ! NotifyWhenRegisteredAtJobManager - - expectMsgClass(classOf[RegisteredAtJobManager]) - - jmGateway.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self) - - expectMsg(JobSubmitSuccess(jobGraph2.getJobID())) - - val result = expectMsgType[JobResultSuccess] - result.result.getJobId() should equal(jobGraph2.getJobID) - } - } finally { - cluster.stop() - } - } - } - - def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { - val config = new Configuration() - config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) - - new TestingCluster(config, singleActorSystem = false) - } -}