diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index ffc44d479b8d..f1c058f0826c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -11,7 +11,7 @@ airbyteJavaConnector { 'gcs-destinations', 'core', ] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 63e6d08d4e4d..c951fa39922d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.6.3 + dockerImageTag: 2.7.0 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery @@ -31,6 +31,7 @@ data: memory_request: 1Gi supportLevel: certified supportsDbt: true + supportsRefreshes: true tags: - language:java connectorTestSuitesOptions: diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java index 46ba1c8c886c..e0a656946712 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryDestinationHandler.java @@ -35,6 +35,7 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; @@ -85,15 +86,15 @@ public boolean isFinalTableEmpty(final StreamId id) { return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.getFinalNamespace(), id.getFinalName())).getNumRows()); } - public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception { - final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName())); + public InitialRawTableStatus getInitialRawTableState(final StreamId id, final String suffix) throws Exception { + final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName() + suffix)); if (rawTable == null) { // Table doesn't exist. There are no unprocessed records, and no timestamp. return new InitialRawTableStatus(false, false, Optional.empty()); } final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE, suffix))).replace( // bigquery timestamps have microsecond precision """ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) @@ -109,7 +110,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND) } final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of( - "raw_table", id.rawTableId(QUOTE))).replace( + "raw_table", id.rawTableId(QUOTE, suffix))).replace( """ SELECT MAX(_airbyte_extracted_at) FROM ${raw_table} @@ -199,11 +200,13 @@ public List> gatherInitialSta for (final StreamConfig streamConfig : streamConfigs) { final StreamId id = streamConfig.getId(); final Optional finalTable = findExistingTable(id); - final InitialRawTableStatus rawTableState = getInitialRawTableState(id); + final InitialRawTableStatus rawTableState = getInitialRawTableState(id, ""); + final InitialRawTableStatus tempRawTableState = getInitialRawTableState(id, AbstractStreamOperation.TMP_TABLE_SUFFIX); initialStates.add(new DestinationInitialStatus<>( streamConfig, finalTable.isPresent(), rawTableState, + tempRawTableState, finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()), finalTable.isEmpty() || isFinalTableEmpty(id), // Return a default state blob since we don't actually track state. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt index 0a490fb5defd..38ce6a7e6aaa 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt @@ -56,10 +56,14 @@ class BigQueryDirectLoadingStorageOperation( |More details: |""".trimMargin() } - override fun writeToStage(streamConfig: StreamConfig, data: Stream) { + override fun writeToStage( + streamConfig: StreamConfig, + suffix: String, + data: Stream + ) { // TODO: why do we need ratelimiter, and using unstable API from Google's guava rateLimiter.acquire() - val tableId = TableId.of(streamConfig.id.rawNamespace, streamConfig.id.rawName) + val tableId = tableId(streamConfig.id, suffix) log.info { "Writing data to table $tableId with schema $SCHEMA_V2" } val writeChannel = initWriteChannel(tableId) writeChannel.use { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt index 98eda4d34f0b..db456ef1333c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryGcsStorageOperation.kt @@ -10,7 +10,6 @@ import com.google.cloud.bigquery.FormatOptions import com.google.cloud.bigquery.Job import com.google.cloud.bigquery.JobInfo import com.google.cloud.bigquery.LoadJobConfiguration -import com.google.cloud.bigquery.TableId import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations @@ -21,7 +20,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.util.* import org.joda.time.DateTime @@ -46,9 +44,10 @@ class BigQueryGcsStorageOperation( ) { private val connectionId = UUID.randomUUID() private val syncDateTime = DateTime.now(DateTimeZone.UTC) - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { - super.prepareStage(streamId, destinationSyncMode) + override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) { + super.prepareStage(streamId, suffix, replace) // prepare staging bucket + // TODO should this also use the suffix? log.info { "Creating bucket ${gcsConfig.bucketName}" } gcsStorageOperations.createBucketIfNotExists() } @@ -61,21 +60,29 @@ class BigQueryGcsStorageOperation( gcsStorageOperations.dropBucketObject(stagingRootPath) } - override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) { - val stagedFileName: String = uploadRecordsToStage(streamConfig.id, data) - copyIntoTableFromStage(streamConfig.id, stagedFileName) + override fun writeToStage( + streamConfig: StreamConfig, + suffix: String, + data: SerializableBuffer + ) { + val stagedFileName: String = uploadRecordsToStage(streamConfig.id, suffix, data) + copyIntoTableFromStage(streamConfig.id, suffix, stagedFileName) } - private fun uploadRecordsToStage(streamId: StreamId, buffer: SerializableBuffer): String { + private fun uploadRecordsToStage( + streamId: StreamId, + suffix: String, + buffer: SerializableBuffer + ): String { val objectPath: String = stagingFullPath(streamId) log.info { - "Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath" + "Uploading records to for ${streamId.rawNamespace}.${streamId.rawName}$suffix to path $objectPath" } return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath) } - private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) + private fun copyIntoTableFromStage(streamId: StreamId, suffix: String, stagedFileName: String) { + val tableId = tableId(streamId, suffix) val stagingPath = stagingFullPath(streamId) val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName" log.info { "Uploading records from file $fullFilePath to target Table $tableId" } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt index 5adacf97d988..bd0af4ff32c7 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperation.kt @@ -5,7 +5,9 @@ package io.airbyte.integrations.destination.bigquery.operation import com.google.cloud.bigquery.BigQuery +import com.google.cloud.bigquery.QueryJobConfiguration import com.google.cloud.bigquery.TableId +import com.google.cloud.bigquery.TableResult import io.airbyte.integrations.base.destination.operation.StorageOperation import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId @@ -14,10 +16,9 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant -import java.util.* +import java.util.Optional import java.util.concurrent.ConcurrentHashMap private val log = KotlinLogging.logger {} @@ -29,27 +30,79 @@ abstract class BigQueryStorageOperation( protected val datasetLocation: String ) : StorageOperation { private val existingSchemas = ConcurrentHashMap.newKeySet() - override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) { + override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) { // Prepare staging table. For overwrite, it does drop-create so we can skip explicit create. - if (destinationSyncMode == DestinationSyncMode.OVERWRITE) { - truncateStagingTable(streamId) + if (replace) { + truncateStagingTable(streamId, suffix) } else { - createStagingTable(streamId) + createStagingTable(streamId, suffix) } } - private fun createStagingTable(streamId: StreamId) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) + override fun overwriteStage(streamId: StreamId, suffix: String) { + if (suffix == "") { + throw IllegalArgumentException("Cannot overwrite raw table with empty suffix") + } + bigquery.delete(tableId(streamId, "")) + bigquery.query( + QueryJobConfiguration.of( + """ALTER TABLE `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` RENAME TO `${streamId.rawName}`""" + ), + ) + } + + override fun transferFromTempStage(streamId: StreamId, suffix: String) { + if (suffix == "") { + throw IllegalArgumentException( + "Cannot transfer records from temp raw table with empty suffix" + ) + } + // TODO figure out how to make this work + // something about incompatible partitioning spec (probably b/c we're copying from a temp + // table partitioned on generation ID into an old real raw table partitioned on + // extracted_at) + val tempRawTable = tableId(streamId, suffix) + // val jobConf = + // CopyJobConfiguration.newBuilder(tableId(streamId, ""), tempRawTable) + // .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) + // .build() + // val job = bigquery.create(JobInfo.of(jobConf)) + // BigQueryUtils.waitForJobFinish(job) + + bigquery.query( + QueryJobConfiguration.of( + """ + INSERT INTO `${streamId.rawNamespace}`.`${streamId.rawName}` + SELECT * FROM `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` + """.trimIndent() + ) + ) + bigquery.delete(tempRawTable) + } + + override fun getStageGeneration(streamId: StreamId, suffix: String): Long? { + val result: TableResult = + bigquery.query( + QueryJobConfiguration.of( + "SELECT _airbyte_generation_id FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix LIMIT 1" + ), + ) + if (result.totalRows == 0L) { + return null + } + return result.iterateAll().first()["_airbyte_generation_id"].longValue + } + + private fun createStagingTable(streamId: StreamId, suffix: String) { BigQueryUtils.createPartitionedTableIfNotExists( bigquery, - tableId, - BigQueryRecordFormatter.SCHEMA_V2 + tableId(streamId, suffix), + BigQueryRecordFormatter.SCHEMA_V2, ) } - private fun dropStagingTable(streamId: StreamId) { - val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) - bigquery.delete(tableId) + private fun dropStagingTable(streamId: StreamId, suffix: String) { + bigquery.delete(tableId(streamId, suffix)) } /** @@ -57,11 +110,11 @@ abstract class BigQueryStorageOperation( * the table's partition filter must be turned off to truncate. Since deleting a table is a free * operation this option re-uses functions that already exist */ - private fun truncateStagingTable(streamId: StreamId) { + private fun truncateStagingTable(streamId: StreamId, suffix: String) { val tableId = TableId.of(streamId.rawNamespace, streamId.rawName) log.info { "Truncating raw table $tableId" } - dropStagingTable(streamId) - createStagingTable(streamId) + dropStagingTable(streamId, suffix) + createStagingTable(streamId, suffix) } override fun cleanupStage(streamId: StreamId) { @@ -81,19 +134,20 @@ abstract class BigQueryStorageOperation( } override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) { - if (tmpTableSuffix.isNotBlank()) { - log.info { - "Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${ - streamConfig.id.finalTableId( - BigQuerySqlGenerator.QUOTE, - tmpTableSuffix, - ) - }" - } - destinationHandler.execute( - sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix) - ) + if (tmpTableSuffix == "") { + throw IllegalArgumentException("Cannot overwrite final table with empty suffix") + } + log.info { + "Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${ + streamConfig.id.finalTableId( + BigQuerySqlGenerator.QUOTE, + tmpTableSuffix, + ) + }" } + destinationHandler.execute( + sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix), + ) } override fun typeAndDedupe( @@ -109,4 +163,9 @@ abstract class BigQueryStorageOperation( finalTableSuffix, ) } + + companion object { + fun tableId(streamId: StreamId, suffix: String = ""): TableId = + TableId.of(streamId.rawNamespace, streamId.rawName + suffix) + } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index d92b5588d3f0..868d41a044fb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -35,6 +35,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Set; +import java.util.function.Function; import org.junit.jupiter.api.Test; public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest { @@ -178,7 +179,9 @@ public void testAirbyteMetaAndGenerationIdMigration() throws Exception { // First sync final List messages1 = readMessages("dat/sync1_messages.jsonl"); - runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20"); + // We don't want to send a stream status message, because this version of destination-bigquery will + // crash. + runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20", Function.identity(), null); // Second sync final List messages2 = readMessages("dat/sync2_messages.jsonl"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperationTest.kt b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperationTest.kt new file mode 100644 index 000000000000..6838d748b70b --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryStorageOperationTest.kt @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.operation + +import com.fasterxml.jackson.databind.JsonNode +import com.google.cloud.bigquery.BigQuery +import com.google.cloud.bigquery.BigQueryException +import com.google.cloud.bigquery.DatasetId +import com.google.cloud.bigquery.DatasetInfo +import com.google.cloud.bigquery.QueryJobConfiguration +import com.google.cloud.bigquery.TableResult +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.string.Strings +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.destination.bigquery.BigQueryConsts +import io.airbyte.integrations.destination.bigquery.BigQueryDestination +import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGeneratorIntegrationTest +import io.airbyte.protocol.models.v0.AirbyteMessage.Type +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.nio.file.Files +import java.nio.file.Path +import java.util.Optional +import java.util.stream.Stream +import kotlin.test.assertEquals +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode + +/** + * Extremely barebones integration test for the direct inserts storage op. We should eventually: + * * Make something similar for the GCS storage op + * * Genericize this and put it in the CDK + * * Add assertions for all the columns, not just airbyte_data + * * Actually test all the methods on StorageOperation + */ +@Execution(ExecutionMode.CONCURRENT) +class BigQueryDirectLoadingStorageOperationTest { + private val randomString = Strings.addRandomSuffix("", "", 10) + private val streamId = + StreamId( + finalNamespace = "final_namespace_$randomString", + finalName = "final_name_$randomString", + rawNamespace = "raw_namespace_$randomString", + rawName = "raw_name_$randomString", + originalNamespace = "original_namespace_$randomString", + originalName = "original_name_$randomString", + ) + private val streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + LinkedHashMap(), + GENERATION_ID, + 0, + SYNC_ID, + ) + + @BeforeEach + fun setup() { + bq.create(DatasetInfo.of(streamId.rawNamespace)) + } + + @AfterEach + fun teardown() { + bq.delete( + DatasetId.of(streamId.rawNamespace), + BigQuery.DatasetDeleteOption.deleteContents() + ) + } + + @Test + fun testTransferStage() { + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, TMP_TABLE_SUFFIX) + // Table is currently empty, so expect null generation. + assertEquals(null, storageOperation.getStageGeneration(streamId, TMP_TABLE_SUFFIX)) + + // Write one record to the real raw table + storageOperation.writeToStage( + streamConfig, + "", + Stream.of(record(1)), + ) + assertEquals( + listOf("""{"record_number": 1}"""), + // We write the raw data as a string column, not a JSON column, so use asText(). + dumpRawRecords("").map { it["_airbyte_data"].asText() }, + ) + + // And write one record to the temp final table + storageOperation.writeToStage( + streamConfig, + TMP_TABLE_SUFFIX, + Stream.of(record(2)), + ) + assertEquals( + listOf("""{"record_number": 2}"""), + dumpRawRecords(TMP_TABLE_SUFFIX).map { it["_airbyte_data"].asText() }, + ) + assertEquals(GENERATION_ID, storageOperation.getStageGeneration(streamId, TMP_TABLE_SUFFIX)) + + // If we transfer the records, we should end up with 2 records in the real raw table. + storageOperation.transferFromTempStage(streamId, TMP_TABLE_SUFFIX) + assertEquals( + listOf( + """{"record_number": 1}""", + """{"record_number": 2}""", + ), + dumpRawRecords("") + .sortedBy { + Jsons.deserialize(it["_airbyte_data"].asText())["record_number"].asLong() + } + .map { it["_airbyte_data"].asText() }, + ) + + // After transferring the records to the real table, the temp table should no longer exist. + assertEquals(404, assertThrows { dumpRawRecords(TMP_TABLE_SUFFIX) }.code) + } + + @Test + fun testOverwriteStage() { + // If we then create another temp raw table and _overwrite_ the real raw table, + // we should end up with a single raw record. + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, TMP_TABLE_SUFFIX) + storageOperation.writeToStage( + streamConfig, + "", + Stream.of(record(3)), + ) + storageOperation.writeToStage( + streamConfig, + TMP_TABLE_SUFFIX, + Stream.of(record(4)), + ) + + storageOperation.overwriteStage(streamId, TMP_TABLE_SUFFIX) + + assertEquals( + listOf("""{"record_number": 4}"""), + dumpRawRecords("").map { it["_airbyte_data"].asText() }, + ) + assertEquals(404, assertThrows { dumpRawRecords(TMP_TABLE_SUFFIX) }.code) + } + + private fun dumpRawRecords(suffix: String): List { + val result: TableResult = + bq.query( + QueryJobConfiguration.of( + "SELECT * FROM " + streamId.rawTableId(BigQuerySqlGenerator.QUOTE, suffix) + ), + ) + return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result) + } + + private fun record(recordNumber: Int): PartialAirbyteMessage { + val serializedData = """{"record_number": $recordNumber}""" + return PartialAirbyteMessage() + .withType(Type.RECORD) + .withSerialized(serializedData) + .withRecord( + PartialAirbyteRecordMessage() + .withNamespace(streamId.originalNamespace) + .withStream(streamId.originalName) + .withEmittedAt(10_000) + .withMeta( + AirbyteRecordMessageMeta() + .withChanges(emptyList()) + .withAdditionalProperty( + JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY, + SYNC_ID, + ), + ) + .withData(Jsons.deserialize(serializedData)), + ) + } + + companion object { + private val config = + Jsons.deserialize(Files.readString(Path.of("secrets/credentials-gcs-staging.json"))) + private val bq = BigQueryDestination.getBigQuery(config) + private val projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText() + private val datasetLocation = config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText() + private val storageOperation = + BigQueryDirectLoadingStorageOperation( + bq, + 15, + BigQueryRecordFormatter(), + BigQuerySqlGenerator(projectId, datasetLocation), + BigQueryDestinationHandler(bq, datasetLocation), + datasetLocation, + ) + + private const val SYNC_ID = 12L + private const val GENERATION_ID = 42L + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index aab4c8494fe2..cb4d01d5298b 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -49,10 +49,16 @@ void columnCollision() { put(new ColumnId("current_date_1", "current_date", "current_date_1"), AirbyteProtocolType.INTEGER); } - }, 0, 0, 0), + }, + 1, + 1, + 2), parser.toStreamConfig(new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withGenerationId(1L) + .withMinimumGenerationId(1L) + .withSyncId(2L) .withStream(new AirbyteStream() .withName("foo") .withNamespace("bar") diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index c6af5f4123cb..4d89c39b3c21 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -223,6 +223,7 @@ tutorials: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.7.0 | 2024-05-28 | [38713](https://github.com/airbytehq/airbyte/pull/38713) | Support for refreshes and resumable full refresh. WARNING: You must upgrade to platform 0.63.0 before upgrading to this connector version. | | 2.6.3 | 2024-05-30 | [38331](https://github.com/airbytehq/airbyte/pull/38331) | Internal code changes in preparation for future feature release | | 2.6.2 | 2024-06-07 | [38764](https://github.com/airbytehq/airbyte/pull/38764) | Increase message length limit to 50MiB | | 2.6.1 | 2024-05-29 | [38770](https://github.com/airbytehq/airbyte/pull/38770) | Internal code change (switch to CDK artifact) |