From 275fa538ec2330f750fbdf7d0b0501e433730029 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 13 Feb 2019 16:55:24 +0100 Subject: [PATCH 1/3] [hotfix] Move testing sender and receiver invokables to TestingAbstractInvokables Moves the testing Sender and Receiver class from JobExecutionITCase to TestingAbstractInvokables for easier reusability. --- .../runtime/jobmaster/JobExecutionITCase.java | 59 +------------ .../runtime/jobmaster/JobRecoveryITCase.java | 2 +- .../jobmaster/TestingAbstractInvokables.java | 85 +++++++++++++++++++ 3 files changed, 88 insertions(+), 58 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java index f6082f4e081fd..e19fe2020ddaa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java @@ -18,18 +18,13 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.network.api.reader.RecordReader; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; -import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -76,11 +71,11 @@ public void testCoLocationConstraintJobExecution() throws Exception { private JobGraph createJobGraph(int parallelism) { final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(parallelism); - sender.setInvokableClass(Sender.class); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); final JobVertex receiver = new JobVertex("Receiver"); receiver.setParallelism(parallelism); - receiver.setInvokableClass(Receiver.class); + receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class); // In order to make testCoLocationConstraintJobExecution fail, one needs to // remove the co-location constraint and the slot sharing groups, because then @@ -98,54 +93,4 @@ private JobGraph createJobGraph(int parallelism) { return jobGraph; } - - /** - * Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream. - */ - public static class Sender extends AbstractInvokable { - - public Sender(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - final RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); - - try { - writer.emit(new IntValue(42)); - writer.emit(new IntValue(1337)); - writer.flushAll(); - } finally { - writer.clearBuffers(); - } - } - } - - /** - * Basic receiver {@link AbstractInvokable} which verifies the sent elements - * from the {@link Sender}. - */ - public static class Receiver extends AbstractInvokable { - - public Receiver(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - final RecordReader reader = new RecordReader<>( - getEnvironment().getInputGate(0), - IntValue.class, - getEnvironment().getTaskManagerInfo().getTmpDirectories()); - - final IntValue i1 = reader.next(); - final IntValue i2 = reader.next(); - final IntValue i3 = reader.next(); - - if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { - throw new Exception("Wrong data received."); - } - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java index 9a16e3229df18..9448a8213b3d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java @@ -108,7 +108,7 @@ private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException { /** * Receiver which fails once before successfully completing. */ - public static final class FailingOnceReceiver extends JobExecutionITCase.Receiver { + public static final class FailingOnceReceiver extends TestingAbstractInvokables.Receiver { private static volatile boolean failed = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java new file mode 100644 index 0000000000000..d227918b82391 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.types.IntValue; + +/** + * {@link AbstractInvokable} for testing purposes. + */ +public class TestingAbstractInvokables { + + private TestingAbstractInvokables() { + throw new UnsupportedOperationException(getClass().getSimpleName() + " should not be instantiated."); + } + + /** + * Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream. + */ + public static class Sender extends AbstractInvokable { + + public Sender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); + + try { + writer.emit(new IntValue(42)); + writer.emit(new IntValue(1337)); + writer.flushAll(); + } finally { + writer.clearBuffers(); + } + } + } + + /** + * Basic receiver {@link AbstractInvokable} which verifies the sent elements + * from the {@link Sender}. + */ + public static class Receiver extends AbstractInvokable { + + public Receiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordReader reader = new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + final IntValue i1 = reader.next(); + final IntValue i2 = reader.next(); + final IntValue i3 = reader.next(); + + if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { + throw new Exception("Wrong data received."); + } + } + } +} From a585d8550d062726ee3a374460e23a93c9e06dd7 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 13 Feb 2019 17:06:46 +0100 Subject: [PATCH 2/3] [FLINK-11592][tests] Port TaskManagerFailsWithSlotSharingITCase to new code base - "handle gracefully failing task manager with slot sharing" --> TaskExecutorITCase#testJobReExecutionAfterTaskExecutorTermination - "handle hard failing task manager with slot sharing" --> TaskExecutorITCase#testJobReExecutionAfterTaskExecutorTermination This commit changes the JobGraph of the TaskExecutorITCase such that it uses now slot sharing. --- .../taskexecutor/TaskExecutorITCase.java | 29 +++- ...askManagerFailsWithSlotSharingITCase.scala | 155 ------------------ 2 files changed, 22 insertions(+), 162 deletions(-) delete mode 100644 flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 8ab55fefefa58..71c2775374e10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -23,12 +23,16 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables; import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -141,7 +145,8 @@ private CompletableFuture submitJobAndWaitUntilRunning(JobGraph jobGr } private SupplierWithException jobIsRunning(Supplier> executionGraphFutureSupplier) { - final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING)); + final Predicate runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED)); + final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished); return () -> { final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join(); @@ -159,19 +164,28 @@ private JobGraph createJobGraphWithRestartStrategy(int parallelism) throws IOExc } private JobGraph createJobGraph(int parallelism) { - final JobVertex vertex = new JobVertex("blocking operator"); - vertex.setParallelism(parallelism); - vertex.setInvokableClass(BlockingOperator.class); + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(parallelism); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); + final JobVertex receiver = new JobVertex("Blocking receiver"); + receiver.setParallelism(parallelism); + receiver.setInvokableClass(BlockingOperator.class); BlockingOperator.reset(); - return new JobGraph("Blocking test job", vertex); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + sender.setSlotSharingGroup(slotSharingGroup); + receiver.setSlotSharingGroup(slotSharingGroup); + + return new JobGraph("Blocking test job with slot sharing", sender, receiver); } /** * Blocking invokable which is controlled by a static field. */ - public static class BlockingOperator extends AbstractInvokable { + public static class BlockingOperator extends TestingAbstractInvokables.Receiver { private static CountDownLatch countDownLatch = new CountDownLatch(1); public BlockingOperator(Environment environment) { @@ -181,6 +195,7 @@ public BlockingOperator(Environment environment) { @Override public void invoke() throws Exception { countDownLatch.await(); + super.invoke(); } public static void unblock() { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala deleted file mode 100644 index e2702c7d2cf45..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.{ActorSystem, Kill, PoisonPill} -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager" should { - "handle gracefully failing task manager with slot sharing" in { - val num_tasks = 100 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! PoisonPill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - case e => fail(s"Received wrong exception $e.") - } - } - } finally { - cluster.stop() - } - } - - "handle hard failing task manager with slot sharing" in { - val num_tasks = 20 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! Kill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - } - }finally{ - cluster.stop() - } - } - } - -} From 2ef1e9a523a248c91d02c22fc7a164b0f4dde463 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 13 Feb 2019 17:12:00 +0100 Subject: [PATCH 3/3] [hotfix][tests] Remove Tasks#Sender and #Receiver --- .../runtime/jobmaster/JobRecoveryITCase.java | 3 +- .../minicluster/MiniClusterITCase.java | 4 +- .../runtime/taskmanager/TaskManagerTest.java | 11 ++-- .../flink/runtime/jobmanager/Tasks.scala | 56 ------------------- 4 files changed, 9 insertions(+), 65 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java index 9448a8213b3d0..d003912eae353 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.MiniClusterResource; @@ -81,7 +80,7 @@ private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws Exceptio private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException { final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(PARALLELISM); - sender.setInvokableClass(Tasks.Sender.class); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); final JobVertex receiver = new JobVertex("Receiver"); receiver.setParallelism(PARALLELISM); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index cd541053e8e05..3744f11618415 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -38,11 +38,11 @@ import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender; import org.apache.flink.runtime.jobmanager.Tasks.Forwarder; import org.apache.flink.runtime.jobmanager.Tasks.InstantiationErrorSender; -import org.apache.flink.runtime.jobmanager.Tasks.Receiver; -import org.apache.flink.runtime.jobmanager.Tasks.Sender; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Receiver; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Sender; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index a397b6ca915a2..79809e32c79c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -585,7 +586,7 @@ public void testGateChannelEdgeMismatch() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -594,7 +595,7 @@ public void testGateChannelEdgeMismatch() { jid, "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, - new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -694,7 +695,7 @@ public void testRunJobWithForwardChannel() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -702,7 +703,7 @@ public void testRunJobWithForwardChannel() { jid, "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, - new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), new ArrayList<>(), Collections.emptyList(), 0); @@ -843,7 +844,7 @@ public void testCancellingDependentAndStateUpdateFails() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 029f5ef5112bc..472d15c8ae0e9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -26,22 +26,6 @@ import org.apache.flink.types.IntValue object Tasks { - class Sender(environment: Environment) - extends AbstractInvokable(environment) { - - override def invoke(): Unit = { - val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0)) - - try{ - writer.emit(new IntValue(42)) - writer.emit(new IntValue(1337)) - writer.flushAll() - }finally{ - writer.clearBuffers() - } - } - } - class Forwarder(environment: Environment) extends AbstractInvokable(environment) { @@ -71,46 +55,6 @@ object Tasks { } } - class Receiver(environment: Environment) - extends AbstractInvokable(environment) { - - override def invoke(): Unit = { - val reader = new RecordReader[IntValue]( - getEnvironment.getInputGate(0), - classOf[IntValue], - getEnvironment.getTaskManagerInfo.getTmpDirectories) - - val i1 = reader.next() - val i2 = reader.next() - val i3 = reader.next() - - if(i1.getValue != 42 || i2.getValue != 1337 || i3 != null){ - throw new Exception("Wrong data received.") - } - } - } - - class BlockingOnceReceiver(environment: Environment) - extends Receiver(environment) { - import BlockingOnceReceiver.blocking - - override def invoke(): Unit = { - if(blocking) { - val o = new Object - o.synchronized{ - o.wait() - } - } else { - super.invoke() - } - } - - } - - object BlockingOnceReceiver{ - var blocking = true - } - class AgnosticReceiver(environment: Environment) extends AbstractInvokable(environment) {