Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: Extract generation ID from catalog #38127

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.34.4
version=0.35.0
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ constructor(
.substring(0, 3)
val newName = "${originalName}_$hash"
actualStreamConfig =
StreamConfig(
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
originalStreamConfig.syncMode,
originalStreamConfig.destinationSyncMode,
originalStreamConfig.primaryKey,
originalStreamConfig.cursor,
originalStreamConfig.columns,
originalStreamConfig.copy(
id =
sqlGenerator.buildStreamId(
originalNamespace,
newName,
rawNamespace,
),
)
} else {
actualStreamConfig = originalStreamConfig
Expand Down Expand Up @@ -112,6 +112,18 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
stream.generationId = 0
stream.minimumGenerationId = 0
stream.syncId = 0
}
if (
stream.minimumGenerationId != 0.toLong() &&
stream.minimumGenerationId != stream.generationId
) {
throw UnsupportedOperationException("Hybrid refreshes are not yet supported.")
}

val airbyteColumns =
when (
val schema: AirbyteType =
Expand Down Expand Up @@ -143,11 +155,13 @@ constructor(

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.syncMode,
stream.destinationSyncMode,
primaryKey,
cursor,
columns
columns,
stream.generationId,
stream.minimumGenerationId,
stream.syncId,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package io.airbyte.integrations.base.destination.typing_deduping

import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.*
import kotlin.collections.LinkedHashMap

data class StreamConfig(
val id: StreamId,
val syncMode: SyncMode,
val destinationSyncMode: DestinationSyncMode,
val primaryKey: List<ColumnId>,
val cursor: Optional<ColumnId>,
val columns: LinkedHashMap<ColumnId, AirbyteType>,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.List
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -74,7 +73,7 @@ internal class CatalogParserTest {
}
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))
.withStreams(listOf(stream("a", "foobarfoo"), stream("a", "foofoo")))

val parsedCatalog = parser.parseCatalog(catalog)

Expand Down Expand Up @@ -127,13 +126,13 @@ internal class CatalogParserTest {

""".trimIndent()
)
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
)
Expand Down Expand Up @@ -168,10 +167,10 @@ internal class CatalogParserTest {
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
)
Expand Down Expand Up @@ -200,6 +199,9 @@ internal class CatalogParserTest {
)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withGenerationId(0)
.withMinimumGenerationId(0)
.withSyncId(0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,11 +913,13 @@ class DefaultTyperDeduperTest {
"overwrite_ns",
"overwrite_stream"
),
mock(),
DestinationSyncMode.OVERWRITE,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val APPEND_STREAM_CONFIG =
StreamConfig(
Expand All @@ -929,11 +931,13 @@ class DefaultTyperDeduperTest {
"append_ns",
"append_stream"
),
mock(),
DestinationSyncMode.APPEND,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val DEDUPE_STREAM_CONFIG =
StreamConfig(
Expand All @@ -945,11 +949,13 @@ class DefaultTyperDeduperTest {
"dedup_ns",
"dedup_stream"
),
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest {
migrator: BaseDestinationV1V2Migrator<*>,
expected: Boolean
) {
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock())
val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0)
val actual = migrator.shouldMigrate(config)
Assertions.assertEquals(expected, actual)
}
Expand All @@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest {
val config =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val migrator = makeMockMigrator(true, true, false, false, false)
val exception =
Expand All @@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest {
val stream =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val handler = Mockito.mock(DestinationHandler::class.java)
val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,39 +222,47 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
incrementalDedupStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)
incrementalAppendStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)

cdcIncrementalDedupStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)
cdcIncrementalAppendStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)

LOGGER.info("Running with namespace {}", namespace)
Expand Down Expand Up @@ -353,11 +361,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
incrementalDedupStream.primaryKey,
incrementalDedupStream.cursor,
incrementalDedupStream.columns
incrementalDedupStream.columns,
0,
0,
0,
)

createRawTable(streamId)
Expand Down Expand Up @@ -962,11 +972,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val streamConfig =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.empty(),
COLUMNS
COLUMNS,
0,
0,
0,
)
createRawTable(streamId)
createFinalTable(streamConfig, "")
Expand Down Expand Up @@ -1369,7 +1381,6 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
Expand All @@ -1387,7 +1398,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
generator.buildColumnId("includes$\$doubledollar") to
AirbyteProtocolType.STRING,
generator.buildColumnId("endswithbackslash\\") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1442,11 +1456,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
modifiedStreamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
java.util.List.of(columnId),
Optional.of(columnId),
linkedMapOf(columnId to AirbyteProtocolType.STRING)
linkedMapOf(columnId to AirbyteProtocolType.STRING),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1475,14 +1491,16 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
linkedMapOf(
generator.buildColumnId("current_date") to AirbyteProtocolType.STRING,
generator.buildColumnId("join") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1523,11 +1541,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList<ColumnId>(),
Optional.empty(),
LinkedHashMap()
LinkedHashMap(),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down
Loading