Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ public HighAvailabilityServices getHighAvailabilityServices() {
}
}

TaskExecutor[] getTaskManagers() {
synchronized (lock) {
checkState(running, "MiniCluster is not yet running.");
return taskManagers;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to do this differently. Please take a look at #7676.


// ------------------------------------------------------------------------
// life cycle
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks.AgnosticBinaryReceiver;
import org.apache.flink.runtime.jobmanager.Tasks.AgnosticReceiver;
import org.apache.flink.runtime.jobmanager.Tasks.AgnosticTertiaryReceiver;
import org.apache.flink.runtime.jobmanager.Tasks.BlockingReceiver;
import org.apache.flink.runtime.jobmanager.Tasks.ExceptionReceiver;
import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender;
import org.apache.flink.runtime.jobmanager.Tasks.Forwarder;
Expand All @@ -43,19 +48,24 @@
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -337,6 +347,262 @@ public void testSchedulingAllAtOnce() throws Exception {
}
}

@Test
public void testSlotSharingForForwardJobWithCoLocationConstraint() throws Exception {
testSlotSharingForForwardJob(true);
}

@Test
public void testSlotSharingForForwardJobWithoutCoLocationConstraint() throws Exception {
testSlotSharingForForwardJob(false);
}

/**
* This job runs in N slots with N senders and N receivers.
* Unless slot sharing is used, it cannot complete.
* Either with or without co-location constraint should not
* make difference.
*/
private void testSlotSharingForForwardJob(boolean withCoLocationConstraint) throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(parallelism)
.setConfiguration(getDefaultConfiguration())
.build();

try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();

final JobVertex sender = new JobVertex("Sender");
sender.setInvokableClass(CountDownLatchedSender.class);
sender.setParallelism(parallelism);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setInvokableClass(CountDownLatchedReceiver.class);
receiver.setParallelism(parallelism);

receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);

final CountDownLatch countDownLatch = new CountDownLatch(parallelism);
CountDownLatchedSender.setLatch(countDownLatch);
CountDownLatchedReceiver.setLatch(countDownLatch);

final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender.getID(), receiver.getID());
sender.setSlotSharingGroup(sharingGroup);
receiver.setSlotSharingGroup(sharingGroup);

if (withCoLocationConstraint) {
receiver.setStrictlyCoLocatedWith(sender);
}

final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);

miniCluster.executeJobBlocking(jobGraph);
}
}

/**
* A sender that does not exit until all receivers are running.
*/
public static class CountDownLatchedSender extends AbstractInvokable {

private static CountDownLatch latch;

static void setLatch(CountDownLatch latch) {
CountDownLatchedSender.latch = latch;
}

public CountDownLatchedSender(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
RecordWriter<IntValue> writer = new RecordWriter<>(getEnvironment().getWriter(0));

try {
writer.emit(new IntValue(42));
writer.emit(new IntValue(1337));
writer.flushAll();
} finally {
writer.clearBuffers();
latch.await();
}
}
}

/**
* A receiver that counts down the latch on running.
*/
public static class CountDownLatchedReceiver extends AbstractInvokable {

private static CountDownLatch latch;

static void setLatch(CountDownLatch latch) {
CountDownLatchedReceiver.latch = latch;
}

public CountDownLatchedReceiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
latch.countDown();

RecordReader<IntValue> reader = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

IntValue i1 = reader.next();
IntValue i2 = reader.next();
IntValue i3 = reader.next();

if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
throw new Exception("Wrong data received.");
}
}
}

/**
* This job runs in N slots with 2 * N senders and N receivers. Unless slot sharing is used,
* it cannot complete.
*/
@Test
public void testSlotSharingForTwoInputsJob() throws Exception {
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(parallelism)
.setConfiguration(getDefaultConfiguration())
.build();

try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();

final JobVertex sender1 = new JobVertex("Sender1");
sender1.setInvokableClass(CountDownLatchedSender.class);
sender1.setParallelism(parallelism);

final JobVertex sender2 = new JobVertex("Sender2");
sender2.setInvokableClass(CountDownLatchedSender.class);
sender2.setParallelism(parallelism);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setInvokableClass(CountDownLatchedAgnosticBinaryReceiver.class);
receiver.setParallelism(parallelism);

receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);
receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED);

