diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 69afb605999..61f4ce9a947 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -21,13 +21,14 @@ import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; -import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.cdc.connectors.mysql.table.StartupMode; +import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.FlinkRuntimeException; @@ -114,7 +115,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { (MySqlStreamingChangeEventSourceMetrics) statefulTaskContext.getStreamingChangeEventSourceMetrics(), currentBinlogSplit, - createEventFilter(currentBinlogSplit.getStartingOffset())); + createEventFilter()); executorService.submit( () -> { @@ -306,17 +307,35 @@ private void configureFilter() { this.pureBinlogPhaseTables.clear(); } - private Predicate createEventFilter(BinlogOffset startingOffset) { + private Predicate createEventFilter() { // If the startup mode is set as TIMESTAMP, we need to apply a filter on event to drop // events earlier than the specified timestamp. - if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) { - long startTimestampSec = startingOffset.getTimestampSec(); - // Notes: - // 1. Heartbeat event doesn't contain timestamp, so we just keep it - // 2. Timestamp of event is in epoch millisecond - return event -> - EventType.HEARTBEAT.equals(event.getHeader().getEventType()) - || event.getHeader().getTimestamp() >= startTimestampSec * 1000; + + // NOTE: Here we take user's configuration (statefulTaskContext.getSourceConfig()) + // as the ground truth. This might be fragile if user changes the config and recover + // the job from savepoint / checkpoint, as there might be conflict between user's config + // and the state in savepoint / checkpoint. But as we don't promise compatibility of + // checkpoint after changing the config, this is acceptable for now. + StartupOptions startupOptions = statefulTaskContext.getSourceConfig().getStartupOptions(); + if (startupOptions.startupMode.equals(StartupMode.TIMESTAMP)) { + if (startupOptions.binlogOffset == null) { + throw new NullPointerException( + "The startup option was set to TIMESTAMP " + + "but unable to find starting binlog offset. Please check if the timestamp is specified in " + + "configuration. "); + } + long startTimestampSec = startupOptions.binlogOffset.getTimestampSec(); + // We only skip data change event, as other kinds of events are necessary for updating + // some internal state inside MySqlStreamingChangeEventSource + LOG.info( + "Creating event filter that dropping row mutation events before timestamp in second {}", + startTimestampSec); + return event -> { + if (!EventType.isRowMutation(getEventType(event))) { + return true; + } + return event.getHeader().getTimestamp() >= startTimestampSec * 1000; + }; } return event -> true; } @@ -327,8 +346,17 @@ public void stopBinlogReadTask() { changeEventSourceContext.stopChangeEventSource(); } + private EventType getEventType(Event event) { + return event.getHeader().getEventType(); + } + @VisibleForTesting public ExecutorService getExecutorService() { return executorService; } + + @VisibleForTesting + MySqlBinlogSplitReadTask getBinlogSplitReadTask() { + return binlogSplitReadTask; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java index 3899215b46c..7a6e7757f7d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.debezium.task; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl; import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher; import org.apache.flink.cdc.connectors.mysql.debezium.reader.StoppableChangeEventSourceContext; @@ -119,4 +120,9 @@ protected void handleEvent( private boolean isBoundedRead() { return !isNonStoppingOffset(binlogSplit.getEndingOffset()); } + + @VisibleForTesting + public Predicate getEventFilter() { + return eventFilter; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 0d407f9c54f..05e603c00ab 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -45,6 +45,10 @@ import org.apache.flink.util.ExceptionUtils; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlOffsetContext; @@ -82,6 +86,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; @@ -841,6 +846,50 @@ public void testReadBinlogFromUnavailableBinlog() throws Exception { } } + @Test + public void testRestoreFromCheckpointWithTimestampStartingOffset() throws Exception { + // Preparations + inventoryDatabase8.createAndInitialize(); + MySqlSourceConfig connectionConfig = + getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"}); + binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + + // Capture the current binlog offset, and use it to mock restoring from checkpoint + BinlogOffset checkpointOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); + + // Create a config to start reading from timestamp + long startTimestampMs = 15213L; + MySqlSourceConfig sourceConfig = + getConfig( + MYSQL8_CONTAINER, + inventoryDatabase8, + StartupOptions.timestamp(startTimestampMs), + new String[] {"products"}); + + BinlogSplitReader binlogReader = createBinlogReader(sourceConfig); + MySqlBinlogSplit checkpointSplit = + createBinlogSplit( + getConfig( + MYSQL8_CONTAINER, + inventoryDatabase8, + StartupOptions.specificOffset(checkpointOffset), + new String[] {"products"})); + + // Restore binlog reader from checkpoint + binlogReader.submitSplit(checkpointSplit); + + // We mock a WRITE_ROWS event with timestamp = 1, which should be dropped by filter + EventHeaderV4 header = new EventHeaderV4(); + header.setEventType(EventType.WRITE_ROWS); + header.setTimestamp(1L); + Event event = new Event(header, new WriteRowsEventData()); + + // Check if the filter works + Predicate eventFilter = binlogReader.getBinlogSplitReadTask().getEventFilter(); + assertThat(eventFilter.test(event)).isFalse(); + } + private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { return createBinlogReader(sourceConfig, false); }