Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql-cdc: implementation to construct initial debezium state #28561

Merged
merged 11 commits into from
Jul 26, 2023
1 change: 1 addition & 0 deletions airbyte-integrations/bases/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {

testImplementation project(':airbyte-test-utils')
testImplementation libs.connectors.testcontainers.jdbc
testImplementation libs.connectors.testcontainers.mysql
testImplementation libs.connectors.testcontainers.postgresql

testFixturesImplementation 'org.junit.jupiter:junit-jupiter-engine:5.4.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore;
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.debezium.engine.ChangeEvent;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlDebeziumStateUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDebeziumStateUtil.class);
public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset";
public static final String MYSQL_DB_HISTORY = "mysql_db_history";

public JsonNode constructInitialDebeziumState(final Properties properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this method is basically constructing the state from spinning up the DBZ engine and grabbing the state assc with the first record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no! As discussed over call, we construct the offset by ourselves but in order to construct the db schema we need Debezium and thus we give Debezium the offset that we generated and run it with schema_only_recovery mode and it generates the schema history on its own and we use it.

final ConfiguredAirbyteCatalog catalog,
final JdbcDatabase database) {
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
Copy link
Contributor

@akashkulk akashkulk Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just add a quick comment on the schema_only_recovery and why we use it?

properties.setProperty("snapshot.mode", "schema_only_recovery");
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
constructBinlogOffset(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText()),
Optional.empty());
final AirbyteSchemaHistoryStorage schemaHistoryStorage = AirbyteSchemaHistoryStorage.initializeDBHistory(Optional.empty());
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>();
try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(properties, database.getSourceConfig(), catalog, offsetManager,
Optional.of(schemaHistoryStorage))) {
publisher.start(queue);
while (!publisher.hasClosed()) {
final ChangeEvent<String, String> event = queue.poll(10, TimeUnit.SECONDS);
if (event == null) {
continue;
}
LOGGER.info("A record is returned, closing the engine since the state is constructed");
publisher.close();
break;
}
} catch (Exception e) {
throw new RuntimeException(e);
}

final Map<String, String> offset = offsetManager.read();
final String dbHistory = schemaHistoryStorage.read();

assert !offset.isEmpty();
assert Objects.nonNull(dbHistory);

final Map<String, Object> state = new HashMap<>();
state.put(MYSQL_CDC_OFFSET, offset);
state.put(MYSQL_DB_HISTORY, dbHistory);

final JsonNode asJson = Jsons.jsonNode(state);
LOGGER.info("Initial Debezium state constructed: {}", asJson);

return asJson;
}

/**
* Method to construct initial Debezium state which can be passed onto Debezium engine to make it
* process binlogs from a specific file and position and skip snapshot phase
*/
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String dbName) {
return format(getStateAttributesFromDB(database), dbName, Instant.now());
}

@VisibleForTesting
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String dbName, final Instant time) {
final String key = "[\"" + dbName + "\",{\"server\":\"" + dbName + "\"}]";
final String gtidSet = attributes.gtidSet().isPresent() ? ",\"gtids\":\"" + attributes.gtidSet().get() + "\"" : "";
final String value =
"{\"transaction_id\":null,\"ts_sec\":" + time.getEpochSecond() + ",\"file\":\"" + attributes.binlogFilename() + "\",\"pos\":"
+ attributes.binlogPosition()
+ gtidSet + "}";

final Map<String, String> result = new HashMap<>();
result.put(key, value);

final JsonNode jsonNode = Jsons.jsonNode(result);
LOGGER.info("Initial Debezium state constructed: {}", jsonNode);

return jsonNode;
}

private MysqlDebeziumStateAttributes getStateAttributesFromDB(final JdbcDatabase database) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit : let's make this method public. I'm going to need it in the initial snapshotting code, so that we can attach the binlog_file + position for each record emitted in the initial pk sync

try (final Stream<MysqlDebeziumStateAttributes> stream = database.unsafeResultSetQuery(
connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"),
resultSet -> {
final String file = resultSet.getString("File");
final long position = resultSet.getLong("Position");
assert file != null;
assert position >= 0;
if (resultSet.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
final String gtidSet = resultSet.getString(5); // GTID set, may be null, blank, or contain a GTID set
return new MysqlDebeziumStateAttributes(file, position, removeNewLineChars(gtidSet));
}
return new MysqlDebeziumStateAttributes(file, position, Optional.empty());
})) {
final List<MysqlDebeziumStateAttributes> stateAttributes = stream.toList();
assert stateAttributes.size() == 1;
return stateAttributes.get(0);
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}

