-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12960] Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes #8857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
@flinkbot attention @zentol @zhijiangW |
zentol
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments.
| boolean isReleasedOnConsumption, | ||
| boolean isBlockingPartition) { | ||
| Preconditions.checkArgument( | ||
| !isReleasedOnConsumption || isBlockingPartition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long-term we probably want to change this a bit; this kind of setup forces users of the shuffle service (i.e. the ones creating a PartitionDescriptor) to be fully aware of it's capabilities.
Basically, both the partition and shuffle descriptor describe an is-state; this partition must be released on consumption, and we expect the shuffle service to fail if it can't fulfill that.
I'd like to see this changed at some point so that the partition descriptor (or some other form of input) describes a wish instead, to facilitate something like:
Scheduler: "hey shuffle service, if you could persist this partition, that would be great."
ShuffleService: "Sorry kiddo, this one must be released on consumption."
Scheduler: "oh alright then, I'll just adjust my scheduling."
The usefulness of this can be easily shown with a simple though experiment: assume there exists a shuffle service implementation that supports pipelined partitions that are consumable multiple times.
Currently, it would be impossible for the runtime to ever make use of this flag, so long as we don't resort to type checks.
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleUtils.java
Outdated
Show resolved
Hide resolved
3281582 to
99fe0ab
Compare
| releasedOnConsumption ? | ||
| shuffleDescriptor.getSupportedReleaseTypes().contains(ReleaseType.AUTO) : | ||
| shuffleDescriptor.getSupportedReleaseTypes().contains(ReleaseType.MANUAL), | ||
| "Release on consumption <%s> is not supported by the shuffle service for this partition, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's include the result partition ID as well
| * Manually release the partition, the partition has to support consumption multiple times. | ||
| * | ||
| * <p>The partition requires manual actions to release it once all consumption is done: | ||
| * {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is slight ambiguous as it isn't whether the local resource condition applies to just the ShuffleEnvironment call or both of them.
I'd suggest something along the lines of ShuffleMaster[...] and, if [...], ShuffleEnvironment [...]
| sendScheduleOrUpdateConsumersMessage, | ||
| // Later we might have to make the scheduling adjust automatically | ||
| // if certain release type is not supported by shuffle service implementation at hand | ||
| partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in future the releasedOnConsumption could be refactored into one ReleaseStrategy as proposed in #8804 , then it could be configured which strategy is used in practice (not only limited to ResultPartitionType) and check whether this config is consistent with shuffle implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, the PR just leaves it as it was before at the moment
| int maxParallelism, | ||
| boolean sendScheduleOrUpdateConsumersMessage, | ||
| boolean releasedOnConsumption) { | ||
| checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, releasedOnConsumption); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNull(partitionDescriptor) should be put here.
| Optional<ResourceID> storesLocalResourcesOn(); | ||
|
|
||
| /** | ||
| * Return release types supported by Shuffle Service for this partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove for this partition.
by this shuffle service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? if we leave by this shuffle service then it might sound like for all partitions but this method returns supported release types for certain partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might exist two options:
-
One
ShuffleServiceimplementation might provide differentShuffleDescriptorimplementations for different partitions with different release types. Then the previous comment makes sense. -
One
ShuffleServiceimplementation would only have one kind ofShuffleDescriptorand provide one enum of release types for all the partitions. Just like atm we only haveNettyShuffleDescriptorimplementation which provides the same release types for all partitions. So my above comment suggestion was based on this. The release type isShuffleServiceglobal level suitable for all partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NettyShuffleDescriptor does not have the same release types for all partitions. For non-blocking partitions we do not support ReleaseType.MANUAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have no other concerns here now.
| * <p>This call triggers release of any resources which are occupied by the given partition in the external systems | ||
| * outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions. | ||
| * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}. | ||
| * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory different ShuffleManager implementations could choose which release strategy is supported, we should not limit all the implementations must support one type, otherwise we could make this limitation reflected in interface methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of PR is to let shuffle services decide what to support in ShuffleDescriptor#getSupportedReleaseTypes. This doc comment line is related to when the manual release is possible (releasePartitionLocally/releasePartitionExternally).
Or do you mean, that we implicitly assume at the moment that ReleaseType#MANUAL has to be always supported for blocking partitions? For this, we have assertion ResultPartitionDeploymentDescriptor#checkReleaseOnConsumptionIsSupportedForPartition which will fail the job fast. It is true, we have to make scheduling/release strategies automatically adjust later if something is not supported as written in the comment to the hardcoded releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I have not noticed that this javadoc change was for the method of releasePartitionsExternally, if so I agree it makes sense that this method implementation is for MANUL release.
Before I thought the new info was added on the class javadoc, that means all the ShuffleMaster implementation must support MANUL release type, so I pointed out that concern.
zhijiangW
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @azagrebin !
I only reviewed the overall codes and left some minor comments, might further think it if possible.
|
Thanks for the review @zentol @zhijiangW ! I updated the PR. |
| createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); | ||
|
|
||
| ShuffleDescriptor shuffleDescriptorCopy = copy.getShuffleDescriptor(); | ||
| ShuffleDescriptor shuffleDescriptorCopy = CommonTestUtils.createCopySerializable(shuffleDescriptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change should be a separate hotfix
| 1, | ||
| 0), | ||
| NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), | ||
| NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be proper for using partitionType.isBlocking() instead of direct true
| return this; | ||
| } | ||
|
|
||
| public NettyShuffleDescriptorBuilder setBlocking(boolean blocking) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setIsBlocking(boolean isBlocking) ?
…est serialization of UnknownShuffleDescriptor without ResultPartitionDeploymentDescriptor
| * by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} | ||
| * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}. | ||
| * | ||
| * <p>The partition has to support the corresponding {@link ReleaseType} in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be better to indicate which value true/false should support which releaseType, otherwise it seems not clear the mapping between this value and release type.
|
@azagrebin I have no other concerns except some nit comments. |
…aseType and ShuffleDescriptor#getSupportedReleaseTypes `ResultPartitionDeploymentDescriptor#releasedOnConsumption` shows the intention how the partition is going to be used by the shuffle user and released. The `ShuffleDescriptor` should provide a way to query which release type is supported by shuffle service for this partition. If the requested release type is not supported by the shuffle service for a certain type of partition, the job should fail fast.
What is the purpose of the change
ResultPartitionDeploymentDescriptor#releasedOnConsumptionshows the intention how the partition is going to be used by the shuffle user and released. TheShuffleDescriptorshould provide a way to query which release type is supported by shuffle service for this partition. If the requested release type is not supported by the shuffle service for a certain type of partition, the job should fail fast.Brief change log
ShuffleDescriptor#ReleaseType.AUTO and MANUALShuffleDescriptor#getSupportedReleaseTypesResultPartitionDeploymentDescriptor#releaseTypeVerifying this change
Simple refactoring covered by existing tests, plus
ResultPartitionDeploymentDescriptorTest#testIncompatibleReleaseTypeManualDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation