Skip to content

Commit 1578d52

Browse files
junmuzymuzammil
authored andcommitted
Adding support for readAppendOnly for MySQL Pipeline connector
1 parent 3fb72cc commit 1578d52

File tree

5 files changed

+41
-4
lines changed

5 files changed

+41
-4
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
8484
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
8585
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
86+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED;
8687
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
8788
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
8889
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
@@ -168,6 +169,8 @@ public DataSource createDataSource(Context context) {
168169
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
169170
boolean isAssignUnboundedChunkFirst =
170171
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
172+
boolean scanReadChangelogAsAppendOnlyEnabled =
173+
config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
171174

172175
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
173176
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -221,7 +224,8 @@ public DataSource createDataSource(Context context) {
221224
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
222225
.useLegacyJsonFormat(useLegacyJsonFormat)
223226
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
224-
.skipSnapshotBackfill(skipSnapshotBackfill);
227+
.skipSnapshotBackfill(skipSnapshotBackfill)
228+
.scanReadChangelogAsAppendOnly(scanReadChangelogAsAppendOnlyEnabled);
225229

226230
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
227231

@@ -359,6 +363,7 @@ public Set<ConfigOption<?>> optionalOptions() {
359363
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
360364
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
361365
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
366+
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
362367
return options;
363368
}
364369

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,14 @@ public EventSourceProvider getEventSourceProvider() {
6767
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
6868
false);
6969

70+
DebeziumChangelogMode changelogMode =
71+
sourceConfig.isScanReadChangelogAsAppendOnly()
72+
? DebeziumChangelogMode.UPSERT
73+
: DebeziumChangelogMode.ALL;
74+
7075
MySqlEventDeserializer deserializer =
7176
new MySqlEventDeserializer(
72-
DebeziumChangelogMode.ALL,
77+
changelogMode,
7378
sourceConfig.isIncludeSchemaChanges(),
7479
readableMetadataList,
7580
includeComments,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
330330
.defaultValue(false)
331331
.withDescription(
332332
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
333+
334+
@Experimental
335+
public static final ConfigOption<Boolean> SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
336+
ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
337+
.booleanType()
338+
.defaultValue(false)
339+
.withDescription(
340+
"Whether to convert the changelog data stream to an append-only data stream");
333341
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
7070
private final boolean parseOnLineSchemaChanges;
7171
public static boolean useLegacyJsonFormat = true;
7272
private final boolean assignUnboundedChunkFirst;
73+
private final boolean scanReadChangelogAsAppendOnly;
7374

7475
// --------------------------------------------------------------------------------------------
7576
// Debezium Configurations
@@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
108109
boolean parseOnLineSchemaChanges,
109110
boolean treatTinyInt1AsBoolean,
110111
boolean useLegacyJsonFormat,
111-
boolean assignUnboundedChunkFirst) {
112+
boolean assignUnboundedChunkFirst,
113+
boolean scanReadChangelogAsAppendOnly) {
112114
this.hostname = checkNotNull(hostname);
113115
this.port = port;
114116
this.username = checkNotNull(username);
@@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
152154
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
153155
this.useLegacyJsonFormat = useLegacyJsonFormat;
154156
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
157+
this.scanReadChangelogAsAppendOnly = scanReadChangelogAsAppendOnly;
155158
}
156159

157160
public String getHostname() {
@@ -243,6 +246,10 @@ public boolean isAssignUnboundedChunkFirst() {
243246
return assignUnboundedChunkFirst;
244247
}
245248

249+
public boolean isScanReadChangelogAsAppendOnly() {
250+
return scanReadChangelogAsAppendOnly;
251+
}
252+
246253
public Properties getDbzProperties() {
247254
return dbzProperties;
248255
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class MySqlSourceConfigFactory implements Serializable {
7474
private boolean treatTinyInt1AsBoolean = true;
7575
private boolean useLegacyJsonFormat = true;
7676
private boolean assignUnboundedChunkFirst = false;
77+
private boolean scanReadChangelogAsAppendOnly = false;
7778

7879
public MySqlSourceConfigFactory hostname(String hostname) {
7980
this.hostname = hostname;
@@ -324,6 +325,16 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde
324325
return this;
325326
}
326327

328+
/**
329+
* Whether to convert the changelog data stream to an append-only data stream. Defaults to
330+
* false.
331+
*/
332+
public MySqlSourceConfigFactory scanReadChangelogAsAppendOnly(
333+
boolean scanReadChangelogAsAppendOnly) {
334+
this.scanReadChangelogAsAppendOnly = scanReadChangelogAsAppendOnly;
335+
return this;
336+
}
337+
327338
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
328339
public MySqlSourceConfig createConfig(int subtaskId) {
329340
// hard code server name, because we don't need to distinguish it, docs:
@@ -421,6 +432,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
421432
parseOnLineSchemaChanges,
422433
treatTinyInt1AsBoolean,
423434
useLegacyJsonFormat,
424-
assignUnboundedChunkFirst);
435+
assignUnboundedChunkFirst,
436+
scanReadChangelogAsAppendOnly);
425437
}
426438
}

0 commit comments

Comments
 (0)