distributionFactorLower(double distributionFactorLower) {
+ this.configFactory.distributionFactorLower(distributionFactorLower);
+ return this;
+ }
+
+ /**
+ * Whether to close idle readers at the end of the snapshot phase. This feature depends on
+ * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
+ * greater than or equal to 1.14, and the configuration
+ * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to
+ * true.
+ *
+ * See more
+ * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
+ */
+ public OceanBaseSourceBuilder closeIdleReaders(boolean closeIdleReaders) {
+ this.configFactory.closeIdleReaders(closeIdleReaders);
+ return this;
+ }
+
+ /** Whether the {@link OceanBaseIncrementalSource} should scan the newly added table. */
+ public OceanBaseSourceBuilder scanNewlyAddedTableEnabled(
+ boolean scanNewlyAddedTableEnabled) {
+ this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+ return this;
+ }
+
+ /**
+ * The deserializer used to convert from consumed {@link
+ * org.apache.kafka.connect.source.SourceRecord}.
+ */
+ public OceanBaseSourceBuilder deserializer(DebeziumDeserializationSchema deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public OceanBaseIncrementalSource build() {
+ this.offsetFactory = new LogMessageOffsetFactory();
+ this.dialect = new OceanBaseDialect();
+ return new OceanBaseIncrementalSource<>(
+ configFactory, checkNotNull(deserializer), offsetFactory, dialect);
+ }
+
+ /** The {@link JdbcIncrementalSource} implementation for OceanBase. */
+ public static class OceanBaseIncrementalSource extends JdbcIncrementalSource {
+ public OceanBaseIncrementalSource(
+ OceanBaseSourceConfigFactory configFactory,
+ DebeziumDeserializationSchema deserializationSchema,
+ LogMessageOffsetFactory offsetFactory,
+ OceanBaseDialect dataSourceDialect) {
+ super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
+ }
+
+ public static OceanBaseSourceBuilder builder() {
+ return new OceanBaseSourceBuilder<>();
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseConnectorConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseConnectorConfig.java
index a2b55b60c71..7223775a8a1 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseConnectorConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseConnectorConfig.java
@@ -41,34 +41,69 @@ public class OceanBaseConnectorConfig extends RelationalDatabaseConnectorConfig
Arrays.asList(
"information_schema", "mysql", "oceanbase", "LBACSYS", "ORAAUDITOR"));
- private final String compatibleMode;
- private final String serverTimeZone;
+ private final OceanBaseSourceConfig sourceConfig;
- public OceanBaseConnectorConfig(
- String compatibleMode, String serverTimeZone, Properties properties) {
+ public OceanBaseConnectorConfig(OceanBaseSourceConfig sourceConfig) {
super(
- Configuration.from(properties),
+ Configuration.from(sourceConfig.getDbzProperties()),
LOGICAL_NAME,
Tables.TableFilter.fromPredicate(
tableId ->
- "mysql".equalsIgnoreCase(compatibleMode)
+ "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode())
? !BUILT_IN_DB_NAMES.contains(tableId.catalog())
: !BUILT_IN_DB_NAMES.contains(tableId.schema())),
TableId::identifier,
DEFAULT_SNAPSHOT_FETCH_SIZE,
- "mysql".equalsIgnoreCase(compatibleMode)
+ "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode())
? ColumnFilterMode.CATALOG
: ColumnFilterMode.SCHEMA);
- this.compatibleMode = compatibleMode;
- this.serverTimeZone = serverTimeZone;
+ this.sourceConfig = sourceConfig;
}
- public String getCompatibleMode() {
- return compatibleMode;
+ @Deprecated
+ public OceanBaseConnectorConfig(
+ String compatibleMode,
+ String serverTimeZone,
+ String tenantName,
+ Properties properties) {
+ this(
+ new OceanBaseSourceConfig(
+ compatibleMode,
+ tenantName,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 0,
+ 0,
+ 0,
+ 0,
+ false,
+ false,
+ properties,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ null,
+ 0,
+ serverTimeZone,
+ null,
+ 0,
+ 0,
+ null,
+ false,
+ false));
}
- public String getServerTimeZone() {
- return serverTimeZone;
+ public OceanBaseSourceConfig getSourceConfig() {
+ return sourceConfig;
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfig.java
new file mode 100644
index 00000000000..10980ac4ea2
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.config;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+
+import io.debezium.config.Configuration;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Describes the connection information of the OceanBase database and the configuration information
+ * for performing snapshotting and streaming reading, such as splitSize.
+ */
+public class OceanBaseSourceConfig extends JdbcSourceConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String compatibleMode;
+ private final String tenantName;
+ private final String logProxyHost;
+ private final Integer logProxyPort;
+ private final String rsList;
+ private final String configUrl;
+ private final String workingMode;
+ private final Properties obcdcProperties;
+
+ public OceanBaseSourceConfig(
+ String compatibleMode,
+ String tenantName,
+ String logProxyHost,
+ Integer logProxyPort,
+ String rsList,
+ String configUrl,
+ String workingMode,
+ Properties obcdcProperties,
+ StartupOptions startupOptions,
+ List databaseList,
+ List tableList,
+ int splitSize,
+ int splitMetaGroupSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ boolean includeSchemaChanges,
+ boolean closeIdleReaders,
+ Properties dbzProperties,
+ Configuration dbzConfiguration,
+ String driverClassName,
+ String hostname,
+ int port,
+ String username,
+ String password,
+ int fetchSize,
+ String serverTimeZone,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ String chunkKeyColumn,
+ boolean skipSnapshotBackfill,
+ boolean isScanNewlyAddedTableEnabled) {
+ super(
+ startupOptions,
+ databaseList,
+ null,
+ tableList,
+ splitSize,
+ splitMetaGroupSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ includeSchemaChanges,
+ closeIdleReaders,
+ dbzProperties,
+ dbzConfiguration,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ chunkKeyColumn,
+ skipSnapshotBackfill,
+ isScanNewlyAddedTableEnabled);
+ this.compatibleMode = compatibleMode;
+ this.tenantName = tenantName;
+ this.logProxyHost = logProxyHost;
+ this.logProxyPort = logProxyPort;
+ this.rsList = rsList;
+ this.configUrl = configUrl;
+ this.workingMode = workingMode;
+ this.obcdcProperties = obcdcProperties;
+ }
+
+ public String getCompatibleMode() {
+ return compatibleMode;
+ }
+
+ public String getTenantName() {
+ return tenantName;
+ }
+
+ public String getLogProxyHost() {
+ return logProxyHost;
+ }
+
+ public Integer getLogProxyPort() {
+ return logProxyPort;
+ }
+
+ public String getRsList() {
+ return rsList;
+ }
+
+ public String getConfigUrl() {
+ return configUrl;
+ }
+
+ public String getWorkingMode() {
+ return workingMode;
+ }
+
+ public Properties getObcdcProperties() {
+ return obcdcProperties;
+ }
+
+ @Override
+ public OceanBaseConnectorConfig getDbzConnectorConfig() {
+ return new OceanBaseConnectorConfig(this);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfigFactory.java
new file mode 100644
index 00000000000..db2f6d181d5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/config/OceanBaseSourceConfigFactory.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.config;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory;
+
+import io.debezium.config.Configuration;
+
+import java.util.Properties;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
+import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
+
+/** A factory to initialize {@link OceanBaseSourceConfig}. */
+@SuppressWarnings("UnusedReturnValue")
+public class OceanBaseSourceConfigFactory extends JdbcSourceConfigFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private String compatibleMode;
+ private String driverClassName;
+ private String tenantName;
+ private String logProxyHost;
+ private Integer logProxyPort;
+ private String rsList;
+ private String configUrl;
+ private String workingMode;
+ private Properties obcdcProperties;
+
+ public JdbcSourceConfigFactory compatibleMode(String compatibleMode) {
+ this.compatibleMode = compatibleMode;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory driverClassName(String driverClassName) {
+ this.driverClassName = driverClassName;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory tenantName(String tenantName) {
+ this.tenantName = tenantName;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory logProxyHost(String logProxyHost) {
+ this.logProxyHost = logProxyHost;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory logProxyPort(int logProxyPort) {
+ this.logProxyPort = logProxyPort;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory rsList(String rsList) {
+ this.rsList = rsList;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory configUrl(String configUrl) {
+ this.configUrl = configUrl;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory workingMode(String workingMode) {
+ this.workingMode = workingMode;
+ return this;
+ }
+
+ public JdbcSourceConfigFactory obcdcProperties(Properties obcdcProperties) {
+ this.obcdcProperties = obcdcProperties;
+ return this;
+ }
+
+ @Override
+ public OceanBaseSourceConfig create(int subtaskId) {
+ checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
+ Properties props = new Properties();
+ props.setProperty("database.server.name", "oceanbase_cdc");
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.dbname", checkNotNull(databaseList.get(0)));
+ props.setProperty("database.connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));
+
+ // table filter
+ props.put("database.include.list", String.join(",", databaseList));
+ if (tableList != null) {
+ props.put("table.include.list", String.join(",", tableList));
+ }
+ // value converter
+ props.put("decimal.handling.mode", "precise");
+ props.put("time.precision.mode", "adaptive_time_microseconds");
+ props.put("binary.handling.mode", "bytes");
+
+ if (dbzProperties != null) {
+ props.putAll(dbzProperties);
+ }
+
+ Configuration dbzConfiguration = Configuration.from(props);
+ return new OceanBaseSourceConfig(
+ compatibleMode,
+ tenantName,
+ logProxyHost,
+ logProxyPort,
+ rsList,
+ configUrl,
+ workingMode,
+ obcdcProperties,
+ startupOptions,
+ databaseList,
+ tableList,
+ splitSize,
+ splitMetaGroupSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ includeSchemaChanges,
+ closeIdleReaders,
+ props,
+ dbzConfiguration,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ chunkKeyColumn,
+ true,
+ scanNewlyAddedTableEnabled);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
index 49ad532a13b..d4f94180a54 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
@@ -18,11 +18,11 @@
package org.apache.flink.cdc.connectors.oceanbase.source.connection;
import org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
@@ -43,8 +43,8 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/** {@link JdbcConnection} extension to be used with OceanBase server. */
public class OceanBaseConnection extends JdbcConnection {
@@ -123,6 +123,16 @@ private static Properties initializeDefaultJdbcProperties() {
return defaultJdbcProperties;
}
+ public OceanBaseConnection(
+ JdbcConfiguration config,
+ ConnectionFactory connectionFactory,
+ String compatibleMode,
+ String openingQuoteCharacter,
+ String closingQuoteCharacter) {
+ super(config, connectionFactory, openingQuoteCharacter, closingQuoteCharacter);
+ this.compatibleMode = compatibleMode;
+ }
+
private static char getQuote(String compatibleMode) {
return "mysql".equalsIgnoreCase(compatibleMode) ? '`' : '"';
}
@@ -174,54 +184,48 @@ public Optional getCurrentTimestamp() throws SQLException {
* @throws SQLException If a database access error occurs.
*/
public List getTables(String dbPattern, String tbPattern) throws SQLException {
- List result = new ArrayList<>();
+ return listTables(
+ db -> Pattern.matches(dbPattern, db),
+ tableId -> Pattern.matches(tbPattern, tableId.table()));
+ }
+
+ public List listTables(RelationalTableFilters tableFilters) throws SQLException {
+ return listTables(tableFilters.databaseFilter(), tableFilters.dataCollectionFilter());
+ }
+
+ private List listTables(
+ Predicate databaseFilter, Tables.TableFilter tableFilter) throws SQLException {
+ List tableIds = new ArrayList<>();
DatabaseMetaData metaData = connection().getMetaData();
- switch (compatibleMode.toLowerCase()) {
- case "mysql":
- List dbNames = getResultList(metaData.getCatalogs(), "TABLE_CAT");
- dbNames =
- dbNames.stream()
- .filter(dbName -> Pattern.matches(dbPattern, dbName))
- .collect(Collectors.toList());
- for (String dbName : dbNames) {
- List tableNames =
- getResultList(
- metaData.getTables(dbName, null, null, supportedTableTypes()),
- "TABLE_NAME");
- tableNames.stream()
- .filter(tbName -> Pattern.matches(tbPattern, tbName))
- .forEach(tbName -> result.add(new TableId(dbName, null, tbName)));
- }
- break;
- case "oracle":
- List schemaNames = getResultList(metaData.getSchemas(), "TABLE_SCHEM");
- schemaNames =
- schemaNames.stream()
- .filter(schemaName -> Pattern.matches(dbPattern, schemaName))
- .collect(Collectors.toList());
- for (String schemaName : schemaNames) {
- List tableNames =
- getResultList(
- metaData.getTables(
- null, schemaName, null, supportedTableTypes()),
- "TABLE_NAME");
- tableNames.stream()
- .filter(tbName -> Pattern.matches(tbPattern, tbName))
- .forEach(tbName -> result.add(new TableId(null, schemaName, tbName)));
+ ResultSet rs = metaData.getCatalogs();
+ List dbList = new ArrayList<>();
+ boolean isMySql = "mysql".equalsIgnoreCase(compatibleMode);
+ while (rs.next()) {
+ String db = rs.getString(isMySql ? "TABLE_CAT" : "TABLE_SCHEM");
+ if (databaseFilter.test(db)) {
+ dbList.add(db);
+ }
+ }
+ for (String db : dbList) {
+ String catalog = isMySql ? db : null;
+ String schema = isMySql ? null : db;
+ rs = metaData.getTables(catalog, schema, null, supportedTableTypes());
+ while (rs.next()) {
+ TableId tableId = new TableId(catalog, schema, rs.getString("TABLE_NAME"));
+ if (tableFilter.isIncluded(tableId)) {
+ tableIds.add(tableId);
}
- break;
- default:
- throw new FlinkRuntimeException("Unsupported compatible mode: " + compatibleMode);
+ }
}
- return result;
+ return tableIds;
}
- private List getResultList(ResultSet resultSet, String columnName) throws SQLException {
- List result = new ArrayList<>();
- while (resultSet.next()) {
- result.add(resultSet.getString(columnName));
- }
- return result;
+ public String readSystemVariable(String variable) throws SQLException {
+ return querySingleValue(
+ connection(),
+ "SHOW VARIABLES LIKE ?",
+ ps -> ps.setString(1, variable),
+ rs -> rs.getString("VALUE"));
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnectionPoolFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnectionPoolFactory.java
new file mode 100644
index 00000000000..cb72d602b5d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnectionPoolFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.connection;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
+import org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils;
+
+/** The OceanBase datasource factory. */
+public class OceanBaseConnectionPoolFactory extends JdbcConnectionPoolFactory {
+
+ private static final String MYSQL_URL_PATTERN =
+ "jdbc:mysql://%s:%s/?useUnicode=true&useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&characterSetResults=UTF-8";
+ private static final String OB_URL_PATTERN =
+ "jdbc:oceanbase://%s:%s/?useUnicode=true&useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&characterSetResults=UTF-8";
+
+ @Override
+ public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+ String driver = sourceConfig.getDriverClassName();
+ if (OceanBaseUtils.isOceanBaseDriver(driver)) {
+ return String.format(OB_URL_PATTERN, hostName, port);
+ }
+ return String.format(MYSQL_URL_PATTERN, hostName, port);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java
index bf2a125db78..74a4cbe74ea 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/converter/OceanBaseValueConverters.java
@@ -88,8 +88,8 @@ public OceanBaseValueConverters(OceanBaseConnectorConfig connectorConfig) {
x -> x,
BigIntUnsignedMode.PRECISE,
connectorConfig.binaryHandlingMode());
- this.compatibleMode = connectorConfig.getCompatibleMode();
- this.serverTimezone = connectorConfig.getServerTimeZone();
+ this.compatibleMode = connectorConfig.getSourceConfig().getCompatibleMode();
+ this.serverTimezone = connectorConfig.getSourceConfig().getServerTimeZone();
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffset.java
new file mode 100644
index 00000000000..eab22c999bf
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffset.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.offset;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+
+import com.oceanbase.oms.logmessage.DataMessage;
+import com.oceanbase.oms.logmessage.LogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A structure describes a fine-grained offset of {@link LogMessage}. */
+public class LogMessageOffset extends Offset {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogMessageOffset.class);
+
+ private static final String TIMESTAMP_KEY = "timestamp";
+ private static final String COMMIT_VERSION_KEY = "commit_version";
+ private static final String EVENTS_TO_SKIP_KEY = "events";
+
+ public static final LogMessageOffset INITIAL_OFFSET = LogMessageOffset.from(Long.MIN_VALUE);
+ public static final LogMessageOffset NO_STOPPING_OFFSET = LogMessageOffset.from(Long.MAX_VALUE);
+
+ public LogMessageOffset(Map offset) {
+ Map offsetMap = new HashMap<>();
+ for (Map.Entry entry : offset.entrySet()) {
+ offsetMap.put(
+ entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
+ }
+ this.offset = offsetMap;
+ }
+
+ public LogMessageOffset(@Nonnull String timestamp, String commitVersion, long eventsToSkip) {
+ Map offsetMap = new HashMap<>();
+ offsetMap.put(TIMESTAMP_KEY, getTimestampString(timestamp));
+ if (commitVersion != null) {
+ offsetMap.put(COMMIT_VERSION_KEY, commitVersion);
+ }
+ offsetMap.put(EVENTS_TO_SKIP_KEY, String.valueOf(eventsToSkip));
+ this.offset = offsetMap;
+ }
+
+ public static LogMessageOffset from(long timestamp) {
+ return new LogMessageOffset(getTimestampString(timestamp), null, 0);
+ }
+
+ public static LogMessageOffset from(LogMessage message) {
+ DataMessage.Record.Type type = message.getOpt();
+ if (type == DataMessage.Record.Type.BEGIN
+ || type == DataMessage.Record.Type.DDL
+ || type == DataMessage.Record.Type.HEARTBEAT) {
+ return new LogMessageOffset(message.getTimestamp(), getCommitVersion(message), 0);
+ }
+ throw new IllegalArgumentException("Can't get offset from LogMessage type: " + type);
+ }
+
+ private static String getTimestampString(Object timestamp) {
+ if (timestamp == null) {
+ return null;
+ }
+ if (timestamp instanceof Long) {
+ return Long.toString((Long) timestamp).substring(0, 10);
+ }
+ return timestamp.toString().substring(0, 10);
+ }
+
+ private static String getCommitVersion(LogMessage message) {
+ try {
+ String microseconds = message.getTimestampUsec();
+ return getTimestampString(message) + microseconds;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get microseconds from LogMessage", e);
+ }
+ }
+
+ public String getTimestamp() {
+ return offset.get(TIMESTAMP_KEY);
+ }
+
+ public String getCommitVersion() {
+ return offset.get(COMMIT_VERSION_KEY);
+ }
+
+ public long getEventsToSkip() {
+ return longOffsetValue(offset, EVENTS_TO_SKIP_KEY);
+ }
+
+ @Override
+ public int compareTo(@Nonnull Offset offset) {
+ LogMessageOffset that = (LogMessageOffset) offset;
+
+ int flag;
+ flag = compareLong(getTimestamp(), that.getTimestamp());
+ if (flag != 0) {
+ return flag;
+ }
+ flag = compareLong(getCommitVersion(), that.getCommitVersion());
+ if (flag != 0) {
+ return flag;
+ }
+ return Long.compare(getEventsToSkip(), that.getEventsToSkip());
+ }
+
+ private int compareLong(String a, String b) {
+ if (a == null && b == null) {
+ return 0;
+ }
+ if (a == null) {
+ return -1;
+ }
+ if (b == null) {
+ return 1;
+ }
+ return Long.compare(Long.parseLong(a), Long.parseLong(b));
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffsetFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffsetFactory.java
new file mode 100644
index 00000000000..562b4ae53ed
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/LogMessageOffsetFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.offset;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+
+import java.util.Map;
+
+/** A change stream offset factory class that creates {@link LogMessageOffset} instances. */
+public class LogMessageOffsetFactory extends OffsetFactory {
+
+ @Override
+ public LogMessageOffset newOffset(Map offset) {
+ return new LogMessageOffset(offset);
+ }
+
+ @Override
+ public Offset newOffset(String filename, Long position) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Offset newOffset(Long position) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogMessageOffset createTimestampOffset(long timestampMillis) {
+ return LogMessageOffset.from(timestampMillis);
+ }
+
+ @Override
+ public LogMessageOffset createInitialOffset() {
+ return LogMessageOffset.INITIAL_OFFSET;
+ }
+
+ @Override
+ public LogMessageOffset createNoStoppingOffset() {
+ return LogMessageOffset.NO_STOPPING_OFFSET;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseOffsetContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseOffsetContext.java
new file mode 100644
index 00000000000..e0f76dbaa36
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseOffsetContext.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.offset;
+
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
+import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.pipeline.txmetadata.TransactionContext;
+import io.debezium.relational.TableId;
+import io.debezium.schema.DataCollectionId;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+/** OceanBase offset context. */
+public class OceanBaseOffsetContext implements OffsetContext {
+
+ private final Schema sourceInfoSchema;
+ private final OceanBaseSourceInfo sourceInfo;
+ private boolean snapshotCompleted;
+ private final TransactionContext transactionContext;
+ private final IncrementalSnapshotContext incrementalSnapshotContext;
+
+ private String timestamp;
+ private String commitVersion;
+ private long eventsToSkip;
+
+ public OceanBaseOffsetContext(
+ boolean snapshot,
+ boolean snapshotCompleted,
+ TransactionContext transactionContext,
+ IncrementalSnapshotContext incrementalSnapshotContext,
+ OceanBaseSourceInfo sourceInfo) {
+ this.sourceInfoSchema = sourceInfo.schema();
+ this.sourceInfo = sourceInfo;
+ this.transactionContext = transactionContext;
+ this.incrementalSnapshotContext = incrementalSnapshotContext;
+
+ this.snapshotCompleted = snapshotCompleted;
+ if (this.snapshotCompleted) {
+ postSnapshotCompletion();
+ } else {
+ sourceInfo.setSnapshot(snapshot ? SnapshotRecord.TRUE : SnapshotRecord.FALSE);
+ }
+ }
+
+ @Override
+ public Map getOffset() {
+ Map offset = new HashMap<>();
+ offset.put("timestamp", timestamp);
+ if (sourceInfo.isSnapshot()) {
+ offset.put("snapshot", true);
+ offset.put("snapshot_completed", snapshotCompleted);
+ return offset;
+ } else {
+ if (commitVersion != null) {
+ offset.put("commit_version", commitVersion);
+ }
+ if (eventsToSkip != 0) {
+ offset.put("events", eventsToSkip);
+ }
+ return incrementalSnapshotContext.store(transactionContext.store(offset));
+ }
+ }
+
+ @Override
+ public Schema getSourceInfoSchema() {
+ return sourceInfoSchema;
+ }
+
+ @Override
+ public Struct getSourceInfo() {
+ return sourceInfo.struct();
+ }
+
+ @Override
+ public boolean isSnapshotRunning() {
+ return sourceInfo.isSnapshot() && !snapshotCompleted;
+ }
+
+ @Override
+ public void preSnapshotStart() {
+ sourceInfo.setSnapshot(SnapshotRecord.TRUE);
+ snapshotCompleted = false;
+ }
+
+ @Override
+ public void preSnapshotCompletion() {
+ snapshotCompleted = true;
+ }
+
+ @Override
+ public void postSnapshotCompletion() {
+ sourceInfo.setSnapshot(SnapshotRecord.FALSE);
+ }
+
+ public static OceanBaseOffsetContext initial(OceanBaseConnectorConfig config) {
+ return new OceanBaseOffsetContext(
+ false,
+ false,
+ new TransactionContext(),
+ new SignalBasedIncrementalSnapshotContext<>(),
+ new OceanBaseSourceInfo(config));
+ }
+
+ @Override
+ public void markLastSnapshotRecord() {
+ sourceInfo.setSnapshot(SnapshotRecord.LAST);
+ }
+
+ @Override
+ public void event(DataCollectionId tableId, Instant timestamp) {
+ this.sourceInfo.setSourceTime(timestamp);
+ this.sourceInfo.tableEvent((TableId) tableId);
+ this.eventsToSkip++;
+ }
+
+ @Override
+ public TransactionContext getTransactionContext() {
+ return transactionContext;
+ }
+
+ @Override
+ public void incrementalSnapshotEvents() {
+ sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
+ }
+
+ @Override
+ public IncrementalSnapshotContext> getIncrementalSnapshotContext() {
+ return incrementalSnapshotContext;
+ }
+
+ public void setCheckpoint(String timestamp, String commitVersion, long eventsToSkip) {
+ this.timestamp = timestamp;
+ this.commitVersion = commitVersion;
+ this.eventsToSkip = eventsToSkip;
+ }
+
+ public void setCommitVersion(String timestamp, String commitVersion) {
+ this.setCheckpoint(timestamp, commitVersion, 0);
+ }
+
+ public String getCommitVersion() {
+ return commitVersion;
+ }
+
+ public void beginTransaction(String transactionId) {
+ sourceInfo.beginTransaction(transactionId);
+ eventsToSkip = 0;
+ }
+
+ public void commitTransaction() {
+ sourceInfo.commitTransaction();
+ }
+
+ /** The OceanBase offset context loader. */
+ public static class Loader implements OffsetContext.Loader {
+
+ private final OceanBaseConnectorConfig connectorConfig;
+
+ public Loader(OceanBaseConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ }
+
+ @Override
+ public OceanBaseOffsetContext load(Map offset) {
+ boolean snapshot = isTrue(offset.get("snapshot"));
+ boolean snapshotCompleted = isTrue(offset.get("snapshot_completed"));
+
+ IncrementalSnapshotContext incrementalSnapshotContext =
+ SignalBasedIncrementalSnapshotContext.load(offset);
+
+ final OceanBaseOffsetContext offsetContext =
+ new OceanBaseOffsetContext(
+ snapshot,
+ snapshotCompleted,
+ TransactionContext.load(offset),
+ incrementalSnapshotContext,
+ new OceanBaseSourceInfo(connectorConfig));
+
+ String timestamp = (String) offset.get("timestamp");
+ String commitVersion = (String) offset.get("commit_version");
+ Long events = (Long) offset.get("events");
+ offsetContext.setCheckpoint(timestamp, commitVersion, events == null ? 0 : events);
+ return offsetContext;
+ }
+
+ private boolean isTrue(Object obj) {
+ return obj != null && Boolean.parseBoolean(obj.toString());
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBasePartition.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBasePartition.java
new file mode 100644
index 00000000000..aa7b6da9d9a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBasePartition.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.offset;
+
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+
+import io.debezium.pipeline.spi.Partition;
+import io.debezium.util.Collect;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** OceanBase partition. */
+public class OceanBasePartition implements Partition {
+
+ private static final String SERVER_PARTITION_KEY = "server";
+
+ private final String serverName;
+
+ public OceanBasePartition(String serverName) {
+ this.serverName = serverName;
+ }
+
+ @Override
+ public Map getSourcePartition() {
+ return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final OceanBasePartition other = (OceanBasePartition) obj;
+ return Objects.equals(serverName, other.serverName);
+ }
+
+ @Override
+ public int hashCode() {
+ return serverName.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "OceanBasePartition [sourcePartition=" + getSourcePartition() + "]";
+ }
+
+ static class Provider implements Partition.Provider {
+ private final OceanBaseConnectorConfig connectorConfig;
+
+ Provider(OceanBaseConnectorConfig connectorConfig) {
+ this.connectorConfig = connectorConfig;
+ }
+
+ @Override
+ public Set getPartitions() {
+ return Collections.singleton(new OceanBasePartition(connectorConfig.getLogicalName()));
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseSourceInfo.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseSourceInfo.java
index 1319b1a22c8..0c6ed53864b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseSourceInfo.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/offset/OceanBaseSourceInfo.java
@@ -34,19 +34,19 @@ public class OceanBaseSourceInfo extends BaseSourceInfo {
public static final String TENANT_KEY = "tenant";
public static final String TRANSACTION_ID_KEY = "transaction_id";
- private final String tenant;
+ private final OceanBaseConnectorConfig config;
private Instant sourceTime;
private Set tableIds;
private String transactionId;
- public OceanBaseSourceInfo(OceanBaseConnectorConfig config, String tenant) {
+ public OceanBaseSourceInfo(OceanBaseConnectorConfig config) {
super(config);
- this.tenant = tenant;
+ this.config = config;
}
public String tenant() {
- return tenant;
+ return config.getSourceConfig().getTenantName();
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageEmitter.java
new file mode 100644
index 00000000000..f57f5db566d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageEmitter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBasePartition;
+
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.RelationalChangeRecordEmitter;
+import io.debezium.util.Clock;
+
+import java.io.Serializable;
+
+/** A {@link RelationalChangeRecordEmitter} to emit log message data. */
+public class LogMessageEmitter extends RelationalChangeRecordEmitter {
+
+ private final Envelope.Operation operation;
+ private final Object[] before;
+ private final Object[] after;
+
+ public LogMessageEmitter(
+ OceanBasePartition partition,
+ OffsetContext offset,
+ Clock clock,
+ Envelope.Operation operation,
+ Serializable[] before,
+ Serializable[] after) {
+ super(partition, offset, clock);
+ this.operation = operation;
+ this.before = before;
+ this.after = after;
+ }
+
+ @Override
+ protected Object[] getOldColumnValues() {
+ return before;
+ }
+
+ @Override
+ protected Object[] getNewColumnValues() {
+ return after;
+ }
+
+ @Override
+ public Envelope.Operation getOperation() {
+ return operation;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageSource.java
new file mode 100644
index 00000000000..bdeadb741af
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/LogMessageSource.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseSourceConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.LogMessageOffset;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseOffsetContext;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBasePartition;
+import org.apache.flink.util.function.SerializableFunction;
+
+import com.oceanbase.clogproxy.client.LogProxyClient;
+import com.oceanbase.clogproxy.client.config.ClientConf;
+import com.oceanbase.clogproxy.client.config.ObReaderConfig;
+import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
+import com.oceanbase.clogproxy.client.listener.RecordListener;
+import com.oceanbase.oms.logmessage.DataMessage;
+import com.oceanbase.oms.logmessage.LogMessage;
+import io.debezium.data.Envelope;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
+import io.debezium.relational.TableId;
+import io.debezium.relational.TableSchema;
+import io.debezium.util.Clock;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** A change event source that emits events from commit log of OceanBase. */
+public class LogMessageSource
+ implements StreamingChangeEventSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogMessageSource.class);
+
+ private final StreamSplit split;
+ private final OceanBaseConnectorConfig connectorConfig;
+ private final JdbcSourceEventDispatcher eventDispatcher;
+ private final ErrorHandler errorHandler;
+ private final OceanBaseTaskContext taskContext;
+ private final SerializableFunction tableIdProvider;
+ private final Map> fieldIndexMap = new HashMap<>();
+
+ private final LogProxyClient client;
+
+ public LogMessageSource(
+ OceanBaseConnectorConfig connectorConfig,
+ JdbcSourceEventDispatcher eventDispatcher,
+ ErrorHandler errorHandler,
+ OceanBaseTaskContext taskContext,
+ StreamSplit split) {
+ this.connectorConfig = connectorConfig;
+ this.eventDispatcher = eventDispatcher;
+ this.errorHandler = errorHandler;
+ this.taskContext = taskContext;
+ this.split = split;
+ this.tableIdProvider = message -> getTableId(connectorConfig, message);
+ this.client = createClient(connectorConfig, split);
+ }
+
+ private static TableId getTableId(OceanBaseConnectorConfig config, LogMessage message) {
+ if (StringUtils.isBlank(message.getDbName())
+ || StringUtils.isBlank(message.getTableName())) {
+ return null;
+ }
+ String dbName =
+ message.getDbName().replace(config.getSourceConfig().getTenantName() + ".", "");
+ if (StringUtils.isBlank(dbName)) {
+ return null;
+ }
+ return "mysql".equalsIgnoreCase(config.getSourceConfig().getCompatibleMode())
+ ? new TableId(dbName, null, message.getTableName())
+ : new TableId(null, dbName, message.getTableName());
+ }
+
+ private static LogProxyClient createClient(OceanBaseConnectorConfig config, StreamSplit split) {
+ OceanBaseSourceConfig sourceConfig = config.getSourceConfig();
+ ObReaderConfig obReaderConfig = new ObReaderConfig();
+ if (StringUtils.isNotEmpty(sourceConfig.getRsList())) {
+ obReaderConfig.setRsList(sourceConfig.getRsList());
+ }
+ if (StringUtils.isNotEmpty(sourceConfig.getConfigUrl())) {
+ obReaderConfig.setClusterUrl(sourceConfig.getConfigUrl());
+ }
+ if (StringUtils.isNotEmpty(sourceConfig.getWorkingMode())) {
+ obReaderConfig.setWorkingMode(sourceConfig.getWorkingMode());
+ }
+ obReaderConfig.setUsername(sourceConfig.getUsername());
+ obReaderConfig.setPassword(sourceConfig.getPassword());
+ obReaderConfig.setStartTimestamp(
+ Long.parseLong(((LogMessageOffset) split.getStartingOffset()).getTimestamp()));
+ obReaderConfig.setTimezone(sourceConfig.getServerTimeZone());
+
+ if (sourceConfig.getObcdcProperties() != null
+ && !sourceConfig.getObcdcProperties().isEmpty()) {
+ Map extraConfigs = new HashMap<>();
+ sourceConfig
+ .getObcdcProperties()
+ .forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
+ obReaderConfig.setExtraConfigs(extraConfigs);
+ }
+
+ ClientConf clientConf =
+ ClientConf.builder()
+ .maxReconnectTimes(0)
+ .connectTimeoutMs((int) sourceConfig.getConnectTimeout().toMillis())
+ .build();
+
+ return new LogProxyClient(
+ sourceConfig.getLogProxyHost(),
+ sourceConfig.getLogProxyPort(),
+ obReaderConfig,
+ clientConf);
+ }
+
+ @Override
+ public void execute(
+ ChangeEventSourceContext changeEventSourceContext,
+ OceanBasePartition partition,
+ OceanBaseOffsetContext offsetContext) {
+ if (connectorConfig.getSourceConfig().getStartupOptions().isSnapshotOnly()) {
+ LOG.info("Streaming is not enabled in current configuration");
+ return;
+ }
+ this.taskContext.getSchema().assureNonEmptySchema();
+ OceanBaseOffsetContext effectiveOffsetContext =
+ offsetContext != null
+ ? offsetContext
+ : OceanBaseOffsetContext.initial(this.connectorConfig);
+
+ Set skippedOperations = this.connectorConfig.getSkippedOperations();
+ EnumMap> eventHandlers =
+ new EnumMap<>(DataMessage.Record.Type.class);
+
+ eventHandlers.put(
+ DataMessage.Record.Type.HEARTBEAT,
+ message -> LOG.trace("HEARTBEAT message: {}", message));
+ eventHandlers.put(
+ DataMessage.Record.Type.DDL, message -> LOG.trace("DDL message: {}", message));
+
+ eventHandlers.put(
+ DataMessage.Record.Type.BEGIN,
+ message -> handleTransactionBegin(partition, effectiveOffsetContext, message));
+ eventHandlers.put(
+ DataMessage.Record.Type.COMMIT,
+ message -> handleTransactionCompletion(partition, effectiveOffsetContext, message));
+
+ if (!skippedOperations.contains(Envelope.Operation.CREATE)) {
+ eventHandlers.put(
+ DataMessage.Record.Type.INSERT,
+ message ->
+ handleChange(
+ partition,
+ effectiveOffsetContext,
+ Envelope.Operation.CREATE,
+ message));
+ }
+ if (!skippedOperations.contains(Envelope.Operation.UPDATE)) {
+ eventHandlers.put(
+ DataMessage.Record.Type.UPDATE,
+ message ->
+ handleChange(
+ partition,
+ effectiveOffsetContext,
+ Envelope.Operation.UPDATE,
+ message));
+ }
+ if (!skippedOperations.contains(Envelope.Operation.DELETE)) {
+ eventHandlers.put(
+ DataMessage.Record.Type.DELETE,
+ message ->
+ handleChange(
+ partition,
+ effectiveOffsetContext,
+ Envelope.Operation.DELETE,
+ message));
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ client.addListener(
+ new RecordListener() {
+
+ private volatile boolean started = false;
+
+ @Override
+ public void notify(LogMessage message) {
+ if (!changeEventSourceContext.isRunning()) {
+ client.stop();
+ return;
+ }
+
+ if (!started) {
+ started = true;
+ latch.countDown();
+ }
+
+ LogMessageOffset currentOffset =
+ new LogMessageOffset(effectiveOffsetContext.getOffset());
+ if (currentOffset.isBefore(split.getStartingOffset())) {
+ return;
+ }
+ if (!LogMessageOffset.NO_STOPPING_OFFSET.equals(split.getEndingOffset())
+ && currentOffset.isAtOrAfter(split.getEndingOffset())) {
+ try {
+ eventDispatcher.dispatchWatermarkEvent(
+ partition.getSourcePartition(),
+ split,
+ currentOffset,
+ WatermarkKind.END);
+ } catch (InterruptedException e) {
+ LOG.error("Send signal event error.", e);
+ errorHandler.setProducerThrowable(
+ new RuntimeException(
+ "Error processing log signal event", e));
+ }
+ ((StoppableChangeEventSourceContext) changeEventSourceContext)
+ .stopChangeEventSource();
+ client.stop();
+ return;
+ }
+
+ try {
+ eventHandlers
+ .getOrDefault(
+ message.getOpt(),
+ msg -> LOG.trace("Skip log message {}", msg))
+ .accept(message);
+ } catch (Throwable throwable) {
+ errorHandler.setProducerThrowable(
+ new RuntimeException("Error handling log message", throwable));
+ }
+ }
+
+ @Override
+ public void onException(LogProxyClientException e) {
+ LOG.error(e.getMessage());
+ }
+ });
+
+ try {
+ client.start();
+ client.join();
+ } finally {
+ client.stop();
+ }
+ }
+
+ private void handleTransactionBegin(
+ OceanBasePartition partition, OceanBaseOffsetContext offsetContext, LogMessage message)
+ throws InterruptedException {
+ LogMessageOffset offset = LogMessageOffset.from(message);
+ String transactionId = message.getOB10UniqueId();
+ offsetContext.beginTransaction(transactionId);
+
+ if (!offset.getCommitVersion().equals(offsetContext.getCommitVersion())) {
+ offsetContext.setCommitVersion(offset.getTimestamp(), offset.getCommitVersion());
+ }
+ eventDispatcher.dispatchTransactionStartedEvent(partition, transactionId, offsetContext);
+ }
+
+ private void handleTransactionCompletion(
+ OceanBasePartition partition, OceanBaseOffsetContext offsetContext, LogMessage message)
+ throws InterruptedException {
+ offsetContext.commitTransaction();
+ eventDispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
+ }
+
+ private void handleChange(
+ OceanBasePartition partition,
+ OceanBaseOffsetContext offsetContext,
+ Envelope.Operation operation,
+ LogMessage message)
+ throws InterruptedException {
+ final TableId tableId = tableIdProvider.apply(message);
+ if (tableId == null) {
+ LOG.warn("No valid tableId found, skipping log message: {}", message);
+ return;
+ }
+
+ TableSchema tableSchema = taskContext.getSchema().schemaFor(tableId);
+ if (tableSchema == null) {
+ LOG.warn("No table schema found, skipping log message: {}", message);
+ return;
+ }
+
+ Map fieldIndex =
+ fieldIndexMap.computeIfAbsent(
+ tableSchema,
+ schema ->
+ IntStream.range(0, schema.valueSchema().fields().size())
+ .boxed()
+ .collect(
+ Collectors.toMap(
+ i ->
+ schema.valueSchema()
+ .fields()
+ .get(i)
+ .name(),
+ i -> i)));
+
+ Serializable[] before = null;
+ Serializable[] after = null;
+ for (DataMessage.Record.Field field : message.getFieldList()) {
+ if (field.isPrev()) {
+ if (before == null) {
+ before = new Serializable[fieldIndex.size()];
+ }
+ before[fieldIndex.get(field.getFieldname())] = (Serializable) getFieldValue(field);
+ } else {
+ if (after == null) {
+ after = new Serializable[fieldIndex.size()];
+ }
+ after[fieldIndex.get(field.getFieldname())] = (Serializable) getFieldValue(field);
+ }
+ }
+
+ offsetContext.event(tableId, Instant.ofEpochSecond(Long.parseLong(message.getTimestamp())));
+ eventDispatcher.dispatchDataChangeEvent(
+ partition,
+ tableId,
+ new LogMessageEmitter(
+ partition, offsetContext, Clock.SYSTEM, operation, before, after));
+ }
+
+ private Object getFieldValue(DataMessage.Record.Field field) {
+ if (field.getValue() == null) {
+ return null;
+ }
+ String encoding = field.getEncoding();
+ if ("binary".equalsIgnoreCase(encoding)) {
+ return field.getValue().getBytes();
+ }
+ return field.getValue().toString(encoding);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseScanFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseScanFetchTask.java
new file mode 100644
index 00000000000..fe6846a3d1b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseScanFetchTask.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnection;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseOffsetContext;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBasePartition;
+import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseDatabaseSchema;
+
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.pipeline.spi.ChangeRecordEmitter;
+import io.debezium.pipeline.spi.SnapshotResult;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.SnapshotChangeRecordEmitter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.ColumnUtils;
+import io.debezium.util.Strings;
+import io.debezium.util.Threads;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import static org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils.buildSplitScanQuery;
+import static org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils.readTableSplitDataStatement;
+
+/** The task to work for fetching data of OceanBase table snapshot split. */
+public class OceanBaseScanFetchTask extends AbstractScanFetchTask {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OceanBaseScanFetchTask.class);
+
+ public OceanBaseScanFetchTask(SnapshotSplit snapshotSplit) {
+ super(snapshotSplit);
+ }
+
+ @Override
+ protected void executeDataSnapshot(Context context) throws Exception {
+ OceanBaseSourceFetchTaskContext sourceFetchContext =
+ (OceanBaseSourceFetchTaskContext) context;
+ OceanBaseSnapshotSplitReadTask snapshotSplitReadTask =
+ new OceanBaseSnapshotSplitReadTask(
+ sourceFetchContext.getDbzConnectorConfig(),
+ sourceFetchContext.getOffsetContext(),
+ sourceFetchContext.getDatabaseSchema(),
+ sourceFetchContext.getConnection(),
+ sourceFetchContext.getEventDispatcher(),
+ snapshotSplit);
+ StoppableChangeEventSourceContext changeEventSourceContext =
+ new StoppableChangeEventSourceContext();
+ SnapshotResult snapshotResult =
+ snapshotSplitReadTask.execute(
+ changeEventSourceContext,
+ sourceFetchContext.getPartition(),
+ sourceFetchContext.getOffsetContext());
+ if (!snapshotResult.isCompletedOrSkipped()) {
+ taskRunning = false;
+ throw new IllegalStateException(
+ String.format("Read snapshot for OceanBase split %s fail", snapshotResult));
+ }
+ }
+
+ @Override
+ protected void executeBackfillTask(Context context, StreamSplit backfillStreamSplit)
+ throws Exception {
+ // TODO add back fill task
+ }
+
+ /** A wrapped task to fetch snapshot split of table. */
+ public static class OceanBaseSnapshotSplitReadTask
+ extends AbstractSnapshotChangeEventSource {
+
+ private final OceanBaseConnectorConfig connectorConfig;
+ private final OceanBaseDatabaseSchema databaseSchema;
+ private final OceanBaseConnection jdbcConnection;
+ private final JdbcSourceEventDispatcher dispatcher;
+ private final Clock clock;
+ private final SnapshotSplit snapshotSplit;
+ private final OceanBaseOffsetContext offsetContext;
+
+ public OceanBaseSnapshotSplitReadTask(
+ OceanBaseConnectorConfig connectorConfig,
+ OceanBaseOffsetContext previousOffset,
+ OceanBaseDatabaseSchema databaseSchema,
+ OceanBaseConnection jdbcConnection,
+ JdbcSourceEventDispatcher dispatcher,
+ SnapshotSplit snapshotSplit) {
+ super(connectorConfig, SnapshotProgressListener.NO_OP());
+ this.connectorConfig = connectorConfig;
+ this.offsetContext = previousOffset;
+ this.databaseSchema = databaseSchema;
+ this.jdbcConnection = jdbcConnection;
+ this.dispatcher = dispatcher;
+ this.clock = Clock.SYSTEM;
+ this.snapshotSplit = snapshotSplit;
+ }
+
+ @Override
+ protected SnapshotResult doExecute(
+ ChangeEventSourceContext context,
+ OceanBaseOffsetContext previousOffset,
+ SnapshotContext snapshotContext,
+ SnapshottingTask snapshottingTask)
+ throws Exception {
+ final OceanBaseSnapshotContext ctx = (OceanBaseSnapshotContext) snapshotContext;
+ ctx.offset = offsetContext;
+ createDataEvents(ctx, snapshotSplit.getTableId());
+ return SnapshotResult.completed(ctx.offset);
+ }
+
+ @Override
+ protected SnapshottingTask getSnapshottingTask(
+ OceanBasePartition partition, OceanBaseOffsetContext previousOffset) {
+ return new SnapshottingTask(false, true);
+ }
+
+ @Override
+ protected SnapshotContext prepare(
+ OceanBasePartition partition) throws Exception {
+ return new OceanBaseSnapshotContext(partition);
+ }
+
+ private static class OceanBaseSnapshotContext
+ extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<
+ OceanBasePartition, OceanBaseOffsetContext> {
+
+ public OceanBaseSnapshotContext(OceanBasePartition partition) throws SQLException {
+ super(partition, "");
+ }
+ }
+
+ private void createDataEvents(OceanBaseSnapshotContext snapshotContext, TableId tableId)
+ throws Exception {
+ EventDispatcher.SnapshotReceiver snapshotReceiver =
+ dispatcher.getSnapshotChangeEventReceiver();
+ LOG.debug("Snapshotting table {}", tableId);
+ createDataEventsForTable(
+ snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
+ snapshotReceiver.completeSnapshot();
+ }
+
+ /** Dispatches the data change events for the records of a single table. */
+ private void createDataEventsForTable(
+ OceanBaseSnapshotContext snapshotContext,
+ EventDispatcher.SnapshotReceiver snapshotReceiver,
+ Table table)
+ throws InterruptedException {
+
+ long exportStart = clock.currentTimeInMillis();
+ LOG.info(
+ "Exporting data from split '{}' of table {}",
+ snapshotSplit.splitId(),
+ table.id());
+
+ final String selectSql =
+ buildSplitScanQuery(
+ snapshotSplit.getTableId(),
+ snapshotSplit.getSplitKeyType(),
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null);
+ LOG.info(
+ "For split '{}' of table {} using select statement: '{}'",
+ snapshotSplit.splitId(),
+ table.id(),
+ selectSql);
+
+ try (PreparedStatement selectStatement =
+ readTableSplitDataStatement(
+ jdbcConnection,
+ selectSql,
+ snapshotSplit.getSplitStart() == null,
+ snapshotSplit.getSplitEnd() == null,
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
+ snapshotSplit.getSplitKeyType().getFieldCount(),
+ connectorConfig.getQueryFetchSize());
+ ResultSet rs = selectStatement.executeQuery()) {
+
+ ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
+ long rows = 0;
+ Threads.Timer logTimer = getTableScanLogTimer();
+
+ while (rs.next()) {
+ rows++;
+ final Object[] row =
+ jdbcConnection.rowToArray(table, databaseSchema, rs, columnArray);
+ if (logTimer.expired()) {
+ long stop = clock.currentTimeInMillis();
+ LOG.info(
+ "Exported {} records for split '{}' after {}",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(stop - exportStart));
+ logTimer = getTableScanLogTimer();
+ }
+ dispatcher.dispatchSnapshotEvent(
+ snapshotContext.partition,
+ table.id(),
+ getChangeRecordEmitter(snapshotContext, table.id(), row),
+ snapshotReceiver);
+ }
+ LOG.info(
+ "Finished exporting {} records for split '{}', total duration '{}'",
+ rows,
+ snapshotSplit.splitId(),
+ Strings.duration(clock.currentTimeInMillis() - exportStart));
+ } catch (SQLException e) {
+ throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
+ }
+ }
+
+ protected ChangeRecordEmitter getChangeRecordEmitter(
+ SnapshotContext snapshotContext,
+ TableId tableId,
+ Object[] row) {
+ snapshotContext.offset.event(tableId, clock.currentTime());
+ return new SnapshotChangeRecordEmitter<>(
+ snapshotContext.partition, snapshotContext.offset, row, clock);
+ }
+
+ private Threads.Timer getTableScanLogTimer() {
+ return Threads.timer(clock, LOG_INTERVAL);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseSourceFetchTaskContext.java
new file mode 100644
index 00000000000..def83173247
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseSourceFetchTaskContext.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseSourceConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.connection.OceanBaseConnection;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.LogMessageOffset;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseOffsetContext;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBasePartition;
+import org.apache.flink.cdc.connectors.oceanbase.source.offset.OceanBaseSourceInfo;
+import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseDatabaseSchema;
+import org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.schema.DataCollectionId;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.Collect;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.time.Instant;
+import java.util.Map;
+
+/** The context for fetch task that fetching data of snapshot split from OceanBase data source. */
+public class OceanBaseSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
+
+ private final OceanBaseConnection connection;
+ private final OceanBaseEventMetadataProvider metadataProvider;
+
+ private OceanBaseDatabaseSchema databaseSchema;
+ private OceanBaseTaskContext taskContext;
+ private OceanBaseOffsetContext offsetContext;
+ private OceanBasePartition partition;
+ private JdbcSourceEventDispatcher dispatcher;
+
+ private ChangeEventQueue queue;
+ private ErrorHandler errorHandler;
+
+ public OceanBaseSourceFetchTaskContext(
+ JdbcSourceConfig sourceConfig,
+ JdbcDataSourceDialect dataSourceDialect,
+ OceanBaseConnection connection) {
+ super(sourceConfig, dataSourceDialect);
+ this.connection = connection;
+ this.metadataProvider = new OceanBaseEventMetadataProvider();
+ }
+
+ @Override
+ public void configure(SourceSplitBase sourceSplitBase) {
+ final OceanBaseConnectorConfig connectorConfig = getDbzConnectorConfig();
+ final boolean tableIdCaseInsensitive =
+ dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig);
+
+ TopicSelector topicSelector =
+ TopicSelector.defaultSelector(
+ connectorConfig,
+ (tableId, prefix, delimiter) ->
+ String.join(delimiter, prefix, tableId.identifier()));
+
+ this.databaseSchema =
+ new OceanBaseDatabaseSchema(connectorConfig, topicSelector, tableIdCaseInsensitive);
+ this.offsetContext =
+ loadStartingOffsetState(
+ new OceanBaseOffsetContext.Loader(connectorConfig), sourceSplitBase);
+
+ this.partition = new OceanBasePartition(connectorConfig.getLogicalName());
+
+ this.taskContext = new OceanBaseTaskContext(connectorConfig, databaseSchema);
+ final int queueSize =
+ sourceSplitBase.isSnapshotSplit()
+ ? Integer.MAX_VALUE
+ : connectorConfig.getMaxQueueSize();
+ this.queue =
+ new ChangeEventQueue.Builder()
+ .pollInterval(connectorConfig.getPollInterval())
+ .maxBatchSize(connectorConfig.getMaxBatchSize())
+ .maxQueueSize(queueSize)
+ .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
+ .loggingContextSupplier(
+ () ->
+ taskContext.configureLoggingContext(
+ "oceanbase-cdc-connector-task"))
+ .build();
+ this.dispatcher =
+ new JdbcSourceEventDispatcher<>(
+ connectorConfig,
+ topicSelector,
+ databaseSchema,
+ queue,
+ connectorConfig.getTableFilters().dataCollectionFilter(),
+ DataChangeEvent::new,
+ metadataProvider,
+ schemaNameAdjuster,
+ null);
+ this.errorHandler = new ErrorHandler(null, connectorConfig, queue);
+ }
+
+ @Override
+ public OceanBaseSourceConfig getSourceConfig() {
+ return (OceanBaseSourceConfig) sourceConfig;
+ }
+
+ public OceanBaseConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public OceanBasePartition getPartition() {
+ return partition;
+ }
+
+ public OceanBaseTaskContext getTaskContext() {
+ return taskContext;
+ }
+
+ @Override
+ public OceanBaseConnectorConfig getDbzConnectorConfig() {
+ return (OceanBaseConnectorConfig) super.getDbzConnectorConfig();
+ }
+
+ @Override
+ public OceanBaseOffsetContext getOffsetContext() {
+ return offsetContext;
+ }
+
+ @Override
+ public ErrorHandler getErrorHandler() {
+ return errorHandler;
+ }
+
+ @Override
+ public OceanBaseDatabaseSchema getDatabaseSchema() {
+ return databaseSchema;
+ }
+
+ @Override
+ public RowType getSplitType(Table table) {
+ return OceanBaseUtils.getSplitType(table);
+ }
+
+ @Override
+ public JdbcSourceEventDispatcher getEventDispatcher() {
+ return dispatcher;
+ }
+
+ @Override
+ public WatermarkDispatcher getWaterMarkDispatcher() {
+ return dispatcher;
+ }
+
+ @Override
+ public ChangeEventQueue getQueue() {
+ return queue;
+ }
+
+ @Override
+ public Tables.TableFilter getTableFilter() {
+ return getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
+ }
+
+ @Override
+ public Offset getStreamOffset(SourceRecord sourceRecord) {
+ return new LogMessageOffset(sourceRecord.sourceOffset());
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ }
+
+ private OceanBaseOffsetContext loadStartingOffsetState(
+ OffsetContext.Loader loader, SourceSplitBase sourceSplit) {
+ Offset offset =
+ sourceSplit.isSnapshotSplit()
+ ? LogMessageOffset.INITIAL_OFFSET
+ : sourceSplit.asStreamSplit().getStartingOffset();
+ return loader.load(offset.getOffset());
+ }
+
+ /** The {@link EventMetadataProvider} implementation for OceanBase. */
+ public static class OceanBaseEventMetadataProvider implements EventMetadataProvider {
+
+ @Override
+ public Instant getEventTimestamp(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
+ if (sourceInfo == null) {
+ return null;
+ }
+ final Long ts = sourceInfo.getInt64(OceanBaseSourceInfo.TIMESTAMP_KEY);
+ return ts == null ? null : Instant.ofEpochMilli(ts);
+ }
+
+ @Override
+ public Map getEventSourcePosition(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
+ if (sourceInfo == null) {
+ return null;
+ }
+ return Collect.hashMapOf(
+ "tenant",
+ sourceInfo.getString(OceanBaseSourceInfo.TENANT_KEY),
+ "timestamp",
+ sourceInfo.getString(OceanBaseSourceInfo.TIMESTAMP_KEY),
+ "transaction_id",
+ sourceInfo.getString(OceanBaseSourceInfo.TRANSACTION_ID_KEY));
+ }
+
+ @Override
+ public String getTransactionId(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
+ if (sourceInfo == null) {
+ return null;
+ }
+ return sourceInfo.getString(OceanBaseSourceInfo.TRANSACTION_ID_KEY);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseStreamFetchTask.java
new file mode 100644
index 00000000000..fcd0dcd505a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseStreamFetchTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+
+/** The task to work for fetching data of OceanBase table stream split. */
+public class OceanBaseStreamFetchTask implements FetchTask {
+
+ private final StreamSplit split;
+ private volatile boolean taskRunning = false;
+
+ public OceanBaseStreamFetchTask(StreamSplit split) {
+ this.split = split;
+ }
+
+ @Override
+ public void execute(Context context) throws Exception {
+ OceanBaseSourceFetchTaskContext sourceFetchContext =
+ (OceanBaseSourceFetchTaskContext) context;
+ sourceFetchContext.getOffsetContext().preSnapshotCompletion();
+ taskRunning = true;
+ LogMessageSource logMessageSource =
+ new LogMessageSource(
+ sourceFetchContext.getDbzConnectorConfig(),
+ sourceFetchContext.getEventDispatcher(),
+ sourceFetchContext.getErrorHandler(),
+ sourceFetchContext.getTaskContext(),
+ split);
+ StoppableChangeEventSourceContext changeEventSourceContext =
+ new StoppableChangeEventSourceContext();
+ logMessageSource.execute(
+ changeEventSourceContext,
+ sourceFetchContext.getPartition(),
+ sourceFetchContext.getOffsetContext());
+ }
+
+ @Override
+ public boolean isRunning() {
+ return taskRunning;
+ }
+
+ @Override
+ public StreamSplit getSplit() {
+ return split;
+ }
+
+ @Override
+ public void close() {
+ taskRunning = false;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseTaskContext.java
new file mode 100644
index 00000000000..c5258aaae1a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/OceanBaseTaskContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import org.apache.flink.cdc.connectors.oceanbase.source.config.OceanBaseConnectorConfig;
+import org.apache.flink.cdc.connectors.oceanbase.source.schema.OceanBaseDatabaseSchema;
+
+import io.debezium.connector.common.CdcSourceTaskContext;
+
+/** OceanBase task context. */
+public class OceanBaseTaskContext extends CdcSourceTaskContext {
+
+ private final OceanBaseDatabaseSchema schema;
+
+ public OceanBaseTaskContext(OceanBaseConnectorConfig config, OceanBaseDatabaseSchema schema) {
+ super(config.getContextName(), config.getLogicalName(), schema::tableIds);
+ this.schema = schema;
+ }
+
+ public OceanBaseDatabaseSchema getSchema() {
+ return schema;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/StoppableChangeEventSourceContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/StoppableChangeEventSourceContext.java
new file mode 100644
index 00000000000..4d7b6557c49
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/reader/fetch/StoppableChangeEventSourceContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.reader.fetch;
+
+import io.debezium.pipeline.source.spi.ChangeEventSource;
+
+/**
+ * A change event source context that can stop the running source by invoking {@link
+ * #stopChangeEventSource()}.
+ */
+public class StoppableChangeEventSourceContext
+ implements ChangeEventSource.ChangeEventSourceContext {
+
+ private volatile boolean isRunning = true;
+
+ public void stopChangeEventSource() {
+ isRunning = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return isRunning;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/schema/OceanBaseDatabaseSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/schema/OceanBaseDatabaseSchema.java
index 094937bad85..b9864764c53 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/schema/OceanBaseDatabaseSchema.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/schema/OceanBaseDatabaseSchema.java
@@ -21,6 +21,7 @@
import org.apache.flink.cdc.connectors.oceanbase.source.converter.OceanBaseValueConverters;
import io.debezium.relational.RelationalDatabaseSchema;
+import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
@@ -32,13 +33,36 @@ public OceanBaseDatabaseSchema(
OceanBaseConnectorConfig connectorConfig,
Tables.TableFilter tableFilter,
boolean tableIdCaseInsensitive) {
- super(
+ this(
connectorConfig,
TopicSelector.defaultSelector(
connectorConfig,
(tableId, prefix, delimiter) ->
String.join(delimiter, prefix, tableId.identifier())),
tableFilter,
+ tableIdCaseInsensitive);
+ }
+
+ public OceanBaseDatabaseSchema(
+ OceanBaseConnectorConfig connectorConfig,
+ TopicSelector topicSelector,
+ boolean tableIdCaseInsensitive) {
+ this(
+ connectorConfig,
+ topicSelector,
+ connectorConfig.getTableFilters().dataCollectionFilter(),
+ tableIdCaseInsensitive);
+ }
+
+ private OceanBaseDatabaseSchema(
+ OceanBaseConnectorConfig connectorConfig,
+ TopicSelector topicSelector,
+ Tables.TableFilter tableFilter,
+ boolean tableIdCaseInsensitive) {
+ super(
+ connectorConfig,
+ topicSelector,
+ tableFilter,
connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
new OceanBaseValueConverters(connectorConfig),
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/splitter/OceanBaseChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/splitter/OceanBaseChunkSplitter.java
new file mode 100644
index 00000000000..246f2c381b2
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/splitter/OceanBaseChunkSplitter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.source.splitter;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.source.assigner.splitter.JdbcSourceChunkSplitter;
+import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
+import org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseTypeUtils;
+import org.apache.flink.cdc.connectors.oceanbase.utils.OceanBaseUtils;
+import org.apache.flink.table.types.DataType;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+
+import java.sql.SQLException;
+
+/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
+public class OceanBaseChunkSplitter extends JdbcSourceChunkSplitter {
+
+ public OceanBaseChunkSplitter(
+ JdbcSourceConfig sourceConfig,
+ JdbcDataSourceDialect dialect,
+ ChunkSplitterState chunkSplitterState) {
+ super(sourceConfig, dialect, chunkSplitterState);
+ }
+
+ @Override
+ protected Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ Column splitColumn,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return OceanBaseUtils.queryNextChunkMax(
+ jdbc, tableId, splitColumn.name(), chunkSize, includedLowerBound);
+ }
+
+ @Override
+ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+ throws SQLException {
+ if (tableId.catalog() != null) {
+ return OceanBaseUtils.queryApproximateRowCnt(jdbc, tableId);
+ }
+ return 0L;
+ }
+
+ @Override
+ public DataType fromDbzColumn(Column splitColumn) {
+ return OceanBaseTypeUtils.fromDbzColumn(splitColumn);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSource.java
index e3084b5fb83..8f1c79805ff 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSource.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
+import org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseSourceBuilder;
import org.apache.flink.cdc.connectors.oceanbase.source.converter.OceanBaseDeserializationConverterFactory;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
@@ -29,6 +30,7 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -76,6 +78,17 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
private final String workingMode;
private final Properties obcdcProperties;
private final Properties debeziumProperties;
+ private final boolean enableParallelRead;
+ private final int splitSize;
+ private final int splitMetaGroupSize;
+ private final int fetchSize;
+ private final int connectMaxRetries;
+ private final int connectionPoolSize;
+ private final double distributionFactorUpper;
+ private final double distributionFactorLower;
+ private final String chunkKeyColumn;
+ private final boolean closeIdlerReaders;
+ private final boolean scanNewlyAddedTableEnabled;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -87,6 +100,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
/** Metadata that is appended at the end of a physical source row. */
protected List metadataKeys;
+ @Deprecated
public OceanBaseTableSource(
ResolvedSchema physicalSchema,
StartupOptions startupOptions,
@@ -137,6 +151,91 @@ public OceanBaseTableSource(
this.obcdcProperties = obcdcProperties;
this.debeziumProperties = debeziumProperties;
+ this.enableParallelRead = false;
+ this.splitSize = 0;
+ this.splitMetaGroupSize = 0;
+ this.fetchSize = 0;
+ this.connectMaxRetries = 0;
+ this.connectionPoolSize = 0;
+ this.distributionFactorUpper = 0;
+ this.distributionFactorLower = 0;
+ this.chunkKeyColumn = null;
+ this.closeIdlerReaders = false;
+ this.scanNewlyAddedTableEnabled = false;
+
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ public OceanBaseTableSource(
+ ResolvedSchema physicalSchema,
+ StartupOptions startupOptions,
+ String username,
+ String password,
+ String tenantName,
+ String databaseName,
+ String tableName,
+ String serverTimeZone,
+ Duration connectTimeout,
+ String hostname,
+ int port,
+ String compatibleMode,
+ String jdbcDriver,
+ String logProxyHost,
+ Integer logProxyPort,
+ String rsList,
+ String configUrl,
+ String workingMode,
+ Properties obcdcProperties,
+ Properties debeziumProperties,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ String chunkKeyColumn,
+ boolean closeIdlerReaders,
+ boolean scanNewlyAddedTableEnabled) {
+ this.physicalSchema = physicalSchema;
+ this.startupOptions = checkNotNull(startupOptions);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.tenantName = tenantName;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.serverTimeZone = serverTimeZone;
+ this.connectTimeout = connectTimeout;
+ this.hostname = checkNotNull(hostname);
+ this.port = port;
+ this.compatibleMode = compatibleMode;
+ this.jdbcDriver = jdbcDriver;
+ this.logProxyHost = logProxyHost;
+ this.logProxyPort = logProxyPort;
+ this.rsList = rsList;
+ this.configUrl = configUrl;
+ this.workingMode = workingMode;
+ this.obcdcProperties = obcdcProperties;
+ this.debeziumProperties = debeziumProperties;
+
+ this.tableList = null;
+ this.jdbcProperties = null;
+ this.logProxyClientId = null;
+ this.startupTimestamp = null;
+
+ this.enableParallelRead = true;
+ this.splitSize = splitSize;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.fetchSize = fetchSize;
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ this.distributionFactorUpper = distributionFactorUpper;
+ this.distributionFactorLower = distributionFactorLower;
+ this.chunkKeyColumn = chunkKeyColumn;
+ this.closeIdlerReaders = closeIdlerReaders;
+ this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@@ -166,6 +265,42 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
OceanBaseDeserializationConverterFactory.instance())
.build();
+ if (enableParallelRead) {
+ OceanBaseSourceBuilder.OceanBaseIncrementalSource incrementalSource =
+ OceanBaseSourceBuilder.OceanBaseIncrementalSource.builder()
+ .startupOptions(startupOptions)
+ .hostname(hostname)
+ .port(port)
+ .compatibleMode(compatibleMode)
+ .driverClassName(jdbcDriver)
+ .tenantName(tenantName)
+ .databaseList(databaseName)
+ .tableList(databaseName + "." + tableName)
+ .username(username)
+ .password(password)
+ .logProxyHost(logProxyHost)
+ .logProxyPort(logProxyPort)
+ .rsList(rsList)
+ .configUrl(configUrl)
+ .workingMode(workingMode)
+ .obcdcProperties(obcdcProperties)
+ .debeziumProperties(debeziumProperties)
+ .serverTimeZone(serverTimeZone)
+ .connectTimeout(connectTimeout)
+ .connectionPoolSize(connectionPoolSize)
+ .connectMaxRetries(connectMaxRetries)
+ .chunkKeyColumn(chunkKeyColumn)
+ .splitSize(splitSize)
+ .fetchSize(fetchSize)
+ .splitMetaGroupSize(splitMetaGroupSize)
+ .distributionFactorUpper(distributionFactorUpper)
+ .distributionFactorLower(distributionFactorLower)
+ .closeIdleReaders(closeIdlerReaders)
+ .deserializer(deserializer)
+ .build();
+ return SourceProvider.of(incrementalSource);
+ }
+
OceanBaseSource.Builder builder =
OceanBaseSource.builder()
.startupOptions(startupOptions)
@@ -227,6 +362,43 @@ public void applyReadableMetadata(List metadataKeys, DataType producedDa
@Override
public DynamicTableSource copy() {
+ if (enableParallelRead) {
+ OceanBaseTableSource source =
+ new OceanBaseTableSource(
+ physicalSchema,
+ startupOptions,
+ username,
+ password,
+ tenantName,
+ databaseName,
+ tableName,
+ serverTimeZone,
+ connectTimeout,
+ hostname,
+ port,
+ compatibleMode,
+ jdbcDriver,
+ logProxyHost,
+ logProxyPort,
+ rsList,
+ configUrl,
+ workingMode,
+ obcdcProperties,
+ debeziumProperties,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn,
+ closeIdlerReaders,
+ scanNewlyAddedTableEnabled);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
OceanBaseTableSource source =
new OceanBaseTableSource(
physicalSchema,
@@ -291,6 +463,17 @@ public boolean equals(Object o) {
&& Objects.equals(this.workingMode, that.workingMode)
&& Objects.equals(this.obcdcProperties, that.obcdcProperties)
&& Objects.equals(this.debeziumProperties, that.debeziumProperties)
+ && Objects.equals(this.enableParallelRead, that.enableParallelRead)
+ && Objects.equals(this.splitSize, that.splitSize)
+ && Objects.equals(this.splitMetaGroupSize, that.splitMetaGroupSize)
+ && Objects.equals(this.fetchSize, that.fetchSize)
+ && Objects.equals(this.connectMaxRetries, that.connectMaxRetries)
+ && Objects.equals(this.connectionPoolSize, that.connectionPoolSize)
+ && Objects.equals(this.distributionFactorUpper, that.distributionFactorUpper)
+ && Objects.equals(this.distributionFactorLower, that.distributionFactorLower)
+ && Objects.equals(this.chunkKeyColumn, that.chunkKeyColumn)
+ && Objects.equals(this.closeIdlerReaders, that.closeIdlerReaders)
+ && Objects.equals(this.scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled)
&& Objects.equals(this.producedDataType, that.producedDataType)
&& Objects.equals(this.metadataKeys, that.metadataKeys);
}
@@ -322,6 +505,17 @@ public int hashCode() {
workingMode,
obcdcProperties,
debeziumProperties,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn,
+ closeIdlerReaders,
+ scanNewlyAddedTableEnabled,
producedDataType,
metadataKeys);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java
index a236c0eb667..3fc9829af6c 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java
@@ -39,45 +39,37 @@
import java.util.Properties;
import java.util.Set;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
+import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
/** Factory for creating configured instance of {@link OceanBaseTableSource}. */
public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory {
private static final String IDENTIFIER = "oceanbase-cdc";
- public static final ConfigOption USERNAME =
- ConfigOptions.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("Username to be used when connecting to OceanBase.");
-
- public static final ConfigOption PASSWORD =
- ConfigOptions.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("Password to be used when connecting to OceanBase.");
-
public static final ConfigOption TENANT_NAME =
ConfigOptions.key("tenant-name")
.stringType()
.noDefaultValue()
.withDescription("Tenant name of OceanBase to monitor.");
- public static final ConfigOption DATABASE_NAME =
- ConfigOptions.key("database-name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Database name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode.");
-
- public static final ConfigOption TABLE_NAME =
- ConfigOptions.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "Table name of OceanBase to monitor, should be regular expression. Only can be used with 'initial' mode.");
-
public static final ConfigOption TABLE_LIST =
ConfigOptions.key("table-list")
.stringType()
@@ -85,26 +77,6 @@ public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory {
.withDescription(
"List of full names of tables, separated by commas, e.g. \"db1.table1, db2.table2\".");
- public static final ConfigOption SERVER_TIME_ZONE =
- ConfigOptions.key("server-time-zone")
- .stringType()
- .defaultValue("+00:00")
- .withDescription("The session time zone in database server.");
-
- public static final ConfigOption CONNECT_TIMEOUT =
- ConfigOptions.key("connect.timeout")
- .durationType()
- .defaultValue(Duration.ofSeconds(30))
- .withDescription(
- "The maximum time that the connector should wait after trying to connect to the database server or log proxy server before timing out.");
-
- public static final ConfigOption HOSTNAME =
- ConfigOptions.key("hostname")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "IP address or hostname of the OceanBase database server or OceanBase proxy server.");
-
public static final ConfigOption PORT =
ConfigOptions.key("port")
.intType()
@@ -215,6 +187,53 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String configUrl = config.get(CONFIG_URL);
String workingMode = config.get(WORKING_MODE);
+ boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
+ int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
+ int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
+ int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
+ double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ String chunkKeyColumn =
+ config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
+ boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+ boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+
+ if (enableParallelRead) {
+ return new OceanBaseTableSource(
+ physicalSchema,
+ startupOptions,
+ username,
+ password,
+ tenantName,
+ databaseName,
+ tableName,
+ serverTimeZone,
+ connectTimeout,
+ hostname,
+ port,
+ compatibleMode,
+ jdbcDriver,
+ logProxyHost,
+ logProxyPort,
+ rsList,
+ configUrl,
+ workingMode,
+ getProperties(context.getCatalogTable().getOptions(), OBCDC_PROPERTIES_PREFIX),
+ DebeziumOptions.getDebeziumProperties(context.getCatalogTable().getOptions()),
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ chunkKeyColumn,
+ closeIdlerReaders,
+ scanNewlyAddedTableEnabled);
+ }
+
OptionUtils.printOptions(IDENTIFIER, ((Configuration) config).toMap());
return new OceanBaseTableSource(
@@ -278,6 +297,18 @@ public Set> optionalOptions() {
options.add(RS_LIST);
options.add(CONFIG_URL);
options.add(WORKING_MODE);
+
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
+ options.add(CHUNK_META_GROUP_SIZE);
+ options.add(SCAN_SNAPSHOT_FETCH_SIZE);
+ options.add(CONNECT_MAX_RETRIES);
+ options.add(CONNECTION_POOL_SIZE);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+ options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+ options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseTypeUtils.java
new file mode 100644
index 00000000000..7d69db73fd3
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseTypeUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import io.debezium.relational.Column;
+
+import java.sql.Types;
+
+/** Utilities for converting from OceanBase types to Flink types. */
+public class OceanBaseTypeUtils {
+
+ public static boolean isUnsignedColumn(Column column) {
+ return column.typeName().toUpperCase().contains("UNSIGNED");
+ }
+
+ public static DataType fromDbzColumn(Column column) {
+ DataType dataType = convertFromColumn(column);
+ if (column.isOptional()) {
+ return dataType;
+ } else {
+ return dataType.notNull();
+ }
+ }
+
+ private static DataType convertFromColumn(Column column) {
+ switch (column.jdbcType()) {
+ case Types.BIT:
+ if (column.length() > 1) {
+ return DataTypes.BYTES();
+ }
+ return DataTypes.BOOLEAN();
+ case Types.TINYINT:
+ if (column.length() == 1) {
+ DataTypes.BOOLEAN();
+ }
+ if (isUnsignedColumn(column)) {
+ return DataTypes.SMALLINT();
+ }
+ return DataTypes.TINYINT();
+ case Types.SMALLINT:
+ if (isUnsignedColumn(column)) {
+ return DataTypes.INT();
+ }
+ return DataTypes.SMALLINT();
+ case Types.INTEGER:
+ if (!column.typeName().toUpperCase().startsWith("MEDIUMINT")
+ && isUnsignedColumn(column)) {
+ return DataTypes.BIGINT();
+ }
+ return DataTypes.INT();
+ case Types.BIGINT:
+ if (isUnsignedColumn(column)) {
+ return DataTypes.DECIMAL(20, 0);
+ }
+ return DataTypes.BIGINT();
+ case Types.FLOAT:
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return DataTypes.DECIMAL(column.length(), column.scale().orElse(0));
+ case Types.REAL:
+ return DataTypes.FLOAT();
+ case Types.DOUBLE:
+ return DataTypes.DOUBLE();
+ case Types.DATE:
+ return DataTypes.DATE();
+ case Types.TIME:
+ return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME();
+ case Types.TIMESTAMP:
+ return column.length() >= 0
+ ? DataTypes.TIMESTAMP(column.length())
+ : DataTypes.TIMESTAMP();
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.CLOB:
+ return DataTypes.STRING();
+ case Types.BINARY:
+ return DataTypes.BINARY(column.length());
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ return DataTypes.VARBINARY(column.length());
+ case Types.BLOB:
+ return DataTypes.BYTES();
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Don't support OceanBase type '%s' yet, jdbcType:'%s'.",
+ column.typeName(), column.jdbcType()));
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java
index 7cb738b68b7..8a031c35215 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseUtils.java
@@ -17,6 +17,25 @@
package org.apache.flink.cdc.connectors.oceanbase.utils;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.ROW;
+
/** Utils for OceanBase. */
public class OceanBaseUtils {
@@ -24,4 +43,265 @@ public static boolean isOceanBaseDriver(String driverClass) {
return "com.oceanbase.jdbc.Driver".equals(driverClass)
|| "com.alipay.oceanbase.jdbc.Driver".equals(driverClass);
}
+
+ public static Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String splitColumnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ String quotedColumn = jdbc.quotedColumnIdString(splitColumnName);
+ String query =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ + ") AS T",
+ quotedColumn,
+ quotedColumn,
+ jdbc.quotedTableIdString(tableId),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ return jdbc.prepareQueryAndMap(
+ query,
+ ps -> ps.setObject(1, includedLowerBound),
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]", query));
+ }
+ return rs.getObject(1);
+ });
+ }
+
+ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
+ throws SQLException {
+ // The statement used to get approximate row count which is less
+ // accurate than COUNT(*), but is more efficient for large table.
+ final String useDatabaseStatement = String.format("USE `%s`;", tableId.catalog());
+ final String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tableId.table());
+ jdbc.executeWithoutCommitting(useDatabaseStatement);
+ return jdbc.queryAndMap(
+ rowCountQuery,
+ rs -> {
+ if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]",
+ rowCountQuery));
+ }
+ return rs.getLong(5);
+ });
+ }
+
+ public static String buildSplitScanQuery(
+ TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) {
+ return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true);
+ }
+
+ private static String buildSplitQuery(
+ TableId tableId,
+ RowType pkRowType,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ int limitSize,
+ boolean isScanningData) {
+ final String condition;
+
+ if (isFirstSplit && isLastSplit) {
+ condition = null;
+ } else if (isFirstSplit) {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
+ if (isScanningData) {
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
+ sql.append(")");
+ }
+ condition = sql.toString();
+ } else if (isLastSplit) {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
+ condition = sql.toString();
+ } else {
+ final StringBuilder sql = new StringBuilder();
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
+ if (isScanningData) {
+ sql.append(" AND NOT (");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
+ sql.append(")");
+ }
+ sql.append(" AND ");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
+ condition = sql.toString();
+ }
+
+ if (isScanningData) {
+ return buildSelectWithRowLimits(
+ tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
+ } else {
+ final String orderBy =
+ pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
+ return buildSelectWithBoundaryRowLimits(
+ tableId,
+ limitSize,
+ getPrimaryKeyColumnsProjection(pkRowType),
+ getMaxPrimaryKeyColumnsProjection(pkRowType),
+ Optional.ofNullable(condition),
+ orderBy);
+ }
+ }
+
+ public static PreparedStatement readTableSplitDataStatement(
+ JdbcConnection jdbc,
+ String sql,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ Object[] splitStart,
+ Object[] splitEnd,
+ int primaryKeyNum,
+ int fetchSize) {
+ try {
+ final PreparedStatement statement = initStatement(jdbc, sql, fetchSize);
+ if (isFirstSplit && isLastSplit) {
+ return statement;
+ }
+ if (isFirstSplit) {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitEnd[i]);
+ statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+ }
+ } else if (isLastSplit) {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitStart[i]);
+ }
+ } else {
+ for (int i = 0; i < primaryKeyNum; i++) {
+ statement.setObject(i + 1, splitStart[i]);
+ statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+ statement.setObject(i + 1 + 2 * primaryKeyNum, splitEnd[i]);
+ }
+ }
+ return statement;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build the split data read statement.", e);
+ }
+ }
+
+ public static RowType getSplitType(Table table) {
+ List primaryKeys = table.primaryKeyColumns();
+ if (primaryKeys.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Incremental snapshot for tables requires primary key,"
+ + " but table %s doesn't have primary key.",
+ table.id()));
+ }
+
+ // use first field in primary key as the split key
+ return getSplitType(primaryKeys.get(0));
+ }
+
+ public static RowType getSplitType(Column splitColumn) {
+ return (RowType)
+ ROW(FIELD(splitColumn.name(), OceanBaseTypeUtils.fromDbzColumn(splitColumn)))
+ .getLogicalType();
+ }
+
+ private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, int fetchSize)
+ throws SQLException {
+ final Connection connection = jdbc.connection();
+ connection.setAutoCommit(false);
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setFetchSize(fetchSize);
+ return statement;
+ }
+
+ private static void addPrimaryKeyColumnsToCondition(
+ RowType pkRowType, StringBuilder sql, String predicate) {
+ for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append(fieldNamesIt.next()).append(predicate);
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" AND ");
+ }
+ }
+ }
+
+ private static String getPrimaryKeyColumnsProjection(RowType pkRowType) {
+ StringBuilder sql = new StringBuilder();
+ for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append(fieldNamesIt.next());
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" , ");
+ }
+ }
+ return sql.toString();
+ }
+
+ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) {
+ StringBuilder sql = new StringBuilder();
+ for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator();
+ fieldNamesIt.hasNext(); ) {
+ sql.append("MAX(" + fieldNamesIt.next() + ")");
+ if (fieldNamesIt.hasNext()) {
+ sql.append(" , ");
+ }
+ }
+ return sql.toString();
+ }
+
+ private static String buildSelectWithRowLimits(
+ TableId tableId,
+ int limit,
+ String projection,
+ Optional condition,
+ Optional orderBy) {
+ final StringBuilder sql = new StringBuilder("SELECT ");
+ sql.append(projection).append(" FROM ");
+ sql.append(quotedTableIdString(tableId));
+ if (condition.isPresent()) {
+ sql.append(" WHERE ").append(condition.get());
+ }
+ if (orderBy.isPresent()) {
+ sql.append(" ORDER BY ").append(orderBy.get());
+ }
+ if (limit > 0) {
+ sql.append(" LIMIT ").append(limit);
+ }
+ return sql.toString();
+ }
+
+ private static String buildSelectWithBoundaryRowLimits(
+ TableId tableId,
+ int limit,
+ String projection,
+ String maxColumnProjection,
+ Optional condition,
+ String orderBy) {
+ final StringBuilder sql = new StringBuilder("SELECT ");
+ sql.append(maxColumnProjection);
+ sql.append(" FROM (");
+ sql.append("SELECT ");
+ sql.append(projection);
+ sql.append(" FROM ");
+ sql.append(quotedTableIdString(tableId));
+ if (condition.isPresent()) {
+ sql.append(" WHERE ").append(condition.get());
+ }
+ sql.append(" ORDER BY ").append(orderBy).append(" LIMIT ").append(limit);
+ sql.append(") T");
+ return sql.toString();
+ }
+
+ private static String quotedTableIdString(TableId tableId) {
+ if (tableId.catalog() != null) {
+ return tableId.toQuotedString('`');
+ }
+ return tableId.toDoubleQuotedString();
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseMySQLModeITCase.java
similarity index 99%
rename from flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
rename to flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseMySQLModeITCase.java
index a74bf035ec0..d561cd46c2b 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseMySQLModeITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseMySQLModeITCase.java
@@ -48,7 +48,7 @@
import static org.apache.flink.cdc.connectors.oceanbase.OceanBaseTestUtils.createOceanBaseContainerForCDC;
/** Integration tests for OceanBase MySQL mode table source. */
-public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {
+public class LegacyOceanBaseMySQLModeITCase extends OceanBaseTestBase {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -97,6 +97,13 @@ public void after() {
}
}
+ @Override
+ protected String commonOptionsString() {
+ return super.commonOptionsString()
+ + " , "
+ + " 'scan.incremental.snapshot.enabled' = 'false'";
+ }
+
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseOracleModeITCase.java
similarity index 97%
rename from flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
rename to flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseOracleModeITCase.java
index b9f9fa0e0dc..a025e4d1818 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseOracleModeITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseOracleModeITCase.java
@@ -36,7 +36,7 @@
/** Integration tests for OceanBase Oracle mode table source. */
@Ignore("Test ignored before oceanbase-xe docker image is available")
-public class OceanBaseOracleModeITCase extends OceanBaseTestBase {
+public class LegacyOceanBaseOracleModeITCase extends OceanBaseTestBase {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -51,6 +51,14 @@ protected OceanBaseCdcMetadata metadata() {
return METADATA;
}
+ @Override
+ protected String commonOptionsString() {
+ return super.commonOptionsString()
+ + " , "
+ + " 'scan.incremental.snapshot.enabled' = 'false', "
+ + " 'jdbc.driver' = 'com.oceanbase.jdbc.Driver'";
+ }
+
@Override
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseTableFactoryTest.java
similarity index 97%
rename from flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java
rename to flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseTableFactoryTest.java
index 00ea50e945c..0135e0859b3 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/table/LegacyOceanBaseTableFactoryTest.java
@@ -46,7 +46,7 @@
import static org.junit.Assert.fail;
/** Test for {@link OceanBaseTableSource} created by {@link OceanBaseTableSourceFactory}. */
-public class OceanBaseTableFactoryTest {
+public class LegacyOceanBaseTableFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
@@ -81,7 +81,7 @@ public class OceanBaseTableFactoryTest {
private static final String DATABASE_NAME = "db[0-9]";
private static final String TABLE_NAME = "table[0-9]";
private static final String TABLE_LIST = "db.table";
- private static final String SERVER_TIME_ZONE = "+00:00";
+ private static final String SERVER_TIME_ZONE = "UTC";
private static final String CONNECT_TIMEOUT = "30s";
private static final String HOSTNAME = "127.0.0.1";
private static final Integer PORT = 2881;
@@ -247,6 +247,7 @@ private Map getRequiredOptions() {
options.put("tenant-name", TENANT_NAME);
options.put("logproxy.host", LOG_PROXY_HOST);
options.put("logproxy.port", String.valueOf(LOG_PROXY_PORT));
+ options.put("scan.incremental.snapshot.enabled", String.valueOf(false));
return options;
}
@@ -263,7 +264,7 @@ private static DynamicTableSource createTableSource(
options),
schema),
new Configuration(),
- OceanBaseTableFactoryTest.class.getClassLoader(),
+ LegacyOceanBaseTableFactoryTest.class.getClassLoader(),
false);
}
}