Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25920] Ignore duplicate EOI in SinkWriter [1.20] #25619

Merged
merged 8 commits into from
Nov 13, 2024

Conversation

AHeise
Copy link
Contributor

@AHeise AHeise commented Nov 7, 2024

Backport of #25292 with minimal changes to imports and tests.

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 7, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@AHeise AHeise changed the title Backport of sink fixes from main to 1.20 [FLINK-25920] Ignore duplicate EOI in SinkWriter [1.20] Nov 7, 2024
UnifiedSinkMigrationITCase assumed that we also commit partial batches of committables. However, that was never the intend and fixed in FLINK-25920. This commit adjusts the test.

(cherry picked from commit 300e1ea)
In some parts of the sink, EOI is treated as checkpointId=null and in some checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a null is valid however the serializer actually breaks then. In practice, checkpointId=MAX is used all the time by accident.

This commit replaces the nullable checkpointIds with a primitive long EOI=MAX, so that we always use the special value instead of null. The serializer already used that value, so it actually simplifies many places and doesn't break any existing state.

(cherry picked from commit c56def0)
Use the proper ObjectAssert as the base for CommittableSummaryAssert and CommittableWithLinageAssert.

(cherry picked from commit ad01d71)
The committer is supposed to commit all committables at once for a given subtask (so that it can potentially optimize committables on the fly). With UCs, we could potentially see notifyCheckpointCompleted before receiving all committables. The CommittableSummary was built and is used to detect that.

So far, we enforced completeness only for the most current committables belonging the respective checkpoint being completed. However, we should also enforce it to all subsumed committables. In fact, we probably implicitly do it but we have the extra code path which allows subsumed committables to be incomplete. This commit simplifies the code a bit by always enforcing completeness.

(cherry picked from commit 1d32f1b)
The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate the state which was never clean. In particular, it also stored the input elements in state until EOI arrived and emitted them all at once. For state restoration tests, we emitted records after EOI arrived.

This commit changed the writer state completely to just capture the record count, which is much more realistic than storing actual payload. The tests now directly assert on the state instead of output.

This commit also introduces an adaptor for serializing basic types in the writer state and replaces the hard-to-maintain SinkAndSuppliers with an InspectableSink in the sink writer tests that require an abstraction on top of the different Sink flavors.

(cherry picked from commit 4217408)
In case of a failure after final checkpoint, EOI is called twice.

SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code.

Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation.

(cherry picked from commit 37e6724)
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.

(cherry picked from commit 6d60f41)
Copy link

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@AHeise AHeise merged commit 1a5e4dc into apache:release-1.20 Nov 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants