diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 699bd90cd5f1f..abb86e60fd53f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.ConfigGroup; import org.apache.flink.annotation.docs.ConfigGroups; +import org.apache.flink.annotation.docs.Documentation; import static org.apache.flink.configuration.ConfigOptions.key; @@ -209,6 +210,11 @@ public class NettyShuffleEnvironmentOptions { .withDeprecatedKeys("taskmanager.net.request-backoff.max") .withDescription("Maximum backoff in milliseconds for partition requests of input channels."); + @Documentation.ExcludeFromDocumentation("dev use only; likely temporary") + public static final ConfigOption FORCE_PARTITION_RELEASE_ON_CONSUMPTION = + key("taskmanager.network.partition.force-release-on-consumption") + .defaultValue(true); + // ------------------------------------------------------------------------ /** Not intended to be instantiated. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index 064c9bdff173f..7afe6aad0c3c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -48,6 +48,9 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ private final boolean sendScheduleOrUpdateConsumersMessage; + /** Whether the result partition is released on consumption. */ + private final boolean releasedOnConsumption; + public ResultPartitionDeploymentDescriptor( PartitionDescriptor partitionDescriptor, ShuffleDescriptor shuffleDescriptor, @@ -58,6 +61,7 @@ public ResultPartitionDeploymentDescriptor( KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); this.maxParallelism = maxParallelism; this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; + this.releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING; } public IntermediateDataSetID getResultId() { @@ -88,6 +92,10 @@ public boolean sendScheduleOrUpdateConsumersMessage() { return sendScheduleOrUpdateConsumersMessage; } + public boolean isReleasedOnConsumption() { + return releasedOnConsumption; + } + @Override public String toString() { return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, " diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java index 9cf86cb6d4ac2..a1ee77489d8e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java @@ -155,7 +155,8 @@ public static NettyShuffleEnvironment create( ioManager, networkBufferPool, config.networkBuffersPerChannel(), - config.floatingNetworkBuffersPerGate()); + config.floatingNetworkBuffersPerGate(), + config.isForcePartitionReleaseOnConsumption()); SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( taskExecutorLocation, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java new file mode 100644 index 0000000000000..5f2d3907a10ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; +import org.apache.flink.util.function.FunctionWithException; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * ResultPartition that releases itself once all subpartitions have been consumed. + */ +public class ReleaseOnConsumptionResultPartition extends ResultPartition { + + /** + * The total number of references to subpartitions of this result. The result partition can be + * safely released, iff the reference count is zero. A reference count of -1 denotes that the + * result partition has been released. + */ + private final AtomicInteger pendingReferences = new AtomicInteger(); + + ReleaseOnConsumptionResultPartition( + String owningTaskName, + ResultPartitionID partitionId, + ResultPartitionType partitionType, + ResultSubpartition[] subpartitions, + int numTargetKeyGroups, + ResultPartitionManager partitionManager, + FunctionWithException bufferPoolFactory) { + super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory); + } + + /** + * The partition can only be released after each subpartition has been consumed once per pin operation. + */ + @Override + void pin() { + while (true) { + int refCnt = pendingReferences.get(); + + if (refCnt >= 0) { + if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) { + break; + } + } + else { + throw new IllegalStateException("Released."); + } + } + } + + @Override + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { + int refCnt = pendingReferences.get(); + + checkState(refCnt != -1, "Partition released."); + checkState(refCnt > 0, "Partition not pinned."); + + return super.createSubpartitionView(index, availabilityListener); + } + + @Override + void onConsumedSubpartition(int subpartitionIndex) { + if (isReleased()) { + return; + } + + int refCnt = pendingReferences.decrementAndGet(); + + if (refCnt == 0) { + partitionManager.onConsumedPartition(this); + } else if (refCnt < 0) { + throw new IllegalStateException("All references released."); + } + + LOG.debug("{}: Received release notification for subpartition {}.", + this, subpartitionIndex); + } + + @Override + public String toString() { + return "ReleaseOnConsumptionResultPartition " + partitionId.toString() + " [" + partitionType + ", " + + subpartitions.length + " subpartitions, " + + pendingReferences + " pending references]"; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index d46727600bb94..a303421aa0b88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -38,7 +38,6 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkElementIndex; @@ -71,19 +70,19 @@ */ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { - private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); + protected static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); private final String owningTaskName; - private final ResultPartitionID partitionId; + protected final ResultPartitionID partitionId; /** Type of this partition. Defines the concrete subpartition implementation to use. */ - private final ResultPartitionType partitionType; + protected final ResultPartitionType partitionType; /** The subpartitions of this partition. At least one. */ - private final ResultSubpartition[] subpartitions; + protected final ResultSubpartition[] subpartitions; - private final ResultPartitionManager partitionManager; + protected final ResultPartitionManager partitionManager; public final int numTargetKeyGroups; @@ -91,13 +90,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { private final AtomicBoolean isReleased = new AtomicBoolean(); - /** - * The total number of references to subpartitions of this result. The result partition can be - * safely released, iff the reference count is zero. A reference count of -1 denotes that the - * result partition has been released. - */ - private final AtomicInteger pendingReferences = new AtomicInteger(); - private BufferPool bufferPool; private boolean isFinished; @@ -281,11 +273,6 @@ public void fail(@Nullable Throwable throwable) { * Returns the requested subpartition. */ public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { - int refCnt = pendingReferences.get(); - - checkState(refCnt != -1, "Partition released."); - checkState(refCnt > 0, "Partition not pinned."); - checkElementIndex(index, subpartitions.length, "Subpartition not found."); ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener); @@ -337,31 +324,15 @@ public boolean isReleased() { @Override public String toString() { return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", " - + subpartitions.length + " subpartitions, " - + pendingReferences + " pending references]"; + + subpartitions.length + " subpartitions]"; } // ------------------------------------------------------------------------ /** * Pins the result partition. - * - *

The partition can only be released after each subpartition has been consumed once per pin - * operation. */ void pin() { - while (true) { - int refCnt = pendingReferences.get(); - - if (refCnt >= 0) { - if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) { - break; - } - } - else { - throw new IllegalStateException("Released."); - } - } } /** @@ -373,17 +344,8 @@ void onConsumedSubpartition(int subpartitionIndex) { return; } - int refCnt = pendingReferences.decrementAndGet(); - - if (refCnt == 0) { - partitionManager.onConsumedPartition(this); - } - else if (refCnt < 0) { - throw new IllegalStateException("All references released."); - } - - LOG.debug("{}: Received release notification for subpartition {} (reference count now at: {}).", - this, subpartitionIndex, pendingReferences); + LOG.debug("{}: Received release notification for subpartition {}.", + this, subpartitionIndex); } public ResultSubpartition[] getAllPartitions() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 1fccf1d2ef277..67ad2c597917f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -57,18 +57,22 @@ public class ResultPartitionFactory { private final int floatingNetworkBuffersPerGate; + private final boolean forcePartitionReleaseOnConsumption; + public ResultPartitionFactory( @Nonnull ResultPartitionManager partitionManager, @Nonnull IOManager ioManager, @Nonnull BufferPoolFactory bufferPoolFactory, int networkBuffersPerChannel, - int floatingNetworkBuffersPerGate) { + int floatingNetworkBuffersPerGate, + boolean forcePartitionReleaseOnConsumption) { this.partitionManager = partitionManager; this.ioManager = ioManager; this.networkBuffersPerChannel = networkBuffersPerChannel; this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; this.bufferPoolFactory = bufferPoolFactory; + this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption; } public ResultPartition create( @@ -82,6 +86,7 @@ public ResultPartition create( desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), + desc.isReleasedOnConsumption() || forcePartitionReleaseOnConsumption, createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType())); } @@ -92,18 +97,28 @@ public ResultPartition create( @Nonnull ResultPartitionType type, int numberOfSubpartitions, int maxParallelism, + boolean releasePartitionOnConsumption, FunctionWithException bufferPoolFactory) { ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions]; - ResultPartition partition = new ResultPartition( - taskNameWithSubtaskAndId, - id, - type, - subpartitions, - maxParallelism, - partitionManager, - bufferPoolFactory); + ResultPartition partition = releasePartitionOnConsumption + ? new ReleaseOnConsumptionResultPartition( + taskNameWithSubtaskAndId, + id, + type, + subpartitions, + maxParallelism, + partitionManager, + bufferPoolFactory) + : new ResultPartition( + taskNameWithSubtaskAndId, + id, + type, + subpartitions, + maxParallelism, + partitionManager, + bufferPoolFactory); createSubpartitions(partition, type, subpartitions); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java index a6ff17bb25af9..b439f0803c281 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java @@ -60,6 +60,8 @@ public class NettyShuffleEnvironmentConfiguration { private final boolean isNetworkDetailedMetrics; + private final boolean forcePartitionReleaseOnConsumption; + private final NettyConfig nettyConfig; public NettyShuffleEnvironmentConfiguration( @@ -71,6 +73,7 @@ public NettyShuffleEnvironmentConfiguration( int floatingNetworkBuffersPerGate, boolean isCreditBased, boolean isNetworkDetailedMetrics, + boolean forcePartitionReleaseOnConsumption, @Nullable NettyConfig nettyConfig) { this.numNetworkBuffers = numNetworkBuffers; @@ -81,6 +84,7 @@ public NettyShuffleEnvironmentConfiguration( this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate; this.isCreditBased = isCreditBased; this.isNetworkDetailedMetrics = isNetworkDetailedMetrics; + this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption; this.nettyConfig = nettyConfig; } @@ -122,6 +126,10 @@ public boolean isNetworkDetailedMetrics() { return isNetworkDetailedMetrics; } + public boolean isForcePartitionReleaseOnConsumption() { + return forcePartitionReleaseOnConsumption; + } + // ------------------------------------------------------------------------ /** @@ -158,6 +166,9 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration( boolean isNetworkDetailedMetrics = configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS); + boolean forcePartitionReleaseOnConsumption = configuration.getBoolean( + NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION); + return new NettyShuffleEnvironmentConfiguration( numberOfNetworkBuffers, pageSize, @@ -167,6 +178,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration( extraBuffersPerGate, isCreditBased, isNetworkDetailedMetrics, + forcePartitionReleaseOnConsumption, nettyConfig); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 37b4a7f7adbf6..dd8b24f329bc6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -104,6 +104,24 @@ public void testSerializationWithNettyShuffleDescriptor() throws Exception { assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID)); } + @Test + public void testReleasedOnConsumptionFlag() { + for (ResultPartitionType partitionType : ResultPartitionType.values()) { + ResultPartitionDeploymentDescriptor partitionDescriptor = new ResultPartitionDeploymentDescriptor( + new PartitionDescriptor(resultId, partitionId, partitionType, numberOfSubpartitions, connectionIndex), + ResultPartitionID::new, + 1, + true + ); + + if (partitionType == ResultPartitionType.BLOCKING) { + assertThat(partitionDescriptor.isReleasedOnConsumption(), is(false)); + } else { + assertThat(partitionDescriptor.isReleasedOnConsumption(), is(true)); + } + } + } + private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPartitionDeploymentDescriptor( ShuffleDescriptor shuffleDescriptor) throws IOException { ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java index 9238ddad9d8c6..2269646a7c3c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java @@ -128,6 +128,7 @@ public NettyShuffleEnvironment build() { floatingNetworkBuffersPerGate, isCreditBased, isNetworkDetailedMetrics, + true, nettyConfig), taskManagerLocation, taskEventDispatcher, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java index 45b25c0645872..f1ba7fbe6eb25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java @@ -55,6 +55,8 @@ public class ResultPartitionBuilder { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private Optional> bufferPoolFactory = Optional.empty(); + private boolean releasedOnConsumption; + public ResultPartitionBuilder setResultPartitionId(ResultPartitionID partitionId) { this.partitionId = partitionId; return this; @@ -112,13 +114,19 @@ public ResultPartitionBuilder setBufferPoolFactory( return this; } + public ResultPartitionBuilder isReleasedOnConsumption(boolean releasedOnConsumption) { + this.releasedOnConsumption = releasedOnConsumption; + return this; + } + public ResultPartition build() { ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( partitionManager, ioManager, networkBufferPool, networkBuffersPerChannel, - floatingNetworkBuffersPerGate); + floatingNetworkBuffersPerGate, + true); FunctionWithException factory = bufferPoolFactory.orElseGet(() -> resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType)); @@ -129,6 +137,7 @@ public ResultPartition build() { partitionType, numberOfSubpartitions, numTargetKeyGroups, + releasedOnConsumption, factory); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java new file mode 100644 index 0000000000000..95979702bf154 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Tests for the {@link ResultPartitionFactory}. + */ +public class ResultPartitionFactoryTest extends TestLogger { + + @Test + public void testForceConsumptionOnReleaseEnabled() { + testForceConsumptionOnRelease(true); + } + + @Test + public void testForceConsumptionOnReleaseDisabled() { + testForceConsumptionOnRelease(false); + } + + private static void testForceConsumptionOnRelease(boolean forceConsumptionOnRelease) { + ResultPartitionFactory factory = new ResultPartitionFactory( + new ResultPartitionManager(), + new NoOpIOManager(), + new NetworkBufferPool(1, 64, 1), + 1, + 1, + forceConsumptionOnRelease + ); + + final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor( + new PartitionDescriptor( + new IntermediateDataSetID(), + new IntermediateResultPartitionID(), + ResultPartitionType.BLOCKING, + 1, + 0), + ResultPartitionID::new, + 1, + true + ); + + final ResultPartition test = factory.create("test", new ExecutionAttemptID(), descriptor); + + if (forceConsumptionOnRelease) { + assertThat(test, instanceOf(ReleaseOnConsumptionResultPartition.class)); + } else { + assertThat(test, not(instanceOf(ReleaseOnConsumptionResultPartition.class))); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 75459880a60a8..63503abf9e04d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -37,10 +37,13 @@ import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.Collections; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -100,6 +103,32 @@ public void testAddOnFinishedBlockingPartition() throws Exception { testAddOnFinishedPartition(ResultPartitionType.BLOCKING); } + @Test + public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException { + ResultPartitionManager manager = new ResultPartitionManager(); + + final ResultPartition partition = new ResultPartitionBuilder() + .isReleasedOnConsumption(false) + .setResultPartitionManager(manager) + .setResultPartitionType(ResultPartitionType.BLOCKING) + .build(); + + manager.registerResultPartition(partition); + partition.finish(); + + assertThat(manager.getUnreleasedPartitions(), contains(partition.getPartitionId())); + + // a blocking partition that is not released on consumption should be consumable multiple times + for (int x = 0; x < 2; x++) { + ResultSubpartitionView subpartitionView1 = partition.createSubpartitionView(0, () -> {}); + subpartitionView1.notifySubpartitionConsumed(); + + // partition should not be released on consumption + assertThat(manager.getUnreleasedPartitions(), contains(partition.getPartitionId())); + assertFalse(partition.isReleased()); + } + } + /** * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished. *