diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java new file mode 100644 index 0000000000000..f6082f4e081fd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.types.IntValue; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Integration tests for job scheduling. + */ +public class JobExecutionITCase extends TestLogger { + + /** + * Tests that tasks with a co-location constraint are scheduled in the same + * slots. In fact it also tests that consumers are scheduled wrt their input + * location if the co-location constraint is deactivated. + */ + @Test + public void testCoLocationConstraintJobExecution() throws Exception { + final int numSlotsPerTaskExecutor = 1; + final int numTaskExecutors = 3; + final int parallelism = numTaskExecutors * numSlotsPerTaskExecutor; + final JobGraph jobGraph = createJobGraph(parallelism); + + final TestingMiniClusterConfiguration miniClusterConfiguration = new TestingMiniClusterConfiguration.Builder() + .setNumSlotsPerTaskManager(numSlotsPerTaskExecutor) + .setNumTaskManagers(numTaskExecutors) + .setLocalCommunication(true) + .build(); + + try (TestingMiniCluster miniCluster = new TestingMiniCluster(miniClusterConfiguration)) { + miniCluster.start(); + + miniCluster.submitJob(jobGraph).get(); + + final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); + + assertThat(jobResultFuture.get().isSuccess(), is(true)); + } + } + + private JobGraph createJobGraph(int parallelism) { + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(parallelism); + sender.setInvokableClass(Sender.class); + + final JobVertex receiver = new JobVertex("Receiver"); + receiver.setParallelism(parallelism); + receiver.setInvokableClass(Receiver.class); + + // In order to make testCoLocationConstraintJobExecution fail, one needs to + // remove the co-location constraint and the slot sharing groups, because then + // the receivers will have to wait for the senders to finish and the slot + // assignment order to the receivers is non-deterministic (depending on the + // order in which the senders finish). + final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + receiver.setSlotSharingGroup(slotSharingGroup); + sender.setSlotSharingGroup(slotSharingGroup); + receiver.setStrictlyCoLocatedWith(sender); + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph(getClass().getSimpleName(), sender, receiver); + + return jobGraph; + } + + /** + * Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream. + */ + public static class Sender extends AbstractInvokable { + + public Sender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); + + try { + writer.emit(new IntValue(42)); + writer.emit(new IntValue(1337)); + writer.flushAll(); + } finally { + writer.clearBuffers(); + } + } + } + + /** + * Basic receiver {@link AbstractInvokable} which verifies the sent elements + * from the {@link Sender}. + */ + public static class Receiver extends AbstractInvokable { + + public Receiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordReader reader = new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + final IntValue i1 = reader.next(); + final IntValue i2 = reader.next(); + final IntValue i3 = reader.next(); + + if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { + throw new Exception("Wrong data received."); + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java new file mode 100644 index 0000000000000..9a16e3229df18 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the recovery of task failures. + */ +public class JobRecoveryITCase extends TestLogger { + + private static final int NUM_TMS = 1; + private static final int SLOTS_PER_TM = 11; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TM) + .build()); + + @Test + public void testTaskFailureRecovery() throws Exception { + runTaskFailureRecoveryTest(createjobGraph(false)); + } + + @Test + public void testTaskFailureWithSlotSharingRecovery() throws Exception { + runTaskFailureRecoveryTest(createjobGraph(true)); + } + + private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws Exception { + final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster(); + + miniCluster.submitJob(jobGraph).get(); + + final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); + + assertThat(jobResultFuture.get().isSuccess(), is(true)); + } + + private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException { + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(PARALLELISM); + sender.setInvokableClass(Tasks.Sender.class); + + final JobVertex receiver = new JobVertex("Receiver"); + receiver.setParallelism(PARALLELISM); + receiver.setInvokableClass(FailingOnceReceiver.class); + FailingOnceReceiver.reset(); + + if (slotSharingEnabled) { + final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + receiver.setSlotSharingGroup(slotSharingGroup); + sender.setSlotSharingGroup(slotSharingGroup); + } + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); + + final JobGraph jobGraph = new JobGraph(getClass().getSimpleName(), sender, receiver); + jobGraph.setExecutionConfig(executionConfig); + + return jobGraph; + } + + /** + * Receiver which fails once before successfully completing. + */ + public static final class FailingOnceReceiver extends JobExecutionITCase.Receiver { + + private static volatile boolean failed = false; + + public FailingOnceReceiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + if (!failed && getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) { + failed = true; + throw new FlinkRuntimeException(getClass().getSimpleName()); + } else { + super.invoke(); + } + } + + private static void reset() { + failed = false; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java index f82b1a6b556b4..1d62b19f5f380 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java @@ -151,14 +151,22 @@ public void testReelectionOfJobMaster() throws Exception { @Test public void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception { - assertThat(miniCluster.requestClusterOverview().get().getNumTaskManagersConnected(), is(NUM_TMS)); + final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT); + waitUntilTaskExecutorsHaveConnected(NUM_TMS, deadline); highAvailabilityServices.revokeResourceManagerLeadership().get(); highAvailabilityServices.grantResourceManagerLeadership(); // wait for the ResourceManager to confirm the leadership assertThat(LeaderRetrievalUtils.retrieveLeaderConnectionInfo(highAvailabilityServices.getResourceManagerLeaderRetriever(), Time.minutes(TESTING_TIMEOUT.toMinutes())).getLeaderSessionID(), is(notNullValue())); - CommonTestUtils.waitUntilCondition(() -> miniCluster.requestClusterOverview().get().getNumTaskManagersConnected() == NUM_TMS, Deadline.fromNow(TESTING_TIMEOUT), 10L); + waitUntilTaskExecutorsHaveConnected(NUM_TMS, deadline); + } + + private void waitUntilTaskExecutorsHaveConnected(int numTaskExecutors, Deadline deadline) throws Exception { + CommonTestUtils.waitUntilCondition( + () -> miniCluster.requestClusterOverview().get().getNumTaskManagersConnected() == numTaskExecutors, + deadline, + 10L); } private JobGraph createJobGraph(int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java index 22d7b4ba1a38d..4169ff8358e0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java @@ -44,17 +44,20 @@ */ public class TestingMiniCluster extends MiniCluster { + private final int numberDispatcherResourceManagerComponents; + + private final boolean localCommunication; + @Nullable private final Supplier highAvailabilityServicesSupplier; - private final int numberDispatcherResourceManagerComponents; - public TestingMiniCluster( TestingMiniClusterConfiguration miniClusterConfiguration, @Nullable Supplier highAvailabilityServicesSupplier) { super(miniClusterConfiguration); this.numberDispatcherResourceManagerComponents = miniClusterConfiguration.getNumberDispatcherResourceManagerComponents(); this.highAvailabilityServicesSupplier = highAvailabilityServicesSupplier; + this.localCommunication = miniClusterConfiguration.isLocalCommunication(); } public TestingMiniCluster(TestingMiniClusterConfiguration miniClusterConfiguration) { @@ -80,7 +83,7 @@ public void startTaskExecutor() throws Exception { @Override protected boolean useLocalCommunication() { - return false; + return localCommunication; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java index 49e571b470121..98a4e48f2d428 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java @@ -34,20 +34,28 @@ public class TestingMiniClusterConfiguration extends MiniClusterConfiguration { private final int numberDispatcherResourceManagerComponents; + private final boolean localCommunication; + public TestingMiniClusterConfiguration( Configuration configuration, int numTaskManagers, RpcServiceSharing rpcServiceSharing, @Nullable String commonBindAddress, - int numberDispatcherResourceManagerComponents) { + int numberDispatcherResourceManagerComponents, + boolean localCommunication) { super(configuration, numTaskManagers, rpcServiceSharing, commonBindAddress); this.numberDispatcherResourceManagerComponents = numberDispatcherResourceManagerComponents; + this.localCommunication = localCommunication; } public int getNumberDispatcherResourceManagerComponents() { return numberDispatcherResourceManagerComponents; } + public boolean isLocalCommunication() { + return localCommunication; + } + /** * Builder for the {@link TestingMiniClusterConfiguration}. */ @@ -57,6 +65,7 @@ public static class Builder { private int numSlotsPerTaskManager = 1; private RpcServiceSharing rpcServiceSharing = SHARED; private int numberDispatcherResourceManagerComponents = 1; + private boolean localCommunication = false; @Nullable private String commonBindAddress = null; @@ -91,6 +100,11 @@ public Builder setNumberDispatcherResourceManagerComponents(int numberDispatcher return this; } + public Builder setLocalCommunication(boolean localCommunication) { + this.localCommunication = localCommunication; + return this; + } + public TestingMiniClusterConfiguration build() { final Configuration modifiedConfiguration = new Configuration(configuration); modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager); @@ -106,7 +120,8 @@ public TestingMiniClusterConfiguration build() { numTaskManagers, rpcServiceSharing, commonBindAddress, - numberDispatcherResourceManagerComponents); + numberDispatcherResourceManagerComponents, + localCommunication); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 16450f9d73af2..8ab55fefefa58 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; @@ -37,6 +39,7 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -78,22 +81,14 @@ public void teardown() throws Exception { } /** - * Tests that a job will be re-executed if a new TaskExecutor joins the cluster. + * Tests that a job can be re-executed after the job has failed due + * to a TaskExecutor termination. */ @Test - public void testNewTaskExecutorJoinsCluster() throws Exception { + public void testJobReExecutionAfterTaskExecutorTermination() throws Exception { final JobGraph jobGraph = createJobGraph(PARALLELISM); - miniCluster.submitJob(jobGraph).get(); - - final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); - - assertThat(jobResultFuture.isDone(), is(false)); - - CommonTestUtils.waitUntilCondition( - jobIsRunning(() -> miniCluster.getExecutionGraph(jobGraph.getJobID())), - Deadline.fromNow(TESTING_TIMEOUT), - 20L); + final CompletableFuture jobResultFuture = submitJobAndWaitUntilRunning(jobGraph); // kill one TaskExecutor which should fail the job execution miniCluster.terminateTaskExecutor(0); @@ -111,6 +106,40 @@ public void testNewTaskExecutorJoinsCluster() throws Exception { miniCluster.requestJobResult(jobGraph.getJobID()).get(); } + /** + * Tests that the job can recover from a failing {@link TaskExecutor}. + */ + @Test + public void testJobRecoveryWithFailingTaskExecutor() throws Exception { + final JobGraph jobGraph = createJobGraphWithRestartStrategy(PARALLELISM); + + final CompletableFuture jobResultFuture = submitJobAndWaitUntilRunning(jobGraph); + + // start an additional TaskExecutor + miniCluster.startTaskExecutor(); + + miniCluster.terminateTaskExecutor(0).get(); // this should fail the job + + BlockingOperator.unblock(); + + assertThat(jobResultFuture.get().isSuccess(), is(true)); + } + + private CompletableFuture submitJobAndWaitUntilRunning(JobGraph jobGraph) throws Exception { + miniCluster.submitJob(jobGraph).get(); + + final CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID()); + + assertThat(jobResultFuture.isDone(), is(false)); + + CommonTestUtils.waitUntilCondition( + jobIsRunning(() -> miniCluster.getExecutionGraph(jobGraph.getJobID())), + Deadline.fromNow(TESTING_TIMEOUT), + 50L); + + return jobResultFuture; + } + private SupplierWithException jobIsRunning(Supplier> executionGraphFutureSupplier) { final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING)); @@ -120,6 +149,15 @@ private SupplierWithException jobIsRunning(Supplier public static void waitUntilCondition(SupplierWithException condition, Deadline timeout, long retryIntervalMillis) throws Exception { while (timeout.hasTimeLeft() && !condition.get()) { - Thread.sleep(Math.min(RETRY_INTERVAL, timeout.timeLeft().toMillis())); + Thread.sleep(Math.min(retryIntervalMillis, timeout.timeLeft().toMillis())); } if (!timeout.hasTimeLeft()) { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala deleted file mode 100644 index d4b4cbf927518..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.ActorSystem -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class CoLocationConstraintITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - /** - * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, - * it cannot complete. - */ - "The JobManager actor" must { - "support colocation constraints and slot sharing" in { - val num_tasks = 31 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[Receiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - receiver.setStrictlyCoLocatedWith(sender) - - val jobGraph = new JobGraph("Pointwise job", sender, receiver) - - val cluster = TestingUtils.startTestingCluster(num_tasks) - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - expectMsgType[JobResultSuccess] - } - } finally { - cluster.stop() - } - } - } - -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala deleted file mode 100644 index a2dffc8efda66..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.{ActorSystem, PoisonPill} -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import org.scalatest.junit.JUnitRunner - -import scala.concurrent.duration._ -import language.postfixOps - -@RunWith(classOf[JUnitRunner]) -class RecoveryITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - def createTestClusterWithHeartbeatTimeout( - numSlots: Int, - numTaskManagers: Int, - heartbeatTimeout: String) - : TestingCluster = { - val config = new Configuration() - config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers) - config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout) - new TestingCluster(config) - } - - val NUM_TASKS = 31 - - "The recovery" must { - "recover once failing forward job" in { - FailingOnceReceiver.failed = false - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Tasks.Sender]) - receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) - - sender.setParallelism(NUM_TASKS) - receiver.setParallelism(NUM_TASKS) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val executionConfig = new ExecutionConfig() - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)) - - val jobGraph = new JobGraph("Pointwise job", sender, receiver) - jobGraph.setExecutionConfig(executionConfig) - - val cluster = createTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "100 ms") - cluster.start() - - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION){ - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - val result = expectMsgType[JobResultSuccess] - - result.result.getJobId() should equal(jobGraph.getJobID) - } - } catch { - case t: Throwable => - t.printStackTrace() - fail(t.getMessage) - } finally{ - cluster.stop() - } - } - - "recover once failing forward job with slot sharing" in { - FailingOnceReceiver.failed = false - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Tasks.Sender]) - receiver.setInvokableClass(classOf[Tasks.FailingOnceReceiver]) - - sender.setParallelism(NUM_TASKS) - receiver.setParallelism(NUM_TASKS) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val executionConfig = new ExecutionConfig() - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)) - - val jobGraph = new JobGraph("Pointwise job", sender, receiver) - jobGraph.setExecutionConfig(executionConfig) - - val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "100 ms") - cluster.start() - - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION){ - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - val result = expectMsgType[JobResultSuccess] - - result.result.getJobId() should equal(jobGraph.getJobID) - } - } catch { - case t: Throwable => - t.printStackTrace() - fail(t.getMessage) - } finally{ - cluster.stop() - } - } - - "recover a task manager failure" in { - BlockingOnceReceiver.blocking = true - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Tasks.Sender]) - receiver.setInvokableClass(classOf[Tasks.BlockingOnceReceiver]) - - sender.setParallelism(NUM_TASKS) - receiver.setParallelism(NUM_TASKS) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val executionConfig = new ExecutionConfig() - executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)) - - val jobGraph = new JobGraph("Pointwise job", sender, receiver) - jobGraph.setExecutionConfig(executionConfig) - - val cluster = createTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "100 ms") - cluster.start() - - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION){ - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID), self) - - expectMsg(AllVerticesRunning(jobGraph.getJobID)) - - BlockingOnceReceiver.blocking = false - jmGateway.tell(NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING), self) - jmGateway.tell(RequestWorkingTaskManager(jobGraph.getJobID), self) - - val WorkingTaskManager(gatewayOption) = expectMsgType[WorkingTaskManager] - - gatewayOption match { - case None => fail("There has to be at least one task manager on which" + - "the tasks are running.") - case Some(gateway) => gateway.tell(PoisonPill) - } - - expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING)) - - val result = expectMsgType[JobResultSuccess] - - result.result.getJobId() should equal(jobGraph.getJobID) - } - } catch { - case t: Throwable => - t.printStackTrace() - fail(t.getMessage) - } finally{ - cluster.stop() - } - } - } -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 02458b83c9f82..029f5ef5112bc 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -90,24 +90,6 @@ object Tasks { } } - class FailingOnceReceiver(environment: Environment) - extends Receiver(environment) { - import FailingOnceReceiver.failed - - override def invoke(): Unit = { - if(!failed && getEnvironment.getTaskInfo.getIndexOfThisSubtask == 0){ - failed = true - throw new Exception("Test exception.") - }else{ - super.invoke() - } - } - } - - object FailingOnceReceiver{ - var failed = false - } - class BlockingOnceReceiver(environment: Environment) extends Receiver(environment) { import BlockingOnceReceiver.blocking diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 31ce1f3ee8d15..a3dfb710531b5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -137,7 +137,7 @@ public void testJobExecutionOnClusterWithLeaderChange() throws Exception { final Dispatcher dispatcher = leadingDispatcherResourceManagerComponent.getDispatcher(); - CommonTestUtils.waitUntilCondition(() -> dispatcher.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING, timeout, 10L); + CommonTestUtils.waitUntilCondition(() -> dispatcher.requestJobStatus(jobGraph.getJobID(), RPC_TIMEOUT).get() == JobStatus.RUNNING, timeout, 50L); leadingDispatcherResourceManagerComponent.closeAsync(); }