Skip to content

Commit

Permalink
Destination bigquery: Implement refreshes logic (#38713)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jun 17, 2024
1 parent cb6f6ec commit 9b36fa0
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.37.1'
cdkVersionRequired = '0.40.0'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.6.3
dockerImageTag: 2.7.0
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand All @@ -31,6 +31,7 @@ data:
memory_request: 1Gi
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
Expand Down Expand Up @@ -85,15 +86,15 @@ public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.getFinalNamespace(), id.getFinalName())).getNumRows());
}

public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName()));
public InitialRawTableStatus getInitialRawTableState(final StreamId id, final String suffix) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName() + suffix));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
// bigquery timestamps have microsecond precision
"""
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
Expand All @@ -109,7 +110,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
Expand Down Expand Up @@ -199,11 +200,13 @@ public List<DestinationInitialStatus<BigQueryDestinationState>> gatherInitialSta
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.getId();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id, "");
final InitialRawTableStatus tempRawTableState = getInitialRawTableState(id, AbstractStreamOperation.TMP_TABLE_SUFFIX);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
tempRawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ class BigQueryDirectLoadingStorageOperation(
|More details:
|""".trimMargin()
}
override fun writeToStage(streamConfig: StreamConfig, data: Stream<PartialAirbyteMessage>) {
override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: Stream<PartialAirbyteMessage>
) {
// TODO: why do we need ratelimiter, and using unstable API from Google's guava
rateLimiter.acquire()
val tableId = TableId.of(streamConfig.id.rawNamespace, streamConfig.id.rawName)
val tableId = tableId(streamConfig.id, suffix)
log.info { "Writing data to table $tableId with schema $SCHEMA_V2" }
val writeChannel = initWriteChannel(tableId)
writeChannel.use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.Job
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.LoadJobConfiguration
import com.google.cloud.bigquery.TableId
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
Expand All @@ -21,7 +20,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import org.joda.time.DateTime
Expand All @@ -46,9 +44,10 @@ class BigQueryGcsStorageOperation(
) {
private val connectionId = UUID.randomUUID()
private val syncDateTime = DateTime.now(DateTimeZone.UTC)
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
super.prepareStage(streamId, destinationSyncMode)
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
super.prepareStage(streamId, suffix, replace)
// prepare staging bucket
// TODO should this also use the suffix?
log.info { "Creating bucket ${gcsConfig.bucketName}" }
gcsStorageOperations.createBucketIfNotExists()
}
Expand All @@ -61,21 +60,29 @@ class BigQueryGcsStorageOperation(
gcsStorageOperations.dropBucketObject(stagingRootPath)
}

override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, data)
copyIntoTableFromStage(streamConfig.id, stagedFileName)
override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: SerializableBuffer
) {
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, suffix, data)
copyIntoTableFromStage(streamConfig.id, suffix, stagedFileName)
}

private fun uploadRecordsToStage(streamId: StreamId, buffer: SerializableBuffer): String {
private fun uploadRecordsToStage(
streamId: StreamId,
suffix: String,
buffer: SerializableBuffer
): String {
val objectPath: String = stagingFullPath(streamId)
log.info {
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath"
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName}$suffix to path $objectPath"
}
return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath)
}

