diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 4f1c7089948..c34e13f784a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -837,6 +837,7 @@ protected void handleInsert( Operation.CREATE, WriteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()), + x -> taskContext.getSchema().getExcludeTableId(x.getTableId()), WriteRowsEventData::getRows, (tableId, row) -> eventDispatcher.dispatchDataChangeEvent( @@ -868,6 +869,7 @@ protected void handleUpdate( Operation.UPDATE, UpdateRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()), + x -> taskContext.getSchema().getExcludeTableId(x.getTableId()), UpdateRowsEventData::getRows, (tableId, row) -> eventDispatcher.dispatchDataChangeEvent( @@ -899,6 +901,7 @@ protected void handleDelete( Operation.DELETE, DeleteRowsEventData.class, x -> taskContext.getSchema().getTableId(x.getTableId()), + x -> taskContext.getSchema().getExcludeTableId(x.getTableId()), DeleteRowsEventData::getRows, (tableId, row) -> eventDispatcher.dispatchDataChangeEvent( @@ -919,7 +922,8 @@ private void handleChange( Event event, Operation operation, Class eventDataClass, - TableIdProvider tableIdProvider, + TableIdProvider includeTableIdProvider, + TableIdProvider excludeTableIdProvider, RowsProvider rowsProvider, BinlogChangeEmitter changeEmitter) throws InterruptedException { @@ -933,7 +937,7 @@ private void handleChange( return; } final T data = unwrapData(event); - final TableId tableId = tableIdProvider.getTableId(data); + final TableId tableId = includeTableIdProvider.getTableId(data); final List rows = rowsProvider.getRows(data); String changeType = operation.name(); @@ -967,6 +971,12 @@ private void handleChange( } } else { informAboutUnknownTableIfRequired(partition, offsetContext, event, tableId, operation); + + TableId excludeTableId = excludeTableIdProvider.getTableId(data); + if (excludeTableId != null) { + // Even for unmonitored tables, the offset should be updated to avoid reading a large amount of binlog. + offsetContext.event(excludeTableId, eventTimestamp); + } } startingRowNumber = 0; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index b86e6e5272b..133c7ef9086 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -213,7 +213,7 @@ private void backfill( private boolean isBackfillRequired(MySqlBinlogSplit backfillBinlogSplit) { return !statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill() - && backfillBinlogSplit + || backfillBinlogSplit .getEndingOffset() .isAfter(backfillBinlogSplit.getStartingOffset()); }