Skip to content

Commit

Permalink
[cdc-connector][mongodb] Avoid mongodb source to read data after high…
Browse files Browse the repository at this point in the history
…_watermark in backfill phase (apache#2893)
  • Loading branch information
loserwang1024 authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent 5d9a519 commit 0beccd0
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;

import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.CLUSTER_TIME_FIELD;
Expand Down Expand Up @@ -170,15 +169,19 @@ public void execute(Context context) throws Exception {
LOG.info("Ignored {} record: {}", operationType, changeStreamDocument);
}
}

if (changeRecord != null) {
if (changeRecord != null && !isBoundedRead()) {
queue.enqueue(new DataChangeEvent(changeRecord));
}

if (isBoundedRead()) {
ChangeStreamOffset currentOffset;
if (changeRecord != null) {
currentOffset = new ChangeStreamOffset(getResumeToken(changeRecord));
// The log after the high watermark won't emit.
if (currentOffset.isAtOrBefore(streamSplit.getEndingOffset())) {
queue.enqueue(new DataChangeEvent(changeRecord));
}

} else {
// Heartbeat is not turned on or there is no update event
currentOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
Expand Down Expand Up @@ -227,8 +230,7 @@ public void close() {}
private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
ChangeStreamDescriptor changeStreamDescriptor) {
ChangeStreamOffset offset =
new ChangeStreamOffset(
(Map<String, String>) streamSplit.getStartingOffset().getOffset());
new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());
ChangeStreamIterable<Document> changeStreamIterable =
getChangeStreamIterable(sourceConfig, changeStreamDescriptor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {

private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

Expand Down Expand Up @@ -224,8 +225,7 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
return;
}

List<String> records =
testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, true);
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -261,8 +261,7 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
return;
}

List<String> records =
testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, true);
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -292,14 +291,50 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
assertEqualsInAnyOrder(expectedRecords, records);
}

@Test
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
if (!parallelismSnapshot) {
return;
}
List<String> records =
testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"-U[2000, user_21, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
"-D[1019, user_20, Shanghai, 123567891234]");
assertEqualsInAnyOrder(expectedRecords, records);
}

@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
if (!parallelismSnapshot) {
return;
}

List<String> records =
testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK, true);
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -339,8 +374,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
return;
}

List<String> records =
testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK, true);
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -376,8 +410,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
}

private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
throws Exception {
boolean skipBackFill, int fetchSize, int hookType) throws Exception {

String customerDatabase =
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
Expand Down Expand Up @@ -415,11 +448,11 @@ private List<String> testBackfillWhenWritingEvents(
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(enableFullDocPrePostImage)
.scanFullChangelog(true)
.collectionList(
getCollectionNameRegex(
customerDatabase, new String[] {"customers"}))
.deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
.deserializer(customerTable.getDeserializer(true))
.skipSnapshotBackfill(skipBackFill)
.build();

Expand All @@ -443,10 +476,16 @@ private List<String> testBackfillWhenWritingEvents(
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
};

if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);

Expand Down Expand Up @@ -150,8 +151,7 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
@Test
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {

List<String> records =
testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, false);
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -184,8 +184,7 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {

List<String> records =
testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, false);
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -216,10 +215,45 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
}

@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {

List<String> records =
testBackfillWhenWritingEvents(true, 24, USE_PRE_HIGHWATERMARK_HOOK, false);
testBackfillWhenWritingEvents(false, 24, USE_POST_HIGHWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]",
"+U[2000, user_21, Pittsburgh, 123567891234]",
// delete message only contains _id, sql job contain value because of
// changelog normalization
"-D[0, null, null, null]");
assertEqualsInAnyOrder(expectedRecords, records);
}

@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {

List<String> records = testBackfillWhenWritingEvents(true, 24, USE_PRE_HIGHWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -257,8 +291,7 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
@Test
public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {

List<String> records =
testBackfillWhenWritingEvents(true, 24, USE_POST_LOWWATERMARK_HOOK, false);
List<String> records = testBackfillWhenWritingEvents(true, 24, USE_POST_LOWWATERMARK_HOOK);

List<String> expectedRecords =
Arrays.asList(
Expand Down Expand Up @@ -295,8 +328,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
}

private List<String> testBackfillWhenWritingEvents(
boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
throws Exception {
boolean skipBackFill, int fetchSize, int hookType) throws Exception {
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Expand All @@ -319,11 +351,11 @@ private List<String> testBackfillWhenWritingEvents(
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(enableFullDocPrePostImage)
.scanFullChangelog(false)
.collectionList(
getCollectionNameRegex(
customerDatabase, new String[] {"customers"}))
.deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
.deserializer(customerTable.getDeserializer(false))
.skipSnapshotBackfill(skipBackFill)
.build();

Expand All @@ -347,10 +379,16 @@ private List<String> testBackfillWhenWritingEvents(
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
};

if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);

Expand Down

0 comments on commit 0beccd0

Please sign in to comment.