private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
private fun copyIntoTableFromStage(streamId: StreamId, suffix: String, stagedFileName: String) {
val tableId = tableId(streamId, suffix)
val stagingPath = stagingFullPath(streamId)
val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName"
log.info { "Uploading records from file $fullFilePath to target Table $tableId" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package io.airbyte.integrations.destination.bigquery.operation

import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.TableResult
import io.airbyte.integrations.base.destination.operation.StorageOperation
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
Expand All @@ -14,10 +16,9 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap

private val log = KotlinLogging.logger {}
Expand All @@ -29,39 +30,91 @@ abstract class BigQueryStorageOperation<Data>(
protected val datasetLocation: String
) : StorageOperation<Data> {
private val existingSchemas = ConcurrentHashMap.newKeySet<String>()
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
// Prepare staging table. For overwrite, it does drop-create so we can skip explicit create.
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
truncateStagingTable(streamId)
if (replace) {
truncateStagingTable(streamId, suffix)
} else {
createStagingTable(streamId)
createStagingTable(streamId, suffix)
}
}

private fun createStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
override fun overwriteStage(streamId: StreamId, suffix: String) {
if (suffix == "") {
throw IllegalArgumentException("Cannot overwrite raw table with empty suffix")
}
bigquery.delete(tableId(streamId, ""))
bigquery.query(
QueryJobConfiguration.of(
"""ALTER TABLE `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` RENAME TO `${streamId.rawName}`"""
),
)
}

override fun transferFromTempStage(streamId: StreamId, suffix: String) {
if (suffix == "") {
throw IllegalArgumentException(
"Cannot transfer records from temp raw table with empty suffix"
)
}
// TODO figure out how to make this work
// something about incompatible partitioning spec (probably b/c we're copying from a temp
// table partitioned on generation ID into an old real raw table partitioned on
// extracted_at)
val tempRawTable = tableId(streamId, suffix)
// val jobConf =
// CopyJobConfiguration.newBuilder(tableId(streamId, ""), tempRawTable)
// .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
// .build()
// val job = bigquery.create(JobInfo.of(jobConf))
// BigQueryUtils.waitForJobFinish(job)

bigquery.query(
QueryJobConfiguration.of(
"""
INSERT INTO `${streamId.rawNamespace}`.`${streamId.rawName}`
SELECT * FROM `${streamId.rawNamespace}`.`${streamId.rawName}$suffix`
""".trimIndent()
)
)
bigquery.delete(tempRawTable)
}

override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
val result: TableResult =
bigquery.query(
QueryJobConfiguration.of(
"SELECT _airbyte_generation_id FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix LIMIT 1"
),
)
if (result.totalRows == 0L) {
return null
}
return result.iterateAll().first()["_airbyte_generation_id"].longValue
}

private fun createStagingTable(streamId: StreamId, suffix: String) {
BigQueryUtils.createPartitionedTableIfNotExists(
bigquery,
tableId,
BigQueryRecordFormatter.SCHEMA_V2
tableId(streamId, suffix),
BigQueryRecordFormatter.SCHEMA_V2,
)
}

private fun dropStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
bigquery.delete(tableId)
private fun dropStagingTable(streamId: StreamId, suffix: String) {
bigquery.delete(tableId(streamId, suffix))
}

/**
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where
* the table's partition filter must be turned off to truncate. Since deleting a table is a free
* operation this option re-uses functions that already exist
*/
private fun truncateStagingTable(streamId: StreamId) {
private fun truncateStagingTable(streamId: StreamId, suffix: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
log.info { "Truncating raw table $tableId" }
dropStagingTable(streamId)
createStagingTable(streamId)
dropStagingTable(streamId, suffix)
createStagingTable(streamId, suffix)
}

override fun cleanupStage(streamId: StreamId) {
Expand All @@ -81,19 +134,20 @@ abstract class BigQueryStorageOperation<Data>(
}

override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) {
if (tmpTableSuffix.isNotBlank()) {
log.info {
"Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${
streamConfig.id.finalTableId(
BigQuerySqlGenerator.QUOTE,
tmpTableSuffix,
)
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix)
)
if (tmpTableSuffix == "") {
throw IllegalArgumentException("Cannot overwrite final table with empty suffix")
}
log.info {
"Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${
streamConfig.id.finalTableId(
BigQuerySqlGenerator.QUOTE,
tmpTableSuffix,
)
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix),
)
}

override fun typeAndDedupe(
Expand All @@ -109,4 +163,9 @@ abstract class BigQueryStorageOperation<Data>(
finalTableSuffix,
)
}

companion object {
fun tableId(streamId: StreamId, suffix: String = ""): TableId =
TableId.of(streamId.rawNamespace, streamId.rawName + suffix)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.junit.jupiter.api.Test;

public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest {
Expand Down Expand Up @@ -178,7 +179,9 @@ public void testAirbyteMetaAndGenerationIdMigration() throws Exception {

// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20");
// We don't want to send a stream status message, because this version of destination-bigquery will
// crash.
runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20", Function.identity(), null);

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand Down
Loading

0 comments on commit 9b36fa0

Please sign in to comment.