diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index cd0e7720049..a3a234b732b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -203,7 +203,8 @@ else if (!isRemainingTablesCheckpointed && !isSnapshotAssigningFinished(assigner private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { try { // check whether we got newly added tables final List currentCapturedTables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e209921b553..9ea69b11ad8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -215,7 +215,8 @@ else if (!isRemainingTablesCheckpointed private void captureNewlyAddedTables() { // Don't scan newly added table in snapshot mode. if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { // check whether we got newly added tables try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { final List currentCapturedTables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 759827d9df4..de875a0ed75 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -484,7 +484,7 @@ public void testEnumerateTablesLazily() { } @Test - public void testScanNewlyAddedTableStartFromCheckpoint() { + public void testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() { List expected = Arrays.asList( "customers_sparse_dist [109] null", @@ -492,7 +492,24 @@ public void testScanNewlyAddedTableStartFromCheckpoint() { "customers_even_dist [10] [18]", "customers_even_dist [18] null", "customer_card_single_line null null"); - assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint()); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.INITIAL_ASSIGNING_FINISHED)); + } + + @Test + public void testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() { + List expected = + Arrays.asList( + "customers_sparse_dist [109] null", + "customers_even_dist null [10]", + "customers_even_dist [10] [18]", + "customers_even_dist [18] null"); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED)); } private List getTestAssignSnapshotSplits( @@ -536,7 +553,7 @@ private List getTestAssignSnapshotSplits( return getSplitsFromAssigner(assigner); } - private List getTestAssignSnapshotSplitsFromCheckpoint() { + private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) { TableId newTable = TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line"); TableId processedTable = @@ -619,7 +636,7 @@ private List getTestAssignSnapshotSplitsFromCheckpoint() { assignedSplits, new HashMap<>(), splitFinishedOffsets, - AssignerStatus.INITIAL_ASSIGNING, + assignerStatus, remainingTables, false, true,