diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 63fa09c3467a5..edb20b77e1162 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -209,6 +209,13 @@ public HighAvailabilityServices getHighAvailabilityServices() { } } + TaskExecutor[] getTaskManagers() { + synchronized (lock) { + checkState(running, "MiniCluster is not yet running."); + return taskManagers; + } + } + // ------------------------------------------------------------------------ // life cycle // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index e0b53612f7bc5..6068da863ffc9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -26,14 +26,19 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks.AgnosticBinaryReceiver; import org.apache.flink.runtime.jobmanager.Tasks.AgnosticReceiver; import org.apache.flink.runtime.jobmanager.Tasks.AgnosticTertiaryReceiver; +import org.apache.flink.runtime.jobmanager.Tasks.BlockingReceiver; import org.apache.flink.runtime.jobmanager.Tasks.ExceptionReceiver; import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender; import org.apache.flink.runtime.jobmanager.Tasks.Forwarder; @@ -43,19 +48,24 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable; +import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -337,6 +347,262 @@ public void testSchedulingAllAtOnce() throws Exception { } } + @Test + public void testSlotSharingForForwardJobWithCoLocationConstraint() throws Exception { + testSlotSharingForForwardJob(true); + } + + @Test + public void testSlotSharingForForwardJobWithoutCoLocationConstraint() throws Exception { + testSlotSharingForForwardJob(false); + } + + /** + * This job runs in N slots with N senders and N receivers. + * Unless slot sharing is used, it cannot complete. + * Either with or without co-location constraint should not + * make difference. + */ + private void testSlotSharingForForwardJob(boolean withCoLocationConstraint) throws Exception { + final int parallelism = 11; + + final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(parallelism) + .setConfiguration(getDefaultConfiguration()) + .build(); + + try (final MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + + final JobVertex sender = new JobVertex("Sender"); + sender.setInvokableClass(CountDownLatchedSender.class); + sender.setParallelism(parallelism); + + final JobVertex receiver = new JobVertex("Receiver"); + receiver.setInvokableClass(CountDownLatchedReceiver.class); + receiver.setParallelism(parallelism); + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED); + + final CountDownLatch countDownLatch = new CountDownLatch(parallelism); + CountDownLatchedSender.setLatch(countDownLatch); + CountDownLatchedReceiver.setLatch(countDownLatch); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID()); + sender.setSlotSharingGroup(sharingGroup); + receiver.setSlotSharingGroup(sharingGroup); + + if (withCoLocationConstraint) { + receiver.setStrictlyCoLocatedWith(sender); + } + + final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); + + miniCluster.executeJobBlocking(jobGraph); + } + } + + /** + * A sender that does not exit until all receivers are running. + */ + public static class CountDownLatchedSender extends AbstractInvokable { + + private static CountDownLatch latch; + + static void setLatch(CountDownLatch latch) { + CountDownLatchedSender.latch = latch; + } + + public CountDownLatchedSender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); + + try { + writer.emit(new IntValue(42)); + writer.emit(new IntValue(1337)); + writer.flushAll(); + } finally { + writer.clearBuffers(); + latch.await(); + } + } + } + + /** + * A receiver that counts down the latch on running. + */ + public static class CountDownLatchedReceiver extends AbstractInvokable { + + private static CountDownLatch latch; + + static void setLatch(CountDownLatch latch) { + CountDownLatchedReceiver.latch = latch; + } + + public CountDownLatchedReceiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + latch.countDown(); + + RecordReader reader = new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + IntValue i1 = reader.next(); + IntValue i2 = reader.next(); + IntValue i3 = reader.next(); + + if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { + throw new Exception("Wrong data received."); + } + } + } + + /** + * This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used, + * it cannot complete. + */ + @Test + public void testSlotSharingForTwoInputsJob() throws Exception { + final int parallelism = 11; + + final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setNumTaskManagers(1) + .setNumSlotsPerTaskManager(parallelism) + .setConfiguration(getDefaultConfiguration()) + .build(); + + try (final MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + + final JobVertex sender1 = new JobVertex("Sender1"); + sender1.setInvokableClass(CountDownLatchedSender.class); + sender1.setParallelism(parallelism); + + final JobVertex sender2 = new JobVertex("Sender2"); + sender2.setInvokableClass(CountDownLatchedSender.class); + sender2.setParallelism(parallelism); + + final JobVertex receiver = new JobVertex("Receiver"); + receiver.setInvokableClass(CountDownLatchedAgnosticBinaryReceiver.class); + receiver.setParallelism(parallelism); + + receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED); + receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, + ResultPartitionType.PIPELINED); + + final CountDownLatch countDownLatch = new CountDownLatch(parallelism); + CountDownLatchedSender.setLatch(countDownLatch); + CountDownLatchedAgnosticBinaryReceiver.setLatch(countDownLatch); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID()); + sender1.setSlotSharingGroup(sharingGroup); + sender2.setSlotSharingGroup(sharingGroup); + receiver.setSlotSharingGroup(sharingGroup); + + final JobGraph jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver); + + miniCluster.executeJobBlocking(jobGraph); + } + } + + /** + * A receiver that counts down the latch on running. + */ + public static class CountDownLatchedAgnosticBinaryReceiver extends AbstractInvokable { + + private static CountDownLatch latch; + + static void setLatch(CountDownLatch latch) { + CountDownLatchedAgnosticBinaryReceiver.latch = latch; + } + + public CountDownLatchedAgnosticBinaryReceiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + latch.countDown(); + + RecordReader reader1 = new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + RecordReader reader2 = new RecordReader<>( + getEnvironment().getInputGate(1), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + while (reader1.next() != null) { } + while (reader2.next() != null) { } + } + } + + @Test + public void testSlotSharingForForwardJobWithFailedTaskManager() throws Exception { + final int parallelism = 20; + final int numOfTaskManagers = 2; + + final Configuration configuration = getDefaultConfiguration(); + configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L); + + final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() + .setNumTaskManagers(numOfTaskManagers) + .setNumSlotsPerTaskManager(parallelism / numOfTaskManagers) + .setConfiguration(configuration) + .build(); + + try (final MiniCluster miniCluster = new MiniCluster(cfg)) { + miniCluster.start(); + + final JobVertex sender = new JobVertex("Sender"); + sender.setInvokableClass(Sender.class); + sender.setParallelism(parallelism); + + final JobVertex receiver = new JobVertex("BlockingReceiver"); + receiver.setInvokableClass(BlockingReceiver.class); + receiver.setParallelism(parallelism); + + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + sender.setSlotSharingGroup(sharingGroup); + receiver.setSlotSharingGroup(sharingGroup); + + final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver); + + miniCluster.submitJob(jobGraph).get(); + + TaskExecutor taskExecutor = miniCluster.getTaskManagers()[0]; + taskExecutor.closeAsync(); + taskExecutor.getTerminationFuture().get(); + + JobResult result = miniCluster.requestJobResult(jobGraph.getJobID()).get(); + + try { + result.toJobExecutionResult(getClass().getClassLoader()); + + fail("Job should fail."); + } catch (JobExecutionException e) { + assertThat(e.getJobID(), is(jobGraph.getJobID())); + } + } + } + @Test public void testJobWithAFailingSenderVertex() throws Exception { final int parallelism = 11; diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala deleted file mode 100644 index d4b4cbf927518..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.ActorSystem -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{Receiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class CoLocationConstraintITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - /** - * This job runs in N slots with N senders and N receivers. Unless slot sharing is used, - * it cannot complete. - */ - "The JobManager actor" must { - "support colocation constraints and slot sharing" in { - val num_tasks = 31 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[Receiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - receiver.setStrictlyCoLocatedWith(sender) - - val jobGraph = new JobGraph("Pointwise job", sender, receiver) - - val cluster = TestingUtils.startTestingCluster(num_tasks) - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - expectMsgType[JobResultSuccess] - } - } finally { - cluster.stop() - } - } - } - -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala deleted file mode 100644 index 1c26901096dda..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.ActorSystem -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class SlotSharingITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager actor" must { - "support slot sharing for forward job" in { - val num_tasks = 31 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[Receiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup(sender.getID, receiver.getID) - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - - val cluster = TestingUtils.startTestingCluster(num_tasks) - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - expectMsgType[JobResultSuccess] - - } - } finally { - cluster.stop() - } - } - - /** - * This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used, - * it cannot complete. - */ - "support jobs with two inputs and slot sharing" in { - val num_tasks = 11 - - val sender1 = new JobVertex("Sender1") - val sender2 = new JobVertex("Sender2") - val receiver = new JobVertex("Receiver") - - sender1.setInvokableClass(classOf[Sender]) - sender2.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[AgnosticBinaryReceiver]) - - sender1.setParallelism(num_tasks) - sender2.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - - val sharingGroup = new SlotSharingGroup(sender1.getID, sender2.getID, receiver.getID) - sender1.setSlotSharingGroup(sharingGroup) - sender2.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL, - ResultPartitionType.PIPELINED) - - val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver) - - val cluster = TestingUtils.startTestingCluster(num_tasks) - val jmGateway = cluster.getLeaderGateway(1 seconds) - - try { - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - expectMsgType[JobResultSuccess] - } - } finally { - cluster.stop() - } - - } - } -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala deleted file mode 100644 index e2702c7d2cf45..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.{ActorSystem, Kill, PoisonPill} -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager" should { - "handle gracefully failing task manager with slot sharing" in { - val num_tasks = 100 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! PoisonPill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - case e => fail(s"Received wrong exception $e.") - } - } - } finally { - cluster.stop() - } - } - - "handle hard failing task manager with slot sharing" in { - val num_tasks = 20 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! Kill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - } - }finally{ - cluster.stop() - } - } - } - -}