Skip to content

Commit

Permalink
bigquery cdk bump
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Aug 12, 2024
1 parent 19305f5 commit f3e781d
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
import io.airbyte.integrations.base.destination.operation.StandardStreamOperation
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.Sql
Expand All @@ -58,7 +59,6 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayInputStream
import java.io.IOException
Expand Down Expand Up @@ -174,12 +174,12 @@ class BigQueryDestination : BaseConnector(), Destination {
val streamConfig =
StreamConfig(
id = streamId,
destinationSyncMode = DestinationSyncMode.OVERWRITE,
postImportAction = ImportType.APPEND,
primaryKey = listOf(),
cursor = Optional.empty(),
columns = linkedMapOf(),
generationId = 0,
minimumGenerationId = 0,
generationId = 1,
minimumGenerationId = 1,
syncId = 0
)

Expand All @@ -206,7 +206,9 @@ class BigQueryDestination : BaseConnector(), Destination {
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState = BigQueryDestinationState(needsSoftReset = false)
destinationState = BigQueryDestinationState(needsSoftReset = false),
finalTempTableGenerationId = null,
finalTableGenerationId = null,
)

// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,27 @@ class BigQueryGcsStorageOperation(
suffix: String,
data: SerializableBuffer
) {
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, suffix, data)
val stagedFileName: String =
uploadRecordsToStage(streamConfig.id, suffix, data, streamConfig.generationId)
copyIntoTableFromStage(streamConfig.id, suffix, stagedFileName)
}

private fun uploadRecordsToStage(
streamId: StreamId,
suffix: String,
buffer: SerializableBuffer
buffer: SerializableBuffer,
generationId: Long,
): String {
val objectPath: String = stagingFullPath(streamId)
log.info {
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName}$suffix to path $objectPath"
}
return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath)
return gcsStorageOperations.uploadRecordsToBucket(
buffer,
streamId.rawNamespace,
objectPath,
generationId
)
}

private fun copyIntoTableFromStage(streamId: StreamId, suffix: String, stagedFileName: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsAllIgnoreCase
import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils.containsIgnoreCase
Expand Down Expand Up @@ -118,6 +119,37 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
}
}

