Skip to content

Commit

Permalink
source-mysql: adopt RelationalColumnCustomConverter, fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Dec 19, 2024
1 parent 3f30b8c commit 0aa2030
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 253 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.226'
cdk = 'local'
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.4
dockerImageTag: 3.9.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,53 @@

package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import io.airbyte.cdk.read.cdc.Converted
import io.airbyte.cdk.read.cdc.NoConversion
import io.airbyte.cdk.read.cdc.PartialConverter
import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter
import org.apache.kafka.connect.data.SchemaBuilder

class MySqlSourceCdcBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}
class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter {

private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT")
override val debeziumPropertiesKey: String = "boolean"
override val handlers: List<RelationalColumnCustomConverter.Handler> =
listOf(booleanHandler, tinyint1Handler)

override fun converterFor(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
if (
Arrays.stream(BOOLEAN_TYPES).anyMatch { s: String ->
field!!.typeName().contains(s, ignoreCase = true) &&
field.length().isPresent &&
field.length().asInt == 1
}
) {
registerBoolean(field, registration)
}
}
companion object {
val booleanHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().startsWith("BOOL", ignoreCase = true) },
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
PartialConverter {
when (it) {
null -> Converted(false)
is Boolean -> Converted(it)
else -> NoConversion
}
}
)
)

private fun registerBoolean(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
registration?.register(SchemaBuilder.bool()) { x ->
if (x == null) {
return@register if (field!!.isOptional) {
null
} else if (field.hasDefaultValue()) {
field.defaultValue()
} else {
false
}
}
when (x) {
is Boolean -> x
is String -> x.toBoolean()
is Number -> x != 0
else -> throw IllegalArgumentException("Unsupported type: ${x::class}")
}
}
val tinyint1Handler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("TINYINT", ignoreCase = true) &&
it.length().isPresent &&
it.length().asInt == 1
},
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
PartialConverter {
when (it) {
null -> Converted(false)
is Number -> Converted(it != 0)
else -> NoConversion
}
}
)
)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,117 +4,92 @@

package io.airbyte.integrations.source.mysql

import io.airbyte.cdk.jdbc.converters.DateTimeConverter
import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import io.debezium.time.Conversions
import io.airbyte.cdk.data.LocalDateCodec
import io.airbyte.cdk.data.LocalDateTimeCodec
import io.airbyte.cdk.data.LocalTimeCodec
import io.airbyte.cdk.data.OffsetDateTimeCodec
import io.airbyte.cdk.read.cdc.Converted
import io.airbyte.cdk.read.cdc.NoConversion
import io.airbyte.cdk.read.cdc.NullFallThrough
import io.airbyte.cdk.read.cdc.PartialConverter
import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter
import java.time.Duration
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.util.*
import java.util.concurrent.TimeUnit
import java.time.ZonedDateTime
import org.apache.kafka.connect.data.SchemaBuilder

/**
* This is a custom debezium converter used in MySQL to handle the DATETIME data type. We need a
* custom converter cause by default debezium returns the DATETIME values as numbers. We need to
* convert it to proper format. Ref :
* https://debezium.io/documentation/reference/2.1/development/converters.html This is built from
* reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you
* rename this class then remember to rename the datetime.type property value in {@link
* MySqlCdcProperties#commonProperties(JdbcDatabase)} (If you don't rename, a test would still fail
* but it might be tricky to figure out where to change the property name)
*/
class MySqlSourceCdcTemporalConverter : CustomConverter<SchemaBuilder, RelationalColumn> {

private val DATE_TYPES = arrayOf("DATE", "DATETIME", "TIME", "TIMESTAMP")
override fun configure(props: Properties?) {}

override fun converterFor(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
if (
Arrays.stream<String>(DATE_TYPES).anyMatch { s: String ->
s.equals(
field!!.typeName(),
ignoreCase = true,
)
}
) {
registerDate(field, registration)
}
}
class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

private fun getTimePrecision(field: RelationalColumn): Int {
return field.length().orElse(-1)
}
override val debeziumPropertiesKey: String = "temporal"

private fun registerDate(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
val fieldType = field!!.typeName()
override val handlers: List<RelationalColumnCustomConverter.Handler> =
listOf(datetimeHandler, dateHandler, timeHandler, timestampHandler)

registration?.register(SchemaBuilder.string().optional()) { x ->
if (x == null) {
return@register convertDefaultValue(field)
}
companion object {

when (fieldType.uppercase()) {
"DATETIME" -> {
if (x is Long) {
if (getTimePrecision(field) <= 3) {
return@register DateTimeConverter.convertToTimestamp(
Conversions.toInstantFromMillis(x),
)
val datetimeHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("DATETIME", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDateTime)
Converted(it.format(LocalDateTimeCodec.formatter))
else NoConversion
}
if (getTimePrecision(field) <= 6) {
return@register DateTimeConverter.convertToTimestamp(
Conversions.toInstantFromMicros(x),
)
}
}
DateTimeConverter.convertToTimestamp(x)
}
"DATE" -> {
if (x is Int) {
return@register DateTimeConverter.convertToDate(
LocalDate.ofEpochDay(
x.toLong(),
),
)
}
DateTimeConverter.convertToDate(x)
}
"TIME" -> {
if (x is Long) {
val l = Math.multiplyExact(x, TimeUnit.MICROSECONDS.toNanos(1))
return@register DateTimeConverter.convertToTime(
LocalTime.ofNanoOfDay(
l,
),
)
}
DateTimeConverter.convertToTime(x)
}
"TIMESTAMP" ->
DateTimeConverter.convertToTimestampWithTimezone(
x,
)
else ->
throw IllegalArgumentException("Unknown field type " + fieldType.uppercase())
}
}
}
)

companion object {
fun convertDefaultValue(field: RelationalColumn): Any? {
if (field.isOptional) {
return null
} else if (field.hasDefaultValue()) {
return field.defaultValue()
}
return null
}
val dateHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("DATE", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is LocalDate) Converted(it.format(LocalDateCodec.formatter))
else NoConversion
},
),
)

val timeHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("TIME", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is Duration)
Converted(
LocalTime.MIDNIGHT.plus(it).format(LocalTimeCodec.formatter)
)
else NoConversion
},
),
)

val timestampHandler =
RelationalColumnCustomConverter.Handler(
predicate = { it.typeName().equals("TIMESTAMP", ignoreCase = true) },
outputSchema = SchemaBuilder.string(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter {
if (it is ZonedDateTime)
Converted(
it.toOffsetDateTime().format(OffsetDateTimeCodec.formatter)
)
else NoConversion
},
),
)
}
}
Loading

0 comments on commit 0aa2030

Please sign in to comment.