Skip to content

Commit

Permalink
bigquery implement refreshes logic
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 31, 2024
1 parent 53d7c23 commit a98f521
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.6.2
dockerImageTag: 2.7.0
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand All @@ -31,6 +31,7 @@ data:
memory_request: 1Gi
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
Expand Down Expand Up @@ -85,15 +86,15 @@ public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.getFinalNamespace(), id.getFinalName())).getNumRows());
}

public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName()));
public InitialRawTableStatus getInitialRawTableState(final StreamId id, final String suffix) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName() + suffix));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
// bigquery timestamps have microsecond precision
"""
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
Expand All @@ -109,7 +110,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
Expand Down Expand Up @@ -199,11 +200,13 @@ public List<DestinationInitialStatus<BigQueryDestinationState>> gatherInitialSta
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.getId();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id, "");
final InitialRawTableStatus tempRawTableState = getInitialRawTableState(id, AbstractStreamOperation.TMP_TABLE_SUFFIX);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
tempRawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BigQueryDirectLoadingStorageOperation(
override fun writeToStage(streamConfig: StreamConfig, data: Stream<PartialAirbyteMessage>) {
// TODO: why do we need ratelimiter, and using unstable API from Google's guava
rateLimiter.acquire()
val tableId = TableId.of(streamConfig.id.rawNamespace, streamConfig.id.rawName)
val tableId = tableId(streamConfig.id)
log.info { "Writing data to table $tableId with schema $SCHEMA_V2" }
val writeChannel = initWriteChannel(tableId)
writeChannel.use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.Job
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.LoadJobConfiguration
import com.google.cloud.bigquery.TableId
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
Expand All @@ -21,7 +20,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import org.joda.time.DateTime
Expand All @@ -46,9 +44,10 @@ class BigQueryGcsStorageOperation(
) {
private val connectionId = UUID.randomUUID()
private val syncDateTime = DateTime.now(DateTimeZone.UTC)
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
super.prepareStage(streamId, destinationSyncMode)
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
super.prepareStage(streamId, suffix, replace)
// prepare staging bucket
// TODO should this also use the suffix?
log.info { "Creating bucket ${gcsConfig.bucketName}" }
gcsStorageOperations.createBucketIfNotExists()
}
Expand All @@ -75,7 +74,7 @@ class BigQueryGcsStorageOperation(
}

private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
val tableId = tableId(streamId)
val stagingPath = stagingFullPath(streamId)
val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName"
log.info { "Uploading records from file $fullFilePath to target Table $tableId" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package io.airbyte.integrations.destination.bigquery.operation

import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.TableResult
import io.airbyte.integrations.base.destination.operation.StorageOperation
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
Expand All @@ -14,10 +16,9 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap

private val log = KotlinLogging.logger {}
Expand All @@ -29,39 +30,83 @@ abstract class BigQueryStorageOperation<Data>(
protected val datasetLocation: String
) : StorageOperation<Data> {
private val existingSchemas = ConcurrentHashMap.newKeySet<String>()
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
// Prepare staging table. For overwrite, it does drop-create so we can skip explicit create.
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
truncateStagingTable(streamId)
if (replace) {
truncateStagingTable(streamId, suffix)
} else {
createStagingTable(streamId)
createStagingTable(streamId, suffix)
}
}

private fun createStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
override fun overwriteStage(streamId: StreamId, suffix: String) {
bigquery.delete(tableId(streamId, ""))
bigquery.query(
QueryJobConfiguration.of(
"""ALTER TABLE `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` RENAME TO `${streamId.rawName}`"""
),
)
}

override fun transferFromTempStage(streamId: StreamId, suffix: String) {
// TODO figure out how to make this work
// something about incompatible partitioning spec (probably b/c we're copying from a temp
// table partitioned on generation ID into an old real raw table partitioned on
// extracted_at)
val tempRawTable = tableId(streamId, suffix)
// val jobConf =
// CopyJobConfiguration.newBuilder(tableId(streamId, ""), tempRawTable)
// .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
// .build()
// val job = bigquery.create(JobInfo.of(jobConf))
// BigQueryUtils.waitForJobFinish(job)

bigquery.query(
QueryJobConfiguration.of(
"""
INSERT INTO `${streamId.rawNamespace}`.`${streamId.rawName}`
SELECT * FROM `${streamId.rawNamespace}`.`${streamId.rawName}$suffix`
""".trimIndent()
)
)
bigquery.delete(tempRawTable)
}

override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
val result: TableResult =
bigquery.query(
QueryJobConfiguration.of(
"SELECT _airbyte_generation_id FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix LIMIT 1"
),
)
if (result.totalRows == 0L) {
return null
}
return result.iterateAll().first()["_airbyte_generation_id"].longValue
}

private fun createStagingTable(streamId: StreamId, suffix: String) {
BigQueryUtils.createPartitionedTableIfNotExists(
bigquery,
tableId,
BigQueryRecordFormatter.SCHEMA_V2
tableId(streamId, suffix),
BigQueryRecordFormatter.SCHEMA_V2,
)
}

private fun dropStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
bigquery.delete(tableId)
private fun dropStagingTable(streamId: StreamId, suffix: String) {
bigquery.delete(tableId(streamId, suffix))
}

/**
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where
* the table's partition filter must be turned off to truncate. Since deleting a table is a free
* operation this option re-uses functions that already exist
*/
private fun truncateStagingTable(streamId: StreamId) {
private fun truncateStagingTable(streamId: StreamId, suffix: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
log.info { "Truncating raw table $tableId" }
dropStagingTable(streamId)
createStagingTable(streamId)
dropStagingTable(streamId, suffix)
createStagingTable(streamId, suffix)
}

override fun cleanupStage(streamId: StreamId) {
Expand Down Expand Up @@ -91,7 +136,7 @@ abstract class BigQueryStorageOperation<Data>(
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix)
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix),
)
}
}
Expand All @@ -109,4 +154,9 @@ abstract class BigQueryStorageOperation<Data>(
finalTableSuffix,
)
}

companion object {
fun tableId(streamId: StreamId, suffix: String = ""): TableId =
TableId.of(streamId.rawNamespace, streamId.rawName + suffix)
}
}
Loading

0 comments on commit a98f521

Please sign in to comment.