Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,27 @@ private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlo
LOG.info("Source reader {} adds split {}", subtaskId, split);
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else if (sourceConfig
if (sourceConfig
.getTableFilters()
.dataCollectionFilter()
.isIncluded(split.asSnapshotSplit().getTableId())) {
unfinishedSplits.add(split);
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else {
unfinishedSplits.add(split);
}
} else {
LOG.debug(
"The subtask {} is skipping split {} because it does not match new table filter.",
subtaskId,
split.splitId());
if (sourceConfig.isScanNewlyAddedTableEnabled()) {
LOG.info(
"The subtask {} is skipping split {} because it does not match new table filter.",
subtaskId,
split.splitId());
} else {
LOG.warn(
"The subtask {} is skipping split {} because it does not match new table filter, but ScanNewlyAddedTable is not enabled.",
subtaskId,
split.splitId());
}
}
} else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
Expand Down Expand Up @@ -295,7 +304,10 @@ private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlo
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
} else if (suspendedBinlogSplit
!= null) { // only request new snapshot split if the binlog split is suspended
!= null // request new snapshot split if the binlog split is suspended
|| getNumberOfCurrentlyAssignedSplits()
<= 1 // request when all splits are in removed tables
) {
context.sendSplitRequest();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
Expand Down Expand Up @@ -119,6 +120,113 @@ public void clear() {
inventoryDatabase.dropDatabase();
}

@Test
public void testRemoveTableUsingStateFromSnapshotPhase() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers", "prefix_customers"});
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> snapshotSplits;
List<MySqlSplit> toRemoveSplits;
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverSchemaForCapturedTables(
new MySqlPartition(
sourceConfig.getMySqlConnectorConfig().getLogicalName()),
sourceConfig,
jdbc);
TableId tableId0 = new TableId(customerDatabase.getDatabaseName(), null, "customers");
TableId tableId1 =
new TableId(customerDatabase.getDatabaseName(), null, "prefix_customers");
RowType splitType =
RowType.of(
new LogicalType[] {DataTypes.INT().getLogicalType()},
new String[] {"id"});
snapshotSplits =
Collections.singletonList(
new MySqlSnapshotSplit(
tableId0,
tableId0 + ":0",
splitType,
null,
null,
null,
tableSchemas));
toRemoveSplits =
Collections.singletonList(
new MySqlSnapshotSplit(
tableId1,
tableId1 + ":0",
splitType,
null,
null,
null,
tableSchemas));
}

// Step 1: start source reader and assign snapshot splits
MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig, -1);
reader.start();
reader.addSplits(snapshotSplits);

String[] expectedRecords =
new String[] {
"+I[111, user_6, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[101, user_1, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
// Step 2: wait the snapshot splits finished reading
Thread.sleep(5000L);
List<String> actualRecords = consumeRecords(reader, dataType);
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
reader.handleSourceEvents(
new FinishedSnapshotSplitsAckEvent(
Collections.singletonList(snapshotSplits.get(0).splitId())));

// Step 3: add splits that need to be removed and do not read it, then snapshot reader's
// state
reader.addSplits(toRemoveSplits);
List<MySqlSplit> splitsState = reader.snapshotState(1L);

// Step 4: remove table 'prefix_customers' and restart reader from a restored state
sourceConfig = getConfig(new String[] {"customers"});
TestingReaderContext readerContext = new TestingReaderContext();
MySqlSourceReader<SourceRecord> restartReader =
createReader(sourceConfig, readerContext, -1);
restartReader.start();
restartReader.addSplits(splitsState);

// Step 5: check the finished unacked splits between original reader and restarted reader
assertEquals(0, reader.getFinishedUnackedSplits().size());
// one from the start method and one from the addSplits method
assertEquals(2, readerContext.getNumSplitRequests());

reader.close();
restartReader.close();
}

@Test
public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception {
customerDatabase.createAndInitialize();
Expand Down Expand Up @@ -446,6 +554,12 @@ private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configura
configuration, new TestingReaderContext(), limit, SnapshotPhaseHooks.empty());
}

private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration, SourceReaderContext readerContext, int limit)
throws Exception {
return createReader(configuration, readerContext, limit, SnapshotPhaseHooks.empty());
}

private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration,
SourceReaderContext readerContext,
Expand Down