-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12612][coordination] Track stored partition on the TaskExecutor #8687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| * | ||
| * @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. | ||
| */ | ||
| int start() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make JobAwareShuffleEnvironment extends ShuffleEnvironment then some methods existing in ShuffleEnvironment do not need to be defined here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no, then we'd introduce the risk that someone uses the wrong method, like createPartitionWriters without the JobID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, if we were to introduce a JobID parameter into ShuffleEnvironment#createResultPartitionWriters we could probably just extend it. Long-term we need releasePartition methods without a JobID parameter anyway for global partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, my previous thought was putting JobID into shuffle context which was proposed by Andrey in another PR via ShuffleEnvironment#createResultPartitionWriters(context), then in JobAwareShuffleEnvironmentImpl it could get job id from context to avoid two createResultPartitionWriters methods separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we could also pass the JobID into the context.
For the time-being I'll update the PR so that JobAwareShuffleEnvironmentImpl extends ShuffleEnvironment, add add a createResultPartitionWriters method that accepts a JobID.
Once the context is merged we can migrate to that.
| * @param buffersGroup shuffle specific group for buffer metrics | ||
| * @return collection of the task's {@link InputGate}s | ||
| */ | ||
| Collection<G> createInputGates( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: could be removed if this interface extends ShuffleEnvironment
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
|
Note that the commit about maintaining the jobmanager connection must not be merged until the release logic on the JM is in place, as without these the TM would never terminate the connection. |
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public boolean hasPartitionsOccupyingLocalResources(JobID jobId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method has the similar function with below getPartitionsOccupyingLocalResources which might be used for reporting collections of unreleased partitionIDs in heartbeat with ResourceManager.
Is it possible to remove this method and implement the getPartitionsOccupyingLocalResources based on inProgressPartitionTable and finishedPartitionTable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd still need separate methods. This method here is about having partitions for a specific job, while getPartitionsOccupyingLocalResources() checks across jobs.
Additionally, we don't track all partitions in the JobAwareShuffleEnvironment; only those that are externally managed, so the semantics are a bit different.
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
| * @return collection of the task's {@link ResultPartitionWriter}s | ||
| */ | ||
| Collection<P> createResultPartitionWriters( | ||
| JobID jobId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #8485 , Andrey introduced shuffle context to cover taskName, executionAttemptID and MetricGroup. If this context could also cover jobId then we could reduce this method definition as now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this make the JobID part of the ShuffleService interfaces? I thought that we wanted to keep this out of what the user needs to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's the main argument against doing so. It'S a lose-lose situation, as we either
a) introduce a jobID into the shuffle service API, which we don't really want
b) extend the ShuffleService interface, overload createResultPartitionWriters with a variant that accepts a JobID, and have the other method fail with an exception
c) duplicate most of the interface to prevent downside of b.
| MemoryManager memManager, | ||
| IOManager ioManager, | ||
| ShuffleEnvironment<?, ?> shuffleEnvironment, | ||
| JobAwareShuffleEnvironment<?, ?> shuffleEnvironment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could still reference ShuffleEnvironment here if making JobAwareShuffleEnvironment extend ShuffleEnvironment as mentioned above.
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImplTest.java
Outdated
Show resolved
Hide resolved
|
@zentol I have finished reviewing the whole PR, thanks for the updates! |
aeb8722 to
097af6b
Compare
|
rebase on #8789. |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @zentol. Here is the first half of comments I had. I'm posting them now because I fear to lose them if I refresh my browser.
| if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() && !shuffleEnvironment.hasPartitionsOccupyingLocalResources(jobId)) { | ||
| // we can remove the job from the job leader service | ||
| try { | ||
| jobLeaderService.removeJob(jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the jobLeaderService if there are only partitions but no slots allocated to a given job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure; I'm just delayed all operations that were done before.
For the time being we could probably remove the job from the jobLeaderSerbice once the last slot was freed. After all, if a jobmaster failover occurs we remove all partitions anyway.
Long-term though, once we introduce a grace-period for reconnects and/or failovers, I would've expected that we want to continue to observe for leader changes.
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
Outdated
Show resolved
Hide resolved
| * @return collection of the task's {@link ResultPartitionWriter}s | ||
| */ | ||
| Collection<P> createResultPartitionWriters( | ||
| JobID jobId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this make the JobID part of the ShuffleService interfaces? I thought that we wanted to keep this out of what the user needs to implement.
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/NotifyingResultPartitionWriter.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished my pass through the code. I think the abstraction make sense and decouple the concerns of tracking from the ShuffleEnvironment nicely.
I think we need to fix the concurrency issues in the tests. In general I would suggest to not modify state of an RpcEndpoint from the test thread. This can lead to hard to figure out test instabilities.
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
...-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImplTest.java
Outdated
Show resolved
Hide resolved
...untime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
Outdated
Show resolved
Hide resolved
...untime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Releases local resources occupied with the given partitions. | ||
| */ | ||
| void releaseFinishedPartitions(JobID jobId, Collection<ResultPartitionID> partitionIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering what effort it would be to keep track of the ResultPartitionID -> JobID mapping inside of the JobAwareShuffleEnvironmentImpl? If we would do this, then we would not have to introduce this method and we could have kept TaskExecutorGateway#releasePartitions. Now, the need for the JobID when releasing partitions needs to be looped through multiple components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh we could certainly do that, the introduction of the JobID parameter was never out of necessity, but purely so we don't have to maintain an extra mapping. The JobID argument has the nice side-effect that you don't have to check the JobID for every single partition.
f3598f1 to
ab8e404
Compare
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @zentol. I left some comments to consider.
In general, it looks like after introducing the termination future, a simplified approach can be also considered. The TM partition tracking could rely on this termination future Instead of relying on listeners which might also put the need of shuffle environment wrapping under question. TaskExecutor.submitTask could register tracked partitions and their release in the callback for the termination future.
Also the distinguishing between in-progress and finished partitions does not look to be needed. see inline comments.
...rc/main/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironment.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/taskexecutor/partition/JobAwareShuffleEnvironmentImpl.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
Outdated
Show resolved
Hide resolved
2d5e9e7 to
ab8e404
Compare
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zentol, LGTM 👍, I left minor comments to look into before merging.
Also we can consider a separate partition lifecycle manager component to move there all introduced here code from the TE at some point.
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Show resolved
Hide resolved
...ntime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
Outdated
Show resolved
Hide resolved
| @ThreadSafe | ||
| public class PartitionTable { | ||
|
|
||
| private final Map<JobID, Set<ResultPartitionID>> trackedPartitionsPerJob = new ConcurrentHashMap<>(8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also consider per task ExecutionAttemptID mapping to support release by task id, then external from JM and connection loss release could be unified with scheduling the release per task the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be done yes. I'd revisit this once everything is in place and we can better gauge what would be the most appropriate mappings.
Introduces partition tracking on the TaskExecutor. We start tracking partitions after a Task has been started. Partitions are only tracked iff. a) they are not released on consumption (in this case the ShuffleEnvironment takes care of that automatically) b) they have local resources (as otherwise there's nothing to release) Partitions are tracked using a PartitionTable. If a task fails we stop tracking any partitions for this task. No release calls will be issued, as the task will fail their partition, which will automatically release them. If task reaches the state FINISHED, no additional action is required. Partitions will continue to be tracked until they are released at a later point. The TaskExecutor is informed about the task termination via the tasks termination future, which also contains the final execution state. Partitions for finished tasks are only released in 2 cases: a) An external call to TaskExecutor#releasePartitions is made. b) The connection to a JobManager terminates for whatever reason. For a) we stop tracking the given partitions and forward a release call to the ShuffleEnvironment. No sanity checks are made; we assume that the whoever issued the release call has a reason to do so. For b) we determine all partitions associated with the job corresponding to the terminated connection, stop tracking them and issue a release call to the ShuffleEnvironment. No additional cleanup is performed on shutdown, as the ShuffleEnvironment implicitly releases all partitions requiring it.
…ns are released Whenever a slot is freed the TaskExecutor was checking whether any other slots were allocated for a given job, and if this isn't the case disconnects from the corresponding JobMaster. It now additionally takes into account whether we still have any tracked partitions for the given job. This check is also performed whenever an partition was released due to TaskExecutor#releasePartitions being called.
Based on
#8630,#8654and#8680.What is the purpose of the change
Introduces partition tracking on the TaskExecutor.
We start tracking partitions after a Task has been started. Partitions
are only tracked iff.
a) they are not released on consumption (in this case the ShuffleEnvironment takes care of that automatically)
b) they have local resources (as otherwise there's nothing to release)
Partitions are tracked using a PartitionTable.
If a task fails we stop tracking any partitions for this task. No release calls will be issued, as
the task will fail their partition, which will automatically release them.
If task reaches the state FINISHED, no additional action is required. Partitions will continue to be tracked
until they are released at a later point.
The TaskExecutor is informed about the task termination via the tasks termination future, which also contains
the final execution state.
Partitions for finished tasks are only released in 2 cases:
a) An external call to TaskExecutor#releasePartitions is made.
b) The connection to a JobManager terminates for whatever reason.
For a) we stop tracking the given partitions and forward a release call to the ShuffleEnvironment.
No sanity checks are made; we assume that the whoever issued the release call has a reason to do so.
For b) we determine all partitions associated with the job corresponding to the terminated connection,
stop tracking them and issue a release call to the ShuffleEnvironment.
No additional cleanup is performed on shutdown, as the ShuffleEnvironment implicitly releases all
partitions requiring it.
Whenever a slot is freed the TaskExecutor was checking whether any other slots were allocated for a given job,
and if this isn't the case disconnects from the corresponding JobMaster.
It now additionally takes into account whether we still have any tracked partitions for the given job.
This check is also performed whenever an partition was released due to TaskExecutor#releasePartitions being called.