From 9e051321e24019b70bccb83e7542fa9d6c077830 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 25 Jan 2024 18:02:52 +0800 Subject: [PATCH] [debezium] Upgrade to Debezium 1.9.8.Final(#3033) --- .../CustomAlterTableParserListener.java | 2 +- .../CustomMySqlAntlrDdlParserListener.java | 2 +- .../RelationalChangeRecordEmitter.java | 2 +- .../connector/mysql/MySqlConnection.java | 2 +- .../mysql/MySqlDefaultValueConverter.java | 4 +- .../MySqlStreamingChangeEventSource.java | 2 +- .../connector/mysql/MySqlValueConverters.java | 4 +- .../listener/DefaultValueParserListener.java | 9 +- .../connector/oracle/OracleConnection.java | 640 ------------------ .../oracle/logminer/LogMinerAdapter.java | 2 +- .../logminer/LogMinerChangeRecordEmitter.java | 7 +- .../LogMinerStreamingChangeEventSource.java | 2 +- .../AbstractLogMinerEventProcessor.java | 2 +- .../PostgresStreamingChangeEventSource.java | 2 +- .../connector/postgresql/connection/Lsn.java | 9 +- .../connection/PostgresConnection.java | 18 +- .../PostgresReplicationConnection.java | 103 +-- .../SqlServerStreamingChangeEventSource.java | 4 +- pom.xml | 2 +- 19 files changed, 108 insertions(+), 710 deletions(-) delete mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 93d738767a2..b313960ac36 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -44,7 +44,7 @@ import static com.ververica.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn; -/** Copied from {@link AlterTableParserListener} in Debezium 1.9.7.Final. */ +/** Copied from {@link AlterTableParserListener} in Debezium 1.9.8.Final. */ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private static final int STARTING_INDEX = 1; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java index 089a81208af..be99d36536d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java @@ -48,7 +48,7 @@ import java.util.concurrent.CopyOnWriteArrayList; /** - * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.7.final. + * Copied from {@link MySqlAntlrDdlParserListener} in Debezium 1.9.8.final. * *

This listener's constructor will use some modified listener. */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java index 6bb18a75c84..2b0688a5dc4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java @@ -22,7 +22,7 @@ import java.util.Optional; /** - * Copied from Debezium 1.9.7.Final. + * Copied from Debezium 1.9.8.Final. * *

Base class for {@link ChangeRecordEmitter} implementations based on a relational database. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index faa4997a87d..84f5c230a69 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -38,7 +38,7 @@ import java.util.Properties; /** - * Copied from Debezium project(1.9.7.final) to add custom jdbc properties in the jdbc url. The new + * Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new * parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will * be used to generate the jdbc url pattern, and may overwrite the default value. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java index 674fa68f7e7..4f6869633d3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDefaultValueConverter.java @@ -35,8 +35,8 @@ import java.util.regex.Pattern; /** - * Copied from Debezium project(1.9.7.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove - * this when https://issues.redhat.com/browse/DBZ-6824 is fixed. + * Copied from Debezium project(1.9.8.final) to add BIGINT and SMALLINT to TRIM_DATA_TYPES. Remove + * this until https://issues.redhat.com/browse/DBZ-6824 is fixed in 2.3.3.Final. * *

Line 81 & 82: add BIGINT and SMALLINT to TRIM_DATA_TYPES. */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index a35aa93da61..31de3468db5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -84,7 +84,7 @@ import static io.debezium.util.Strings.isNullOrEmpty; /** - * Copied from Debezium project to fix + * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 686e9722cc0..421dae75fa6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -48,8 +48,8 @@ import java.util.regex.Pattern; /** - * Copied from Debezium project to fix FLOAT converted to FLOAT64 type issue. See DBZ-3865, - * DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final. + * Copied from Debezium project(1.9.8.Final) to fix FLOAT converted to FLOAT64 type issue. See + * DBZ-3865, DBZ-5843. Remove it when debezium version is upgraded above 2.0.0.Final. * *

Line 240 & 246: add FLOAT type adjustment. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java index d96a7a5ebd4..8a956123e31 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/antlr/listener/DefaultValueParserListener.java @@ -13,11 +13,11 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Copied from Debezium project(v1.9.7.Final) to fix + * Copied from Debezium project(v1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1506. * *

Line 48~59: use the actual default string value when the sql contains COLLATE. We should - * remove this class after we bumped a higher debezium version where the + * remove this class after we bumped debezium 2.0 where the * https://issues.redhat.com/browse/DBZ-5587 has been fixed. */ public class DefaultValueParserListener extends MySqlParserBaseListener { @@ -95,7 +95,10 @@ public void exitDefaultValue(boolean skipIfUnknownOptional) { } private String unquote(String stringLiteral) { - return stringLiteral.substring(1, stringLiteral.length() - 1); + if (stringLiteral != null && stringLiteral.startsWith("'") && stringLiteral.endsWith("'")) { + return stringLiteral.substring(1, stringLiteral.length() - 1); + } + return stringLiteral; } private String unquoteBinary(String stringLiteral) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java deleted file mode 100644 index 5fd6b07cd7b..00000000000 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ /dev/null @@ -1,640 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.debezium.connector.oracle; - -import io.debezium.DebeziumException; -import io.debezium.config.Field; -import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; -import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.jdbc.JdbcConnection; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnEditor; -import io.debezium.relational.TableId; -import io.debezium.relational.Tables; -import io.debezium.relational.Tables.ColumnNameFilter; -import io.debezium.util.Strings; -import oracle.jdbc.OracleTypes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Clob; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Supplier; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -/** - * Copied from Debezium 1.9.7.Final. - * - *

Line 213-225: Fixed for DBZ-5738. - */ -public class OracleConnection extends JdbcConnection { - - private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class); - - /** Returned by column metadata in Oracle if no scale is set; */ - private static final int ORACLE_UNSET_SCALE = -127; - - /** Pattern to identify system generated indices and column names. */ - private static final Pattern SYS_NC_PATTERN = - Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$"); - - /** Pattern to identify abstract data type indices and column names. */ - private static final Pattern ADT_INDEX_NAMES_PATTERN = Pattern.compile("^\".*\"\\.\".*\".*"); - - /** - * Pattern to identify a hidden column based on redefining a table with the {@code ROWID} - * option. - */ - private static final Pattern MROW_PATTERN = Pattern.compile("^M_ROW\\$\\$"); - - /** A field for the raw jdbc url. This field has no default value. */ - private static final Field URL = Field.create("url", "Raw JDBC url"); - - /** The database version. */ - private final OracleDatabaseVersion databaseVersion; - - private static final String QUOTED_CHARACTER = "\""; - - public OracleConnection(JdbcConfiguration config, Supplier classLoaderSupplier) { - this(config, classLoaderSupplier, true); - } - - public OracleConnection( - JdbcConfiguration config, - Supplier classLoaderSupplier, - boolean showVersion) { - super( - config, - resolveConnectionFactory(config), - classLoaderSupplier, - QUOTED_CHARACTER, - QUOTED_CHARACTER); - this.databaseVersion = resolveOracleDatabaseVersion(); - if (showVersion) { - LOGGER.info("Database Version: {}", databaseVersion.getBanner()); - } - } - - public void setSessionToPdb(String pdbName) { - Statement statement = null; - - try { - statement = connection().createStatement(); - statement.execute("alter session set container=" + pdbName); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOGGER.error("Couldn't close statement", e); - } - } - } - } - - public void resetSessionToCdb() { - Statement statement = null; - - try { - statement = connection().createStatement(); - statement.execute("alter session set container=cdb$root"); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - LOGGER.error("Couldn't close statement", e); - } - } - } - } - - public OracleDatabaseVersion getOracleVersion() { - return databaseVersion; - } - - private OracleDatabaseVersion resolveOracleDatabaseVersion() { - String versionStr; - try { - try { - // Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER - // This column uses a different format than the legacy BANNER. - versionStr = - queryAndMap( - "SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'", - (rs) -> { - if (rs.next()) { - return rs.getString(1); - } - return null; - }); - } catch (SQLException e) { - // exception ignored - if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) { - LOGGER.debug( - "BANNER_FULL column not in V$VERSION, using BANNER column as fallback"); - versionStr = null; - } else { - throw e; - } - } - - // For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not - // being a column and - // this will cause versionStr to remain null, use fallback column BANNER for versions - // prior to 18.1. - if (versionStr == null) { - versionStr = - queryAndMap( - "SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'", - (rs) -> { - if (rs.next()) { - return rs.getString(1); - } - return null; - }); - } - } catch (SQLException e) { - throw new RuntimeException("Failed to resolve Oracle database version", e); - } - - if (versionStr == null) { - throw new RuntimeException("Failed to resolve Oracle database version"); - } - - return OracleDatabaseVersion.parse(versionStr); - } - - @Override - public Set readTableNames( - String databaseCatalog, - String schemaNamePattern, - String tableNamePattern, - String[] tableTypes) - throws SQLException { - - Set tableIds = - super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes); - - return tableIds.stream() - .map(t -> new TableId(databaseCatalog, t.schema(), t.table())) - .collect(Collectors.toSet()); - } - - /** - * Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should - * be omitted from the returned set such as special spatial tables and index-organized tables. - * - * @param catalogName the catalog/database name - * @return set of all table ids for existing table objects - * @throws SQLException if a database exception occurred - */ - protected Set getAllTableIds(String catalogName) throws SQLException { - final String query = - "select owner, table_name from all_tables " - + - // filter special spatial tables - "where table_name NOT LIKE 'MDRT_%' " - + "and table_name NOT LIKE 'MDRS_%' " - + "and table_name NOT LIKE 'MDXT_%' " - + - // filter index-organized-tables - "and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) " - // filter nested tables - + "and nested = 'NO'" - // filter parent tables of nested tables - + "and table_name not in (select PARENT_TABLE_NAME from ALL_NESTED_TABLES)"; - - Set tableIds = new HashSet<>(); - query( - query, - (rs) -> { - while (rs.next()) { - tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2))); - } - LOGGER.trace("TableIds are: {}", tableIds); - }); - - return tableIds; - } - - // todo replace metadata with something like this - private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName) - throws SQLException { - String columnQuery = - "select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from " - + "all_tab_columns where owner like '" - + schemaNamePattern - + "' and table_name='" - + tableName - + "'"; - - PreparedStatement statement = connection().prepareStatement(columnQuery); - return statement.executeQuery(); - } - - // this is much faster, we will use it until full replacement of the metadata usage TODO - public void readSchemaForCapturedTables( - Tables tables, - String databaseCatalog, - String schemaNamePattern, - ColumnNameFilter columnFilter, - boolean removeTablesNotFoundInJdbc, - Set capturedTables) - throws SQLException { - - Set tableIdsBefore = new HashSet<>(tables.tableIds()); - - DatabaseMetaData metadata = connection().getMetaData(); - Map> columnsByTable = new HashMap<>(); - - for (TableId tableId : capturedTables) { - try (ResultSet columnMetadata = - metadata.getColumns( - databaseCatalog, schemaNamePattern, tableId.table(), null)) { - while (columnMetadata.next()) { - // add all whitelisted columns - readTableColumn(columnMetadata, tableId, columnFilter) - .ifPresent( - column -> { - columnsByTable - .computeIfAbsent(tableId, t -> new ArrayList<>()) - .add(column.create()); - }); - } - } - } - - // Read the metadata for the primary keys ... - for (Map.Entry> tableEntry : columnsByTable.entrySet()) { - // First get the primary key information, which must be done for *each* table ... - List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey()); - - // Then define the table ... - List columns = tableEntry.getValue(); - Collections.sort(columns); - tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null); - } - - if (removeTablesNotFoundInJdbc) { - // Remove any definitions for tables that were not found in the database metadata ... - tableIdsBefore.removeAll(columnsByTable.keySet()); - tableIdsBefore.forEach(tables::removeTable); - } - } - - @Override - protected String resolveCatalogName(String catalogName) { - final String pdbName = config().getString("pdb.name"); - return (!Strings.isNullOrEmpty(pdbName) ? pdbName : config().getString("dbname")) - .toUpperCase(); - } - - @Override - public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id) - throws SQLException { - return super.readTableUniqueIndices(metadata, id.toDoubleQuoted()); - } - - @Override - public Optional getCurrentTimestamp() throws SQLException { - return queryAndMap( - "SELECT CURRENT_TIMESTAMP FROM DUAL", - rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty()); - } - - @Override - protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) { - if (columnName != null) { - return !SYS_NC_PATTERN.matcher(columnName).matches() - && !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches() - && !MROW_PATTERN.matcher(columnName).matches(); - } - return false; - } - - /** - * Get the current, most recent system change number. - * - * @return the current system change number - * @throws SQLException if an exception occurred - * @throws IllegalStateException if the query does not return at least one row - */ - public Scn getCurrentScn() throws SQLException { - return queryAndMap( - "SELECT CURRENT_SCN FROM V$DATABASE", - (rs) -> { - if (rs.next()) { - return Scn.valueOf(rs.getString(1)); - } - throw new IllegalStateException("Could not get SCN"); - }); - } - - /** - * Generate a given table's DDL metadata. - * - * @param tableId table identifier, should never be {@code null} - * @return generated DDL - * @throws SQLException if an exception occurred obtaining the DDL metadata - * @throws NonRelationalTableException the table is not a relational table - */ - public String getTableMetadataDdl(TableId tableId) - throws SQLException, NonRelationalTableException { - try { - // This table contains all available objects that are considered relational & object - // based. - // By querying for TABLE_TYPE is null, we are explicitly confirming what if an entry - // exists - // that the table is in-fact a relational table and if the result set is empty, the - // object - // is another type, likely an object-based table, in which case we cannot generate DDL. - final String tableType = - "SELECT COUNT(1) FROM ALL_ALL_TABLES WHERE OWNER='" - + tableId.schema() - + "' AND TABLE_NAME='" - + tableId.table() - + "' AND TABLE_TYPE IS NULL"; - if (queryAndMap(tableType, rs -> rs.next() ? rs.getInt(1) : 0) == 0) { - throw new NonRelationalTableException( - "Table " + tableId + " is not a relational table"); - } - - // The storage and segment attributes aren't necessary - executeWithoutCommitting( - "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;"); - executeWithoutCommitting( - "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;"); - // In case DDL is returned as multiple DDL statements, this allows the parser to parse - // each separately. - // This is only critical during streaming as during snapshot the table structure is - // built from JDBC driver queries. - executeWithoutCommitting( - "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;"); - return queryAndMap( - "SELECT dbms_metadata.get_ddl('TABLE','" - + tableId.table() - + "','" - + tableId.schema() - + "') FROM DUAL", - rs -> { - if (!rs.next()) { - throw new DebeziumException( - "Could not get DDL metadata for table: " + tableId); - } - - Object res = rs.getObject(1); - return ((Clob) res).getSubString(1, (int) ((Clob) res).length()); - }); - } finally { - executeWithoutCommitting( - "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;"); - } - } - - /** - * Get the current connection's session statistic by name. - * - * @param name the name of the statistic to be fetched, must not be {@code null} - * @return the session statistic value, never {@code null} - * @throws SQLException if an exception occurred obtaining the session statistic value - */ - public Long getSessionStatisticByName(String name) throws SQLException { - return queryAndMap( - "SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name='" - + name - + "' AND n.statistic#=m.statistic#", - rs -> rs.next() ? rs.getLong(1) : 0L); - } - - /** - * Returns whether the given table exists or not. - * - * @param tableName table name, should not be {@code null} - * @return true if the table exists, false if it does not - * @throws SQLException if a database exception occurred - */ - public boolean isTableExists(String tableName) throws SQLException { - return queryAndMap( - "SELECT COUNT(1) FROM USER_TABLES WHERE TABLE_NAME = '" + tableName + "'", - rs -> rs.next() && rs.getLong(1) > 0); - } - - public boolean isTableExists(TableId tableId) throws SQLException { - return queryAndMap( - "SELECT COUNT(1) FROM ALL_TABLES WHERE OWNER = '" - + tableId.schema() - + "' AND TABLE_NAME = '" - + tableId.table() - + "'", - rs -> rs.next() && rs.getLong(1) > 0); - } - - /** - * Returns whether the given table is empty or not. - * - * @param tableName table name, should not be {@code null} - * @return true if the table has no records, false otherwise - * @throws SQLException if a database exception occurred - */ - public boolean isTableEmpty(String tableName) throws SQLException { - return getRowCount(tableName) == 0L; - } - - /** - * Returns the number of rows in a given table. - * - * @param tableName table name, should not be {@code null} - * @return the number of rows - * @throws SQLException if a database exception occurred - */ - public long getRowCount(String tableName) throws SQLException { - return queryAndMap( - "SELECT COUNT(1) FROM " + tableName, - rs -> { - if (rs.next()) { - return rs.getLong(1); - } - return 0L; - }); - } - - public T singleOptionalValue(String query, ResultSetExtractor extractor) - throws SQLException { - return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null); - } - - @Override - public String buildSelectWithRowLimits( - TableId tableId, - int limit, - String projection, - Optional condition, - String orderBy) { - final TableId table = new TableId(null, tableId.schema(), tableId.table()); - final StringBuilder sql = new StringBuilder("SELECT "); - sql.append(projection).append(" FROM "); - sql.append(quotedTableIdString(table)); - if (condition.isPresent()) { - sql.append(" WHERE ").append(condition.get()); - } - if (getOracleVersion().getMajor() < 12) { - sql.insert(0, " SELECT * FROM (") - .append(" ORDER BY ") - .append(orderBy) - .append(")") - .append(" WHERE ROWNUM <=") - .append(limit); - } else { - sql.append(" ORDER BY ") - .append(orderBy) - .append(" FETCH NEXT ") - .append(limit) - .append(" ROWS ONLY"); - } - return sql.toString(); - } - - public static String connectionString(JdbcConfiguration config) { - return config.getString(URL) != null - ? config.getString(URL) - : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl(); - } - - private static ConnectionFactory resolveConnectionFactory(JdbcConfiguration config) { - return JdbcConnection.patternBasedFactory(connectionString(config)); - } - - /** - * Determine whether the Oracle server has the archive log enabled. - * - * @return {@code true} if the server's {@code LOG_MODE} is set to {@code ARCHIVELOG}, or {@code - * false} otherwise - */ - protected boolean isArchiveLogMode() { - try { - final String mode = - queryAndMap( - "SELECT LOG_MODE FROM V$DATABASE", - rs -> rs.next() ? rs.getString(1) : ""); - LOGGER.debug("LOG_MODE={}", mode); - return "ARCHIVELOG".equalsIgnoreCase(mode); - } catch (SQLException e) { - throw new DebeziumException( - "Unexpected error while connecting to Oracle and looking at LOG_MODE mode: ", - e); - } - } - - /** - * Resolve a system change number to a timestamp, return value is in database timezone. - * - *

The SCN to TIMESTAMP mapping is only retained for the duration of the flashback query - * area. This means that eventually the mapping between these values are no longer kept by - * Oracle and making a call with a SCN value that has aged out will result in an ORA-08181 - * error. This function explicitly checks for this use case and if a ORA-08181 error is thrown, - * it is therefore treated as if a value does not exist returning an empty optional value. - * - * @param scn the system change number, must not be {@code null} - * @return an optional timestamp when the system change number occurred - * @throws SQLException if a database exception occurred - */ - public Optional getScnToTimestamp(Scn scn) throws SQLException { - try { - return queryAndMap( - "SELECT scn_to_timestamp('" + scn + "') FROM DUAL", - rs -> - rs.next() - ? Optional.of(rs.getObject(1, OffsetDateTime.class)) - : Optional.empty()); - } catch (SQLException e) { - if (e.getMessage().startsWith("ORA-08181")) { - // ORA-08181 specified number is not a valid system change number - // This happens when the SCN provided is outside the flashback area range - // This should be treated as a value is not available rather than an error - return Optional.empty(); - } - // Any other SQLException should be thrown - throw e; - } - } - - @Override - protected ColumnEditor overrideColumn(ColumnEditor column) { - // This allows the column state to be overridden before default-value resolution so that the - // output of the default value is within the same precision as that of the column values. - if (OracleTypes.TIMESTAMP == column.jdbcType()) { - column.length(column.scale().orElse(Column.UNSET_INT_VALUE)).scale(null); - } else if (OracleTypes.NUMBER == column.jdbcType()) { - column.scale().filter(s -> s == ORACLE_UNSET_SCALE).ifPresent(s -> column.scale(null)); - } - return column; - } - - @Override - protected Map> getColumnsDetails( - String databaseCatalog, - String schemaNamePattern, - String tableName, - Tables.TableFilter tableFilter, - ColumnNameFilter columnFilter, - DatabaseMetaData metadata, - Set viewIds) - throws SQLException { - // The Oracle JDBC driver expects that if the table name contains a "/" character that - // the table name is pre-escaped prior to the JDBC driver call, or else it throws an - // exception about the character sequence being improperly escaped. - if (tableName != null && tableName.contains("/")) { - tableName = tableName.replace("/", "//"); - } - return super.getColumnsDetails( - databaseCatalog, - schemaNamePattern, - tableName, - tableFilter, - columnFilter, - metadata, - viewIds); - } - - /** - * An exception that indicates the operation failed because the table is not a relational table. - */ - public static class NonRelationalTableException extends DebeziumException { - public NonRelationalTableException(String message) { - super(message); - } - } -} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java index 53387e3cdbd..59a5cdd6621 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java @@ -58,7 +58,7 @@ import java.util.stream.Collectors; /** - * Copied from Debezium 1.9.7. + * Copied from Debezium 1.9.8.Final. * *

Line 356: Replace < condition with <= to be able to catch ongoing transactions during snapshot * if current SCN points to START/INSERT/DELETE/UPDATE event. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java index 4a743dfa4f2..4969c1abdca 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerChangeRecordEmitter.java @@ -22,7 +22,12 @@ import java.util.Optional; -/** Emits change records based on an event read from Oracle LogMiner. */ +/** + * Copied from Debezium 1.9.8.Final. Emits change records based on an event read from Oracle + * LogMiner. + * + *

This class add RowId and overrides the emit methods to put rowId in the header. + */ public class LogMinerChangeRecordEmitter extends BaseChangeRecordEmitter { private final Operation operation; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index ff3c4f288fc..76a6d2bb83f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -63,7 +63,7 @@ import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining; /** - * Copied from Debezium 1.9.7. A {@link StreamingChangeEventSource} based on Oracle's LogMiner + * Copied from Debezium 1.9.8.Final. A {@link StreamingChangeEventSource} based on Oracle's LogMiner * utility. The event handler loop is executed in a separate executor. * *

Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index af868c9bb91..8a0b516678d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -54,7 +54,7 @@ import java.util.regex.Pattern; /** - * Copied from Debezium 1.9.7.Final. + * Copied from Debezium 1.9.8.Final. * *

An abstract implementation of {@link LogMinerEventProcessor} that all processors should * extend. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 4b97321bef3..e309a3b29a4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Copied from Debezium 1.9.7. + * Copied from Debezium 1.9.8.final * *

Line 150~151 : set the ending lsn for the replication connection. */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index 28e856c0362..17119e4f68f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -11,10 +11,11 @@ import java.nio.ByteBuffer; /** - * Copied from Debezium 1.9.7 without changes due to the NoSuchMethodError, caused by the fact that - * current Debezium release java version is 11, so we need to compile this file by java 8 compiler. - * More - * info. Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}. + * Copied from Debezium 1.9.8.final without changes due to the NoSuchMethodError, caused by the fact + * that current Debezium release java version is 11, so we need to compile this file by java 8 + * compiler. More info. + * Abstraction of PostgreSQL log sequence number, adapted from {@link LogSequenceNumber}. * *

Line 32: add NO_STOPPING_LSN */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index bb17acb6ead..69ae4c45497 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -48,6 +48,7 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -55,7 +56,7 @@ * {@link JdbcConnection} connection extension used for connecting to Postgres instances. * * @author Horia Chiorean - *

Copied from Debezium 1.9.2-Final with two additional methods: + *

Copied from Debezium 1.9.8-Final with three additional methods: *

    *
  • Constructor PostgresConnection( Configuration config, PostgresValueConverterBuilder * valueConverterBuilder, ConnectionFactory factory) to allow passing a custom @@ -549,7 +550,7 @@ public synchronized void close() { public Long currentTransactionId() throws SQLException { AtomicLong txId = new AtomicLong(0); query( - "select * from txid_current()", + "select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid", rs -> { if (rs.next()) { txId.compareAndSet(0, rs.getLong(1)); @@ -570,7 +571,7 @@ public long currentXLogLocation() throws SQLException { int majorVersion = connection().getMetaData().getDatabaseMajorVersion(); query( majorVersion >= 10 - ? "select * from pg_current_wal_lsn()" + ? "select (case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn" : "select * from pg_current_xlog_location()", rs -> { if (!rs.next()) { @@ -847,6 +848,17 @@ protected boolean isTableUniqueIndexIncluded(String indexName, String columnName return false; } + /** + * Retrieves all {@code TableId}s in a given database catalog, including partitioned tables. + * + * @param catalogName the catalog/database name + * @return set of all table ids for existing table objects + * @throws SQLException if a database exception occurred + */ + public Set getAllTableIds(String catalogName) throws SQLException { + return readTableNames(catalogName, null, null, new String[] {"TABLE", "PARTITIONED TABLE"}); + } + @FunctionalInterface public interface PostgresValueConverterBuilder { PostgresValueConverter build(TypeRegistry registry); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 364cb59bd76..0c607045e7d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -49,14 +49,14 @@ import static java.lang.Math.toIntExact; /** - * Copied from Debezium 1.9.7. + * Copied from Debezium 1.9.8.Final * *

    The {@link ReplicationConnection} created from {@code createReplicationStream} will hang when * the wal logs only contain the keepAliveMessage. Support to set an ending Lsn to stop hanging. * - *

    Line 82, 694~695 : add endingPos and its setter. + *

    Line 83, 711~713 : add endingPos and its setter. * - *

    Line 554~559, 578~583: ReplicationStream from {@code createReplicationStream} will stop when + *

    Line 571~576, 595~600: ReplicationStream from {@code createReplicationStream} will stop when * endingPos reached. */ public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { @@ -72,6 +72,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private final PostgresConnectorConfig connectorConfig; private final Duration statusUpdateInterval; private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; private final TypeRegistry typeRegistry; private final Properties streamParams; @@ -98,7 +99,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep * @param statusUpdateInterval the interval at which the replication connection should * periodically send status * @param doSnapshot whether the connector is doing snapshot - * @param jdbcConnection general POstgreSQL JDBC connection + * @param jdbcConnection general PostgreSQL JDBC connection * @param typeRegistry registry with PostgreSQL types * @param streamParams additional parameters to pass to the replication stream * @param schema the schema; must not be null @@ -136,6 +137,7 @@ private PostgresReplicationConnection( this.statusUpdateInterval = statusUpdateInterval; this.messageDecoder = plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; this.typeRegistry = typeRegistry; this.streamParams = streamParams; this.slotCreationInfo = null; @@ -204,44 +206,24 @@ protected void initPublication() { stmt.execute(createPublicationStmt); break; case FILTERED: - try { - Set tablesToCapture = determineCapturedTables(); - tableFilterString = - tablesToCapture.stream() - .map(TableId::toDoubleQuotedString) - .collect(Collectors.joining(", ")); - if (tableFilterString.isEmpty()) { - throw new DebeziumException( - String.format( - "No table filters found for filtered publication %s", - publicationName)); - } - createPublicationStmt = - String.format( - "CREATE PUBLICATION %s FOR TABLE %s;", - publicationName, tableFilterString); - LOGGER.info( - "Creating Publication with statement '{}'", - createPublicationStmt); - // Publication doesn't exist, create it but restrict to the - // tableFilter. - stmt.execute(createPublicationStmt); - } catch (Exception e) { - throw new ConnectException( - String.format( - "Unable to create filtered publication %s for %s", - publicationName, tableFilterString), - e); - } + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); break; } } else { - LOGGER.trace( - "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " - + "and will be used by the plugin", - publicationName, - plugin, - database()); + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, true); + break; + default: + LOGGER.trace( + "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " + + "and will be used by the plugin", + publicationName, + plugin, + database()); + } } } } @@ -253,11 +235,46 @@ protected void initPublication() { } } + private void createOrUpdatePublicationModeFilterted( + String tableFilterString, Statement stmt, boolean isUpdate) { + String createOrUpdatePublicationStmt; + try { + Set tablesToCapture = determineCapturedTables(); + tableFilterString = + tablesToCapture.stream() + .map(TableId::toDoubleQuotedString) + .collect(Collectors.joining(", ")); + if (tableFilterString.isEmpty()) { + throw new DebeziumException( + String.format( + "No table filters found for filtered publication %s", + publicationName)); + } + createOrUpdatePublicationStmt = + isUpdate + ? String.format( + "ALTER PUBLICATION %s SET TABLE %s;", + publicationName, tableFilterString) + : String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + publicationName, tableFilterString); + LOGGER.info( + isUpdate + ? "Updating Publication with statement '{}'" + : "Creating Publication with statement '{}'", + createOrUpdatePublicationStmt); + stmt.execute(createOrUpdatePublicationStmt); + } catch (Exception e) { + throw new ConnectException( + String.format( + "Unable to %s filtered publication %s for %s", + isUpdate ? "update" : "create", publicationName, tableFilterString), + e); + } + } + private Set determineCapturedTables() throws Exception { - Set allTableIds = - this.connect() - .readTableNames( - pgConnection().getCatalog(), null, null, new String[] {"TABLE"}); + Set allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName()); Set capturedTables = new HashSet<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index eadba8160cd..80b4cf235bb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -37,9 +37,9 @@ import java.util.stream.Collectors; /** - * Copied from Debezium project(1.9.7.final) to add method {@link + * Copied from Debezium project(1.9.8.final) to add method {@link * SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented - * {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition, + * {@link SqlServerStreamingChangeEventSource#execute(ChangeEventSourceContext, SqlServerPartition, * SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data * capture functionality. A main loop polls database DDL change and change data tables and turns * them into change events. diff --git a/pom.xml b/pom.xml index 43297e2408c..e82baec99b5 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,7 @@ under the License. 1.18.0 1.18 17.0 - 1.9.7.Final + 1.9.8.Final 3.2.0 2.2.0 1.18.3