From fd2318e9caf6171c639298d5c21df7122aa9d0b0 Mon Sep 17 00:00:00 2001 From: GOODBOY008 Date: Tue, 28 Nov 2023 11:08:11 +0800 Subject: [PATCH] [mysql] Fix snapshot fetch size conf does not take effect --- .../mysql/debezium/reader/SnapshotSplitReader.java | 1 + .../mysql/debezium/task/MySqlSnapshotSplitReadTask.java | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 9fa7cc6f01b..38921ebe837 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -116,6 +116,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { this.reachEnd.set(false); this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask( + statefulTaskContext.getSourceConfig(), statefulTaskContext.getConnectorConfig(), statefulTaskContext.getSnapshotChangeEventSourceMetrics(), statefulTaskContext.getDatabaseSchema(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 8b185e9dd98..8a003ae46d4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -19,6 +19,7 @@ import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher; import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.utils.StatementUtils; @@ -68,7 +69,7 @@ public class MySqlSnapshotSplitReadTask /** Interval for showing a log statement with the progress while scanning a single table. */ private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000); - private final MySqlConnectorConfig connectorConfig; + private final MySqlSourceConfig sourceConfig; private final MySqlDatabaseSchema databaseSchema; private final MySqlConnection jdbcConnection; private final EventDispatcherImpl dispatcher; @@ -79,6 +80,7 @@ public class MySqlSnapshotSplitReadTask private final SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics; public MySqlSnapshotSplitReadTask( + MySqlSourceConfig sourceConfig, MySqlConnectorConfig connectorConfig, SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics, MySqlDatabaseSchema databaseSchema, @@ -89,7 +91,7 @@ public MySqlSnapshotSplitReadTask( Clock clock, MySqlSnapshotSplit snapshotSplit) { super(connectorConfig, snapshotChangeEventSourceMetrics); - this.connectorConfig = connectorConfig; + this.sourceConfig = sourceConfig; this.databaseSchema = databaseSchema; this.jdbcConnection = jdbcConnection; this.dispatcher = dispatcher; @@ -224,7 +226,7 @@ private void createDataEventsForTable( snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd(), snapshotSplit.getSplitKeyType().getFieldCount(), - connectorConfig.getQueryFetchSize()); + sourceConfig.getFetchSize()); ResultSet rs = selectStatement.executeQuery()) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);