From c170003509e10804baf5acae5849330cc5f1528f Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 8 Aug 2024 16:11:30 +0800 Subject: [PATCH 1/3] [FLINK-34688][cdc-connector] Fix: The assigner is not ready to offer finished split information, this should not be called --- .../connectors/base/source/assigner/SnapshotSplitAssigner.java | 3 ++- .../mysql/source/assigners/MySqlSnapshotSplitAssigner.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index cd0e7720049..a3a234b732b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -203,7 +203,8 @@ else if (!isRemainingTablesCheckpointed && !isSnapshotAssigningFinished(assigner private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { try { // check whether we got newly added tables final List currentCapturedTables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e209921b553..9ea69b11ad8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -215,7 +215,8 @@ else if (!isRemainingTablesCheckpointed private void captureNewlyAddedTables() { // Don't scan newly added table in snapshot mode. if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { // check whether we got newly added tables try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { final List currentCapturedTables = From 3c83647be9f65936dd33d59fd3cacc896a4cdde4 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 8 Aug 2024 17:45:21 +0800 Subject: [PATCH 2/3] Fix test --- .../MySqlSnapshotSplitAssignerTest.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 759827d9df4..4f14366d14b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -484,7 +484,7 @@ public void testEnumerateTablesLazily() { } @Test - public void testScanNewlyAddedTableStartFromCheckpoint() { + public void testScanNewlyAddedTableStartFromCheckpoint_INITIAL_ASSIGNING_FINISHED() { List expected = Arrays.asList( "customers_sparse_dist [109] null", @@ -492,7 +492,25 @@ public void testScanNewlyAddedTableStartFromCheckpoint() { "customers_even_dist [10] [18]", "customers_even_dist [18] null", "customer_card_single_line null null"); - assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint()); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.INITIAL_ASSIGNING_FINISHED)); + } + + @Test + public void + testScanNewlyAddedTableStartFromCheckpoint_NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED() { + List expected = + Arrays.asList( + "customers_sparse_dist [109] null", + "customers_even_dist null [10]", + "customers_even_dist [10] [18]", + "customers_even_dist [18] null"); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED)); } private List getTestAssignSnapshotSplits( @@ -536,7 +554,7 @@ private List getTestAssignSnapshotSplits( return getSplitsFromAssigner(assigner); } - private List getTestAssignSnapshotSplitsFromCheckpoint() { + private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) { TableId newTable = TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line"); TableId processedTable = @@ -619,7 +637,7 @@ private List getTestAssignSnapshotSplitsFromCheckpoint() { assignedSplits, new HashMap<>(), splitFinishedOffsets, - AssignerStatus.INITIAL_ASSIGNING, + assignerStatus, remainingTables, false, true, From c64868371ccd98e838ee3ba8d9342d0a33305ce9 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Fri, 9 Aug 2024 09:45:27 +0800 Subject: [PATCH 3/3] Fix based on cr --- .../source/assigners/MySqlSnapshotSplitAssignerTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 4f14366d14b..de875a0ed75 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -484,7 +484,7 @@ public void testEnumerateTablesLazily() { } @Test - public void testScanNewlyAddedTableStartFromCheckpoint_INITIAL_ASSIGNING_FINISHED() { + public void testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() { List expected = Arrays.asList( "customers_sparse_dist [109] null", @@ -499,8 +499,7 @@ public void testScanNewlyAddedTableStartFromCheckpoint_INITIAL_ASSIGNING_FINISHE } @Test - public void - testScanNewlyAddedTableStartFromCheckpoint_NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED() { + public void testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() { List expected = Arrays.asList( "customers_sparse_dist [109] null",