final CountDownLatch countDownLatch = new CountDownLatch(parallelism);
CountDownLatchedSender.setLatch(countDownLatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test must also not be run in parallel to testSlotSharingForForwardJobWithCoLocationConstraint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I add a ReentrantLock to guard settings to these CountDownLatchs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the synchronization can be removed since multiple tests are not run in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: if the test had failed another test could've been blocked indefinitely since you aren't calling unlock in a finally block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm thanks for you education and sorry for so inexperienced. will remove the synchronization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense that we move unlock to finally block for possible parallel testing? or defer the synchronization until we really make it parallelized.

CountDownLatchedAgnosticBinaryReceiver.setLatch(countDownLatch);

final SlotSharingGroup sharingGroup = new SlotSharingGroup(sender1.getID(), sender2.getID(), receiver.getID());
sender1.setSlotSharingGroup(sharingGroup);
sender2.setSlotSharingGroup(sharingGroup);
receiver.setSlotSharingGroup(sharingGroup);

final JobGraph jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver);

miniCluster.executeJobBlocking(jobGraph);
}
}

/**
* A receiver that counts down the latch on running.
*/
public static class CountDownLatchedAgnosticBinaryReceiver extends AbstractInvokable {

private static CountDownLatch latch;

static void setLatch(CountDownLatch latch) {
CountDownLatchedAgnosticBinaryReceiver.latch = latch;
}

public CountDownLatchedAgnosticBinaryReceiver(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
latch.countDown();

RecordReader<IntValue> reader1 = new RecordReader<>(
getEnvironment().getInputGate(0),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

RecordReader<IntValue> reader2 = new RecordReader<>(
getEnvironment().getInputGate(1),
IntValue.class,
getEnvironment().getTaskManagerInfo().getTmpDirectories());

while (reader1.next() != null) { }
while (reader2.next() != null) { }
}
}

@Test
public void testSlotSharingForForwardJobWithFailedTaskManager() throws Exception {
final int parallelism = 20;
final int numOfTaskManagers = 2;

final Configuration configuration = getDefaultConfiguration();
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(numOfTaskManagers)
.setNumSlotsPerTaskManager(parallelism / numOfTaskManagers)
.setConfiguration(configuration)
.build();

try (final MiniCluster miniCluster = new MiniCluster(cfg)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a good idea to separate tests which modify the MiniCluster from those which don't. For the latter, the MiniCluster could be started for the test class instead for every test. This will speed up the execution.

miniCluster.start();

final JobVertex sender = new JobVertex("Sender");
sender.setInvokableClass(Sender.class);
sender.setParallelism(parallelism);

final JobVertex receiver = new JobVertex("BlockingReceiver");
receiver.setInvokableClass(BlockingReceiver.class);
receiver.setParallelism(parallelism);

receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED);

final SlotSharingGroup sharingGroup = new SlotSharingGroup();
sender.setSlotSharingGroup(sharingGroup);
receiver.setSlotSharingGroup(sharingGroup);

final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);

miniCluster.submitJob(jobGraph).get();

TaskExecutor taskExecutor = miniCluster.getTaskManagers()[0];
taskExecutor.closeAsync();
taskExecutor.getTerminationFuture().get();

JobResult result = miniCluster.requestJobResult(jobGraph.getJobID()).get();

try {
result.toJobExecutionResult(getClass().getClassLoader());

fail("Job should fail.");
} catch (JobExecutionException e) {
assertThat(e.getJobID(), is(jobGraph.getJobID()));
}
}
}

@Test
public void testJobWithAFailingSenderVertex() throws Exception {
final int parallelism = 11;
Expand Down
Loading