diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java index e45497e9c1d..3e3b85a5908 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java @@ -20,13 +20,17 @@ import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,6 +38,8 @@ /** {@link JdbcConnection} extension to be used with OceanBase server. */ public class OceanBaseConnection extends JdbcConnection { + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseConnection.class); + private static final String QUOTED_CHARACTER = "`"; private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties(); private static final String MYSQL_URL_PATTERN = @@ -105,20 +111,42 @@ private static JdbcConnection.ConnectionFactory factory( formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader); } - private String getSystemSchema() { - return "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS"; - } - /** - * Get current timestamp in nanoseconds from GTS (Global Timestamp Service). + * Get current timestamp number in seconds. * - * @return the global timestamp. + * @return current timestamp number. * @throws SQLException If a database access error occurs. */ - public Long getGlobalTimestamp() throws SQLException { + public long getCurrentTimestampS() throws SQLException { + try { + long globalTimestamp = getGlobalTimestamp(); + LOG.info("Global timestamp: {}", globalTimestamp); + return Long.parseLong(String.valueOf(globalTimestamp).substring(0, 10)); + } catch (Exception e) { + LOG.warn("Failed to get global timestamp, use local timestamp instead"); + } + return getCurrentTimestamp() + .orElseThrow(IllegalStateException::new) + .toInstant() + .getEpochSecond(); + } + + private long getGlobalTimestamp() throws SQLException { + String schema = "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS"; + return querySingleValue( + connection(), + "SELECT TS_VALUE FROM " + schema + ".V$OB_TIMESTAMP_SERVICE", + ps -> {}, + rs -> rs.getLong(1)); + } + + @Override + public Optional getCurrentTimestamp() throws SQLException { return queryAndMap( - String.format("SELECT TS_VALUE FROM %s.V$OB_TIMESTAMP_SERVICE", getSystemSchema()), - rs -> rs.next() ? rs.getLong(1) : null); + "mysql".equalsIgnoreCase(compatibleMode) + ? "SELECT CURRENT_TIMESTAMP" + : "SELECT CURRENT_TIMESTAMP FROM DUAL", + rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty()); } /** diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index e243243b0e2..8520f48a505 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -152,11 +152,7 @@ public void run(SourceContext ctx) throws Exception { initTableWhiteList(); if (shouldReadSnapshot()) { - Long globalTimestamp = getSnapshotConnection().getGlobalTimestamp(); - if (globalTimestamp == null || globalTimestamp <= 0) { - throw new RuntimeException("Got invalid global timestamp: " + globalTimestamp); - } - long startTimestamp = globalTimestamp / 1000_000_000; + long startTimestamp = getSnapshotConnection().getCurrentTimestampS(); LOG.info("Snapshot reading started from timestamp: {}", startTimestamp); readSnapshotRecords(); LOG.info("Snapshot reading finished");