private fun getFinalTableGeneration(id: StreamId, suffix: String): Long? {
val finalTable = bq.getTable(TableId.of(id.finalNamespace, id.finalName + suffix))
if (finalTable == null || !finalTable.exists()) {
return null
}

val result =
bq.query(
QueryJobConfiguration.of(
"""
SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}
FROM ${id.finalTableId(BigQuerySqlGenerator.QUOTE, suffix)}
""".trimIndent()
)
)
if (result.totalRows == 0L) {
return null
}
val value =
result
.iterateAll()
.iterator()
.next()
.get(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
return if (value == null || value.isNull) {
null
} else {
value.longValue
}
}

@Throws(InterruptedException::class)
override fun execute(sql: Sql) {
val transactions = sql.asSqlStrings("BEGIN TRANSACTION", "COMMIT TRANSACTION")
Expand Down Expand Up @@ -234,7 +266,9 @@ class BigQueryDestinationHandler(private val bq: BigQuery, private val datasetLo
isFinalTableEmpty(
id
), // Return a default state blob since we don't actually track state.
BigQueryDestinationState(false)
BigQueryDestinationState(false),
finalTableGenerationId = getFinalTableGeneration(id, ""),
finalTempTableGenerationId = getFinalTableGeneration(id, TMP_TABLE_SUFFIX)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.google.common.annotations.VisibleForTesting
import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.base.destination.typing_deduping.Array
import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.*
import java.util.function.Function
Expand Down Expand Up @@ -296,7 +295,7 @@ class BigQuerySqlGenerator
useExpensiveSaferCasting: Boolean
): Sql {
val handleNewRecords =
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
if (stream.postImportAction == ImportType.DEDUPE) {
upsertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
} else {
insertNewRecords(stream, finalSuffix, useExpensiveSaferCasting, minRawTimestamp)
Expand Down Expand Up @@ -587,7 +586,7 @@ class BigQuerySqlGenerator
.collect(Collectors.joining("\n"))
val extractedAtCondition = buildExtractedAtCondition(minRawTimestamp)

if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
if (stream.postImportAction == ImportType.DEDUPE) {
// When deduping, we need to dedup the raw records. Note the row_number() invocation in
// the SQL
// statement. Do the same extract+cast CTE + airbyte_meta construction as in non-dedup
Expand Down Expand Up @@ -929,7 +928,7 @@ class BigQuerySqlGenerator

fun clusteringColumns(stream: StreamConfig): List<String> {
val clusterColumns: MutableList<String> = ArrayList()
if (stream.destinationSyncMode == DestinationSyncMode.APPEND_DEDUP) {
if (stream.postImportAction == ImportType.DEDUPE) {
// We're doing de-duping, therefore we have a primary key.
// Cluster on the first 3 PK columns since BigQuery only allows up to 4 clustering
// columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.string.Strings
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
Expand All @@ -27,7 +28,6 @@ import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlG
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGeneratorIntegrationTest
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.nio.file.Files
import java.nio.file.Path
import java.util.Optional
Expand Down Expand Up @@ -62,7 +62,7 @@ class BigQueryDirectLoadingStorageOperationTest {
private val streamConfig =
StreamConfig(
streamId,
DestinationSyncMode.APPEND,
ImportType.APPEND,
emptyList(),
Optional.empty(),
LinkedHashMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.destination.bigquery.BigQueryConsts
import io.airbyte.integrations.destination.bigquery.BigQueryDestination.Companion.getBigQuery
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.nio.file.Files
import java.nio.file.Path
import java.time.Duration
Expand Down Expand Up @@ -509,7 +508,7 @@ class BigQuerySqlGeneratorIntegrationTest :
val stream =
StreamConfig(
streamId,
DestinationSyncMode.APPEND,
ImportType.APPEND,
emptyList(),
Optional.empty(),
columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BigQuerySqlGeneratorTest {
Assertions.assertEquals(
StreamConfig(
StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
DestinationSyncMode.APPEND,
ImportType.APPEND,
emptyList(),
Optional.empty(),
columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@ import com.google.cloud.bigquery.StandardSQLTypeName
import com.google.cloud.bigquery.StandardTableDefinition
import com.google.cloud.bigquery.TimePartitioning
import com.google.common.collect.ImmutableList
import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.Array
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.clusteringMatches
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.partitioningMatches
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler.Companion.schemaContainAllFinalTableV2AirbyteColumns
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator.Companion.toDialectType
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.util.*
import java.util.Optional
import java.util.stream.Collectors
import java.util.stream.Stream
import org.junit.jupiter.api.Assertions
Expand Down Expand Up @@ -52,7 +58,7 @@ class BigqueryDestinationHandlerTest {
var stream =
StreamConfig(
Mockito.mock(),
DestinationSyncMode.APPEND_DEDUP,
ImportType.DEDUPE,
listOf(ColumnId("foo", "bar", "fizz")),
Optional.empty(),
LinkedHashMap(),
Expand All @@ -75,7 +81,7 @@ class BigqueryDestinationHandlerTest {
stream =
StreamConfig(
Mockito.mock(),
DestinationSyncMode.OVERWRITE,
ImportType.APPEND,
emptyList(),
Optional.empty(),
LinkedHashMap(),
Expand Down Expand Up @@ -103,7 +109,7 @@ class BigqueryDestinationHandlerTest {
stream =
StreamConfig(
Mockito.mock(),
DestinationSyncMode.APPEND_DEDUP,
ImportType.DEDUPE,
Stream.concat(expectedStreamColumnNames.stream(), Stream.of("d", "e"))
.map { name: String -> ColumnId(name, "foo", "bar") }
.collect(Collectors.toList()),
Expand Down

0 comments on commit f3e781d

Please sign in to comment.