Skip to content

Conversation

@azagrebin
Copy link
Contributor

What is the purpose of the change

At the moment we have ShuffleEnvironment.releasePartitions which is used to release locally occupied resources of partition. JM can also use it by calling TaskExecutorGateway.releasePartitions.

To support lifecycle management of partitions (FLINK-12069, relevant mostly for batch and blocking partitions), we need to extend Shuffle API:

  • ShuffleDescriptor.hasLocalResources indicates that this partition occupies local resources on TM and requires TM running to consume the produced data (e.g. true for default NettyShuffleEnviroment and false for externally stored partitions). If a partition needs external lifecycle management and is not released after the first consumption is done (ResultPartitionDeploymentDescriptor.isReleasedOnConsumption), then RM/JM should keep TMs, which produce these partitions, running until partition still needs to be consumed. The connection to these TMs should also to be kept to issue the RPC call TaskExecutorGateway.releasePartitions once partition is not needed any more.

  • ShuffleMaster.removePartitionExternally: JM should call this whenever the partition does not need to be consumed any more. This call releases partition resources possibly occupied externally outside of TM and should not depend on ShuffleDescriptor.hasLocalResources.

Brief change log

  • Introduce ShuffleDescriptor.hasLocalResources and default netty shuffle implementation
  • Introduce ShuffleMaster.removePartitionExternally and default netty shuffle implementation

Verifying this change

Trivial shuffle interface extension.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 19, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@azagrebin
Copy link
Contributor Author

@flinkbot attention @tillrohrmann @zentol @zhijiangW

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from my side.

}

@Override
public Optional<ResourceID> hasLocalResources() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you know, whenever I see this method I always first assume that it returns a boolean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could rename it to storesLocalResourcesOn()

Copy link
Contributor

@zhijiangW zhijiangW Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, it might bring confusing here. I have not saw the specific usages for this method yet, so I am not sure whether the return type is proper/necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, storesLocalResourcesOn fits better

* <p>Indicates whether the shuffle service should automatically release all partition resources after
* the first full consumption has been acknowledged.
*
* @return whether to release the partition after having been fully consumed once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has the same information with above Returns whether to release the partition after having been fully consumed once., so might be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep jdoc format consistent.

