Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Integration tests for job scheduling.
*/
public class JobExecutionITCase extends TestLogger {

/**
* Tests that tasks with a co-location constraint are scheduled in the same
* slots. In fact it also tests that consumers are scheduled wrt their input
* location if the co-location constraint is deactivated.
*/
@Test
public void testCoLocationConstraintJobExecution() throws Exception {
final int numSlotsPerTaskExecutor = 1;
final int numTaskExecutors = 3;
final int parallelism = numTaskExecutors * numSlotsPerTaskExecutor;
final JobGraph jobGraph = createJobGraph(parallelism);

final TestingMiniClusterConfiguration miniClusterConfiguration = new TestingMiniClusterConfiguration.Builder()
.setNumSlotsPerTaskManager(numSlotsPerTaskExecutor)
.setNumTaskManagers(numTaskExecutors)
.setLocalCommunication(true)
.build();

try (TestingMiniCluster miniCluster = new TestingMiniCluster(miniClusterConfiguration)) {
miniCluster.start();

miniCluster.submitJob(jobGraph).get();

final CompletableFuture<JobResult> jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());

assertThat(jobResultFuture.get().isSuccess(), is(true));
}
}

private JobGraph createJobGraph(int parallelism) {
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(parallelism);
sender.setInvokableClass(Sender.class);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setParallelism(parallelism);
receiver.setInvokableClass(Receiver.class);

// In order to make testCoLocationConstraintJobExecution fail, one needs to
// remove the co-location constraint and the slot sharing groups, because then
// the receivers will have to wait for the senders to finish and the slot
// assignment order to the receivers is non-deterministic (depending on the
// order in which the senders finish).
final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
receiver.setSlotSharingGroup(slotSharingGroup);
sender.setSlotSharingGroup(slotSharingGroup);
receiver.setStrictlyCoLocatedWith(sender);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without L82-86 the test can pass. The reason is that some Sender/Receiver parallelism start and finish quickly. We can make sure that All Senders don't exit until all Receivers become running, maybe by setting a CountDownLatch like #6883

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is true. We don't have true assertions making sure that task are being co-located. The CountDownLatch would enforce that both tasks are online at the same time. I think this is not what we want to guarantee here. Instead we should test that the tasks are deployed in the same slot and, thus, using local channels for communication. Maybe a non serializable record could do the trick here. I'll try it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this doesn't work because we always serialize into a buffer independent of the channel type. The only difference is whether it goes through Netty or not I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the solution. What we can do is to start the MiniCluster with only local communication enabled. That way we won't start netty and the communication needs to happen strictly locally :-).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tillrohrmann The test still succeeds even if local communication is set to false.

Copy link
Contributor Author

@tillrohrmann tillrohrmann Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's expected that the test succeeds if localCommunication is set to false because it's the less restricted case. If localCommunication is true TMs cannot speak with each other.

What you should try is to comment the colocation constraint out to see that the test fails, because that's what we are testing here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, that makes more sense 🤦‍♂️ . Unfortunately the test still runs successfully if the colocation constraint is removed. Based on the logs the sender tasks are finishing before the receivers are even started, so we never run out of slots, which as I understand is the failure condition here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @zentol and me talked offline, the test actually tests not only the co-location constraints but also the input preferences of normal scheduling. Thus, one needs to remove the slot sharing as well in order to make this test fail.


receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final JobGraph jobGraph = new JobGraph(getClass().getSimpleName(), sender, receiver);

return jobGraph;
}

/**
* Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream.
*/
public static class Sender extends AbstractInvokable {

public Sender(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));

try {
writer.emit(new IntValue(42));
writer.emit(new IntValue(1337));
writer.flushAll();
} finally {
writer.clearBuffers();
}
}
}

