Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

import java.io.Serializable;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -51,20 +53,49 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;

/** Whether the result partition is released on consumption. */
private final boolean releasedOnConsumption;
private final ReleaseType releaseType;

public ResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
int maxParallelism,
boolean sendScheduleOrUpdateConsumersMessage) {
this(
checkNotNull(partitionDescriptor),
shuffleDescriptor,
maxParallelism,
sendScheduleOrUpdateConsumersMessage,
// Later we might have to make the scheduling adjust automatically
// if certain release type is not supported by shuffle service implementation at hand
partitionDescriptor.getPartitionType() == ResultPartitionType.BLOCKING ? ReleaseType.MANUAL : ReleaseType.AUTO);
}

public ResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
int maxParallelism,
boolean sendScheduleOrUpdateConsumersMessage,
ReleaseType releaseType) {
checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, releaseType);
this.partitionDescriptor = checkNotNull(partitionDescriptor);
this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
this.maxParallelism = maxParallelism;
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
this.releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING;
this.releaseType = releaseType;
}

private static void checkReleaseOnConsumptionIsSupportedForPartition(
ShuffleDescriptor shuffleDescriptor,
ReleaseType releaseType) {
checkNotNull(shuffleDescriptor);
checkArgument(
shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType),
"Release type %s is not supported by the shuffle service for this partition" +
"(id: %s), supported release types: %s",
releaseType,
shuffleDescriptor.getResultPartitionID(),
shuffleDescriptor.getSupportedReleaseTypes());
}