/**
* Release local resources occupied by the given partitions.
*
* <p>Relevant for partitions which have {@link ShuffleDescriptor#hasLocalResources()} returning {@code true}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasLocalResources not return boolean value.

*
* @param shuffleDescriptor shuffle descriptor of the result partition to release externally.
*/
void releasePartitionExternally(T shuffleDescriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about the keyword externally.

IMO, we provide different ways for releasing partitions. One is based on consumption once which already exists. The other is based on JM's decision when to release a partition in batch job/blocking partition, so the ShuffleMater provides an interface for JM calling, then ShuffleMater communicates with relevant ShuffleEnvironment which already provides releasePartition to do so.

No matter which way we provide, the target is to release partition, the resources would be naturally freed as a result of partition release. For blocking partition case, it might also free some local resources in TaskExecutor, so I think we do not need to distinguish the external/internal resources here. Might only call ShuffleMaster#releasePartition and ShuffleEnvironment#releasePartition separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already explicitly assume that there can be some local TM resources to release for which we have to keep TM connection and special bookkeeping in TM (#8778 and JobAwareShuffleEnvironment in #8687). The way we are solving currently tracking of partitions to be released from JM in TM, we have to do the RPC local release anyways to update the JobAwareShuffleEnvironment. Then there is no need for extra internal communication between ShuffleMaster and ShuffleEnvironment for this purpose atm (this would also require a lot of effort w/o having TaskManagerGateway).

And we still have to be able to do the external release in future optimisation where we do not keep TM connection (no need for local release) in case of external shuffle service.

In this regard, I think it might be even better to rename ShuffleEnvironment#releasePartitions to ShuffleEnvironment#releasePartitionsLocally because this way it reflects how we intend to use it in JM/TM (final users of shuffle service). Of course, eventually shuffle service can do anything internally including extra communication between ShuffleMater and ShuffleEnvironment or doing local/external cleanup at the same time.

Copy link
Contributor

@zhijiangW zhijiangW Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!
If I understood correctly, the local release is for RPC JM->TM and external release is for RPC ShuffleMaster->ShuffleEnvironment. ATM we only have one internal shuffle implementation, then the ShuffleEnvironment#releasePartitionsLocally is actually used and ShuffleMaster#releasePartitionsExternally might never be used. But for extending external shuffle implementation future, I am not sure how these two release methods would be used then.

My only concern is wondering this abstraction might bring confusing for future extending implementations, because we seem give some specific tags/limitations in general interface. We only need to provide the semantic/ability for the function, no need to reflect the backend detail implementations (TaskManagerGateway, TM connection).

We already provide one simple implementation via JM/TM for release and also define the interface between ShuffleMaster/ShuffleEnvironment for release. The general ShuffleMaster#releasePartition is only for indicating that ShuffleMaster should release given partitions, internal/external shuffle implementations could both rely on it or not.

E.g. the current releaseOnConsumption could also be implemented like this: when JM receives the notification of finished consumer task, it notifies ShuffleMaster to release corresponding producer's partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current idea is that (both for not RPD.releaseOnConsumption):

  • the local release is for RPC JM->TM (only if SD.storesLocalResourcesOn): cleans up JobAwareShuffleEnvironment wrapper and calls ShuffleEnvironment#releasePartitionsLocally.

  • the external release is for RPC ShuffleMaster -> external system when the consumption is done, e.g. auxiliary service on yarn node. It is not for ShuffleEnvironment because TM yarn container can be already released after the production (but not consumption) is done for external shuffle service (does not SD.storesLocalResourcesOn). It is not needed for Netty implementation (empty) but has to be called for external shuffle service w/o JM/TM RPC (no TM/ShuffleEnvironment connection).

So in both cases we call the methods for certain semantic purpose.

We need the semantic difference in these methods because we already assume that there are TM local and external systems, and JM/TM treat the cases differently. It would be nice to have only one call, e.g. ShuffleMaster.releasePartitions and let it decide what to call/release but then it would have to implement RPC internally and also we need to do the JM/TM RPC anyways to manage JobAwareShuffleEnvironment wrapper. Having declared the intentions in method names should actually simplify the shuffle implementations.

releaseOnConsumption currently means that shuffle service internally monitors end of consumption (like Netty implementation always did) and takes care about release automatically without any action required by JM/TM. True, in future we could change it that it is managed only by JM/TM directly, I think we keep it for safety until the fine-grained release is stable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SD.storesLocalResourcesOn TM -> Yes No
PD.releaseOnConsumption \|/ - -
True Auto internal release, no action required Auto internal release, no action required
False Track partitions in JM/TM, keep JM/TM connection until consumption is done, call JM/TM RPC for TM local and ShuffleMaster for external release when consumption is done Track partitions in JM, close JM/TM connection when production is done, call only ShuffleMaster to release when consumption is done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the above detail explanations and I could understand the functions/semantics we want to provide. I think we provide three ways for releasing partitions atm as below table:

Partition release Interface dependencies
1.release once consumed  PD#releaseOnConsumption
2.TM internal shuffle RPC-based  JM->TM RPC, ShuffleEnvironment#releasePartitionsLocally, SD#storesLocalResourcesOn
3.external shuffle RPC-base ShuffleMaster#releasePartitionsExternally, SD#storesLocalResourcesOn

My previous thought was considering whether the interface and proposed methods are clear enough for the third-party implementation of ShuffleManager. We could see from above table that there are four interface methods involved in partition release (PD, SD, ShuffleMaster, ShuffleEnvironment) and some methods are coupled with each other.

E.g. we assume if SD#storesLocalResourcesOn is true, ShuffleEnvironment#releasePartitionsLocally should be used for releasing partition, otherwise ShuffleMaster#releasePartitionsExternally should be used. Actually we both agree that for the above case 2 in table, we could also rely on ShuffleMaster#releasePartitionsExternally to release partition if we want, and only we rely on JM/TM RPC as shortcut default implementation.

But if somebody wants to implement another ShuffleManager to replace the current one for case 2, he might be confused of why ShuffleMaster#releasePartitionsExternally was not suggested using and what is Externally indicating for. If the interface was only defined as ShuffleMaster#releasePartitions, it does not need to explain the differences between externally and locally which might bring confusing sometimes. It only indicates there triggers a release call from ShuffleMaster, the third-party could decide whether to implement it or empty via other ways.

Also for ShuffleEnvironment#releasePartition it only indicates to trigger a partition release call from ShuffleEnvironment, no need to limit/emphasize it for a certain specific implementation, although the external case 3 might never call it in implementation.

Actually from above table we could see we need two boolean tag infos atm. One is for describing the partition release mechanism, and the other is for indicating TM internal shuffle or external shuffle. I think these two infos might be both tagged in ShuffleDescriptor in future. Because we assume the pipelined partition could be consumed only once limited by current implementation, so we tag it in PD#releaseOnConsumption. But whether a partition could be consumed multiple times is also up to the implementation of ShuffleEnvironment#createResultPartitionWriters, we could also implement a pipelined partition which could be consumed multi times. So the SD could consider the partition type with internal implementation to give a final tag.

Copy link
Contributor Author

@azagrebin azagrebin Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for more explanation! I am still not sure that I fully understand the suggested final way to define this part of interface.

If we define both methods as ShuffleMaster#releasePartitions and ShuffleEnvironment#releasePartitions without stating the purpose, it can look for an implementer like any of them can be used for full release and it has to be implemented this way which is currently not true, e.g. for the default netty implementation. Basically, the existing netty implementation will not comply with the interface or we will have to make an implicit assumption in JM/TM about how to use shuffle service just for Netty which does a partial release in case of each method call. True, the methods are coupled at the moment which is not ideal but to make them more flexible, all implementations have to be flexible.

PD#releaseOnConsumption is an information from JM (user of shuffle service) about how the partition is intended to be used, I think we should actually move it to the PartitionDescriptor (#8857), not ShuffleDescriptor. If releaseOnConsumption is not supported by shuffle service for certain partition type, it should throw an exception already in ShuffleMaster#registerPartitionWithProducer.

Do I understand correctly your suggestion or it should be defined differently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my previous suggestion was defined the methods as ShuffleMaster#releasePartitions and ShuffleEnvironment#releasePartitions. In default netty implementation we actually do not use this path for releasing partitions.

For other extending new implementations(no matter external or other internal), users could decide whether to use this path or not. If use, it should adjust to let JM call ShuffleMaster#releasePartition, not communicate directly with TM as now.

I am not strongly insisting on this suggestion, just pointed out some personal concerns from first impression. And I think this summary-up could help deeply understand the full stack via discussion. :)

