Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable schema evolution tests for s3 datalake #51569

Merged
merged 13 commits into from
Jan 29, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -2489,9 +2489,9 @@ abstract class BasicFunctionalityIntegrationTest(
}

companion object {
private val intType = FieldType(IntegerType, nullable = true)
val intType = FieldType(IntegerType, nullable = true)
private val numberType = FieldType(NumberType, nullable = true)
private val stringType = FieldType(StringType, nullable = true)
val stringType = FieldType(StringType, nullable = true)
private val timestamptzType = FieldType(TimestampTypeWithTimezone, nullable = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ package io.airbyte.integrations.destination.s3_data_lake

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.aws.asMicronautProperties
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
Expand All @@ -18,6 +24,7 @@ import java.util.Base64
import okhttp3.FormBody
import okhttp3.OkHttpClient
import okhttp3.Request
import org.junit.jupiter.api.Assumptions
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -61,14 +68,88 @@ abstract class S3DataLakeWriteTest(
super.testDedup()
}

/**
* This test differs from the base test in two critical aspects:
*
* 1. Data Type Conversion:
* ```
* The base test attempts to change a column's data type from INTEGER to STRING,
* which Iceberg does not support and will throw an exception.
* ```
* 2. Result Ordering:
* ```
* While the data content matches exactly, Iceberg returns results in a different
* order than what the base test expects. The base test's ordering assumptions
* need to be adjusted accordingly.
* ```
*/
@Test
@Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)")
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
Assumptions.assumeTrue(verifyDataWriting)
fun makeStream(syncId: Long, schema: LinkedHashMap<String, FieldType>) =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(schema),
generationId = 0,
minimumGenerationId = 0,
syncId,
)
runSync(
configContents,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "same" to intType)
),
listOf(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "to_drop": "val1", "same": 42}""",
emittedAtMs = 1234L,
)
)
)
val finalStream =
makeStream(
syncId = 43,
linkedMapOf("id" to intType, "same" to intType, "to_add" to stringType)
)
runSync(
configContents,
finalStream,
listOf(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"id": 42, "same": "43", "to_add": "val3"}""",
emittedAtMs = 1234,
)
)
)
dumpAndDiffRecords(
parsedConfig,
listOf(
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 42, "same" to 42),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
OutputRecord(
extractedAt = 1234,
generationId = 0,
data = mapOf("id" to 42, "same" to 43, "to_add" to "val3"),
airbyteMeta = OutputRecord.Meta(syncId = 43),
)
),
finalStream,
primaryKey = listOf(listOf("id")),
cursor = listOf("same"),
)
}

@Test
@Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)")
override fun testDedupChangeCursor() {
super.testDedupChangeCursor()
}
Expand Down
Loading