-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-25920] Ignore duplicate EOI in SinkWriter #25292
Conversation
61b53e2
to
0587c70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this issue.
The refactoring of the checkpoint id is very nice 👍
Regarding the actual fix I am not fully convinced yet. Even if persisting the information on EOI works it feels slightly off.
WDYT about not sending the commitable summary on EOI if the summary is empty? It seems the much easier approach.
|
||
// use union state to ensure that rescaling works correctly | ||
this.endOfInputState = | ||
context.getOperatorStateStore().getUnionListState(END_OF_INPUT_STATE_DESC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need help understanding using the state.
I would expect that after receiving EOI we can also not persist anything anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can and need to. Think about the committables inside the CommitterOperator. We absolutely need to track them. EOI just means processElement
isn't called again and you can't call collector afterwards.
...time/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @AHeise
I've left some comments, PTAL.
Besides of that, I think CompactorOperator.java
needs to be adjusted:
- - this code path still emits null
Line 252 in 277706d
checkpointId, @Nullable
)Line 261 in 277706d
checkpointId, @Nullable
)
Also, could you clarify what is the exact path when a duplicate summary for the same checkpoint was emitted? IIUC, the 2nd code path is the "normal" endInput
by SinkWriterOperator
for then bounded input. But what is the 1st path?
...time/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Outdated
Show resolved
Hide resolved
Let's first recap why we need to emit in EOI at all: For streaming jobs, emitting committables on barrier is the correct behavior. For batch, emitting on EOI. However, there is an additional case around streaming with bounded input and final checkpoints. The operator first receives an EOI and then the final barrier. After EOI, the operator is not allowed to emit another record. So just for this case, streaming also needs to emit on EOI and suppress on barrier. Now the question is what happens on failure after final checkpoint. Logically speaking, the EOI before the checkpoint should influence the operator state in the checkpoint in such a way that after recover we are still not allowed to emit records. For sinks that means that all committables have already been transferred to the committer operator for the snapshot. For technical reasons, we still receive a second EOI after recovery. Logically, the first EOI should have sufficed. I'm assuming Flink does the second EOI mostly for channel management but maybe the proper fix is actually not calling EOI twice. @pnowojski could you PTAL? Roman, you raise good questions concerning rescaling and more. The current implementation assumes that EOI will be received by all subtasks at the same logical time (e.g. on final checkpoint). Can we have instances where some subtasks shutdown earlier? |
0587c70
to
3965a8f
Compare
OptionalLong checkpointId = element.getValue().getCheckpointId(); | ||
if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) { | ||
long checkpointId = element.getValue().getCheckpointIdOrEOI(); | ||
if (checkpointId <= lastCompletedCheckpointId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kinda of a change in the semantics on first glance. However, we should not receive any elements after EOI, so this code path is actually never triggered and now it's simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand the comment.
- Why is it a change in semantics? The condition looks the same the only case that is removed is a committable without checkpoint. Which scenario was this before the change?
- I thought that for the final checkpoint, we would first receive EOI and then do a final checkpoint. This would mean the committer receives data after EOI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we mean the same thing. For EOI committables, this check will always yield false. Even without the special case on EOI.
3965a8f
to
a5934b8
Compare
boolean fullyReceived = | ||
!endInput && manager.getCheckpointId() == lastCompletedCheckpointId; | ||
commitAndEmit(manager, fullyReceived); | ||
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fapaul please double-check why we did it in this complicated manner originally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On first glance this change doesn't look correct.
By removing fullyReceived
, can we now commit committables that are from the "current" checkpoint but on receival from a delayed notifyCheckpointComplete from a previous checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case you haven't seen it: fullyReceived
is now implicitly always true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, fullyReceived should always be true because we always want to have complete batches. For earlier checkpoints, partially committed batches are still considered fullyReceived as long as all committables arrived at some point (fullyReceived === (#pending + #completed + #failed == #expected)
)
...he/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
Show resolved
Hide resolved
a5934b8
to
c3827c1
Compare
I addressed your comments and took a different approach to state management (it looks similar but is conceptually rather different). I restructured the PR and added 2 more commits to it. Unfortunately, the fixups looked then rather messy so I decided to force push everything again. So I'm sorry, but you more or less have to review again ;). Btw I figured that the EOI changes were a breaking change of the experimental CommittableMessage and made them non-breaking instead. So the commit is essentially the same but looks a bit different. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good but I left a few comments which we should answer.
assert checkpointRequests.isEmpty(); | ||
|
||
getAllTasksFuture().join(); | ||
emitCompacted(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik yes. We pass the param directly to both CommittableMessage
s. The serializer replaces null
with EOI
. So it effectively results in the same bytes.
@@ -91,13 +90,13 @@ public CommittableMessage<CommT> deserialize(int version, byte[] serialized) | |||
return new CommittableWithLineage<>( | |||
SimpleVersionedSerialization.readVersionAndDeSerialize( | |||
committableSerializer, in), | |||
readCheckpointId(in), | |||
in.readLong(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need consider migration cases from SinkV1 where afaik the checkpointId is always null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for challenging that. It's one of the parts where most uncertainty still resides. But let's look again on what is happening and has happened:
- On write: we replaced null with EOI, now we get EOI and write it; so it should be the same byte sequence.
- On read: we have always used
readLong
which is not null-aware. We have replaced EOI with null.
So for serialization nothing has changed. However, around serialization we replace null with EOI in all instances of the Message.
That should be safe to change without migration, no?
For compatibility, I left getCheckpointId the same, so it will return an OptionalLong.empty() on EOI where it previously returned it on null. I have not found any usage of the method outside of Flink anyhow.
OptionalLong checkpointId = element.getValue().getCheckpointId(); | ||
if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) { | ||
long checkpointId = element.getValue().getCheckpointIdOrEOI(); | ||
if (checkpointId <= lastCompletedCheckpointId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand the comment.
- Why is it a change in semantics? The condition looks the same the only case that is removed is a committable without checkpoint. Which scenario was this before the change?
- I thought that for the final checkpoint, we would first receive EOI and then do a final checkpoint. This would mean the committer receives data after EOI.
@@ -147,15 +146,16 @@ Collection<CommittableWithLineage<CommT>> drainFinished() { | |||
} | |||
|
|||
CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) { | |||
checkArgument(Objects.equals(other.checkpointId, checkpointId)); | |||
checkArgument(other.checkpointId == checkpointId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This change should probably go into the commit, changing the type of the checkpoint fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll try to move it.
boolean fullyReceived = | ||
!endInput && manager.getCheckpointId() == lastCompletedCheckpointId; | ||
commitAndEmit(manager, fullyReceived); | ||
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On first glance this change doesn't look correct.
By removing fullyReceived
, can we now commit committables that are from the "current" checkpoint but on receival from a delayed notifyCheckpointComplete from a previous checkpoint.
...time/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Outdated
Show resolved
Hide resolved
this.endOfInput = !previousState.isEmpty() && !previousState.contains(false); | ||
sinkWriter = | ||
this.endOfInput | ||
? new ClosedWriter<>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, this probably leaves an unclean state from the SinkWriter when it previously crashed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a fixup commit. PTAL.
@@ -178,7 +214,7 @@ public void processElement(StreamRecord<InputT> element) throws Exception { | |||
@Override | |||
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { | |||
super.prepareSnapshotPreBarrier(checkpointId); | |||
if (!endOfInput) { | |||
if (!this.endOfInput) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Why this
here we do not use inside the other methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the fix should work and I don't see any issues with it.
...time/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
Show resolved
Hide resolved
c3827c1
to
6470f83
Compare
I added the fixup commits inline. PTAL. |
a8d3ff9
to
aaac5b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for walking me through the changes offline and discussing the last open points 👍
47edbfb
to
2633be2
Compare
In some parts of the sink, EOI is treated as checkpointId=null and in some checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a null is valid however the serializer actually breaks then. In practice, checkpointId=MAX is used all the time by accident. This commit replaces the nullable checkpointIds with a primitive long EOI=MAX, so that we always use the special value instead of null. The serializer already used that value, so it actually simplifies many places and doesn't break any existing state.
Remove the side-effect and create a new (rather cheap) instance of the managers.
Use the proper ObjectAssert as the base for CommittableSummaryAssert and CommittableWithLinageAssert.
The committer is supposed to commit all committables at once for a given subtask (so that it can potentially optimize committables on the fly). With UCs, we could potentially see notifyCheckpointCompleted before receiving all committables. The CommittableSummary was built and is used to detect that. So far, we enforced completeness only for the most current committables belonging the respective checkpoint being completed. However, we should also enforce it to all subsumed committables. In fact, we probably implicitly do it but we have the extra code path which allows subsumed committables to be incomplete. This commit simplifies the code a bit by always enforcing completeness.
The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate the state which was never clean. In particular, it also stored the input elements in state until EOI arrived and emitted them all at once. For state restoration tests, we emitted records after EOI arrived. This commit changed the writer state completely to just capture the record count, which is much more realistic than storing actual payload. The tests now directly assert on the state instead of output. This commit also introduces an adaptor for serializing basic types in the writer state and replaces the hard-to-maintain SinkAndSuppliers with an InspectableSink in the sink writer tests that require an abstraction on top of the different Sink flavors.
In case of a failure after final checkpoint, EOI is called twice. SinkWriter should ignore the second call to avoid emitting more dummy committables = transactional objects containing no data since no data can arrive when recovering from final checkpoint. The commit uses a boolean list state to remember if EOI has been emitted. The cases are discussed in code. Since rescaling may still result in these dummy committables, the committer needs merge them into the CommittableCollector as these committables still need to be committed as systems like Kafka don't provide transactional isolation.
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.
2633be2
to
6ebfe41
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed commit for Fix AbstractStreamingWriter sending after EOI
// the writer and potentially emit duplicate summaries if we indeed recovered from a | ||
// final checkpoint. | ||
endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); | ||
List<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we use List.of
here and avoid using the shaded guava dependency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure if you drop Java8 support first ;).
buckets.onProcessingTime(Long.MAX_VALUE); | ||
helper.snapshotState(Long.MAX_VALUE); | ||
output.emitWatermark(new Watermark(Long.MAX_VALUE)); | ||
commitUpToCheckpoint(Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the EOI
variable instead of Long.MAX_VALUE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that but decided against it. This is not related to CommittableSummary directly, so it would feel weird to import it just for the EOI. And someone wise said that this class is deprecated anyways, so I didn't want to change too much.
@flinkbot run azure |
What is the purpose of the change
[FLINK-25920]
In case of a failure after final checkpoint, EOI is called twice. SinkWriter should ignore the second call to avoid emitting duplicate committables. This commit uses a union state to remember that EOI happened and suppress additional handling.
Brief change log
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation