diff --git a/airbyte-integrations/bases/debezium/build.gradle b/airbyte-integrations/bases/debezium/build.gradle index a82922b63951..5e515f0c6876 100644 --- a/airbyte-integrations/bases/debezium/build.gradle +++ b/airbyte-integrations/bases/debezium/build.gradle @@ -17,6 +17,7 @@ dependencies { testImplementation project(':airbyte-test-utils') testImplementation libs.connectors.testcontainers.jdbc + testImplementation libs.connectors.testcontainers.mysql testImplementation libs.connectors.testcontainers.postgresql testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2' diff --git a/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlDebeziumStateUtil.java b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlDebeziumStateUtil.java new file mode 100644 index 000000000000..af85ae51159a --- /dev/null +++ b/airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlDebeziumStateUtil.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals.mysql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; +import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage; +import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.debezium.engine.ChangeEvent; +import java.sql.SQLException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MySqlDebeziumStateUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDebeziumStateUtil.class); + public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset"; + public static final String MYSQL_DB_HISTORY = "mysql_db_history"; + + public JsonNode constructInitialDebeziumState(final Properties properties, + final ConfiguredAirbyteCatalog catalog, + final JdbcDatabase database) { + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode + // We use the schema_only_recovery property cause using this mode will instruct Debezium to construct the db schema history. + properties.setProperty("snapshot.mode", "schema_only_recovery"); + final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState( + constructBinlogOffset(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText()), + Optional.empty()); + final AirbyteSchemaHistoryStorage schemaHistoryStorage = AirbyteSchemaHistoryStorage.initializeDBHistory(Optional.empty()); + final LinkedBlockingQueue> queue = new LinkedBlockingQueue<>(); + try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(properties, database.getSourceConfig(), catalog, offsetManager, + Optional.of(schemaHistoryStorage))) { + publisher.start(queue); + while (!publisher.hasClosed()) { + final ChangeEvent event = queue.poll(10, TimeUnit.SECONDS); + if (event == null) { + continue; + } + LOGGER.info("A record is returned, closing the engine since the state is constructed"); + publisher.close(); + break; + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + + final Map offset = offsetManager.read(); + final String dbHistory = schemaHistoryStorage.read(); + + assert !offset.isEmpty(); + assert Objects.nonNull(dbHistory); + + final Map state = new HashMap<>(); + state.put(MYSQL_CDC_OFFSET, offset); + state.put(MYSQL_DB_HISTORY, dbHistory); + + final JsonNode asJson = Jsons.jsonNode(state); + LOGGER.info("Initial Debezium state constructed: {}", asJson); + + return asJson; + } + + /** + * Method to construct initial Debezium state which can be passed onto Debezium engine to make it + * process binlogs from a specific file and position and skip snapshot phase + */ + private JsonNode constructBinlogOffset(final JdbcDatabase database, final String dbName) { + return format(getStateAttributesFromDB(database), dbName, Instant.now()); + } + + @VisibleForTesting + public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String dbName, final Instant time) { + final String key = "[\"" + dbName + "\",{\"server\":\"" + dbName + "\"}]"; + final String gtidSet = attributes.gtidSet().isPresent() ? ",\"gtids\":\"" + attributes.gtidSet().get() + "\"" : ""; + final String value = + "{\"transaction_id\":null,\"ts_sec\":" + time.getEpochSecond() + ",\"file\":\"" + attributes.binlogFilename() + "\",\"pos\":" + + attributes.binlogPosition() + + gtidSet + "}"; + + final Map result = new HashMap<>(); + result.put(key, value); + + final JsonNode jsonNode = Jsons.jsonNode(result); + LOGGER.info("Initial Debezium state offset constructed: {}", jsonNode); + + return jsonNode; + } + + public MysqlDebeziumStateAttributes getStateAttributesFromDB(final JdbcDatabase database) { + try (final Stream stream = database.unsafeResultSetQuery( + connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), + resultSet -> { + final String file = resultSet.getString("File"); + final long position = resultSet.getLong("Position"); + assert file != null; + assert position >= 0; + if (resultSet.getMetaData().getColumnCount() > 4) { + // This column exists only in MySQL 5.6.5 or later ... + final String gtidSet = resultSet.getString(5); // GTID set, may be null, blank, or contain a GTID set + return new MysqlDebeziumStateAttributes(file, position, removeNewLineChars(gtidSet)); + } + return new MysqlDebeziumStateAttributes(file, position, Optional.empty()); + })) { + final List stateAttributes = stream.toList(); + assert stateAttributes.size() == 1; + return stateAttributes.get(0); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private Optional removeNewLineChars(final String gtidSet) { + if (gtidSet != null && !gtidSet.trim().isEmpty()) { + // Remove all the newline chars that exist in the GTID set string ... + return Optional.of(gtidSet.replace("\n", "").replace("\r", "")); + } + + return Optional.empty(); + } + + public record MysqlDebeziumStateAttributes(String binlogFilename, long binlogPosition, Optional gtidSet) { + + } + +} diff --git a/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/MysqlDebeziumStateUtilTest.java b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/MysqlDebeziumStateUtilTest.java new file mode 100644 index 000000000000..4636d4134a44 --- /dev/null +++ b/airbyte-integrations/bases/debezium/src/test/java/io/airbyte/integrations/debezium/internals/MysqlDebeziumStateUtilTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.debezium.internals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.DefaultJdbcDatabase; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteCatalog; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.SyncMode; +import java.sql.SQLException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import org.jooq.SQLDialect; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; + +public class MysqlDebeziumStateUtilTest { + + private static final String DB_NAME = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + private static final String TABLE_NAME = Strings.addRandomSuffix("table", "_", 10).toLowerCase(); + private static final Properties MYSQL_PROPERTIES = new Properties(); + private static final String DB_CREATE_QUERY = "CREATE DATABASE " + DB_NAME; + private static final String TABLE_CREATE_QUERY = "CREATE TABLE " + DB_NAME + "." + TABLE_NAME + " (id INTEGER, name VARCHAR(200), PRIMARY KEY(id))"; + private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + DB_NAME, + Field.of("id", JsonSchemaType.INTEGER), + Field.of("string", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("id"))))); + protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); + + static { + CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); + MYSQL_PROPERTIES.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); + MYSQL_PROPERTIES.setProperty("database.server.id", "5000"); + } + + @Test + public void debeziumInitialStateConstructTest() throws SQLException { + try (final MySQLContainer container = new MySQLContainer<>("mysql:8.0")) { + container.start(); + initDB(container); + final JdbcDatabase database = getJdbcDatabase(container); + final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); + final JsonNode debeziumState = mySqlDebeziumStateUtil.constructInitialDebeziumState(MYSQL_PROPERTIES, CONFIGURED_CATALOG, database); + Assertions.assertEquals(2, Jsons.object(debeziumState, Map.class).size()); + Assertions.assertTrue(debeziumState.has("mysql_db_history")); + Assertions.assertNotNull(debeziumState.get("mysql_db_history")); + Assertions.assertTrue(debeziumState.has("mysql_cdc_offset")); + final Map mysqlCdcOffset = Jsons.object(debeziumState.get("mysql_cdc_offset"), Map.class); + Assertions.assertEquals(1, mysqlCdcOffset.size()); + Assertions.assertTrue(mysqlCdcOffset.containsKey("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]")); + Assertions.assertNotNull(mysqlCdcOffset.get("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]")); + container.stop(); + } + } + + @Test + public void formatTestWithGtid() { + final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); + final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633, + Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z")); + final Map stateAsMap = Jsons.object(debeziumState, Map.class); + Assertions.assertEquals(1, stateAsMap.size()); + Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); + Assertions.assertEquals( + "{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633,\"gtids\":\"3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5\"}", + stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); + } + + @Test + public void formatTestWithoutGtid() { + final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); + final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633, + Optional.empty()), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z")); + final Map stateAsMap = Jsons.object(debeziumState, Map.class); + Assertions.assertEquals(1, stateAsMap.size()); + Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); + Assertions.assertEquals("{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633}", + stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); + } + + private JdbcDatabase getJdbcDatabase(final MySQLContainer container) { + final JdbcDatabase database = new DefaultJdbcDatabase( + DataSourceFactory.create( + "root", + "test", + DatabaseDriver.MYSQL.getDriverClassName(), + String.format(DatabaseDriver.MYSQL.getUrlFormatString(), + container.getHost(), + container.getFirstMappedPort(), + DB_NAME))); + database.setSourceConfig(getSourceConfig(container)); + return database; + } + + private void initDB(final MySQLContainer container) throws SQLException { + final Database db = new Database(DSLContextFactory.create( + "root", + "test", + DatabaseDriver.MYSQL.getDriverClassName(), + String.format("jdbc:mysql://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + SQLDialect.MYSQL)); + db.query(ctx -> ctx.execute(DB_CREATE_QUERY)); + db.query(ctx -> ctx.execute(TABLE_CREATE_QUERY)); + } + + private JsonNode getSourceConfig(final MySQLContainer container) { + final Map config = new HashMap<>(); + config.put(JdbcUtils.USERNAME_KEY, "root"); + config.put(JdbcUtils.PASSWORD_KEY, "test"); + config.put(JdbcUtils.HOST_KEY, container.getHost()); + config.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()); + config.put(JdbcUtils.DATABASE_KEY, DB_NAME); + return Jsons.jsonNode(config); + } + +} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcSavedInfoFetcher.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcSavedInfoFetcher.java index 76050c614eb9..30596a2dcd4c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcSavedInfoFetcher.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcSavedInfoFetcher.java @@ -4,8 +4,8 @@ package io.airbyte.integrations.source.mysql; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.debezium.CdcSavedInfoFetcher; diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java index 74e7a3f49f30..921066f34a0b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java @@ -4,8 +4,8 @@ package io.airbyte.integrations.source.mysql; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index b8c8799a09ac..6da5dcd8cd18 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -40,6 +40,7 @@ import io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil; import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition; import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition; +import io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils; import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils; @@ -97,8 +98,6 @@ public class MySqlSource extends AbstractJdbcSource implements Source """; public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName(); - public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset"; - public static final String MYSQL_DB_HISTORY = "mysql_db_history"; public static final String CDC_LOG_FILE = "_ab_cdc_log_file"; public static final String CDC_LOG_POS = "_ab_cdc_log_pos"; public static final List SSL_PARAMETERS = List.of( diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index 3e5a90efb3ee..9cfddc25406d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -6,11 +6,11 @@ import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET; +import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE; import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS; import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET; -import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull;