Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ else if (current == RUNNING || current == DEPLOYING) {
}

else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled.
// nothing to do any more. finished/failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already
sendReleaseIntermediateResultPartitionsRpcCall();

Expand Down Expand Up @@ -1329,7 +1329,7 @@ private void sendReleaseIntermediateResultPartitionsRpcCall() {

if (!partitionIds.isEmpty()) {
// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.releasePartitions(partitionIds);
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ CompletableFuture<Acknowledge> updatePartitions(
/**
* Batch release intermediate result partitions.
*
* @param jobId id of the job that the partitions belong to
* @param partitionIds partition ids to release
*/
void releasePartitions(Collection<ResultPartitionID> partitionIds);
void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds);

/**
* Notify the given task about a completed checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(partitionIds);
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(jobId, partitionIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ public CompletableFuture<Acknowledge> updatePartitions(
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit weird that we will leave jobId here unused, is the future change that needs it going to be so big?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not gonna be big, but this part is simply already done.

try {
shuffleEnvironment.releasePartitions(partitionIds);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ CompletableFuture<Acknowledge> updatePartitions(
/**
* Batch release intermediate result partitions.
*
* @param jobId id of the job that the partitions belong to
* @param partitionIds partition ids to release
*/
void releasePartitions(Collection<ResultPartitionID> partitionIds);
void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds);

/**
* Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
Expand Down Expand Up @@ -54,6 +58,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -64,6 +69,7 @@
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -293,6 +299,82 @@ public void testSlotAllocationCancellationWhenExecutionCancelled() throws Except
assertThat(canceledSlotRequests, equalTo(slotRequests));
}

/**
* Tests that the partitions are released in case of an execution cancellation after the execution is already finished.
*/
@Test
public void testPartitionReleaseOnCancelingAfterBeingFinished() throws Exception {
testPartitionReleaseAfterFinished(Execution::cancel);
}

/**
* Tests that the partitions are released in case of an execution suspension after the execution is already finished.
*/
@Test
public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws Exception {
testPartitionReleaseAfterFinished(Execution::suspend);
}

private void testPartitionReleaseAfterFinished(Consumer<Execution> postFinishedExecutionAction) throws Exception {
final Tuple2<JobID, Collection<ResultPartitionID>> releasedPartitions = Tuple2.of(null, null);
final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields);

final JobVertex producerVertex = createNoOpJobVertex();
final JobVertex consumerVertex = createNoOpJobVertex();
consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

final SimpleSlot slot = new SimpleSlot(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could provide an utility for creating SimpleSlot in this class, because it would be reused for many tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

such a utility already exists in #createProgrammedSlotProvider, will update the test to use that instead

new SingleSlotTestingSlotOwner(),
new LocalTaskManagerLocation(),
0,
taskManagerGateway);

final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(producerVertex.getID(), 0, CompletableFuture.completedFuture(slot));

ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
new JobID(),
slotProvider,
new NoRestartStrategy(),
producerVertex,
consumerVertex);

executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());

ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];

final Execution execution = executionVertex.getCurrentExecutionAttempt();

execution.allocateResourcesForExecution(
slotProvider,
false,
LocationPreferenceConstraint.ALL,
Collections.emptySet(),
TestingUtils.infiniteTime());

execution.deploy();
Copy link
Contributor

@zhijiangW zhijiangW Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could further provide an utility for creating Execution like below:

private CompletableFuture<Execution> createExecution(
		TaskManagerGateway taskManagerGateway,
		JobVertex... vertices) throws Exception {

		SimpleSlot slot = new SimpleSlot(
			new SingleSlotTestingSlotOwner(),
			new LocalTaskManagerLocation(),
			0,
			taskManagerGateway);

		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
		slotProvider.addSlot(vertices[0].getID(), 0, CompletableFuture.completedFuture(slot));

		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
			new JobID(),
			slotProvider,
			new NoRestartStrategy(),
			vertices);

		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());

		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertices[0].getID());
		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
		Execution execution = executionVertex.getCurrentExecutionAttempt();

		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
			slotProvider,
			false,
			LocationPreferenceConstraint.ALL,
			Collections.emptySet(),
			TestingUtils.infiniteTime());

		return allocationFuture;
	}

Then we could get Execution from future, and further get ExecutionVertex and ExecutionGraph from Execution. Then this helper could be reused for many existing tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we could de-duplicate some code here. I'm concerned that we're baking in a few assumptions (vertex parallelism should be 1, first vertex in the array is special), and there a few subtle differences in tests that where I don't know whether they are significant or not. (For example, #testTaskRestoreStateIsNulledAfterDeployment doesn't allocate a slot beforehand). I don't have the time right now to really look into these things, so I'd move any larger refactoring to this class into a follow-up.

execution.switchToRunning();

// simulate a case where a cancel/suspend call is too slow and the task is already finished
// in this case we have to explicitly release the finished partition
// if the task were canceled properly the TM would release the partition automatically
execution.markFinished();
postFinishedExecutionAction.accept(execution);

assertEquals(executionGraph.getJobID(), releasedPartitions.f0);
Copy link
Contributor

@zhijiangW zhijiangW Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use assertThat instead for assertEquals

assertEquals(executionVertex.getProducedPartitions().size(), releasedPartitions.f1.size());
for (ResultPartitionID partitionId : releasedPartitions.f1) {
// ensure all IDs of released partitions are actually valid
IntermediateResultPartition intermediateResultPartition = executionVertex
.getProducedPartitions()
.get(partitionId.getPartitionId());
assertNotNull(intermediateResultPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assert might be not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is done to ensure that the ids of all released partitions are actually valid. without it the test would pass even if completely random partitions were passed to the task executor.

I'll add a comment to clarify this; I had to think for a bit myself as to what we're checking here.

assertEquals(execution.getAttemptId(), partitionId.getProducerId());
}
}

/**
* Tests that all preferred locations are calculated.
*/
Expand Down Expand Up @@ -417,7 +499,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
slotProvider,
new NoRestartStrategy(),
jobVertex);

ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);

ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand All @@ -51,6 +52,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {

private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;

private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = (ignore1, ignore2) -> { };

public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> predicate) {
submitConsumer = predicate;
}
Expand All @@ -63,6 +66,10 @@ public void setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableF
this.freeSlotFunction = freeSlotFunction;
}

public void setReleasePartitionsConsumer(BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) {
this.releasePartitionsConsumer = releasePartitionsConsumer;
}

@Override
public String getAddress() {
return address;
Expand Down Expand Up @@ -97,7 +104,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
releasePartitionsConsumer.accept(jobId, partitionIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
// noop
}

Expand Down