Skip to content

Commit

Permalink
[source-mysql-v2] Add server timezone (#48016)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Oct 30, 2024
1 parent 3632670 commit 45eed0b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
dockerImageTag: 0.0.29
dockerImageTag: 0.0.30
dockerRepository: airbyte/source-mysql-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,44 +341,51 @@ class MySqlDebeziumOperations(

val commonProperties: Map<String, String> by lazy {
val tunnelSession: TunnelSession = jdbcConnectionFactory.ensureTunnelSession()
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MySqlConnector::class.java)
.withDebeziumName(databaseName)
.withHeartbeats(configuration.debeziumHeartbeatInterval)
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode
.with("binary.handling.mode", "base64")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
.with("snapshot.mode", "when_needed")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while
// Airbyte is taking a snapshot. There is a risk involved that if any database client
// makes a schema change then the sync might break
.with("snapshot.locking.mode", "none")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes
.with("include.schema.changes", "false")
.with(
"connect.keep.alive.interval.ms",
configuration.debeziumKeepAliveInterval.toMillis().toString(),
)
.withDatabase(configuration.jdbcProperties)
.withDatabase("hostname", tunnelSession.address.hostName)
.withDatabase("port", tunnelSession.address.port.toString())
.withDatabase("dbname", databaseName)
.withDatabase("server.id", serverID.toString())
.withDatabase("include.list", databaseName)
.withOffset()
.withSchemaHistory()
.with("converters", "datetime,numeric,boolean")
.with(
"datetime.type",
MySQLDateTimeConverter::class.java.getName(),
)
.with("numeric.type", MySQLNumericConverter::class.java.getName())
.with("boolean.type", MySQLBooleanConverter::class.java.getName())
// TODO: add missing properties, like MySQL converters, etc. Do a full audit.
.buildMap()
val dbzPropertiesBuilder =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MySqlConnector::class.java)
.withDebeziumName(databaseName)
.withHeartbeats(configuration.debeziumHeartbeatInterval)
// This to make sure that binary data represented as a base64-encoded String.
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode
.with("binary.handling.mode", "base64")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode
.with("snapshot.mode", "when_needed")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode
// This is to make sure other database clients are allowed to write to a table while
// Airbyte is taking a snapshot. There is a risk involved that if any database
// client
// makes a schema change then the sync might break
.with("snapshot.locking.mode", "none")
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-include-schema-changes
.with("include.schema.changes", "false")
.with(
"connect.keep.alive.interval.ms",
configuration.debeziumKeepAliveInterval.toMillis().toString(),
)
.withDatabase(configuration.jdbcProperties)
.withDatabase("hostname", tunnelSession.address.hostName)
.withDatabase("port", tunnelSession.address.port.toString())
.withDatabase("dbname", databaseName)
.withDatabase("server.id", serverID.toString())
.withDatabase("include.list", databaseName)
.withOffset()
.withSchemaHistory()
.with("converters", "datetime,numeric,boolean")
.with(
"datetime.type",
MySQLDateTimeConverter::class.java.getName(),
)
.with("numeric.type", MySQLNumericConverter::class.java.getName())
.with("boolean.type", MySQLBooleanConverter::class.java.getName())

val serverTimezone: String? =
(configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone
if (!serverTimezone.isNullOrBlank()) {
dbzPropertiesBuilder.with("database.connectionTimezone", serverTimezone)
}
dbzPropertiesBuilder.buildMap()
}

val syntheticProperties: Map<String, String> by lazy {
Expand Down

0 comments on commit 45eed0b

Please sign in to comment.