Also agree with your issue of PD#releaseOnConsumption, the partition release strategy could be abstracted to pass into PartitionDescriptor and ShuffleMaster could check whether the current implementation supports it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the problem is not easy and the resulting tables in our discussion should really help the understanding :)

We could also define that the shuffle user (JM/TM) has to guarantee to
always call both methods ShuffleMaster#releasePartitions and ShuffleEnvironment#releasePartitions then an implementer could decide which cleanup to do in which method but this is again not true because once the TM is gone, ShuffleEnvironment#releasePartitions cannot be called.

ShuffleMaster can already communicate directly to ShuffleEnvironment. It just does not have to for only ShuffleEnvironment#releasePartitionsLocally and the naming states it clearly. If SD#storesLocalResourcesOn then the shuffle user (JM/TM) guarantees doing the RPC.

Copy link
Contributor

@zhijiangW zhijiangW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @azagrebin .

I am not sure how to use the proposed hasLocalResources in detail process, whether the ResourceID is actually used, so we could further see it in following PRs.

In addition, I have a bit naming concern for ShuffleMaster#releasePartitionExternally.

*
* @return the resource id of the producing task executor if the partition occupies local resources there
*/
Optional<ResourceID> hasLocalResources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are local resources, does this imply that there aren't any external resources? (Basically, I want to double-check when we have to issue release calls to the ShuffleMaster)

Copy link
Contributor Author

@azagrebin azagrebin Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would define it in a way that in general every shuffle service can potentially have some external resources if not then this is just an empty implementation. I would always call the external release in JM when the partition is not needed unless releasedOnConsumption is set in ResultPartitionDeploymentDescriptor.

*
* @param shuffleDescriptor shuffle descriptor of the result partition to release externally.
*/
void releasePartitionExternally(T shuffleDescriptor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this signature will work due to generics.

For this to work one has to maintain the generic shuffle descriptor type from registerPartitionWithProducer until releasePartitionExternally. As it stands this isn't possible. The shuffle master in the EG is currently defined as ShuffleMaster<?> shuffleMaster.

Copy link
Contributor

@zentol zentol Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative could be to simply accept a ResultPartitionID instead. This would force some book-keeping into the implementations, but i don't see a another solution right now.

Copy link
Contributor Author

@azagrebin azagrebin Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True because we do not keep static type information about concrete ShuffleDescriptor in JM, but we can switch it to ShuffleDescriptor type, it should also work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having generified ShuffleMaster is under question then, we can consider removing generics later..

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that the signature of releasePartitionExternally is usable at the moment. It works in isolation, but will not work within the ExecutionGraph since we don't capture the generic type of the ShuffleMaster anywhere.

Simply put, and ShuffleDescriptor retrieved from the ResultPartitionDeploymentDescriptor can never be passed to a ShuffleMastter.

@azagrebin
Copy link
Contributor Author

Thanks for the reviews @zhijiangW @zentol ! I addressed comments. We can further discuss releasePartitionsXXX naming.

…ocalResources

ShuffleDescriptor.hasLocalResources() indicates that this partition occupies local resources on TM and requires TM running to consume the produced data (e.g. true for default NettyShuffleEnviroment and false for externally stored partitions). If a partition needs external lifecycle management and is not released after the first consumption is done (ResultPartitionDeploymentDescriptor.isReleasedOnConsumption()), then RM/JM should keep TMs, which produce these partitions, running until partition still needs to be consumed. The connection to these TMs should also to be kept to issue the RPC call TaskExecutorGateway.releasePartitions once partition is not needed any more, the RPC call triggers ShuffleEnvironment.releasePartitions.
ResultPartitionDeploymentDescriptor partitionDescriptor = new ResultPartitionDeploymentDescriptor(
new PartitionDescriptor(resultId, partitionId, partitionType, numberOfSubpartitions, connectionIndex),
ResultPartitionID::new,
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be a separate hotfix commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target change is not possible without this small change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I misunderstood before.

1,
0),
ResultPartitionID::new,
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@zentol
Copy link
Contributor

zentol commented Jun 24, 2019

As far as I can tell the only remaining discussion here is about the exact naming of releasePartition methods.

Given that the existence of these methods is required for other PRs (#8687, #8778) I would still like to go ahead with the merge, and defer further discussions to a later date.

Overall it looks to me like we need better documentation about how shuffle services can expect Flink to behave, and vice-versa.

…rtitionExternally

JM should call this whenever the partition does not need to be consumed any more.
This call releases partition resources possibly occupied externally outside of TM and does not depend on ShuffleDescriptor.hasLocalResources.
@azagrebin
Copy link
Contributor Author

I added a bit more details to docs

@zentol zentol merged commit e9642f5 into apache:master Jun 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants