Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-source-connector][oceanbase] use current timestamp when query gts failure #2868

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@

import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/** {@link JdbcConnection} extension to be used with OceanBase server. */
public class OceanBaseConnection extends JdbcConnection {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseConnection.class);

private static final String QUOTED_CHARACTER = "`";
private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
private static final String MYSQL_URL_PATTERN =
Expand Down Expand Up @@ -105,20 +111,42 @@ private static JdbcConnection.ConnectionFactory factory(
formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader);
}

private String getSystemSchema() {
return "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS";
}

/**
* Get current timestamp in nanoseconds from GTS (Global Timestamp Service).
* Get current timestamp number in seconds.
*
* @return the global timestamp.
* @return current timestamp number.
* @throws SQLException If a database access error occurs.
*/
public Long getGlobalTimestamp() throws SQLException {
public long getCurrentTimestampS() throws SQLException {
try {
long globalTimestamp = getGlobalTimestamp();
LOG.info("Global timestamp: {}", globalTimestamp);
return Long.parseLong(String.valueOf(globalTimestamp).substring(0, 10));
} catch (Exception e) {
LOG.warn("Failed to get global timestamp, use local timestamp instead");
}
return getCurrentTimestamp()
.orElseThrow(IllegalStateException::new)
.toInstant()
.getEpochSecond();
}

private long getGlobalTimestamp() throws SQLException {
String schema = "mysql".equalsIgnoreCase(compatibleMode) ? "oceanbase" : "SYS";
return querySingleValue(
connection(),
"SELECT TS_VALUE FROM " + schema + ".V$OB_TIMESTAMP_SERVICE",
ps -> {},
rs -> rs.getLong(1));
}

@Override
public Optional<Timestamp> getCurrentTimestamp() throws SQLException {
return queryAndMap(
String.format("SELECT TS_VALUE FROM %s.V$OB_TIMESTAMP_SERVICE", getSystemSchema()),
rs -> rs.next() ? rs.getLong(1) : null);
"mysql".equalsIgnoreCase(compatibleMode)
? "SELECT CURRENT_TIMESTAMP"
: "SELECT CURRENT_TIMESTAMP FROM DUAL",
rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ public void run(SourceContext<T> ctx) throws Exception {
initTableWhiteList();

if (shouldReadSnapshot()) {
Long globalTimestamp = getSnapshotConnection().getGlobalTimestamp();
if (globalTimestamp == null || globalTimestamp <= 0) {
throw new RuntimeException("Got invalid global timestamp: " + globalTimestamp);
}
long startTimestamp = globalTimestamp / 1000_000_000;
long startTimestamp = getSnapshotConnection().getCurrentTimestampS();
LOG.info("Snapshot reading started from timestamp: {}", startTimestamp);
readSnapshotRecords();
LOG.info("Snapshot reading finished");
Expand Down