Skip to content

Commit

Permalink
extract generation id
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored and gisripa committed May 13, 2024
1 parent 5b64af0 commit 23a237f
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ constructor(
.substring(0, 3)
val newName = "${originalName}_$hash"
actualStreamConfig =
StreamConfig(
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
originalStreamConfig.syncMode,
originalStreamConfig.destinationSyncMode,
originalStreamConfig.primaryKey,
originalStreamConfig.cursor,
originalStreamConfig.columns,
originalStreamConfig.copy(
id =
sqlGenerator.buildStreamId(
originalNamespace,
newName,
rawNamespace,
),
)
} else {
actualStreamConfig = originalStreamConfig
Expand Down Expand Up @@ -112,6 +112,18 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
stream.generationId = 0
stream.minimumGenerationId = 0
stream.syncId = 0
}
if (
stream.minimumGenerationId != 0.toLong() &&
stream.minimumGenerationId != stream.generationId
) {
throw UnsupportedOperationException("Hybrid refreshes are not yet supported.")
}

val airbyteColumns =
when (
val schema: AirbyteType =
Expand Down Expand Up @@ -143,11 +155,13 @@ constructor(

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.syncMode,
stream.destinationSyncMode,
primaryKey,
cursor,
columns
columns,
stream.generationId,
stream.minimumGenerationId,
stream.syncId,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
package io.airbyte.integrations.base.destination.typing_deduping

import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.*
import kotlin.collections.LinkedHashMap

data class StreamConfig(
val id: StreamId,
val syncMode: SyncMode,
val destinationSyncMode: DestinationSyncMode,
val primaryKey: List<ColumnId>,
val cursor: Optional<ColumnId>,
val columns: LinkedHashMap<ColumnId, AirbyteType>,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.SyncMode
import java.util.List
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.BeforeEach
Expand Down Expand Up @@ -74,7 +73,7 @@ internal class CatalogParserTest {
}
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))
.withStreams(listOf(stream("a", "foobarfoo"), stream("a", "foofoo")))

val parsedCatalog = parser.parseCatalog(catalog)

Expand Down Expand Up @@ -127,13 +126,13 @@ internal class CatalogParserTest {
""".trimIndent()
)
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
)
Expand Down Expand Up @@ -168,10 +167,10 @@ internal class CatalogParserTest {
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()
val columnsList = parsedCatalog.streams[0].columns.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns.size) },
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
)
Expand Down Expand Up @@ -200,6 +199,9 @@ internal class CatalogParserTest {
)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withGenerationId(0)
.withMinimumGenerationId(0)
.withSyncId(0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,11 +913,13 @@ class DefaultTyperDeduperTest {
"overwrite_ns",
"overwrite_stream"
),
mock(),
DestinationSyncMode.OVERWRITE,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val APPEND_STREAM_CONFIG =
StreamConfig(
Expand All @@ -929,11 +931,13 @@ class DefaultTyperDeduperTest {
"append_ns",
"append_stream"
),
mock(),
DestinationSyncMode.APPEND,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val DEDUPE_STREAM_CONFIG =
StreamConfig(
Expand All @@ -945,11 +949,13 @@ class DefaultTyperDeduperTest {
"dedup_ns",
"dedup_stream"
),
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest {
migrator: BaseDestinationV1V2Migrator<*>,
expected: Boolean
) {
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock())
val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0)
val actual = migrator.shouldMigrate(config)
Assertions.assertEquals(expected, actual)
}
Expand All @@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest {
val config =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val migrator = makeMockMigrator(true, true, false, false, false)
val exception =
Expand All @@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest {
val stream =
StreamConfig(
STREAM_ID,
mock(),
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val handler = Mockito.mock(DestinationHandler::class.java)
val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,39 +222,47 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
incrementalDedupStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)
incrementalAppendStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)

cdcIncrementalDedupStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)
cdcIncrementalAppendStream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)

LOGGER.info("Running with namespace {}", namespace)
Expand Down Expand Up @@ -353,11 +361,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
incrementalDedupStream.primaryKey,
incrementalDedupStream.cursor,
incrementalDedupStream.columns
incrementalDedupStream.columns,
0,
0,
0,
)

createRawTable(streamId)
Expand Down Expand Up @@ -962,11 +972,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val streamConfig =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.empty(),
COLUMNS
COLUMNS,
0,
0,
0,
)
createRawTable(streamId)
createFinalTable(streamConfig, "")
Expand Down Expand Up @@ -1369,7 +1381,6 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
Expand All @@ -1387,7 +1398,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
generator.buildColumnId("includes$\$doubledollar") to
AirbyteProtocolType.STRING,
generator.buildColumnId("endswithbackslash\\") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1442,11 +1456,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
modifiedStreamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
java.util.List.of(columnId),
Optional.of(columnId),
linkedMapOf(columnId to AirbyteProtocolType.STRING)
linkedMapOf(columnId to AirbyteProtocolType.STRING),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1475,14 +1491,16 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList(),
Optional.empty(),
linkedMapOf(
generator.buildColumnId("current_date") to AirbyteProtocolType.STRING,
generator.buildColumnId("join") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1523,11 +1541,13 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
val stream =
StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
emptyList<ColumnId>(),
Optional.empty(),
LinkedHashMap()
LinkedHashMap(),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down

0 comments on commit 23a237f

Please sign in to comment.