-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mysql-cdc: implementation to construct initial debezium state #28561
Merged
octavia-approvington
merged 11 commits into
master
from
mysql-cdc-initial-debezium-state
Jul 26, 2023
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
2c78464
wip
subodh1810 96620f6
Merge branch 'master' into mysql-cdc-initial-debezium-state
subodh1810 8e2f021
wip 2
subodh1810 71e3ee6
add tests + finalise implementation
subodh1810 7aee46d
Merge branch 'master' into mysql-cdc-initial-debezium-state
subodh1810 95c4d0e
undo unwanted changes
subodh1810 a13baec
Automated Commit - Format and Process Resources Changes
subodh1810 f2af115
minor refactor
subodh1810 593faf2
Merge branch 'master' into mysql-cdc-initial-debezium-state
subodh1810 0a2f0ec
add comment
subodh1810 98551ba
make method public
subodh1810 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
...rc/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlDebeziumStateUtil.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.debezium.internals.mysql; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.db.jdbc.JdbcUtils; | ||
import io.airbyte.integrations.debezium.internals.AirbyteFileOffsetBackingStore; | ||
import io.airbyte.integrations.debezium.internals.AirbyteSchemaHistoryStorage; | ||
import io.airbyte.integrations.debezium.internals.DebeziumRecordPublisher; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; | ||
import io.debezium.engine.ChangeEvent; | ||
import java.sql.SQLException; | ||
import java.time.Instant; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Stream; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MySqlDebeziumStateUtil { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDebeziumStateUtil.class); | ||
public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset"; | ||
public static final String MYSQL_DB_HISTORY = "mysql_db_history"; | ||
|
||
public JsonNode constructInitialDebeziumState(final Properties properties, | ||
final ConfiguredAirbyteCatalog catalog, | ||
final JdbcDatabase database) { | ||
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you just add a quick comment on the |
||
// We use the schema_only_recovery property cause using this mode will instruct Debezium to construct the db schema history. | ||
properties.setProperty("snapshot.mode", "schema_only_recovery"); | ||
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState( | ||
constructBinlogOffset(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText()), | ||
Optional.empty()); | ||
final AirbyteSchemaHistoryStorage schemaHistoryStorage = AirbyteSchemaHistoryStorage.initializeDBHistory(Optional.empty()); | ||
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(); | ||
try (final DebeziumRecordPublisher publisher = new DebeziumRecordPublisher(properties, database.getSourceConfig(), catalog, offsetManager, | ||
Optional.of(schemaHistoryStorage))) { | ||
publisher.start(queue); | ||
while (!publisher.hasClosed()) { | ||
final ChangeEvent<String, String> event = queue.poll(10, TimeUnit.SECONDS); | ||
if (event == null) { | ||
continue; | ||
} | ||
LOGGER.info("A record is returned, closing the engine since the state is constructed"); | ||
publisher.close(); | ||
break; | ||
} | ||
} catch (final Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
||
final Map<String, String> offset = offsetManager.read(); | ||
final String dbHistory = schemaHistoryStorage.read(); | ||
|
||
assert !offset.isEmpty(); | ||
assert Objects.nonNull(dbHistory); | ||
|
||
final Map<String, Object> state = new HashMap<>(); | ||
state.put(MYSQL_CDC_OFFSET, offset); | ||
state.put(MYSQL_DB_HISTORY, dbHistory); | ||
|
||
final JsonNode asJson = Jsons.jsonNode(state); | ||
LOGGER.info("Initial Debezium state constructed: {}", asJson); | ||
|
||
return asJson; | ||
} | ||
|
||
/** | ||
* Method to construct initial Debezium state which can be passed onto Debezium engine to make it | ||
* process binlogs from a specific file and position and skip snapshot phase | ||
*/ | ||
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String dbName) { | ||
return format(getStateAttributesFromDB(database), dbName, Instant.now()); | ||
} | ||
|
||
@VisibleForTesting | ||
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String dbName, final Instant time) { | ||
final String key = "[\"" + dbName + "\",{\"server\":\"" + dbName + "\"}]"; | ||
final String gtidSet = attributes.gtidSet().isPresent() ? ",\"gtids\":\"" + attributes.gtidSet().get() + "\"" : ""; | ||
final String value = | ||
"{\"transaction_id\":null,\"ts_sec\":" + time.getEpochSecond() + ",\"file\":\"" + attributes.binlogFilename() + "\",\"pos\":" | ||
+ attributes.binlogPosition() | ||
+ gtidSet + "}"; | ||
|
||
final Map<String, String> result = new HashMap<>(); | ||
result.put(key, value); | ||
|
||
final JsonNode jsonNode = Jsons.jsonNode(result); | ||
LOGGER.info("Initial Debezium state offset constructed: {}", jsonNode); | ||
|
||
return jsonNode; | ||
} | ||
|
||
public MysqlDebeziumStateAttributes getStateAttributesFromDB(final JdbcDatabase database) { | ||
try (final Stream<MysqlDebeziumStateAttributes> stream = database.unsafeResultSetQuery( | ||
connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), | ||
resultSet -> { | ||
final String file = resultSet.getString("File"); | ||
final long position = resultSet.getLong("Position"); | ||
assert file != null; | ||
assert position >= 0; | ||
if (resultSet.getMetaData().getColumnCount() > 4) { | ||
// This column exists only in MySQL 5.6.5 or later ... | ||
final String gtidSet = resultSet.getString(5); // GTID set, may be null, blank, or contain a GTID set | ||
return new MysqlDebeziumStateAttributes(file, position, removeNewLineChars(gtidSet)); | ||
} | ||
return new MysqlDebeziumStateAttributes(file, position, Optional.empty()); | ||
})) { | ||
final List<MysqlDebeziumStateAttributes> stateAttributes = stream.toList(); | ||
assert stateAttributes.size() == 1; | ||
return stateAttributes.get(0); | ||
} catch (final SQLException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private Optional<String> removeNewLineChars(final String gtidSet) { | ||
if (gtidSet != null && !gtidSet.trim().isEmpty()) { | ||
// Remove all the newline chars that exist in the GTID set string ... | ||
return Optional.of(gtidSet.replace("\n", "").replace("\r", "")); | ||
} | ||
|
||
return Optional.empty(); | ||
} | ||
|
||
public record MysqlDebeziumStateAttributes(String binlogFilename, long binlogPosition, Optional<String> gtidSet) { | ||
|
||
} | ||
|
||
} |
142 changes: 142 additions & 0 deletions
142
.../src/test/java/io/airbyte/integrations/debezium/internals/MysqlDebeziumStateUtilTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.debezium.internals; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.string.Strings; | ||
import io.airbyte.db.Database; | ||
import io.airbyte.db.factory.DSLContextFactory; | ||
import io.airbyte.db.factory.DataSourceFactory; | ||
import io.airbyte.db.factory.DatabaseDriver; | ||
import io.airbyte.db.jdbc.DefaultJdbcDatabase; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.db.jdbc.JdbcUtils; | ||
import io.airbyte.integrations.debezium.internals.mysql.MySqlDebeziumStateUtil; | ||
import io.airbyte.protocol.models.Field; | ||
import io.airbyte.protocol.models.JsonSchemaType; | ||
import io.airbyte.protocol.models.v0.AirbyteCatalog; | ||
import io.airbyte.protocol.models.v0.CatalogHelpers; | ||
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.v0.SyncMode; | ||
import java.sql.SQLException; | ||
import java.time.Instant; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import org.jooq.SQLDialect; | ||
import org.junit.jupiter.api.Assertions; | ||
import org.junit.jupiter.api.Test; | ||
import org.testcontainers.containers.MySQLContainer; | ||
|
||
public class MysqlDebeziumStateUtilTest { | ||
|
||
private static final String DB_NAME = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); | ||
private static final String TABLE_NAME = Strings.addRandomSuffix("table", "_", 10).toLowerCase(); | ||
private static final Properties MYSQL_PROPERTIES = new Properties(); | ||
private static final String DB_CREATE_QUERY = "CREATE DATABASE " + DB_NAME; | ||
private static final String TABLE_CREATE_QUERY = "CREATE TABLE " + DB_NAME + "." + TABLE_NAME + " (id INTEGER, name VARCHAR(200), PRIMARY KEY(id))"; | ||
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( | ||
CatalogHelpers.createAirbyteStream( | ||
TABLE_NAME, | ||
DB_NAME, | ||
Field.of("id", JsonSchemaType.INTEGER), | ||
Field.of("string", JsonSchemaType.STRING)) | ||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) | ||
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))); | ||
protected static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); | ||
|
||
static { | ||
CONFIGURED_CATALOG.getStreams().forEach(s -> s.setSyncMode(SyncMode.INCREMENTAL)); | ||
MYSQL_PROPERTIES.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector"); | ||
MYSQL_PROPERTIES.setProperty("database.server.id", "5000"); | ||
} | ||
|
||
@Test | ||
public void debeziumInitialStateConstructTest() throws SQLException { | ||
try (final MySQLContainer<?> container = new MySQLContainer<>("mysql:8.0")) { | ||
container.start(); | ||
initDB(container); | ||
final JdbcDatabase database = getJdbcDatabase(container); | ||
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); | ||
final JsonNode debeziumState = mySqlDebeziumStateUtil.constructInitialDebeziumState(MYSQL_PROPERTIES, CONFIGURED_CATALOG, database); | ||
Assertions.assertEquals(2, Jsons.object(debeziumState, Map.class).size()); | ||
Assertions.assertTrue(debeziumState.has("mysql_db_history")); | ||
Assertions.assertNotNull(debeziumState.get("mysql_db_history")); | ||
Assertions.assertTrue(debeziumState.has("mysql_cdc_offset")); | ||
final Map<String, String> mysqlCdcOffset = Jsons.object(debeziumState.get("mysql_cdc_offset"), Map.class); | ||
Assertions.assertEquals(1, mysqlCdcOffset.size()); | ||
Assertions.assertTrue(mysqlCdcOffset.containsKey("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]")); | ||
Assertions.assertNotNull(mysqlCdcOffset.get("[\"" + DB_NAME + "\",{\"server\":\"" + DB_NAME + "\"}]")); | ||
container.stop(); | ||
} | ||
} | ||
|
||
@Test | ||
public void formatTestWithGtid() { | ||
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); | ||
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633, | ||
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z")); | ||
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class); | ||
Assertions.assertEquals(1, stateAsMap.size()); | ||
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); | ||
Assertions.assertEquals( | ||
"{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633,\"gtids\":\"3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5\"}", | ||
stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); | ||
} | ||
|
||
@Test | ||
public void formatTestWithoutGtid() { | ||
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil(); | ||
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MySqlDebeziumStateUtil.MysqlDebeziumStateAttributes("binlog.000002", 633, | ||
Optional.empty()), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z")); | ||
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class); | ||
Assertions.assertEquals(1, stateAsMap.size()); | ||
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); | ||
Assertions.assertEquals("{\"transaction_id\":null,\"ts_sec\":1686040570,\"file\":\"binlog.000002\",\"pos\":633}", | ||
stateAsMap.get("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]")); | ||
} | ||
|
||
private JdbcDatabase getJdbcDatabase(final MySQLContainer<?> container) { | ||
final JdbcDatabase database = new DefaultJdbcDatabase( | ||
DataSourceFactory.create( | ||
"root", | ||
"test", | ||
DatabaseDriver.MYSQL.getDriverClassName(), | ||
String.format(DatabaseDriver.MYSQL.getUrlFormatString(), | ||
container.getHost(), | ||
container.getFirstMappedPort(), | ||
DB_NAME))); | ||
database.setSourceConfig(getSourceConfig(container)); | ||
return database; | ||
} | ||
|
||
private void initDB(final MySQLContainer<?> container) throws SQLException { | ||
final Database db = new Database(DSLContextFactory.create( | ||
"root", | ||
"test", | ||
DatabaseDriver.MYSQL.getDriverClassName(), | ||
String.format("jdbc:mysql://%s:%s", | ||
container.getHost(), | ||
container.getFirstMappedPort()), | ||
SQLDialect.MYSQL)); | ||
db.query(ctx -> ctx.execute(DB_CREATE_QUERY)); | ||
db.query(ctx -> ctx.execute(TABLE_CREATE_QUERY)); | ||
} | ||
|
||
private JsonNode getSourceConfig(final MySQLContainer<?> container) { | ||
final Map<String, Object> config = new HashMap<>(); | ||
config.put(JdbcUtils.USERNAME_KEY, "root"); | ||
config.put(JdbcUtils.PASSWORD_KEY, "test"); | ||
config.put(JdbcUtils.HOST_KEY, container.getHost()); | ||
config.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort()); | ||
config.put(JdbcUtils.DATABASE_KEY, DB_NAME); | ||
return Jsons.jsonNode(config); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this method is basically constructing the state from spinning up the DBZ engine and grabbing the state assc with the first record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and no! As discussed over call, we construct the offset by ourselves but in order to construct the db schema we need Debezium and thus we give Debezium the offset that we generated and run it with
schema_only_recovery
mode and it generates the schema history on its own and we use it.