Skip to content

Commit

Permalink
cdk-java make TypingDedupingTest aware of column name overrides (#43330)
Browse files Browse the repository at this point in the history
## What
<!--
* Describe what the change is solving. Link all GitHub issues related to this change.
-->

## How
<!--
* Describe how code changes achieve the solution.
-->

## Review guide
<!--
1. `x.py`
2. `y.py`
-->

## User Impact
<!--
* What is the end result perceived by the user?
* If there are negative side effects, please list them. 
-->

## Can this PR be safely reverted and rolled back?
<!--
* If unsure, leave it blank.
-->
- [ ] YES 💚
- [ ] NO ❌
  • Loading branch information
stephane-airbyte authored Aug 7, 2024
1 parent 52130f5 commit 634c494
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 85 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.3 | 2024-08-07 | [\#43330](https://github.com/airbytehq/airbyte/pull/43330) | make TypingDedupingTest aware of column name renaming. |
| 0.44.3 | 2024-08-07 | [\#43329](https://github.com/airbytehq/airbyte/pull/43329) | move generationIdHandling to its own class. |
| 0.44.2 | 2024-08-06 | [\#42869](https://github.com/airbytehq/airbyte/pull/42869) | Add logs about counting info to state message. |
| 0.44.1 | 2024-08-01 | [\#42550](https://github.com/airbytehq/airbyte/pull/42550) | Fix error on reporting counts. |
| 0.44.0 | 2024-08-01 | [\#42405](https://github.com/airbytehq/airbyte/pull/42405) | s3-destinations: Use async framework, adapt to support refreshes |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.2
version=0.44.3
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.function.Function
import kotlin.test.assertFails
Expand Down Expand Up @@ -395,12 +394,7 @@ abstract class BaseTypingDedupingTest {

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
Expand All @@ -411,12 +405,7 @@ abstract class BaseTypingDedupingTest {
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl")
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand Down Expand Up @@ -461,12 +450,7 @@ abstract class BaseTypingDedupingTest {

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
Expand All @@ -477,12 +461,7 @@ abstract class BaseTypingDedupingTest {
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl")
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand Down Expand Up @@ -524,12 +503,7 @@ abstract class BaseTypingDedupingTest {

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
Expand All @@ -540,12 +514,7 @@ abstract class BaseTypingDedupingTest {
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl")
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand All @@ -557,9 +526,7 @@ abstract class BaseTypingDedupingTest {
@ParameterizedTest
@ValueSource(longs = [0L, 42L])
@Throws(Exception::class)
// This test writes a lot of data to the destination and can take longer than a minute.
@Timeout(value = 15, unit = TimeUnit.MINUTES)
fun largeDedupSync(inputGenerationId: Long) {
open fun largeDedupSync(inputGenerationId: Long) {
val catalog =
io.airbyte.protocol.models.v0
.ConfiguredAirbyteCatalog()
Expand Down Expand Up @@ -592,12 +559,7 @@ abstract class BaseTypingDedupingTest {
repeatList(25000, readRecords("dat/sync1_expectedrecords_raw.jsonl"))
// But the final table should be fully deduped
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())
}

Expand Down Expand Up @@ -634,12 +596,7 @@ abstract class BaseTypingDedupingTest {

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(
expectedRawRecords1,
expectedFinalRecords1,
Expand All @@ -656,12 +613,7 @@ abstract class BaseTypingDedupingTest {
val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl")
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)
verifySyncResult(
expectedRawRecords2,
expectedFinalRecords2,
Expand Down Expand Up @@ -730,12 +682,7 @@ abstract class BaseTypingDedupingTest {

val expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl")
val expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
Expand All @@ -754,12 +701,7 @@ abstract class BaseTypingDedupingTest {
expectedFinalRecords2.forEach { record: JsonNode ->
(record as ObjectNode).remove(sqlGenerator.buildColumnId("name").name)
}
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)

verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}
Expand Down Expand Up @@ -897,8 +839,6 @@ abstract class BaseTypingDedupingTest {
* stdout.
*/
@Test
// This test writes a lot of data to the destination and can take longer than a minute.
@Timeout(value = 15, unit = TimeUnit.MINUTES)
@Throws(Exception::class)
open fun identicalNameSimultaneousSync() {
val namespace1 = streamNamespace + "_1"
Expand Down Expand Up @@ -1071,12 +1011,7 @@ abstract class BaseTypingDedupingTest {
readRecords("dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl")
val expectedFinalRecords1 =
readRecords("dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl")
(expectedRawRecords1 + expectedFinalRecords1).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords1, expectedFinalRecords1, inputGenerationId)
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison())

// Second sync
Expand All @@ -1090,12 +1025,7 @@ abstract class BaseTypingDedupingTest {
readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl")
val expectedFinalRecords2 =
readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl")
(expectedRawRecords2 + expectedFinalRecords2).forEach {
(it as ObjectNode).put(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
inputGenerationId
)
}
fixGenerationId(expectedRawRecords2, expectedFinalRecords2, inputGenerationId)
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison())
}

Expand Down Expand Up @@ -1516,6 +1446,25 @@ abstract class BaseTypingDedupingTest {
return Companion.readRecords(filename)
}

fun fixGenerationId(
rawRecords: List<JsonNode>,
finalRecords: List<JsonNode>,
generationId: Long
) {
rawRecords.forEach {
(it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
}
finalRecords.forEach {
(it as ObjectNode).put(
finalMetadataColumnNames.getOrDefault(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
),
generationId
)
}
}

val schema: JsonNode = SCHEMA

companion object {
Expand Down

0 comments on commit 634c494

Please sign in to comment.