diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index dc9972fe0fa..4864ada78bd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -58,6 +58,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; @@ -112,7 +113,7 @@ public DataSource createDataSource(Context context) { double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); - + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); @@ -143,6 +144,7 @@ public DataSource createDataSource(Context context) { .fetchSize(fetchSize) .splitSize(splitSize) .splitMetaGroupSize(splitMetaGroupSize) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .distributionFactorLower(distributionFactorLower) .distributionFactorUpper(distributionFactorUpper) .heartbeatInterval(heartbeatInterval) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index e852eb3d7a8..0c5171d785c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -243,4 +243,14 @@ public class MySqlDataSourceOptions { + "If there is a need to use a dot (.) in a regular expression to match any character, " + "it is necessary to escape the dot with a backslash." + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); + + @Experimental + public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether capture the newly added tables when restoring from a savepoint/checkpoint or not, by default is false."); + + }