From 37e44b253e5cc32661314fe5f5c92d5f2d5a139e Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 20 Jul 2023 14:32:36 +0800 Subject: [PATCH 1/2] [mysql] Fix assigning duplicate snapshot splits when enable scan newly added tables --- .../assigners/MySqlSnapshotSplitAssigner.java | 10 +- .../MySqlSnapshotSplitAssignerTest.java | 126 +++++++++++++++++- 2 files changed, 132 insertions(+), 4 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 6ccf305cf6f..adf7d986cbf 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -45,11 +45,13 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -209,11 +211,17 @@ private void captureNewlyAddedTables() { final List newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig); // Get the removed tables with the new table filter - List tablesToRemove = new LinkedList<>(alreadyProcessedTables); + Set tablesToRemove = new HashSet<>(alreadyProcessedTables); + List tablesInRemainingSplits = + remainingSplits.stream() + .map(MySqlSnapshotSplit::getTableId) + .collect(Collectors.toList()); + tablesToRemove.addAll(tablesInRemainingSplits); tablesToRemove.addAll(remainingTables); tablesToRemove.removeAll(newlyAddedTables); newlyAddedTables.removeAll(alreadyProcessedTables); + newlyAddedTables.removeAll(tablesInRemainingSplits); newlyAddedTables.removeAll(remainingTables); // case 1: there are old tables to remove from state diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 38a0fa2c3d0..082f3255db8 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -17,14 +17,21 @@ package com.ververica.cdc.connectors.mysql.source.assigners; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; +import com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import io.debezium.relational.Column; import io.debezium.relational.TableId; import org.junit.BeforeClass; import org.junit.Test; @@ -41,6 +48,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -416,7 +424,8 @@ public void testEnumerateTablesLazily() { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {"customers_even_dist"}, - "id"); + "id", + false); final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( @@ -428,6 +437,18 @@ public void testEnumerateTablesLazily() { assertFalse(assigner.needToDiscoveryTables()); } + @Test + public void testScanNewlyAddedTableStartFromCheckpoint() { + List expected = + Arrays.asList( + "customers_sparse_dist [109] null", + "customers_even_dist null [10]", + "customers_even_dist [10] [18]", + "customers_even_dist [18] null", + "customer_card_single_line null null"); + assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint()); + } + private List getTestAssignSnapshotSplits( int splitSize, double distributionFactorUpper, @@ -456,7 +477,8 @@ private List getTestAssignSnapshotSplits( distributionFactorUpper, distributionFactorLower, captureTables, - chunkKeyColumn); + chunkKeyColumn, + false); List remainingTables = Arrays.stream(captureTables) .map(t -> database.getDatabaseName() + "." + t) @@ -465,7 +487,103 @@ private List getTestAssignSnapshotSplits( final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( configuration, DEFAULT_PARALLELISM, remainingTables, false); + return getSplitsFromAssigner(assigner); + } + + private List getTestAssignSnapshotSplitsFromCheckpoint() { + TableId newTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line"); + TableId processedTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customers_sparse_dist"); + TableId splitTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customers_even_dist"); + String[] captureTables = {newTable.table(), processedTable.table(), splitTable.table()}; + MySqlSourceConfig configuration = + getConfig( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + captureTables, + null, + true); + List remainingTables = new ArrayList<>(); + List alreadyProcessedTables = new ArrayList<>(); + alreadyProcessedTables.add(processedTable); + + RowType splitKeyType = + ChunkUtils.getChunkKeyColumnType( + Column.editor().name("id").type("INT").jdbcType(4).create()); + List remainingSplits = + Arrays.asList( + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":2", + splitKeyType, + new Object[] {109}, + null, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":0", + splitKeyType, + null, + new Object[] {10}, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":1", + splitKeyType, + new Object[] {10}, + new Object[] {18}, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":2", + splitKeyType, + new Object[] {18}, + null, + null)); + + Map assignedSplits = new HashMap<>(); + assignedSplits.put( + processedTable + ":0", + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":0", + splitKeyType, + null, + new Object[] {105}, + null)); + assignedSplits.put( + processedTable + ":1", + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":1", + splitKeyType, + new Object[] {105}, + new Object[] {109}, + null)); + Map splitFinishedOffsets = new HashMap<>(); + splitFinishedOffsets.put(processedTable + ":0", ofEarliest()); + SnapshotPendingSplitsState checkpoint = + new SnapshotPendingSplitsState( + alreadyProcessedTables, + remainingSplits, + assignedSplits, + new HashMap<>(), + splitFinishedOffsets, + AssignerStatus.INITIAL_ASSIGNING, + remainingTables, + false, + true, + ChunkSplitterState.NO_SPLITTING_TABLE_STATE); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint); + return getSplitsFromAssigner(assigner); + } + private List getSplitsFromAssigner(final MySqlSnapshotSplitAssigner assigner) { assigner.open(); List sqlSplits = new ArrayList<>(); while (true) { @@ -500,7 +618,8 @@ private MySqlSourceConfig getConfig( double distributionFactorUpper, double distributionLower, String[] captureTables, - String chunkKeyColumn) { + String chunkKeyColumn, + boolean scanNewlyAddedTableEnabled) { Map chunkKeys = new HashMap<>(); for (String table : captureTables) { chunkKeys.put(new ObjectPath(database.getDatabaseName(), table), chunkKeyColumn); @@ -523,6 +642,7 @@ private MySqlSourceConfig getConfig( .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) .chunkKeyColumn(chunkKeys) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .createConfig(0); } } From 3efcddff293abe37a63c77cfc29953afefa2faa1 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Thu, 20 Jul 2023 16:41:40 +0800 Subject: [PATCH 2/2] fix review --- .../assigners/MySqlSnapshotSplitAssigner.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index adf7d986cbf..dff21c67739 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -208,21 +208,24 @@ private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled()) { // check whether we got newly added tables try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - final List newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig); - - // Get the removed tables with the new table filter - Set tablesToRemove = new HashSet<>(alreadyProcessedTables); + final List currentCapturedTables = + discoverCapturedTables(jdbc, sourceConfig); + final Set previousCapturedTables = new HashSet<>(); List tablesInRemainingSplits = remainingSplits.stream() .map(MySqlSnapshotSplit::getTableId) .collect(Collectors.toList()); - tablesToRemove.addAll(tablesInRemainingSplits); - tablesToRemove.addAll(remainingTables); - tablesToRemove.removeAll(newlyAddedTables); + previousCapturedTables.addAll(tablesInRemainingSplits); + previousCapturedTables.addAll(alreadyProcessedTables); + previousCapturedTables.addAll(remainingTables); + + // Get the removed tables with the new table filter + Set tablesToRemove = new HashSet<>(previousCapturedTables); + tablesToRemove.removeAll(currentCapturedTables); - newlyAddedTables.removeAll(alreadyProcessedTables); - newlyAddedTables.removeAll(tablesInRemainingSplits); - newlyAddedTables.removeAll(remainingTables); + // Get the newly added tables + currentCapturedTables.removeAll(previousCapturedTables); + List newlyAddedTables = currentCapturedTables; // case 1: there are old tables to remove from state if (!tablesToRemove.isEmpty()) {