diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 63f312580117d..e413619bbd511 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; @@ -688,7 +689,7 @@ else if (current == RUNNING || current == DEPLOYING) { else if (current == FINISHED || current == FAILED) { // nothing to do any more. finished failed before it could be cancelled. // in any case, the task is removed from the TaskManager already - sendFailIntermediateResultPartitionsRpcCall(); + sendReleaseIntermediateResultPartitionsRpcCall(); return; } @@ -721,7 +722,7 @@ public CompletableFuture suspend() { break; case FINISHED: case FAILED: - sendFailIntermediateResultPartitionsRpcCall(); + sendReleaseIntermediateResultPartitionsRpcCall(); break; case CANCELED: break; @@ -1202,14 +1203,23 @@ private void sendCancelRpcCall(int numberRetries) { } } - private void sendFailIntermediateResultPartitionsRpcCall() { + private void sendReleaseIntermediateResultPartitionsRpcCall() { + LOG.info("Discarding the results produced by task execution {}.", attemptId); final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); - // TODO For some tests this could be a problem when querying too early if all resources were released - taskManagerGateway.failPartition(attemptId); + Collection partitions = vertex.getProducedPartitions().values(); + Collection partitionIds = new ArrayList<>(partitions.size()); + for (IntermediateResultPartition partition : partitions) { + partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId)); + } + + if (!partitionIds.isEmpty()) { + // TODO For some tests this could be a problem when querying too early if all resources were released + taskManagerGateway.releasePartitions(partitionIds); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 98c61a4d91e17..0ee85954723cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskexecutor.TaskExecutor; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -207,6 +209,17 @@ public void setupInputGate(SingleInputGate gate) throws IOException { } } + /** + * Batch release intermediate result partitions. + * + * @param partitionIds partition ids to release + */ + public void releasePartitions(Collection partitionIds) { + for (ResultPartitionID partitionId : partitionIds) { + resultPartitionManager.releasePartition(partitionId, null); + } + } + public void start() throws IOException { synchronized (lock) { Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index e0e78298f0936..fb73a70fccdce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -334,7 +334,7 @@ public void close() { } public void fail(@Nullable Throwable throwable) { - partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), throwable); + partitionManager.releasePartition(partitionId, throwable); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java index 09a62edb5963e..8d96e2bd1986f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java @@ -18,17 +18,11 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; - -import org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable; -import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; -import org.apache.flink.shaded.guava18.com.google.common.collect.Table; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import static org.apache.flink.util.Preconditions.checkState; @@ -41,19 +35,15 @@ public class ResultPartitionManager implements ResultPartitionProvider { private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class); - public final Table - registeredPartitions = HashBasedTable.create(); + private final Map registeredPartitions = new HashMap<>(16); private boolean isShutdown; - public void registerResultPartition(ResultPartition partition) throws IOException { + public void registerResultPartition(ResultPartition partition) { synchronized (registeredPartitions) { checkState(!isShutdown, "Result partition manager already shut down."); - ResultPartitionID partitionId = partition.getPartitionId(); - - ResultPartition previous = registeredPartitions.put( - partitionId.getProducerId(), partitionId.getPartitionId(), partition); + ResultPartition previous = registeredPartitions.put(partition.getPartitionId(), partition); if (previous != null) { throw new IllegalStateException("Result partition already registered."); @@ -70,8 +60,7 @@ public ResultSubpartitionView createSubpartitionView( BufferAvailabilityListener availabilityListener) throws IOException { synchronized (registeredPartitions) { - final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(), - partitionId.getPartitionId()); + final ResultPartition partition = registeredPartitions.get(partitionId); if (partition == null) { throw new PartitionNotFoundException(partitionId); @@ -83,26 +72,14 @@ public ResultSubpartitionView createSubpartitionView( } } - public void releasePartitionsProducedBy(ExecutionAttemptID executionId) { - releasePartitionsProducedBy(executionId, null); - } - - public void releasePartitionsProducedBy(ExecutionAttemptID executionId, Throwable cause) { + public void releasePartition(ResultPartitionID partitionId, Throwable cause) { synchronized (registeredPartitions) { - final Map partitions = - registeredPartitions.row(executionId); - - for (ResultPartition partition : partitions.values()) { - partition.release(cause); + ResultPartition resultPartition = registeredPartitions.remove(partitionId); + if (resultPartition != null) { + resultPartition.release(cause); + LOG.debug("Released partition {} produced by {}.", + partitionId.getPartitionId(), partitionId.getProducerId()); } - - for (IntermediateResultPartitionID partitionId : ImmutableList - .copyOf(partitions.keySet())) { - - registeredPartitions.remove(executionId, partitionId); - } - - LOG.debug("Released all partitions produced by {}.", executionId); } } @@ -134,10 +111,7 @@ void onConsumedPartition(ResultPartition partition) { LOG.debug("Received consume notification from {}.", partition); synchronized (registeredPartitions) { - ResultPartitionID partitionId = partition.getPartitionId(); - - previous = registeredPartitions.remove(partitionId.getProducerId(), - partitionId.getPartitionId()); + previous = registeredPartitions.remove(partition.getPartitionId()); } // Release the partition if it was successfully removed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index 6922b05d684b9..593a853c757a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -25,10 +25,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.rpc.RpcTimeout; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -98,11 +100,11 @@ CompletableFuture updatePartitions( Time timeout); /** - * Fail all intermediate result partitions of the given task. + * Batch release intermediate result partitions. * - * @param executionAttemptID identifying the task + * @param partitionIds partition ids to release */ - void failPartition(ExecutionAttemptID executionAttemptID); + void releasePartitions(Collection partitionIds); /** * Notify the given task about a completed checkpoint. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 064eef56143de..1fb2d4943d8be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -25,12 +25,14 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -86,8 +88,8 @@ public CompletableFuture updatePartitions(ExecutionAttemptID execut } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { - taskExecutorGateway.failPartition(executionAttemptID); + public void releasePartitions(Collection partitionIds) { + taskExecutorGateway.releasePartitions(partitionIds); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 9b9ad5b5be5a4..b35d65e926c76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -649,11 +650,9 @@ public CompletableFuture updatePartitions( } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { - log.info("Discarding the results produced by task execution {}.", executionAttemptID); - + public void releasePartitions(Collection partitionIds) { try { - networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID); + networkEnvironment.releasePartitions(partitionIds); } catch (Throwable t) { // TODO: Do we still need this catch branch? onFatalError(t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 728087af63f98..8a653df828787 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.types.SerializableOptional; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -99,11 +101,11 @@ CompletableFuture updatePartitions( @RpcTimeout Time timeout); /** - * Fail all intermediate result partitions of the given task. + * Batch release intermediate result partitions. * - * @param executionAttemptID identifying the task + * @param partitionIds partition ids to release */ - void failPartition(ExecutionAttemptID executionAttemptID); + void releasePartitions(Collection partitionIds); /** * Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 22d5df0610d06..dba0e7d631cd4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -26,10 +26,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; @@ -95,7 +97,8 @@ public CompletableFuture updatePartitions(ExecutionAttemptID execut } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) {} + public void releasePartitions(Collection partitionIds) { + } @Override public void notifyCheckpointComplete( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 789956f2bb6fb..aca0bb2925fae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.StackTraceSampleResponse; @@ -37,6 +38,7 @@ import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.Preconditions; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -123,7 +125,7 @@ public CompletableFuture updatePartitions(ExecutionAttemptID execut } @Override - public void failPartition(ExecutionAttemptID executionAttemptID) { + public void releasePartitions(Collection partitionIds) { // noop }