From 45eed0b0c9b49c4019fde7d562bec915912ae622 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Wed, 30 Oct 2024 15:40:08 -0700 Subject: [PATCH] [source-mysql-v2] Add server timezone (#48016) --- .../connectors/source-mysql-v2/metadata.yaml | 2 +- .../mysql/cdc/MySqlDebeziumOperations.kt | 83 ++++++++++--------- 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml index 2f6b898a1759..d0d4a8113bfe 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql-v2/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c - dockerImageTag: 0.0.29 + dockerImageTag: 0.0.30 dockerRepository: airbyte/source-mysql-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql-v2 diff --git a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt index 7efdbfac8e0c..63bd24c2d153 100644 --- a/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumOperations.kt @@ -341,44 +341,51 @@ class MySqlDebeziumOperations( val commonProperties: Map by lazy { val tunnelSession: TunnelSession = jdbcConnectionFactory.ensureTunnelSession() - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MySqlConnector::class.java) - .withDebeziumName(databaseName) - .withHeartbeats(configuration.debeziumHeartbeatInterval) - // This to make sure that binary data represented as a base64-encoded String. - // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode - .with("binary.handling.mode", "base64") - // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode - .with("snapshot.mode", "when_needed") - // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode - // This is to make sure other database clients are allowed to write to a table while - // Airbyte is taking a snapshot. There is a risk involved that if any database client - // makes a schema change then the sync might break - .with("snapshot.locking.mode", "none") - // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes - .with("include.schema.changes", "false") - .with( - "connect.keep.alive.interval.ms", - configuration.debeziumKeepAliveInterval.toMillis().toString(), - ) - .withDatabase(configuration.jdbcProperties) - .withDatabase("hostname", tunnelSession.address.hostName) - .withDatabase("port", tunnelSession.address.port.toString()) - .withDatabase("dbname", databaseName) - .withDatabase("server.id", serverID.toString()) - .withDatabase("include.list", databaseName) - .withOffset() - .withSchemaHistory() - .with("converters", "datetime,numeric,boolean") - .with( - "datetime.type", - MySQLDateTimeConverter::class.java.getName(), - ) - .with("numeric.type", MySQLNumericConverter::class.java.getName()) - .with("boolean.type", MySQLBooleanConverter::class.java.getName()) - // TODO: add missing properties, like MySQL converters, etc. Do a full audit. - .buildMap() + val dbzPropertiesBuilder = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MySqlConnector::class.java) + .withDebeziumName(databaseName) + .withHeartbeats(configuration.debeziumHeartbeatInterval) + // This to make sure that binary data represented as a base64-encoded String. + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode + .with("binary.handling.mode", "base64") + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode + .with("snapshot.mode", "when_needed") + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode + // This is to make sure other database clients are allowed to write to a table while + // Airbyte is taking a snapshot. There is a risk involved that if any database + // client + // makes a schema change then the sync might break + .with("snapshot.locking.mode", "none") + // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes + .with("include.schema.changes", "false") + .with( + "connect.keep.alive.interval.ms", + configuration.debeziumKeepAliveInterval.toMillis().toString(), + ) + .withDatabase(configuration.jdbcProperties) + .withDatabase("hostname", tunnelSession.address.hostName) + .withDatabase("port", tunnelSession.address.port.toString()) + .withDatabase("dbname", databaseName) + .withDatabase("server.id", serverID.toString()) + .withDatabase("include.list", databaseName) + .withOffset() + .withSchemaHistory() + .with("converters", "datetime,numeric,boolean") + .with( + "datetime.type", + MySQLDateTimeConverter::class.java.getName(), + ) + .with("numeric.type", MySQLNumericConverter::class.java.getName()) + .with("boolean.type", MySQLBooleanConverter::class.java.getName()) + + val serverTimezone: String? = + (configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone + if (!serverTimezone.isNullOrBlank()) { + dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone) + } + dbzPropertiesBuilder.buildMap() } val syntheticProperties: Map by lazy {