Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -76,11 +71,11 @@ public void testCoLocationConstraintJobExecution() throws Exception {
private JobGraph createJobGraph(int parallelism) {
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(parallelism);
sender.setInvokableClass(Sender.class);
sender.setInvokableClass(TestingAbstractInvokables.Sender.class);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setParallelism(parallelism);
receiver.setInvokableClass(Receiver.class);
receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class);

// In order to make testCoLocationConstraintJobExecution fail, one needs to
// remove the co-location constraint and the slot sharing groups, because then
Expand All @@ -98,54 +93,4 @@ private JobGraph createJobGraph(int parallelism) {

return jobGraph;
}

/**
* Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream.
*/
public static class Sender extends AbstractInvokable {

public Sender(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));

try {
writer.emit(new IntValue(42));
writer.emit(new IntValue(1337));
writer.flushAll();
} finally {
writer.clearBuffers();
}
}
}

/**
* Basic receiver {@link AbstractInvokable} which verifies the sent elements
* from the {@link Sender}.
*/
public static class Receiver extends AbstractInvokable {

public Receiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordReader<IntValue> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

final IntValue i1 = reader.next();
final IntValue i2 = reader.next();
final IntValue i3 = reader.next();

if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
throw new Exception("Wrong data received.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
Expand Down Expand Up @@ -81,7 +80,7 @@ private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws Exceptio
private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException {
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(PARALLELISM);
sender.setInvokableClass(Tasks.Sender.class);
sender.setInvokableClass(TestingAbstractInvokables.Sender.class);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setParallelism(PARALLELISM);
Expand All @@ -108,7 +107,7 @@ private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException {
/**
* Receiver which fails once before successfully completing.
*/
public static final class FailingOnceReceiver extends JobExecutionITCase.Receiver {
public static final class FailingOnceReceiver extends TestingAbstractInvokables.Receiver {

private static volatile boolean failed = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.types.IntValue;

/**
* {@link AbstractInvokable} for testing purposes.
*/
public class TestingAbstractInvokables {

private TestingAbstractInvokables() {
throw new UnsupportedOperationException(getClass().getSimpleName() + " should not be instantiated.");
}

/**
* Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream.
*/
public static class Sender extends AbstractInvokable {

public Sender(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));

try {
writer.emit(new IntValue(42));
writer.emit(new IntValue(1337));
writer.flushAll();
} finally {
writer.clearBuffers();
}
}
}

/**
* Basic receiver {@link AbstractInvokable} which verifies the sent elements
* from the {@link Sender}.
*/
public static class Receiver extends AbstractInvokable {

public Receiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordReader<IntValue> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

final IntValue i1 = reader.next();
final IntValue i2 = reader.next();
final IntValue i3 = reader.next();

if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
throw new Exception("Wrong data received.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender;
import org.apache.flink.runtime.jobmanager.Tasks.Forwarder;
import org.apache.flink.runtime.jobmanager.Tasks.InstantiationErrorSender;
import org.apache.flink.runtime.jobmanager.Tasks.Receiver;
import org.apache.flink.runtime.jobmanager.Tasks.Sender;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Receiver;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Sender;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
Expand Down Expand Up @@ -141,7 +145,8 @@ private CompletableFuture<JobResult> submitJobAndWaitUntilRunning(JobGraph jobGr
}

private SupplierWithException<Boolean, Exception> jobIsRunning(Supplier<CompletableFuture<? extends AccessExecutionGraph>> executionGraphFutureSupplier) {
final Predicate<AccessExecutionGraph> allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING));
final Predicate<AccessExecution> runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED));
final Predicate<AccessExecutionGraph> allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished);

return () -> {
final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join();
Expand All @@ -159,19 +164,28 @@ private JobGraph createJobGraphWithRestartStrategy(int parallelism) throws IOExc
}

private JobGraph createJobGraph(int parallelism) {
final JobVertex vertex = new JobVertex("blocking operator");
vertex.setParallelism(parallelism);
vertex.setInvokableClass(BlockingOperator.class);
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(parallelism);
sender.setInvokableClass(TestingAbstractInvokables.Sender.class);

final JobVertex receiver = new JobVertex("Blocking receiver");
receiver.setParallelism(parallelism);
receiver.setInvokableClass(BlockingOperator.class);
BlockingOperator.reset();

return new JobGraph("Blocking test job", vertex);
receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setSlotSharingGroup(slotSharingGroup);

return new JobGraph("Blocking test job with slot sharing", sender, receiver);
}

/**
* Blocking invokable which is controlled by a static field.
*/
public static class BlockingOperator extends AbstractInvokable {
public static class BlockingOperator extends TestingAbstractInvokables.Receiver {
private static CountDownLatch countDownLatch = new CountDownLatch(1);

public BlockingOperator(Environment environment) {
Expand All @@ -181,6 +195,7 @@ public BlockingOperator(Environment environment) {
@Override
public void invoke() throws Exception {
countDownLatch.await();
super.invoke();
}

public static void unblock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
Expand Down Expand Up @@ -585,7 +586,7 @@ public void testGateChannelEdgeMismatch() {
jid, "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<>(), Collections.emptyList(), 0);
Expand All @@ -594,7 +595,7 @@ public void testGateChannelEdgeMismatch() {
jid, "TestJob", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 7, 2, 7, 0,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<>(), Collections.emptyList(), 0);
Expand Down Expand Up @@ -694,15 +695,15 @@ public void testRunJobWithForwardChannel() {
jid, "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(),
irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<>(),
Collections.emptyList(), 0);

final TaskDeploymentDescriptor tdd2 = createTaskDeploymentDescriptor(
jid, "TestJob", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 7, 2, 7, 0,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.singletonList(ircdd),
new ArrayList<>(), Collections.emptyList(), 0);
Expand Down Expand Up @@ -843,7 +844,7 @@ public void testCancellingDependentAndStateUpdateFails() {
jid, "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(),
irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<>(), Collections.emptyList(), 0);

Expand Down
Loading