diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 3a5e4143d295..7313a068944e 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -9,7 +9,7 @@ application { airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = '0.226' + cdk = 'local' } dependencies { diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index f2bff2066105..48f2557c4614 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.9.4 + dockerImageTag: 3.9.3 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt index 467645fa6b96..b7b6d9363adf 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt @@ -4,51 +4,53 @@ package io.airbyte.integrations.source.mysql -import io.debezium.spi.converter.CustomConverter -import io.debezium.spi.converter.RelationalColumn -import java.util.* +import io.airbyte.cdk.read.cdc.Converted +import io.airbyte.cdk.read.cdc.NoConversion +import io.airbyte.cdk.read.cdc.PartialConverter +import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter import org.apache.kafka.connect.data.SchemaBuilder -class MySqlSourceCdcBooleanConverter : CustomConverter { - override fun configure(props: Properties?) {} +class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter { - private val BOOLEAN_TYPES = arrayOf("BOOLEAN", "BOOL", "TINYINT") + override val debeziumPropertiesKey: String = "boolean" + override val handlers: List = + listOf(booleanHandler, tinyint1Handler) - override fun converterFor( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - if ( - Arrays.stream(BOOLEAN_TYPES).anyMatch { s: String -> - field!!.typeName().contains(s, ignoreCase = true) && - field.length().isPresent && - field.length().asInt == 1 - } - ) { - registerBoolean(field, registration) - } - } + companion object { + val booleanHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().startsWith("BOOL", ignoreCase = true) }, + outputSchema = SchemaBuilder.bool(), + partialConverters = + listOf( + PartialConverter { + when (it) { + null -> Converted(false) + is Boolean -> Converted(it) + else -> NoConversion + } + } + ) + ) - private fun registerBoolean( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - registration?.register(SchemaBuilder.bool()) { x -> - if (x == null) { - return@register if (field!!.isOptional) { - null - } else if (field.hasDefaultValue()) { - field.defaultValue() - } else { - false - } - } - when (x) { - is Boolean -> x - is String -> x.toBoolean() - is Number -> x != 0 - else -> throw IllegalArgumentException("Unsupported type: ${x::class}") - } - } + val tinyint1Handler = + RelationalColumnCustomConverter.Handler( + predicate = { + it.typeName().equals("TINYINT", ignoreCase = true) && + it.length().isPresent && + it.length().asInt == 1 + }, + outputSchema = SchemaBuilder.bool(), + partialConverters = + listOf( + PartialConverter { + when (it) { + null -> Converted(false) + is Number -> Converted(it != 0) + else -> NoConversion + } + } + ) + ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt deleted file mode 100644 index 359aa28dc6e8..000000000000 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcNumericConverter.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mysql - -import io.debezium.spi.converter.CustomConverter -import io.debezium.spi.converter.RelationalColumn -import java.util.* -import org.apache.kafka.connect.data.SchemaBuilder - -class MySqlSourceCdcNumericConverter : CustomConverter { - override fun configure(props: Properties?) {} - - private val NUMERIC_TYPES = arrayOf("FLOAT", "DOUBLE", "DECIMAL") - - override fun converterFor( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - if ( - Arrays.stream(NUMERIC_TYPES).anyMatch { s: String -> - field!!.typeName().contains(s, ignoreCase = true) - } - ) { - registerNumber(field, registration) - } - } - - private fun registerNumber( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - registration?.register(SchemaBuilder.float64()) { x -> - if (x == null) { - return@register if (field!!.isOptional) { - null - } else if (field.hasDefaultValue()) { - field.defaultValue() - } else { - null - } - } - when (x) { - is String -> x.toDouble() - is Float -> x.toString().toDouble() - is java.math.BigDecimal -> x.stripTrailingZeros().toDouble() - is Number -> x.toDouble() - else -> throw IllegalArgumentException("Unsupported type: ${x::class}") - } - } - } -} diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt index 83758c7fef14..3ac77d147261 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt @@ -4,117 +4,92 @@ package io.airbyte.integrations.source.mysql -import io.airbyte.cdk.jdbc.converters.DateTimeConverter -import io.debezium.spi.converter.CustomConverter -import io.debezium.spi.converter.RelationalColumn -import io.debezium.time.Conversions +import io.airbyte.cdk.data.LocalDateCodec +import io.airbyte.cdk.data.LocalDateTimeCodec +import io.airbyte.cdk.data.LocalTimeCodec +import io.airbyte.cdk.data.OffsetDateTimeCodec +import io.airbyte.cdk.read.cdc.Converted +import io.airbyte.cdk.read.cdc.NoConversion +import io.airbyte.cdk.read.cdc.NullFallThrough +import io.airbyte.cdk.read.cdc.PartialConverter +import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter +import java.time.Duration import java.time.LocalDate +import java.time.LocalDateTime import java.time.LocalTime -import java.util.* -import java.util.concurrent.TimeUnit +import java.time.ZonedDateTime import org.apache.kafka.connect.data.SchemaBuilder -/** - * This is a custom debezium converter used in MySQL to handle the DATETIME data type. We need a - * custom converter cause by default debezium returns the DATETIME values as numbers. We need to - * convert it to proper format. Ref : - * https://debezium.io/documentation/reference/2.1/development/converters.html This is built from - * reference with {@link io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter} If you - * rename this class then remember to rename the datetime.type property value in {@link - * MySqlCdcProperties#commonProperties(JdbcDatabase)} (If you don't rename, a test would still fail - * but it might be tricky to figure out where to change the property name) - */ -class MySqlSourceCdcTemporalConverter : CustomConverter { - - private val DATE_TYPES = arrayOf("DATE", "DATETIME", "TIME", "TIMESTAMP") - override fun configure(props: Properties?) {} - - override fun converterFor( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - if ( - Arrays.stream(DATE_TYPES).anyMatch { s: String -> - s.equals( - field!!.typeName(), - ignoreCase = true, - ) - } - ) { - registerDate(field, registration) - } - } +class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter { - private fun getTimePrecision(field: RelationalColumn): Int { - return field.length().orElse(-1) - } + override val debeziumPropertiesKey: String = "temporal" - private fun registerDate( - field: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? - ) { - val fieldType = field!!.typeName() + override val handlers: List = + listOf(datetimeHandler, dateHandler, timeHandler, timestampHandler) - registration?.register(SchemaBuilder.string().optional()) { x -> - if (x == null) { - return@register convertDefaultValue(field) - } + companion object { - when (fieldType.uppercase()) { - "DATETIME" -> { - if (x is Long) { - if (getTimePrecision(field) <= 3) { - return@register DateTimeConverter.convertToTimestamp( - Conversions.toInstantFromMillis(x), - ) + val datetimeHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().equals("DATETIME", ignoreCase = true) }, + outputSchema = SchemaBuilder.string(), + partialConverters = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDateTime) + Converted(it.format(LocalDateTimeCodec.formatter)) + else NoConversion } - if (getTimePrecision(field) <= 6) { - return@register DateTimeConverter.convertToTimestamp( - Conversions.toInstantFromMicros(x), - ) - } - } - DateTimeConverter.convertToTimestamp(x) - } - "DATE" -> { - if (x is Int) { - return@register DateTimeConverter.convertToDate( - LocalDate.ofEpochDay( - x.toLong(), - ), - ) - } - DateTimeConverter.convertToDate(x) - } - "TIME" -> { - if (x is Long) { - val l = Math.multiplyExact(x, TimeUnit.MICROSECONDS.toNanos(1)) - return@register DateTimeConverter.convertToTime( - LocalTime.ofNanoOfDay( - l, - ), - ) - } - DateTimeConverter.convertToTime(x) - } - "TIMESTAMP" -> - DateTimeConverter.convertToTimestampWithTimezone( - x, ) - else -> - throw IllegalArgumentException("Unknown field type " + fieldType.uppercase()) - } - } - } + ) - companion object { - fun convertDefaultValue(field: RelationalColumn): Any? { - if (field.isOptional) { - return null - } else if (field.hasDefaultValue()) { - return field.defaultValue() - } - return null - } + val dateHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().equals("DATE", ignoreCase = true) }, + outputSchema = SchemaBuilder.string(), + partialConverters = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDate) Converted(it.format(LocalDateCodec.formatter)) + else NoConversion + }, + ), + ) + + val timeHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().equals("TIME", ignoreCase = true) }, + outputSchema = SchemaBuilder.string(), + partialConverters = + listOf( + NullFallThrough, + PartialConverter { + if (it is Duration) + Converted( + LocalTime.MIDNIGHT.plus(it).format(LocalTimeCodec.formatter) + ) + else NoConversion + }, + ), + ) + + val timestampHandler = + RelationalColumnCustomConverter.Handler( + predicate = { it.typeName().equals("TIMESTAMP", ignoreCase = true) }, + outputSchema = SchemaBuilder.string(), + partialConverters = + listOf( + NullFallThrough, + PartialConverter { + if (it is ZonedDateTime) + Converted( + it.toOffsetDateTime().format(OffsetDateTimeCodec.formatter) + ) + else NoConversion + }, + ), + ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt index c10119684631..73f4ec49566b 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt @@ -7,8 +7,10 @@ package io.airbyte.integrations.source.mysql import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.databind.node.TextNode import io.airbyte.cdk.ConfigErrorException import io.airbyte.cdk.command.OpaqueStateValue +import io.airbyte.cdk.data.LeafAirbyteSchemaType import io.airbyte.cdk.data.LongCodec import io.airbyte.cdk.data.OffsetDateTimeCodec import io.airbyte.cdk.data.TextCodec @@ -39,6 +41,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.math.BigDecimal import java.sql.Connection import java.sql.ResultSet import java.sql.Statement @@ -72,6 +75,22 @@ class MySqlSourceDebeziumOperations( val isDelete: Boolean = after.isNull // Use either `before` or `after` as the record data, depending on the nature of the change. val data: ObjectNode = (if (isDelete) before else after) as ObjectNode + // Turn string representations of numbers into BigDecimals. + for (field in stream.schema) { + when (field.type.airbyteSchemaType) { + LeafAirbyteSchemaType.INTEGER, + LeafAirbyteSchemaType.NUMBER -> { + val textNode: TextNode = data[field.id] as? TextNode ?: continue + val bigDecimal = BigDecimal(textNode.textValue()).stripTrailingZeros() + data.put(field.id, bigDecimal) + } + LeafAirbyteSchemaType.JSONB -> { + val textNode: TextNode = data[field.id] as? TextNode ?: continue + data.set(field.id, Jsons.readTree(textNode.textValue())) + } + else -> continue + } + } // Set _ab_cdc_updated_at and _ab_cdc_deleted_at meta-field values. val transactionMillis: Long = source["ts_ms"].asLong() val transactionOffsetDateTime: OffsetDateTime = @@ -377,6 +396,10 @@ class MySqlSourceDebeziumOperations( // This to make sure that binary data represented as a base64-encoded String. // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-binary-handling-mode .with("binary.handling.mode", "base64") + // This is to make sure that numbers are represented as strings. + .with("decimal.handling.mode", "string") + // This is to make sure that temporal data is represented without loss of precision. + .with("time.precision.mode", "adaptive_time_microseconds") // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode .with("snapshot.mode", "when_needed") // https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-locking-mode @@ -399,13 +422,8 @@ class MySqlSourceDebeziumOperations( .withDatabase("include.list", databaseName) .withOffset() .withSchemaHistory() - .with("converters", "datetime,numeric,boolean") - .with( - "datetime.type", - MySqlSourceCdcTemporalConverter::class.java.getName(), - ) - .with("numeric.type", MySqlSourceCdcNumericConverter::class.java.getName()) - .with("boolean.type", MySqlSourceCdcBooleanConverter::class.java.getName()) + .withConverters(MySqlSourceCdcBooleanConverter::class, MySqlSourceCdcTemporalConverter::class) + val serverTimezone: String? = (configuration.incrementalConfiguration as CdcIncrementalConfiguration).serverTimezone if (!serverTimezone.isNullOrBlank()) { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt index f1231f5093ce..83f5bee64a95 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt @@ -19,11 +19,12 @@ import io.airbyte.cdk.jdbc.BigDecimalFieldType import io.airbyte.cdk.jdbc.BigIntegerFieldType import io.airbyte.cdk.jdbc.BinaryStreamFieldType import io.airbyte.cdk.jdbc.BooleanFieldType -import io.airbyte.cdk.jdbc.ByteFieldType +import io.airbyte.cdk.jdbc.BytesFieldType import io.airbyte.cdk.jdbc.DoubleFieldType import io.airbyte.cdk.jdbc.FloatFieldType import io.airbyte.cdk.jdbc.IntFieldType import io.airbyte.cdk.jdbc.JdbcFieldType +import io.airbyte.cdk.jdbc.JsonStringFieldType import io.airbyte.cdk.jdbc.LocalDateFieldType import io.airbyte.cdk.jdbc.LocalDateTimeFieldType import io.airbyte.cdk.jdbc.LocalTimeFieldType @@ -124,13 +125,7 @@ class MySqlSourceOperations : private fun leafType(type: SystemType): JdbcFieldType<*> { return when (MysqlType.getByName(type.typeName)) { - MysqlType.BIT -> { - if (type.precision!! > 1) { - ByteFieldType - } else { - BooleanFieldType - } - } + MysqlType.BIT -> if (type.precision == 1) BooleanFieldType else BytesFieldType MysqlType.BOOLEAN -> BooleanFieldType MysqlType.TINYINT, MysqlType.TINYINT_UNSIGNED, @@ -141,12 +136,14 @@ class MySqlSourceOperations : MysqlType.MEDIUMINT_UNSIGNED, MysqlType.INT -> IntFieldType MysqlType.INT_UNSIGNED, - MysqlType.BIGINT, + MysqlType.BIGINT -> LongFieldType MysqlType.BIGINT_UNSIGNED -> BigIntegerFieldType MysqlType.FLOAT, - MysqlType.FLOAT_UNSIGNED -> FloatFieldType + MysqlType.FLOAT_UNSIGNED, MysqlType.DOUBLE, - MysqlType.DOUBLE_UNSIGNED -> DoubleFieldType + MysqlType.DOUBLE_UNSIGNED -> { + if ((type.precision ?: 0) <= 23) FloatFieldType else DoubleFieldType + } MysqlType.DECIMAL, MysqlType.DECIMAL_UNSIGNED -> { if (type.scale == 0) BigIntegerFieldType else BigDecimalFieldType @@ -161,9 +158,9 @@ class MySqlSourceOperations : MysqlType.TEXT, MysqlType.MEDIUMTEXT, MysqlType.LONGTEXT, - MysqlType.JSON, MysqlType.ENUM, MysqlType.SET -> StringFieldType + MysqlType.JSON -> JsonStringFieldType MysqlType.TINYBLOB, MysqlType.BLOB, MysqlType.MEDIUMBLOB, @@ -172,10 +169,9 @@ class MySqlSourceOperations : MysqlType.VARBINARY, MysqlType.GEOMETRY -> BinaryStreamFieldType MysqlType.NULL -> NullFieldType - else -> { - print("test debug: unrecognized type: ${type.typeName}") - PokemonFieldType - } + MysqlType.VECTOR, + MysqlType.UNKNOWN, + null -> PokemonFieldType } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt index 247ac22f3956..10acc0b0486e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDatatypeIntegrationTest.kt @@ -98,12 +98,7 @@ object MySqlSourceDatatypeTestOperations : "b'0'" to "false", ) - val longBitValues = - mapOf( - "b'10101010'" to """-86""", - ) - - val longBitCdcValues = + val multiBitValues = mapOf( "b'10101010'" to """"qg=="""", ) @@ -115,9 +110,7 @@ object MySqlSourceDatatypeTestOperations : "'OXBEEF'" to """"OXBEEF"""", ) - val jsonValues = mapOf("""'{"col1": "v1"}'""" to """"{\"col1\": \"v1\"}"""") - - val jsonCdcValues = mapOf("""'{"col1": "v1"}'""" to """"{\"col1\":\"v1\"}"""") + val jsonValues = mapOf("""'{"col1": "v1"}'""" to """{"col1":"v1"}""") val yearValues = mapOf( @@ -126,6 +119,18 @@ object MySqlSourceDatatypeTestOperations : "70" to """1970""", ) + val bigDecimalValues = + mapOf( + "10000000000000000000000000000000000000000.0001" to + "10000000000000000000000000000000000000000.0001", + ) + + val bigIntegerValues = + mapOf( + "10000000000000000000000000000000000000000" to + "10000000000000000000000000000000000000000", + ) + val decimalValues = mapOf( "0.2" to """0.2""", @@ -136,12 +141,12 @@ object MySqlSourceDatatypeTestOperations : "123.4567" to """123.4567""", ) - val zeroPrecisionDecimalValues = + val doubleValues = mapOf( - "2" to """2""", + "123.4567" to """123.45670318603516""", ) - val zeroPrecisionDecimalCdcValues = + val zeroPrecisionDecimalValues = mapOf( "2" to """2""", ) @@ -163,6 +168,8 @@ object MySqlSourceDatatypeTestOperations : val dateValues = mapOf( "'2022-01-01'" to """"2022-01-01"""", + "'0600-12-02'" to """"0600-12-02"""", + "'1752-09-09'" to """"1752-09-09"""", ) val timeValues = @@ -173,7 +180,8 @@ object MySqlSourceDatatypeTestOperations : val dateTimeValues = mapOf( "'2024-09-13 14:30:00'" to """"2024-09-13T14:30:00.000000"""", - "'2024-09-13T14:40:00+00:00'" to """"2024-09-13T14:40:00.000000"""" + "'2024-09-13T14:40:00+00:00'" to """"2024-09-13T14:40:00.000000"""", + "'1752-09-01 14:30:00'" to """"1752-09-01T14:30:00.000000"""", ) val timestampValues = @@ -214,6 +222,16 @@ object MySqlSourceDatatypeTestOperations : stringValues, LeafAirbyteSchemaType.STRING, ), + MySqlSourceDatatypeTestCase( + "DECIMAL(60,4)", + bigDecimalValues, + LeafAirbyteSchemaType.NUMBER, + ), + MySqlSourceDatatypeTestCase( + "DECIMAL(60,0)", + bigIntegerValues, + LeafAirbyteSchemaType.INTEGER, + ), MySqlSourceDatatypeTestCase( "DECIMAL(10,2)", decimalValues, @@ -244,6 +262,15 @@ object MySqlSourceDatatypeTestOperations : "FLOAT(53,8)", floatValues, LeafAirbyteSchemaType.NUMBER, + // Disable CDC testing for this case: + // - 123.4567 is rendered as 123.45670318603516 + // not strictly equal due to IEEE754 encoding artifacts, but acceptable. + isGlobal = false, + ), + MySqlSourceDatatypeTestCase( + "FLOAT(53,8)", + doubleValues, + LeafAirbyteSchemaType.NUMBER, ), MySqlSourceDatatypeTestCase("DOUBLE", decimalValues, LeafAirbyteSchemaType.NUMBER), MySqlSourceDatatypeTestCase( @@ -322,27 +349,13 @@ object MySqlSourceDatatypeTestOperations : ), MySqlSourceDatatypeTestCase( "BIT(8)", - longBitValues, - LeafAirbyteSchemaType.INTEGER, - isGlobal = false, - ), - MySqlSourceDatatypeTestCase( - "BIT(8)", - longBitCdcValues, - LeafAirbyteSchemaType.INTEGER, - isStream = false, + multiBitValues, + LeafAirbyteSchemaType.BINARY, ), MySqlSourceDatatypeTestCase( "JSON", jsonValues, - LeafAirbyteSchemaType.STRING, - isGlobal = false, - ), - MySqlSourceDatatypeTestCase( - "JSON", - jsonCdcValues, - LeafAirbyteSchemaType.STRING, - isStream = false, + LeafAirbyteSchemaType.JSONB, ), MySqlSourceDatatypeTestCase( "ENUM('a', 'b', 'c')", @@ -358,9 +371,11 @@ data class MySqlSourceDatatypeTestCase( val sqlToAirbyte: Map, override val expectedAirbyteSchemaType: AirbyteSchemaType, override val isGlobal: Boolean = true, - override val isStream: Boolean = true, ) : DatatypeTestCase { + override val isStream: Boolean + get() = true + private val typeName: String get() = sqlType diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 1693a64d4360..39cd8696326f 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,8 +226,7 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.9.4 | 2024-12-18 | [49939](https://github.com/airbytehq/airbyte/pull/49939) | Pin Bulk CDK version to 226, rename classes. | -| 3.9.3 | 2024-12-18 | [49932](https://github.com/airbytehq/airbyte/pull/49932) | Backward compatibility for saved states with timestamp that include timezone offset. | +| 3.9.3 | 2024-12-18 | [49918](https://github.com/airbytehq/airbyte/pull/49918) | Fixes datatype handling bugs and inconsistencies. | | 3.9.2 | 2024-12-16 | [49830](https://github.com/airbytehq/airbyte/pull/49830) | Fixes an issue with auto generated tinyint columns | | 3.9.1 | 2024-12-12 | [49456](https://github.com/airbytehq/airbyte/pull/49456) | Bump version to re-relase | | 3.9.0 | 2024-12-12 | [49423](https://github.com/airbytehq/airbyte/pull/49423) | Promoting release candidate 3.9.0-rc.27 to a main version. |