private Optional<String> removeNewLineChars(final String gtidSet) {
if (gtidSet != null && !gtidSet.trim().isEmpty()) {
// Remove all the newline chars that exist in the GTID set string ...
return Optional.of(gtidSet.replace("\n", "").replace("\r", ""));
}

return Optional.empty();
}

public record MysqlDebeziumStateAttributes(String binlogFilename, long binlogPosition, Optional<String> gtidSet) {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.SyncMode;
import java.sql.SQLException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MysqlDebeziumStateUtilTest {

private static final String DB_NAME = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
private static final String TABLE_NAME = Strings.addRandomSuffix("table", "_", 10).toLowerCase();
private static final Properties MYSQL_PROPERTIES = new Properties();
private static final String DB_CREATE_QUERY = "CREATE DATABASE " + DB_NAME;
private static final String TABLE_CREATE_QUERY = "CREATE TABLE " + DB_NAME + "." + TABLE_NAME + " (id INTEGER, name VARCHAR(200), PRIMARY KEY(id))";
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
DB_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("string", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));
protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);

static {
CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL));
MYSQL_PROPERTIES.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
MYSQL_PROPERTIES.setProperty("database.server.id", "5000");
}

@Test
public void debeziumInitialStateConstructTest() throws SQLException {
try (final MySQLContainer<?> container = new MySQLContainer<>("mysql:8.0")) {
container.start();
initDB(container);
final JdbcDatabase database = getJdbcDatabase(container);
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode debeziumState = mySqlDebeziumStateUtil.constructInitialDebeziumState(MYSQL_PROPERTIES, CONFIGURED_CATALOG, database);
Assertions.assertEquals(2, Jsons.object(debeziumState, Map.class).size());
Assertions.assertTrue(debeziumState.has("mysql_db_history"));
Assertions.assertNotNull(debeziumState.get("mysql_db_history"));
Assertions.assertTrue(debeziumState.has("mysql_cdc_offset"));
Map<String, String> mysqlCdcOffset = Jsons.object(debeziumState.get("mysql_cdc_offset"), Map.class);
Assertions.assertEquals(1, mysqlCdcOffset.size());
Assertions.assertTrue(mysqlCdcOffset.containsKey("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]"));
Assertions.assertNotNull(mysqlCdcOffset.get("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]"));
container.stop();
}
}

@Test
public void formatTestWithGtid() {
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633,
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
Assertions.assertEquals(1, stateAsMap.size());
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
Assertions.assertEquals(
"{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633,\"gtids\":\"3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5\"}",
stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
}

@Test
public void formatTestWithoutGtid() {
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633,
Optional.empty()), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
Assertions.assertEquals(1, stateAsMap.size());
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
Assertions.assertEquals("{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633}",
stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
}

private JdbcDatabase getJdbcDatabase(final MySQLContainer<?> container) {
final JdbcDatabase database = new DefaultJdbcDatabase(
DataSourceFactory.create(
"root",
"test",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
container.getHost(),
container.getFirstMappedPort(),
DB_NAME)));
database.setSourceConfig(getSourceConfig(container));
return database;
}

private void initDB(final MySQLContainer<?> container) throws SQLException {
final Database db = new Database(DSLContextFactory.create(
"root",
"test",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format("jdbc:mysql://%s:%s",
container.getHost(),
container.getFirstMappedPort()),
SQLDialect.MYSQL));
db.query(ctx -> ctx.execute(DB_CREATE_QUERY));
db.query(ctx -> ctx.execute(TABLE_CREATE_QUERY));
}

private JsonNode getSourceConfig(final MySQLContainer<?> container) {
Map<String, Object> config = new HashMap<>();
config.put(JdbcUtils.USERNAME_KEY, "root");
config.put(JdbcUtils.PASSWORD_KEY, "test");
config.put(JdbcUtils.HOST_KEY, container.getHost());
config.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort());
config.put(JdbcUtils.DATABASE_KEY, DB_NAME);
return Jsons.jsonNode(config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.debezium.CdcSavedInfoFetcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
Expand Down Expand Up @@ -97,8 +98,6 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
""";

public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName();
public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset";
public static final String MYSQL_DB_HISTORY = "mysql_db_history";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final List<String> SSL_PARAMETERS = List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil.MYSQL_DB_HISTORY;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
import static io.airbyte.integrations.source.mysql.MySqlSource.DRIVER_CLASS;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down