diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 253d9dc6211..ae308436390 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -235,18 +235,27 @@ private void addSplits(List splits, boolean checkTableChangeForBinlo LOG.info("Source reader {} adds split {}", subtaskId, split); if (split.isSnapshotSplit()) { MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); - if (snapshotSplit.isSnapshotReadFinished()) { - finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); - } else if (sourceConfig + if (sourceConfig .getTableFilters() .dataCollectionFilter() .isIncluded(split.asSnapshotSplit().getTableId())) { - unfinishedSplits.add(split); + if (snapshotSplit.isSnapshotReadFinished()) { + finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); + } else { + unfinishedSplits.add(split); + } } else { - LOG.debug( - "The subtask {} is skipping split {} because it does not match new table filter.", - subtaskId, - split.splitId()); + if (sourceConfig.isScanNewlyAddedTableEnabled()) { + LOG.info( + "The subtask {} is skipping split {} because it does not match new table filter.", + subtaskId, + split.splitId()); + } else { + LOG.warn( + "The subtask {} is skipping split {} because it does not match new table filter, but ScanNewlyAddedTable is not enabled.", + subtaskId, + split.splitId()); + } } } else { MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); @@ -295,7 +304,10 @@ private void addSplits(List splits, boolean checkTableChangeForBinlo if (!unfinishedSplits.isEmpty()) { super.addSplits(unfinishedSplits); } else if (suspendedBinlogSplit - != null) { // only request new snapshot split if the binlog split is suspended + != null // request new snapshot split if the binlog split is suspended + || getNumberOfCurrentlyAssignedSplits() + <= 1 // request when all splits are in removed tables + ) { context.sendSplitRequest(); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 3196609aae7..db790f18797 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; @@ -119,6 +120,113 @@ public void clear() { inventoryDatabase.dropDatabase(); } + @Test + public void testRemoveTableUsingStateFromSnapshotPhase() throws Exception { + customerDatabase.createAndInitialize(); + MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers", "prefix_customers"}); + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + List snapshotSplits; + List toRemoveSplits; + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + Map tableSchemas = + TableDiscoveryUtils.discoverSchemaForCapturedTables( + new MySqlPartition( + sourceConfig.getMySqlConnectorConfig().getLogicalName()), + sourceConfig, + jdbc); + TableId tableId0 = new TableId(customerDatabase.getDatabaseName(), null, "customers"); + TableId tableId1 = + new TableId(customerDatabase.getDatabaseName(), null, "prefix_customers"); + RowType splitType = + RowType.of( + new LogicalType[] {DataTypes.INT().getLogicalType()}, + new String[] {"id"}); + snapshotSplits = + Collections.singletonList( + new MySqlSnapshotSplit( + tableId0, + tableId0 + ":0", + splitType, + null, + null, + null, + tableSchemas)); + toRemoveSplits = + Collections.singletonList( + new MySqlSnapshotSplit( + tableId1, + tableId1 + ":0", + splitType, + null, + null, + null, + tableSchemas)); + } + + // Step 1: start source reader and assign snapshot splits + MySqlSourceReader reader = createReader(sourceConfig, -1); + reader.start(); + reader.addSplits(snapshotSplits); + + String[] expectedRecords = + new String[] { + "+I[111, user_6, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[101, user_1, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + // Step 2: wait the snapshot splits finished reading + Thread.sleep(5000L); + List actualRecords = consumeRecords(reader, dataType); + assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords); + reader.handleSourceEvents( + new FinishedSnapshotSplitsAckEvent( + Collections.singletonList(snapshotSplits.get(0).splitId()))); + + // Step 3: add splits that need to be removed and do not read it, then snapshot reader's + // state + reader.addSplits(toRemoveSplits); + List splitsState = reader.snapshotState(1L); + + // Step 4: remove table 'prefix_customers' and restart reader from a restored state + sourceConfig = getConfig(new String[] {"customers"}); + TestingReaderContext readerContext = new TestingReaderContext(); + MySqlSourceReader restartReader = + createReader(sourceConfig, readerContext, -1); + restartReader.start(); + restartReader.addSplits(splitsState); + + // Step 5: check the finished unacked splits between original reader and restarted reader + assertEquals(0, reader.getFinishedUnackedSplits().size()); + // one from the start method and one from the addSplits method + assertEquals(2, readerContext.getNumSplitRequests()); + + reader.close(); + restartReader.close(); + } + @Test public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception { customerDatabase.createAndInitialize(); @@ -446,6 +554,12 @@ private MySqlSourceReader createReader(MySqlSourceConfig configura configuration, new TestingReaderContext(), limit, SnapshotPhaseHooks.empty()); } + private MySqlSourceReader createReader( + MySqlSourceConfig configuration, SourceReaderContext readerContext, int limit) + throws Exception { + return createReader(configuration, readerContext, limit, SnapshotPhaseHooks.empty()); + } + private MySqlSourceReader createReader( MySqlSourceConfig configuration, SourceReaderContext readerContext,