diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index cd3c697e44b..346f5f54f66 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -292,7 +292,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) { private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) - && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { + && position.isAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 492df8262b9..f729f59d05f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -154,11 +154,6 @@ protected SnapshotResult doExecute( hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit); } final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); - lowWatermark - .getOffset() - .put( - BinlogOffset.TIMESTAMP_KEY, - String.valueOf(clock.currentTime().getEpochSecond())); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, @@ -192,11 +187,6 @@ protected SnapshotResult doExecute( } else { // Get the current binlog offset as HW highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); - highWatermark - .getOffset() - .put( - BinlogOffset.TIMESTAMP_KEY, - String.valueOf(clock.currentTime().getEpochSecond())); } LOG.info( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java index b2aa30bb39b..d9334c49ffe 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -232,7 +232,9 @@ public int compareTo(BinlogOffset that) { // compared ... long timestamp = this.getTimestampSec(); long targetTimestamp = that.getTimestampSec(); - return Long.compare(timestamp, targetTimestamp); + if (timestamp != 0 && targetTimestamp != 0) { + return Long.compare(timestamp, targetTimestamp); + } } // First compare the MySQL binlog filenames @@ -251,12 +253,7 @@ public int compareTo(BinlogOffset that) { } // The completed events are the same, so compare the row number ... - if (this.getRestartSkipRows() != that.getRestartSkipRows()) { - return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); - } - - // The skip rows are the same, so compare the timestamp ... - return Long.compare(this.getTimestampSec(), that.getTimestampSec()); + return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); } public boolean isAtOrBefore(BinlogOffset that) {