Skip to content

Commit

Permalink
source-mysql: datatype fixes (#49918)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Dec 20, 2024
1 parent 34f1600 commit 1d6b34c
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 253 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ application {
airbyteBulkConnector {
core = 'extract'
toolkits = ['extract-jdbc', 'extract-cdc']
cdk = '0.231'
cdk = '0.234'
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.10.0-rc.2
dockerImageTag: 3.10.0-rc.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,32 @@

package io.airbyte.integrations.source.mysql

import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.*
import io.airbyte.cdk.read.cdc.Converted
import io.airbyte.cdk.read.cdc.NoConversion
import io.airbyte.cdk.read.cdc.NullFallThrough
import io.airbyte.cdk.read.cdc.PartialConverter
import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter
import org.apache.kafka.connect.data.SchemaBuilder

class MySqlSourceCdcBooleanConverter : CustomConverter<SchemaBuilder, RelationalColumn> {
override fun configure(props: Properties?) {}
class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter {

private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT")
override val debeziumPropertiesKey: String = "boolean"
override val handlers: List<RelationalColumnCustomConverter.Handler> = listOf(tinyint1Handler)

override fun converterFor(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
if (
Arrays.stream(BOOLEAN_TYPES).anyMatch { s: String ->
field!!.typeName().contains(s, ignoreCase = true) &&
field.length().isPresent &&
field.length().asInt == 1
}
) {
registerBoolean(field, registration)
}
}

private fun registerBoolean(
field: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
registration?.register(SchemaBuilder.bool()) { x ->
if (x == null) {
return@register if (field!!.isOptional) {
null
} else if (field.hasDefaultValue()) {
field.defaultValue()
} else {
false
}
}
when (x) {
is Boolean -> x
is String -> x.toBoolean()
is Number -> x != 0
else -> throw IllegalArgumentException("Unsupported type: ${x::class}")
}
}
companion object {
val tinyint1Handler =
RelationalColumnCustomConverter.Handler(
predicate = {
it.typeName().equals("TINYINT", ignoreCase = true) &&
it.length().isPresent &&
it.length().asInt == 1
},
outputSchema = SchemaBuilder.bool(),
partialConverters =
listOf(
NullFallThrough,
PartialConverter { if (it is Number) Converted(it != 0) else NoConversion }
)
)
}
}

This file was deleted.

Loading

0 comments on commit 1d6b34c

Please sign in to comment.