Skip to content

Commit

Permalink
[FLINK-25920] Only process complete batches of committables
Browse files Browse the repository at this point in the history
The committer is supposed to commit all committables at once for a given subtask (so that it can potentially optimize committables on the fly). With UCs, we could potentially see notifyCheckpointCompleted before receiving all committables. The CommittableSummary was built and is used to detect that.

So far, we enforced completeness only for the most current committables belonging the respective checkpoint being completed. However, we should also enforce it to all subsumed committables. In fact, we probably implicitly do it but we have the extra code path which allows subsumed committables to be incomplete. This commit simplifies the code a bit by always enforcing completeness.
  • Loading branch information
AHeise committed Sep 17, 2024
1 parent ad01d71 commit 1d32f1b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ private void commit(long checkpointId) throws IOException, InterruptedException
sinkV1State = globalCommitter.commit(sinkV1State);
}
for (CheckpointCommittableManager<CommT> committable : getCommittables(checkpointId)) {
boolean fullyReceived = committable.getCheckpointId() == lastCompletedCheckpointId;
committable.commit(fullyReceived, committer);
committable.commit(committer);
}
}

Expand All @@ -186,7 +185,7 @@ public void endInput() throws Exception {
committableCollector.getEndOfInputCommittable();
if (endOfInputCommittable != null) {
do {
endOfInputCommittable.commit(false, committer);
endOfInputCommittable.commit(committer);
} while (!committableCollector.isFinished());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,23 @@ public void endInput() throws Exception {
endInput = true;
if (!isCheckpointingEnabled || isBatchMode) {
// There will be no final checkpoint, all committables should be committed here
notifyCheckpointComplete(EOI);
commitAndEmitCheckpoints();
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (endInput) {
// This is the final checkpoint, all committables should be committed
lastCompletedCheckpointId = Long.MAX_VALUE;
} else {
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
}
lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId);
commitAndEmitCheckpoints();
}

private void commitAndEmitCheckpoints() throws IOException, InterruptedException {
long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
do {
for (CheckpointCommittableManager<CommT> manager :
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
// wait for all committables of the current manager before submission
boolean fullyReceived =
!endInput && manager.getCheckpointId() == lastCompletedCheckpointId;
commitAndEmit(manager, fullyReceived);
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
commitAndEmit(manager);
}
// !committableCollector.isFinished() indicates that we should retry
// Retry should be done here if this is a final checkpoint (indicated by endInput)
Expand All @@ -185,10 +178,9 @@ private void commitAndEmitCheckpoints() throws IOException, InterruptedException
}
}

private void commitAndEmit(CommittableManager<CommT> committableManager, boolean fullyReceived)
private void commitAndEmit(CommittableManager<CommT> committableManager)
throws IOException, InterruptedException {
Collection<CommittableWithLineage<CommT>> committed =
committableManager.commit(fullyReceived, committer);
Collection<CommittableWithLineage<CommT>> committed = committableManager.commit(committer);
if (emitDownstream && !committed.isEmpty()) {
output.collect(new StreamRecord<>(committableManager.getSummary()));
for (CommittableWithLineage<CommT> committable : committed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ boolean isFinished() {
}

@Override
public Collection<CommittableWithLineage<CommT>> commit(
boolean fullyReceived, Committer<CommT> committer)
public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer)
throws IOException, InterruptedException {
Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(fullyReceived);
Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(true);
requests.forEach(CommitRequestImpl::setSelected);
committer.commit(new ArrayList<>(requests));
requests.forEach(CommitRequestImpl::setCommittedIfNoError);
Expand All @@ -132,9 +131,9 @@ public Collection<CommittableWithLineage<CommT>> commit(
return committed;
}

Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean fullyReceived) {
Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) {
return subtasksCommittableManagers.values().stream()
.filter(subtask -> !fullyReceived || subtask.hasReceivedAll())
.filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll())
.flatMap(SubtaskCommittableManager::getPendingRequests)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ public interface CommittableManager<CommT> {
CommittableSummary<CommT> getSummary();

/**
* Commits all due committables.
* Commits all due committables if all respective committables of the specific subtask and
* checkpoint have been received.
*
* @param fullyReceived only commit committables if all committables of this checkpoint for a
* subtask are received
* @param committer used to commit to the external system
* @return successfully committed committables with meta information
* @throws IOException
* @throws InterruptedException
*/
Collection<CommittableWithLineage<CommT>> commit(
boolean fullyReceived, Committer<CommT> committer)
Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committer)
throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,21 @@ void testCommit() throws IOException, InterruptedException {

final Committer<Integer> committer = new NoOpCommitter();
// Only commit fully received committables
Collection<CommittableWithLineage<Integer>> commitRequests =
checkpointCommittables.commit(true, committer);
assertThat(commitRequests)
assertThat(checkpointCommittables.commit(committer))
.hasSize(1)
.satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(3));

// Even on retry
assertThat(checkpointCommittables.commit(committer)).isEmpty();

// Add missing committable
checkpointCommittables.addCommittable(new CommittableWithLineage<>(5, 1L, 2));
// Commit all committables
commitRequests = checkpointCommittables.commit(false, committer);
assertThat(commitRequests)
.hasSize(1)
.satisfiesExactly(c -> assertThat(c.getCommittable()).isEqualTo(4));
assertThat(checkpointCommittables.commit(committer))
.hasSize(2)
.satisfiesExactly(
c -> assertThat(c.getCommittable()).isEqualTo(4),
c -> assertThat(c.getCommittable()).isEqualTo(5));
}

@Test
Expand Down

0 comments on commit 1d32f1b

Please sign in to comment.