Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][mongo] Fix MongoDB's rewriteOutputBuffer to emit +I rather than +U(or -U) #2760

Merged
merged 1 commit into from
Nov 29, 2023

Conversation

loserwang1024
Copy link
Contributor

Fix #2759

@loserwang1024
Copy link
Contributor Author

@Jiabao-Sun , @Shawn-Hx , CC

@Jiabao-Sun
Copy link
Contributor

+U and +I should have the same processing in the external system.
Please give an example to explain the specific impact?

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Nov 27, 2023

+U and +I should have the same processing in the external system

@Jiabao-Sun , this +U is without same key data(+I) before, which is not required the changelog semantic. What happens if the downstream need the data before when process +U record?

I wonder whether a +U (without +I before) will cause other problems in streaming compute or not? @leonardBang , CC , WDYT?

@loserwang1024
Copy link
Contributor Author

@Jiabao-Sun , moreover, if enableFullDocPrePostImage = true, also will use MongoDBConnectorFullChangelogDeserializationSchema and emit -U without +I before.

@Jiabao-Sun
Copy link
Contributor

@Jiabao-Sun , moreover, if enableFullDocPrePostImage = true, also will use MongoDBConnectorFullChangelogDeserializationSchema and emit -U without +I before.

-U and +U contains the same key, it should be covered by +U ?

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Nov 27, 2023

-U and +U contains the same key, it should be covered by +U ?

@Jiabao-Sun , MongoDBFetchTaskContext#rewriteOutputBuffer method just covered with modify opration(it may contains +U or -U&+U, depending on config "scan.full-changelog"). Then MongoDBConnectorDeserializationSchema#deserialize will only get +U from it , while MongoDBConnectorFullChangelogDeserializationSchema#deserialize will get +U and -U from it

@loserwang1024 loserwang1024 changed the title [Bug][mongo] Fix MongoDB's rewriteOutputBuffer to emit +I rather than +U [Bug][mongo] Fix MongoDB's rewriteOutputBuffer to emit +I rather than +U(or -U) Nov 28, 2023
@Shawn-Hx
Copy link
Contributor

I agree with @loserwang1024. Logically, data before highwatermark are snapshot data, which should be seen as +I. Otherwise the downstream operators may see a +U/-U record without +I before. I'm not sure whether this behavior may cause data incorrectness or not, but using +I is correct with no doubt. @Jiabao-Sun

@Jiabao-Sun
Copy link
Contributor

OK, @loserwang1024 could you help add some tests for this?

@loserwang1024
Copy link
Contributor Author

loserwang1024 commented Nov 28, 2023

@Jiabao-Sun , thanks a lot. It's not easy to add test now.
In #2553, I add hooks to do database operations during snapshot split code and also add test in it. However, that PR depends on this PR.Therefore, please merge this PR at first.
The detail can see this branch: https://github.com/loserwang1024/flink-cdc-connectors/tree/add-snapshot-hooks. MongoDBFullChangelogITCase#testEnableBackfillWithPreHighWaterMark、testEnableBackfillWithPostLowWaterMark is what you need.

@@ -162,6 +163,7 @@ public void rewriteOutputBuffer(
case INSERT:
case UPDATE:
case REPLACE:
value.put(MongoDBEnvelope.OPERATION_TYPE_FIELD, INSERT.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
value.put(MongoDBEnvelope.OPERATION_TYPE_FIELD, INSERT.getValue());
value.put(MongoDBEnvelope.OPERATION_TYPE_FIELD, OperationType.INSERT.getValue());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot , I will do it

@Jiabao-Sun Jiabao-Sun merged commit cc9dbc6 into apache:master Nov 29, 2023
17 checks passed
e-mhui pushed a commit to e-mhui/flink-cdc-connectors that referenced this pull request Dec 2, 2023
ChaomingZhangCN pushed a commit to ChaomingZhangCN/flink-cdc that referenced this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug][mongo] rewriteOutputBuffer should emit +I rather than +U
3 participants