diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index c0333049618f3..4939e611329f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1335,7 +1335,7 @@ private void startTrackingPartitions(final ResourceID taskExecutorId, final Coll } } - private void stopTrackingAndReleasePartitions() { + void stopTrackingAndReleasePartitions() { LOG.info("Discarding the results produced by task execution {}.", attemptId); if (producedPartitions != null && producedPartitions.size() > 0) { final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 0d01e25995390..a12d198b98212 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -591,6 +591,10 @@ public Execution resetForNewExecution(final long timestamp, final long originati final ExecutionState oldState = oldExecution.getState(); if (oldState.isTerminal()) { + if (oldState == FINISHED) { + oldExecution.stopTrackingAndReleasePartitions(); + } + priorExecutions.add(oldExecution.archive()); final Execution newExecution = new Execution( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index f6147950af38a..8623eda901c30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -18,15 +18,19 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.FailoverRegion; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -34,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -41,12 +46,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; -import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +63,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; @@ -397,23 +404,14 @@ public static ExecutionGraph createExecutionGraph( checkNotNull(vertices); checkNotNull(timeout); - return ExecutionGraphBuilder.buildGraph( - null, - new JobGraph(jid, "test job", vertices), - new Configuration(), - executor, - executor, - slotProvider, - ExecutionGraphTestUtils.class.getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - timeout, - restartStrategy, - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - timeout, - TEST_LOGGER, - NettyShuffleMaster.INSTANCE, - NoOpPartitionTracker.INSTANCE); + return new TestingExecutionGraphBuilder(vertices) + .setFutureExecutor(executor) + .setIoExecutor(executor) + .setSlotProvider(slotProvider) + .setAllocationTimeout(timeout) + .setRpcTimeout(timeout) + .setRestartStrategy(restartStrategy) + .build(); } public static JobVertex createNoOpVertex(int parallelism) { @@ -445,16 +443,10 @@ public static ExecutionJobVertex getExecutionVertex( JobVertex ajv = new JobVertex("TestVertex", id); ajv.setInvokableClass(AbstractInvokable.class); - ExecutionGraph graph = new ExecutionGraph( - executor, - executor, - new JobID(), - "test job", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), - AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy(), - new TestingSlotProvider(ignored -> new CompletableFuture<>())); + ExecutionGraph graph = new TestingExecutionGraphBuilder(ajv) + .setIoExecutor(executor) + .setFutureExecutor(executor) + .build(); graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); @@ -537,4 +529,131 @@ public static void verifyGeneratedExecutionJobVertex( subtaskIndex++; } } + + /** + * Builder for {@link ExecutionGraph}. + */ + public static class TestingExecutionGraphBuilder { + + private ShuffleMaster shuffleMaster = NettyShuffleMaster.INSTANCE; + private Time allocationTimeout = Time.seconds(10L); + private BlobWriter blobWriter = VoidBlobWriter.getInstance(); + private MetricGroup metricGroup = new UnregisteredMetricsGroup(); + private RestartStrategy restartStrategy = new NoRestartStrategy(); + private Time rpcTimeout = AkkaUtils.getDefaultTimeout(); + private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + private ClassLoader classLoader = getClass().getClassLoader(); + private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot())); + private Executor ioExecutor = TestingUtils.defaultExecutor(); + private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor(); + private Configuration jobMasterConfig = new Configuration(); + private JobGraph jobGraph; + private PartitionTracker partitionTracker = NoOpPartitionTracker.INSTANCE; + + public TestingExecutionGraphBuilder(final JobVertex ... jobVertices) { + this(new JobID(), "test job", jobVertices); + } + + public TestingExecutionGraphBuilder(final JobID jobId, final JobVertex ... jobVertices) { + this(jobId, "test job", jobVertices); + } + + public TestingExecutionGraphBuilder(final String jobName, final JobVertex ... jobVertices) { + this(new JobID(), jobName, jobVertices); + } + + public TestingExecutionGraphBuilder(final JobID jobId, final String jobName, final JobVertex ... jobVertices) { + this(new JobGraph(jobId, jobName, jobVertices)); + } + + public TestingExecutionGraphBuilder(final JobGraph jobGraph) { + this.jobGraph = jobGraph; + } + + public TestingExecutionGraphBuilder setJobMasterConfig(final Configuration jobMasterConfig) { + this.jobMasterConfig = jobMasterConfig; + return this; + } + + public TestingExecutionGraphBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) { + this.futureExecutor = futureExecutor; + return this; + } + + public TestingExecutionGraphBuilder setIoExecutor(final Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + public TestingExecutionGraphBuilder setSlotProvider(final SlotProvider slotProvider) { + this.slotProvider = slotProvider; + return this; + } + + public TestingExecutionGraphBuilder setClassLoader(final ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public TestingExecutionGraphBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + return this; + } + + public TestingExecutionGraphBuilder setRpcTimeout(final Time rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public TestingExecutionGraphBuilder setRestartStrategy(final RestartStrategy restartStrategy) { + this.restartStrategy = restartStrategy; + return this; + } + + public TestingExecutionGraphBuilder setMetricGroup(final MetricGroup metricGroup) { + this.metricGroup = metricGroup; + return this; + } + + public TestingExecutionGraphBuilder setBlobWriter(final BlobWriter blobWriter) { + this.blobWriter = blobWriter; + return this; + } + + public TestingExecutionGraphBuilder setAllocationTimeout(final Time allocationTimeout) { + this.allocationTimeout = allocationTimeout; + return this; + } + + public TestingExecutionGraphBuilder setShuffleMaster(final ShuffleMaster shuffleMaster) { + this.shuffleMaster = shuffleMaster; + return this; + } + + public TestingExecutionGraphBuilder setPartitionTracker(final PartitionTracker partitionTracker) { + this.partitionTracker = partitionTracker; + return this; + } + + public ExecutionGraph build() throws JobException, JobExecutionException { + return ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfig, + futureExecutor, + ioExecutor, + slotProvider, + classLoader, + checkpointRecoveryFactory, + rpcTimeout, + restartStrategy, + metricGroup, + blobWriter, + allocationTimeout, + TEST_LOGGER, + shuffleMaster, + partitionTracker + ); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java new file mode 100644 index 0000000000000..c9f3ef0c13b44 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertFalse; + +/** + * Tests for the {@link ExecutionVertex}. + */ +public class ExecutionVertexTest extends TestLogger { + + @Test + public void testResetForNewExecutionReleasesPartitions() throws Exception { + final JobVertex producerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobVertex consumerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + + consumerJobVertex.connectNewDataSetAsInput(producerJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final CompletableFuture> releasePartitionsFuture = new CompletableFuture<>(); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete); + + final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(producerJobVertex, consumerJobVertex) + .setPartitionTracker(partitionTracker) + .build(); + + executionGraph.scheduleForExecution(); + + final ExecutionJobVertex producerExecutionJobVertex = executionGraph.getJobVertex(producerJobVertex.getID()); + + Execution execution = producerExecutionJobVertex + .getTaskVertices()[0] + .getCurrentExecutionAttempt(); + + assertFalse(releasePartitionsFuture.isDone()); + + execution.markFinished(); + + assertFalse(releasePartitionsFuture.isDone()); + + producerExecutionJobVertex.resetForNewExecution(1L, 1L); + + final IntermediateResultPartitionID intermediateResultPartitionID = producerExecutionJobVertex + .getProducedDataSets()[0] + .getPartitions()[0] + .getPartitionId(); + final ResultPartitionID resultPartitionID = execution + .getResultPartitionDeploymentDescriptor(intermediateResultPartitionID) + .get() + .getShuffleDescriptor() + .getResultPartitionID(); + + assertThat(releasePartitionsFuture.get(), contains(resultPartitionID)); + } +}