Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.ConfigGroup;
import org.apache.flink.annotation.docs.ConfigGroups;
import org.apache.flink.annotation.docs.Documentation;

import static org.apache.flink.configuration.ConfigOptions.key;

Expand Down Expand Up @@ -209,6 +210,11 @@ public class NettyShuffleEnvironmentOptions {
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");

@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
key("taskmanager.network.partition.force-release-on-consumption")
.defaultValue(true);

// ------------------------------------------------------------------------

/** Not intended to be instantiated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;

/** Whether the result partition is released on consumption. */
private final boolean releasedOnConsumption;

public ResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
Expand All @@ -58,6 +61,7 @@ public ResultPartitionDeploymentDescriptor(
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
this.maxParallelism = maxParallelism;
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
this.releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING;
}

public IntermediateDataSetID getResultId() {
Expand Down Expand Up @@ -88,6 +92,10 @@ public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}

public boolean isReleasedOnConsumption() {
return releasedOnConsumption;
}

@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [PartitionDescriptor: %s, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public static NettyShuffleEnvironment create(
ioManager,
networkBufferPool,
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate());
config.floatingNetworkBuffersPerGate(),
config.isForcePartitionReleaseOnConsumption());

SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
taskExecutorLocation,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.util.function.FunctionWithException;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkState;

/**
* ResultPartition that releases itself once all subpartitions have been consumed.
*/
public class ReleaseOnConsumptionResultPartition extends ResultPartition {

/**
* The total number of references to subpartitions of this result. The result partition can be
* safely released, iff the reference count is zero. A reference count of -1 denotes that the
* result partition has been released.
*/
private final AtomicInteger pendingReferences = new AtomicInteger();

ReleaseOnConsumptionResultPartition(
String owningTaskName,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
ResultSubpartition[] subpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
}

/**
* The partition can only be released after each subpartition has been consumed once per pin operation.
*/
@Override
void pin() {
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am not sure why it needs while loop here before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow Zhijiang's thought, I think we can now remove the pin method from the ResultPartition, since it is only the implementation detail of the ReleaseOnConsumptionResultPartiton. Then we can move the initialization of the pending reference to the Constructor to ensure no concurrent access to the variable, then we can use pendingReferences.set(subpartitions.length) to initialize the variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created FLINK-12842 and FLINK-12843 for the ref-counter issues which could be solved separately after this PR merged, because these issues already exist before and are not in the scope of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can resolve these in follow-ups.

int refCnt = pendingReferences.get();

if (refCnt >= 0) {
if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) {
break;
}
}
else {
throw new IllegalStateException("Released.");
}
}
}

@Override
public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
int refCnt = pendingReferences.get();

checkState(refCnt != -1, "Partition released.");
checkState(refCnt > 0, "Partition not pinned.");

return super.createSubpartitionView(index, availabilityListener);
}

@Override
void onConsumedSubpartition(int subpartitionIndex) {
if (isReleased()) {
return;
}

int refCnt = pendingReferences.decrementAndGet();

if (refCnt == 0) {
partitionManager.onConsumedPartition(this);
} else if (refCnt < 0) {
throw new IllegalStateException("All references released.");
}

LOG.debug("{}: Received release notification for subpartition {}.",
this, subpartitionIndex);
}

@Override
public String toString() {
return "ReleaseOnConsumptionResultPartition " + partitionId.toString() + " [" + partitionType + ", "
+ subpartitions.length + " subpartitions, "
+ pendingReferences + " pending references]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkElementIndex;
Expand Down Expand Up @@ -71,33 +70,26 @@
*/
public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {

private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
protected static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);

private final String owningTaskName;

private final ResultPartitionID partitionId;
protected final ResultPartitionID partitionId;

/** Type of this partition. Defines the concrete subpartition implementation to use. */
private final ResultPartitionType partitionType;
protected final ResultPartitionType partitionType;

/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
protected final ResultSubpartition[] subpartitions;

private final ResultPartitionManager partitionManager;
protected final ResultPartitionManager partitionManager;

public final int numTargetKeyGroups;

// - Runtime state --------------------------------------------------------

private final AtomicBoolean isReleased = new AtomicBoolean();

/**
* The total number of references to subpartitions of this result. The result partition can be
* safely released, iff the reference count is zero. A reference count of -1 denotes that the
* result partition has been released.
*/
private final AtomicInteger pendingReferences = new AtomicInteger();

private BufferPool bufferPool;

private boolean isFinished;
Expand Down Expand Up @@ -281,11 +273,6 @@ public void fail(@Nullable Throwable throwable) {
* Returns the requested subpartition.
*/
public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
int refCnt = pendingReferences.get();

checkState(refCnt != -1, "Partition released.");
checkState(refCnt > 0, "Partition not pinned.");

checkElementIndex(index, subpartitions.length, "Subpartition not found.");

ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
Expand Down Expand Up @@ -337,31 +324,15 @@ public boolean isReleased() {
@Override
public String toString() {
return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", "
+ subpartitions.length + " subpartitions, "
+ pendingReferences + " pending references]";
+ subpartitions.length + " subpartitions]";
}

// ------------------------------------------------------------------------

/**
* Pins the result partition.
*
* <p>The partition can only be released after each subpartition has been consumed once per pin
* operation.
*/
void pin() {
while (true) {
int refCnt = pendingReferences.get();

if (refCnt >= 0) {
if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) {
break;
}
}
else {
throw new IllegalStateException("Released.");
}
}
}

/**
Expand All @@ -373,17 +344,8 @@ void onConsumedSubpartition(int subpartitionIndex) {
return;
}

int refCnt = pendingReferences.decrementAndGet();

if (refCnt == 0) {
partitionManager.onConsumedPartition(this);
}
else if (refCnt < 0) {
throw new IllegalStateException("All references released.");
}

LOG.debug("{}: Received release notification for subpartition {} (reference count now at: {}).",
this, subpartitionIndex, pendingReferences);
LOG.debug("{}: Received release notification for subpartition {}.",
this, subpartitionIndex);
}

public ResultSubpartition[] getAllPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,22 @@ public class ResultPartitionFactory {

private final int floatingNetworkBuffersPerGate;

private final boolean forcePartitionReleaseOnConsumption;

public ResultPartitionFactory(
@Nonnull ResultPartitionManager partitionManager,
@Nonnull IOManager ioManager,
@Nonnull BufferPoolFactory bufferPoolFactory,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate) {
int floatingNetworkBuffersPerGate,
boolean forcePartitionReleaseOnConsumption) {

this.partitionManager = partitionManager;
this.ioManager = ioManager;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.bufferPoolFactory = bufferPoolFactory;
this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
}

public ResultPartition create(
Expand All @@ -82,6 +86,7 @@ public ResultPartition create(
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
desc.isReleasedOnConsumption() || forcePartitionReleaseOnConsumption,
createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
}

Expand All @@ -92,18 +97,28 @@ public ResultPartition create(
@Nonnull ResultPartitionType type,
int numberOfSubpartitions,
int maxParallelism,
boolean releasePartitionOnConsumption,
FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {

ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];

ResultPartition partition = new ResultPartition(
taskNameWithSubtaskAndId,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferPoolFactory);
ResultPartition partition = releasePartitionOnConsumption
? new ReleaseOnConsumptionResultPartition(
taskNameWithSubtaskAndId,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferPoolFactory)
: new ResultPartition(
taskNameWithSubtaskAndId,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferPoolFactory);

createSubpartitions(partition, type, subpartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class NettyShuffleEnvironmentConfiguration {

private final boolean isNetworkDetailedMetrics;

private final boolean forcePartitionReleaseOnConsumption;

private final NettyConfig nettyConfig;

public NettyShuffleEnvironmentConfiguration(
Expand All @@ -71,6 +73,7 @@ public NettyShuffleEnvironmentConfiguration(
int floatingNetworkBuffersPerGate,
boolean isCreditBased,
boolean isNetworkDetailedMetrics,
boolean forcePartitionReleaseOnConsumption,
@Nullable NettyConfig nettyConfig) {

this.numNetworkBuffers = numNetworkBuffers;
Expand All @@ -81,6 +84,7 @@ public NettyShuffleEnvironmentConfiguration(
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.isCreditBased = isCreditBased;
this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption;
this.nettyConfig = nettyConfig;
}

Expand Down Expand Up @@ -122,6 +126,10 @@ public boolean isNetworkDetailedMetrics() {
return isNetworkDetailedMetrics;
}

public boolean isForcePartitionReleaseOnConsumption() {
return forcePartitionReleaseOnConsumption;
}

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -158,6 +166,9 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(

boolean isNetworkDetailedMetrics = configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);

boolean forcePartitionReleaseOnConsumption = configuration.getBoolean(
NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);

return new NettyShuffleEnvironmentConfiguration(
numberOfNetworkBuffers,
pageSize,
Expand All @@ -167,6 +178,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
extraBuffersPerGate,
isCreditBased,
isNetworkDetailedMetrics,
forcePartitionReleaseOnConsumption,
nettyConfig);
}

Expand Down
Loading