-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FLINK-36891: MySQL CDC connector produces corrupted state in case of serialization failure #3794
Conversation
final byte[] result = out.getCopyOfBuffer(); | ||
// optimization: cache the serialized from, so we avoid the byte work during repeated | ||
// serialization | ||
state.serializedFormCache = result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the same chunk of code in the base folder https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java#L83-L104. Should it be fixed too?
assertThrows(IOException.class, () -> serializer.serialize(unsupportedState)); | ||
|
||
final byte[] ser2 = serializer.serialize(state); | ||
assertEquals(ser1.length, ser2.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to assert that ser1
and ser2
are the same, since there shouldn't be any changes to the state if an error happens based on my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason, they aren't the same regardless of the fix.
UPD: you're right. They are equal, they just need to be compared as arrays, not objects.
…serialization failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the output buffer is reused between calls to
serialize()
, it should be cleared in afinally
block to account for potential exceptions.