-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-12615][coordination] Track partitions on JM #8778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| IntermediateResultPartition[] allPartitions = finishedPartition | ||
| .getIntermediateResult().getPartitions(); | ||
|
|
||
| PartitionTable<ResourceID> partitionTable = vertex.getExecutionGraph().getPartitionTable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider introducing a separate component something like PartitionTracker or manager which would also encapsulate ShuffleMaster (like JobAwareShuffleEnvironmentImpl). It could be responsible for doing what we have in this PR and maybe even register with the ShuffleMaster and track producedPartitions by execution id as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we could do that. Not sure whether we should do this now or defer this to a later point. Currently we'd just end up wrapping the partition table and mirroring the API.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
Outdated
Show resolved
Hide resolved
zhijiangW
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @zentol !
I have finished reviewing the core codes and left some comments. I would further check the tests later.
|
@zhijiangW I don't see any comment of yours, did something go wrong when submitting the review? |
|
I am not sure why they are not shown outside, but inline codes we could see these comments. I would double check and submit them again if necessary. |
|
I can only see andreys comments :( |
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
| final Optional<ResourceID> resourceIdOptional = slotPool.failAllocation(allocationId, cause); | ||
| resourceIdOptional.ifPresent(this::releaseEmptyTaskManager); | ||
| resourceIdOptional.ifPresent(taskManagerId -> { | ||
| if (!partitionTable.hasTrackedPartitions(taskManagerId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if we put the if (!partitionTable.hasTrackedPartitions(taskManagerId)) inside the method of releaseEmptyTaskManager, we could reuse it if release might be reused in other parts future.
|
I entered the commit ids in above brief change log and left inline comments before. So these comments would only be seen inside them. I just re-submitted them in common way. |
|
rebase on #8789. |
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @zentol , I had only minor comments atm
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
ab52653 to
5b237fe
Compare
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
Outdated
Show resolved
Hide resolved
|
|
||
| if (newJobStatus.isGloballyTerminalState()) { | ||
| // other terminal job states are handled by the executions | ||
| if (newJobStatus == JobStatus.FINISHED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, I think we can only consider FINISHED due to that when transit to CANCELED and FAILED , all execution vertices will be canceled and the thus the result partition will be canceled via sendReleaseIntermediateResultPartitionsRpcCall.
However, with region failover, an execution may need to re-run after it turns into FINISHED. This happens for cases like A -> C, B->C, A and B are all finished but C fails to read data from A, then B also need to re-execute. When the execution vertex is resetForNewExecution, the result partition of previous execution is not released. Then when we cancel or failGlobal on the ExecutionGraph, it will also only consider the last execution, then the result partitions of the previous executions may not get released explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failover scenarios are not covered by this PR; it only adds a bare-bones implementation for cleaning up after the job terminates.
You are correct that we have to release partitions for reset vertices, which will be added in a follow-up.
b53dad9 to
8494e4f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that forcePartitionReleaseOnConsumption should actually go to ResultPartitionDeploymentDescriptor, not to ResultPartitionFactory:
this.releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING || forcePartitionReleaseOnConsumption;
otherwise we always activate releasedOnConsumption for blocking,
what is the point of forcePartitionReleaseOnConsumption in ResultPartitionFactory?
and we basically track/release them for nothing in JM by default after FLINK-12615
| } | ||
|
|
||
| private void startTrackingUnreleasedPartitions(final Collection<IntermediateResultPartition> partitions) { | ||
| PartitionTable<ResourceID> partitionTable = vertex.getExecutionGraph().getPartitionTable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inconsistent use of final
| for (Map.Entry<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> entry : registeredTaskManagers.entrySet()) { | ||
| Collection<ResultPartitionID> storedPartitions = partitionTable.stopTrackingPartitions(entry.getKey()); | ||
| // if this call fails TaskExecutors will cleanup partitions regardless once we close the connections | ||
| entry.getValue().f1.releasePartitions(jobGraph.getJobID(), storedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about ShuffleMaster.releasePartitionExternally plans here? as discussed, we will need it in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, I got stuck here since the SM requires a ShuffleDescriptor, but so far we've only been tracking ResultPartitionIDs.
I have updated the PR to include a PartitionTracker which keeps track of shuffle descriptor, result partition ids etc. and acts as a central point for issuing release calls.
0be4d1d to
67417d3
Compare
| /** | ||
| * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. | ||
| */ | ||
| public class PartitionTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should introduce an interface for this
10416a0 to
cf7c5a9
Compare
| IntermediateResultPartitionID id) { | ||
| return Optional.ofNullable(producedPartitions.get(id)); | ||
| Collection<ResultPartitionDeploymentDescriptor> trackedDeploymentDescriptors = vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId()); | ||
| for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate this bit, but really don't want to introduce an additional map just to support this lookup. Any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not really see a problem atm to have it in InternalPartitionInfo with a similar add partition method and Collection -> Map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyways I think the method rather belongs to the tracker with the exec id added to the signature
azagrebin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zentol ! The tracker looks good. I left some comments to resolve before merging.
| /** | ||
| * Lookup function for {@link TaskExecutorGateway}. | ||
| */ | ||
| interface TaskExecutorGatewayLookup { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FunctionalInterface
| /** | ||
| * Factory for {@link PartitionTracker}. | ||
| */ | ||
| public interface PartitionTrackerFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| private final JobID jobId; | ||
|
|
||
| private final Map<ExecutionAttemptID, InternalPartitionInfo> partitionInfoByProducer = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer all initialisations in the constructor
|
|
||
| private final ShuffleMaster<?> shuffleMaster; | ||
|
|
||
| private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
also any value in explicit prefixing over a static import of PartitionTrackerFactory?
| final ProducerDescriptor producerDescriptor, | ||
| final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { | ||
|
|
||
| CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any value in this local var?
| * | ||
| * @param producerId producer id to determine relevant partitions | ||
| */ | ||
| void stopTrackingAndReleasePartitions(final ExecutionAttemptID producerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atm it is stop all for exec id and release what needs release.
I think comment needs adjustment and function could be called:
stopTrackingAndReleasePartitionsForExecution
same for the next method, stop all for task executor and release what needs release:
stopTrackingAndReleasePartitionsOnTaskExecutor (release all is a bit confusing)
| * @param taskExecutorId id of the task executor to look up. | ||
| * @return optional task executor gateway | ||
| */ | ||
| Optional<TaskExecutorGateway> lookup(ResourceID taskExecutorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer a partition releaser:
interface TaskExecutorPartitionReleaser {
void releasePartitions(ResourceID taskExecutorId, JobID jobId, Collection<ResultPartitionID> partitionIds);
}
could also return boolean whether TE found, although not used anyways
do we need the full TaskExecutorGateway for something?
| IntermediateResultPartitionID id) { | ||
| return Optional.ofNullable(producedPartitions.get(id)); | ||
| Collection<ResultPartitionDeploymentDescriptor> trackedDeploymentDescriptors = vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId()); | ||
| for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyways I think the method rather belongs to the tracker with the exec id added to the signature
| Logger log, | ||
| ShuffleMaster<?> shuffleMaster) throws JobExecutionException, JobException { | ||
| PartitionTracker partitionTracker) | ||
| throws JobExecutionException, JobException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep formatting
| } | ||
| } | ||
| final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); | ||
| partitionTracker.stopTrackingAndReleasePartitions(getAttemptId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we are still doing it in cancel and and suspend
| // other terminal job states are handled by the executions | ||
| if (newJobStatus == JobStatus.FINISHED) { | ||
| runAsync(() -> { | ||
| for (Map.Entry<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> entry : registeredTaskManagers.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registeredTaskManagers.keySet().forEach(partitionTracker::stopTrackingAndReleaseAllPartitions);
Based on #8687 . When reviewing this PR, please ignore all commits preceding
"[FLINK-12615][coordination] Support generic key in PartitionTable".
What is the purpose of the change
With this PR we track on the JobMaster side which partitions are still on task executors, maintain the connection to a task executor until all partitions on it have been released, and issue a release call for all partitions when a job terminates.
Attention: The release logic in PR only serves as a MVP. It ignores the shuffle master (and thus will not work with other shuffle service implementation) and is not viable for high-volume jobs (because it requires all blocking partitions to be persisted on disk until the job is finished).
Brief changelog
6009f00 modifies the PartitionTable to support arbitrary keys; so far it was hard-wired to use JobIDs as keys for use on the TE side, but now we want to organize them by ResourceID (== TE ID) instead.
c7193dc introduces a PartitionTable into the JM, Scheduler and EG. This commit only modifies constructor-related methods and does not contain any actual logic. This commit only exists to make reviewing easier.
ea418da contains the tracking logic. Partitions are added to the table, grouped by task executor, by an Execution when it reaches the state FINISHED, and removed again by the JM when disassociating from a TE.
3087792 introduces a call to TE#releasePartitions for when a job is finished.
d5a9f3c finally modifies the check for empty taskmanagers to take partitions into account.