public IntermediateDataSetID getResultId() {
Expand Down Expand Up @@ -103,10 +134,15 @@ public boolean sendScheduleOrUpdateConsumersMessage() {
* by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
* and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
*
* <p>The partition has to support the corresponding {@link ReleaseType} in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to indicate which value true/false should support which releaseType, otherwise it seems not clear the mapping between this value and release type.

* {@link ShuffleDescriptor#getSupportedReleaseTypes()}:
* {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to return {@code true} and
* {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to return {@code false}.
*
* @return whether to release the partition after having been fully consumed once.
*/
public boolean isReleasedOnConsumption() {
return releasedOnConsumption;
return releaseType == ReleaseType.AUTO;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.Optional;

/**
Expand All @@ -34,19 +35,29 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {

private static final long serialVersionUID = 852181945034989215L;

private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS =
EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL);

private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS =
EnumSet.of(ReleaseType.AUTO);

private final ResourceID producerLocation;

private final PartitionConnectionInfo partitionConnectionInfo;

private final ResultPartitionID resultPartitionID;

private final boolean isBlocking;

public NettyShuffleDescriptor(
ResourceID producerLocation,
PartitionConnectionInfo partitionConnectionInfo,
ResultPartitionID resultPartitionID) {
ResultPartitionID resultPartitionID,
boolean isBlocking) {
this.producerLocation = producerLocation;
this.partitionConnectionInfo = partitionConnectionInfo;
this.resultPartitionID = resultPartitionID;
this.isBlocking = isBlocking;
}

public ConnectionID getConnectionId() {
Expand All @@ -63,6 +74,12 @@ public Optional<ResourceID> storesLocalResourcesOn() {
return Optional.of(producerLocation);
}

@Override
public EnumSet<ReleaseType> getSupportedReleaseTypes() {
return isBlocking ?
SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS;
}

public boolean isLocalTo(ResourceID consumerLocation) {
return producerLocation.equals(consumerLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor(
producerDescriptor.getProducerLocation(),
createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()),
resultPartitionID);
resultPartitionID,
partitionDescriptor.getPartitionType().isBlocking());

return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Serializable;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Optional;

/**
Expand Down Expand Up @@ -67,4 +68,32 @@ default boolean isUnknown() {
* @return the resource id of the producing task executor if the partition occupies local resources there
*/
Optional<ResourceID> storesLocalResourcesOn();

/**
* Return release types supported by Shuffle Service for this partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove for this partition.
by this shuffle service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? if we leave by this shuffle service then it might sound like for all partitions but this method returns supported release types for certain partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might exist two options:

  • One ShuffleService implementation might provide different ShuffleDescriptor implementations for different partitions with different release types. Then the previous comment makes sense.

  • One ShuffleService implementation would only have one kind of ShuffleDescriptor and provide one enum of release types for all the partitions. Just like atm we only have NettyShuffleDescriptor implementation which provides the same release types for all partitions. So my above comment suggestion was based on this. The release type is ShuffleService global level suitable for all partitions.

Copy link
Contributor

@zentol zentol Jun 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NettyShuffleDescriptor does not have the same release types for all partitions. For non-blocking partitions we do not support ReleaseType.MANUAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have no other concerns here now.

*/
EnumSet<ReleaseType> getSupportedReleaseTypes();

/**
* Partition release type.
*/
enum ReleaseType {
/**
* Auto-release the partition after having been fully consumed once.
*
* <p>No additional actions required, like {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
* or {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}
*/
AUTO,

/**
* Manually release the partition, the partition has to support consumption multiple times.
*
* <p>The partition requires manual release once all consumption is done:
* {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and
* if the partition occupies producer local resources ({@link #storesLocalResourcesOn()}) then also
* {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
*/
MANUAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -63,12 +64,14 @@
* <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} and
* {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called when the production is done.
* The actual release can take some time depending on implementation details,
* e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.</li>
* e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.
* The partition has to support the {@link ReleaseType#AUTO} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
* <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} and
* {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
* if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()}),
* are called outside of the producer thread, e.g. to manage the lifecycle of BLOCKING result partitions
* which can outlive their producers.</li>
* which can outlive their producers. The partition has to support the {@link ReleaseType#MANUAL} in
* {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
* </ol>
* The partitions, which currently still occupy local resources, can be queried with
* {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
Expand Down Expand Up @@ -130,6 +133,7 @@ Collection<P> createResultPartitionWriters(
* <p>This is called for partitions which occupy resources locally
* (can be checked by {@link ShuffleDescriptor#storesLocalResourcesOn()}).
* This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
* The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
*
* @param partitionIds identifying the partitions to be released
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.shuffle;

import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -51,6 +52,7 @@ CompletableFuture<T> registerPartitionWithProducer(
* <p>This call triggers release of any resources which are occupied by the given partition in the external systems
* outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions.
* This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
* The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory different ShuffleManager implementations could choose which release strategy is supported, we should not limit all the implementations must support one type, otherwise we could make this limitation reflected in interface methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of PR is to let shuffle services decide what to support in ShuffleDescriptor#getSupportedReleaseTypes. This doc comment line is related to when the manual release is possible (releasePartitionLocally/releasePartitionExternally).

Or do you mean, that we implicitly assume at the moment that ReleaseType#MANUAL has to be always supported for blocking partitions? For this, we have assertion ResultPartitionDeploymentDescriptor#checkReleaseOnConsumptionIsSupportedForPartition which will fail the job fast. It is true, we have to make scheduling/release strategies automatically adjust later if something is not supported as written in the comment to the hardcoded releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have not noticed that this javadoc change was for the method of releasePartitionsExternally, if so I agree it makes sense that this method implementation is for MANUL release.
Before I thought the new info was added on the class javadoc, that means all the ShuffleMaster implementation must support MANUL release type, so I pointed out that concern.

* The producer local resources are managed by {@link ShuffleDescriptor#storesLocalResourcesOn()} and
* {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

import java.util.EnumSet;
import java.util.Optional;

/**
Expand Down Expand Up @@ -56,4 +57,9 @@ public boolean isUnknown() {
public Optional<ResourceID> storesLocalResourcesOn() {
return Optional.empty();
}

@Override
public EnumSet<ReleaseType> getSupportedReleaseTypes() {
return EnumSet.noneOf(ReleaseType.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -72,13 +74,9 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
* Tests simple de/serialization with {@link UnknownShuffleDescriptor}.
*/
@Test
public void testSerializationWithUnknownShuffleDescriptor() throws Exception {
public void testSerializationOfUnknownShuffleDescriptor() throws IOException {
ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID);

ResultPartitionDeploymentDescriptor copy =
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);

ShuffleDescriptor shuffleDescriptorCopy = copy.getShuffleDescriptor();
ShuffleDescriptor shuffleDescriptorCopy = CommonTestUtils.createCopySerializable(shuffleDescriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change should be a separate hotfix

assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class));
assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID));
assertThat(shuffleDescriptorCopy.isUnknown(), is(true));
Expand All @@ -88,11 +86,12 @@ public void testSerializationWithUnknownShuffleDescriptor() throws Exception {
* Tests simple de/serialization with {@link NettyShuffleDescriptor}.
*/
@Test
public void testSerializationWithNettyShuffleDescriptor() throws Exception {
public void testSerializationWithNettyShuffleDescriptor() throws IOException {
ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor(
producerLocation,
new NettyShuffleDescriptor.NetworkPartitionConnectionInfo(connectionID),
resultPartitionID);
new NetworkPartitionConnectionInfo(connectionID),
resultPartitionID,
false);

ResultPartitionDeploymentDescriptor copy =
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
Expand All @@ -110,7 +109,7 @@ public void testReleasedOnConsumptionFlag() {
for (ResultPartitionType partitionType : ResultPartitionType.values()) {
ResultPartitionDeploymentDescriptor partitionDescriptor = new ResultPartitionDeploymentDescriptor(
new PartitionDescriptor(resultId, partitionId, partitionType, numberOfSubpartitions, connectionIndex),
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
1,
true
);
Expand All @@ -123,6 +122,16 @@ public void testReleasedOnConsumptionFlag() {
}
}

@Test(expected = IllegalArgumentException.class)
public void testIncompatibleReleaseTypeManual() {
new ResultPartitionDeploymentDescriptor(
partitionDescriptor,
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(),
1,
true,
ReleaseType.MANUAL);
}

private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPartitionDeploymentDescriptor(
ShuffleDescriptor shuffleDescriptor) throws IOException {
ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static void verifyCreateSubpartitionViewThrowsException(
}

public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) {
ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal();
PartitionDescriptor partitionDescriptor = new PartitionDescriptor(
new IntermediateDataSetID(),
shuffleDescriptor.getResultPartitionID().getPartitionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ private static void testForceConsumptionOnRelease(boolean forceConsumptionOnRele
forceConsumptionOnRelease
);

ResultPartitionType partitionType = ResultPartitionType.BLOCKING;
final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor(
new PartitionDescriptor(
new IntermediateDataSetID(),
new IntermediateResultPartitionID(),
ResultPartitionType.BLOCKING,
partitionType,
1,
0),
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
1,
true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class NettyShuffleDescriptorBuilder {
private InetAddress address = InetAddress.getLoopbackAddress();
private int dataPort = 0;
private int connectionIndex = 0;
private boolean isBlocking;

public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID producerLocation) {
this.producerLocation = producerLocation;
Expand Down Expand Up @@ -73,19 +74,26 @@ public NettyShuffleDescriptorBuilder setConnectionIndex(int connectionIndex) {
return this;
}

public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) {
this.isBlocking = isBlocking;
return this;
}

public NettyShuffleDescriptor buildRemote() {
ConnectionID connectionID = new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex);
return new NettyShuffleDescriptor(
producerLocation,
new NetworkPartitionConnectionInfo(connectionID),
id);
id,
isBlocking);
}

public NettyShuffleDescriptor buildLocal() {
return new NettyShuffleDescriptor(
producerLocation,
LocalExecutionPartitionConnectionInfo.INSTANCE,
id);
id,
isBlocking);
}

public static NettyShuffleDescriptorBuilder newBuilder() {
Expand Down