Skip to content

Commit

Permalink
[hotfix] Fix UnifiedSinkMigrationITCase
Browse files Browse the repository at this point in the history
UnifiedSinkMigrationITCase assumed that we also commit partial batches of committables. However, that was never the intend and fixed in FLINK-25920. This commit adjusts the test.
  • Loading branch information
AHeise committed Sep 23, 2024
1 parent 8127f4f commit 300e1ea
Showing 1 changed file with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ private static class TestWriter implements SinkWriter<Long, Integer, Integer> {
@Override
public void write(Long element, Context context) throws IOException, InterruptedException {}

/** Creates two committables on the very first checkpoint. */
@Override
public List<Integer> prepareCommit(boolean flush) throws IOException, InterruptedException {
if (emitted || recovered) {
Expand Down Expand Up @@ -265,16 +266,27 @@ private static class TestCommitter implements Committer<Integer> {
this.commitLatch = commitLatch;
}

/**
* On first attempt: send GLOBAL_COMMITTER_STATE downstream, keep COMMITTER_STATE for retry.
* On second attempt: keep COMMITTER_STATE for retry on first checkpoint, then send
* downstream. Only then, global committer should be triggered.
*/
@Override
public List<Integer> commit(List<Integer> committables)
throws IOException, InterruptedException {
if (firstCommit && !recovered) {
assertThat(committables).containsExactly(COMMITTER_STATE, GLOBAL_COMMITTER_STATE);
} else {
assertThat(committables).containsExactly(COMMITTER_STATE);
if (firstCommit) {
if (!recovered) {
assertThat(committables)
.containsExactly(COMMITTER_STATE, GLOBAL_COMMITTER_STATE);
} else if (recovered) {
assertThat(committables).containsExactly(COMMITTER_STATE);
}
}
LOG.info("Committing {}", committables);
commitLatch.get().countDown();
if (recovered && !firstCommit) {
return Collections.emptyList();
}
firstCommit = false;
// Always retry to keep the state
return Collections.singletonList(COMMITTER_STATE);
Expand Down Expand Up @@ -317,6 +329,7 @@ public String combine(List<Integer> committables) throws IOException {
return String.valueOf(committables.get(0));
}

/** Wait for all committables (after recovery on second checkpoint). */
@Override
public List<String> commit(List<String> globalCommittables)
throws IOException, InterruptedException {
Expand All @@ -329,7 +342,7 @@ public List<String> commit(List<String> globalCommittables)
.containsExactly(String.valueOf(GLOBAL_COMMITTER_STATE));
}
firstCommitAfterRecover = false;
return globalCommittables;
return recover ? Collections.emptyList() : globalCommittables;
}

@Override
Expand Down

0 comments on commit 300e1ea

Please sign in to comment.