/**
* Basic receiver {@link AbstractInvokable} which verifies the sent elements
* from the {@link Sender}.
*/
public static class Receiver extends AbstractInvokable {

public Receiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
final RecordReader<IntValue> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

final IntValue i1 = reader.next();
final IntValue i2 = reader.next();
final IntValue i3 = reader.next();

if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
throw new Exception("Wrong data received.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the recovery of task failures.
*/
public class JobRecoveryITCase extends TestLogger {

private static final int NUM_TMS = 1;
private static final int SLOTS_PER_TM = 11;
private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(SLOTS_PER_TM)
.build());

@Test
public void testTaskFailureRecovery() throws Exception {
runTaskFailureRecoveryTest(createjobGraph(false));
}

@Test
public void testTaskFailureWithSlotSharingRecovery() throws Exception {
runTaskFailureRecoveryTest(createjobGraph(true));
}

private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws Exception {
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();

miniCluster.submitJob(jobGraph).get();

final CompletableFuture<JobResult> jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());

assertThat(jobResultFuture.get().isSuccess(), is(true));
}

private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException {
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(PARALLELISM);
sender.setInvokableClass(Tasks.Sender.class);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setParallelism(PARALLELISM);
receiver.setInvokableClass(FailingOnceReceiver.class);
FailingOnceReceiver.reset();

if (slotSharingEnabled) {
final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
receiver.setSlotSharingGroup(slotSharingGroup);
sender.setSlotSharingGroup(slotSharingGroup);
}

receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));

final JobGraph jobGraph = new JobGraph(getClass().getSimpleName(), sender, receiver);
jobGraph.setExecutionConfig(executionConfig);

return jobGraph;
}

/**
* Receiver which fails once before successfully completing.
*/
public static final class FailingOnceReceiver extends JobExecutionITCase.Receiver {

private static volatile boolean failed = false;

public FailingOnceReceiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
if (!failed && getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
failed = true;
throw new FlinkRuntimeException(getClass().getSimpleName());
} else {
super.invoke();
}
}

private static void reset() {
failed = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,22 @@ public void testReelectionOfJobMaster() throws Exception {

@Test
public void testTaskExecutorsReconnectToClusterWithLeadershipChange() throws Exception {
assertThat(miniCluster.requestClusterOverview().get().getNumTaskManagersConnected(), is(NUM_TMS));
final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT);
waitUntilTaskExecutorsHaveConnected(NUM_TMS, deadline);
highAvailabilityServices.revokeResourceManagerLeadership().get();
highAvailabilityServices.grantResourceManagerLeadership();

// wait for the ResourceManager to confirm the leadership
assertThat(LeaderRetrievalUtils.retrieveLeaderConnectionInfo(highAvailabilityServices.getResourceManagerLeaderRetriever(), Time.minutes(TESTING_TIMEOUT.toMinutes())).getLeaderSessionID(), is(notNullValue()));

CommonTestUtils.waitUntilCondition(() -> miniCluster.requestClusterOverview().get().getNumTaskManagersConnected() == NUM_TMS, Deadline.fromNow(TESTING_TIMEOUT), 10L);
waitUntilTaskExecutorsHaveConnected(NUM_TMS, deadline);
}

private void waitUntilTaskExecutorsHaveConnected(int numTaskExecutors, Deadline deadline) throws Exception {
CommonTestUtils.waitUntilCondition(
() -> miniCluster.requestClusterOverview().get().getNumTaskManagersConnected() == numTaskExecutors,
deadline,
10L);
}

private JobGraph createJobGraph(int parallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@
*/
public class TestingMiniCluster extends MiniCluster {

private final int numberDispatcherResourceManagerComponents;

private final boolean localCommunication;

@Nullable
private final Supplier<HighAvailabilityServices> highAvailabilityServicesSupplier;

private final int numberDispatcherResourceManagerComponents;

public TestingMiniCluster(
TestingMiniClusterConfiguration miniClusterConfiguration,
@Nullable Supplier<HighAvailabilityServices> highAvailabilityServicesSupplier) {
super(miniClusterConfiguration);
this.numberDispatcherResourceManagerComponents = miniClusterConfiguration.getNumberDispatcherResourceManagerComponents();
this.highAvailabilityServicesSupplier = highAvailabilityServicesSupplier;
this.localCommunication = miniClusterConfiguration.isLocalCommunication();
}

public TestingMiniCluster(TestingMiniClusterConfiguration miniClusterConfiguration) {
Expand All @@ -80,7 +83,7 @@ public void startTaskExecutor() throws Exception {

@Override
protected boolean useLocalCommunication() {
return false;
return localCommunication;
}

@Override
Expand Down
Loading