Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: Implement refreshes logic #38713

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.37.1'
cdkVersionRequired = '0.40.0'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.6.3
dockerImageTag: 2.7.0
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand All @@ -31,6 +31,7 @@ data:
memory_request: 1Gi
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation;
import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
Expand Down Expand Up @@ -85,15 +86,15 @@ public boolean isFinalTableEmpty(final StreamId id) {
return BigInteger.ZERO.equals(bq.getTable(TableId.of(id.getFinalNamespace(), id.getFinalName())).getNumRows());
}

public InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName()));
public InitialRawTableStatus getInitialRawTableState(final StreamId id, final String suffix) throws Exception {
final Table rawTable = bq.getTable(TableId.of(id.getRawNamespace(), id.getRawName() + suffix));
if (rawTable == null) {
// Table doesn't exist. There are no unprocessed records, and no timestamp.
return new InitialRawTableStatus(false, false, Optional.empty());
}

final FieldValue unloadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
// bigquery timestamps have microsecond precision
"""
SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
Expand All @@ -109,7 +110,7 @@ SELECT TIMESTAMP_SUB(MIN(_airbyte_extracted_at), INTERVAL 1 MICROSECOND)
}

final FieldValue loadedRecordTimestamp = bq.query(QueryJobConfiguration.newBuilder(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(QUOTE))).replace(
"raw_table", id.rawTableId(QUOTE, suffix))).replace(
"""
SELECT MAX(_airbyte_extracted_at)
FROM ${raw_table}
Expand Down Expand Up @@ -199,11 +200,13 @@ public List<DestinationInitialStatus<BigQueryDestinationState>> gatherInitialSta
for (final StreamConfig streamConfig : streamConfigs) {
final StreamId id = streamConfig.getId();
final Optional<TableDefinition> finalTable = findExistingTable(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id);
final InitialRawTableStatus rawTableState = getInitialRawTableState(id, "");
final InitialRawTableStatus tempRawTableState = getInitialRawTableState(id, AbstractStreamOperation.TMP_TABLE_SUFFIX);
initialStates.add(new DestinationInitialStatus<>(
streamConfig,
finalTable.isPresent(),
rawTableState,
tempRawTableState,
finalTable.isPresent() && !existingSchemaMatchesStreamConfig(streamConfig, finalTable.get()),
finalTable.isEmpty() || isFinalTableEmpty(id),
// Return a default state blob since we don't actually track state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ class BigQueryDirectLoadingStorageOperation(
|More details:
|""".trimMargin()
}
override fun writeToStage(streamConfig: StreamConfig, data: Stream<PartialAirbyteMessage>) {
override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: Stream<PartialAirbyteMessage>
) {
// TODO: why do we need ratelimiter, and using unstable API from Google's guava
rateLimiter.acquire()
val tableId = TableId.of(streamConfig.id.rawNamespace, streamConfig.id.rawName)
val tableId = tableId(streamConfig.id, suffix)
log.info { "Writing data to table $tableId with schema $SCHEMA_V2" }
val writeChannel = initWriteChannel(tableId)
writeChannel.use {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.google.cloud.bigquery.FormatOptions
import com.google.cloud.bigquery.Job
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.LoadJobConfiguration
import com.google.cloud.bigquery.TableId
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.GcsNameTransformer
import io.airbyte.cdk.integrations.destination.gcs.GcsStorageOperations
Expand All @@ -21,7 +20,6 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
import org.joda.time.DateTime
Expand All @@ -46,9 +44,10 @@ class BigQueryGcsStorageOperation(
) {
private val connectionId = UUID.randomUUID()
private val syncDateTime = DateTime.now(DateTimeZone.UTC)
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
super.prepareStage(streamId, destinationSyncMode)
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
super.prepareStage(streamId, suffix, replace)
// prepare staging bucket
// TODO should this also use the suffix?
log.info { "Creating bucket ${gcsConfig.bucketName}" }
gcsStorageOperations.createBucketIfNotExists()
}
Expand All @@ -61,21 +60,29 @@ class BigQueryGcsStorageOperation(
gcsStorageOperations.dropBucketObject(stagingRootPath)
}

override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, data)
copyIntoTableFromStage(streamConfig.id, stagedFileName)
override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: SerializableBuffer
) {
val stagedFileName: String = uploadRecordsToStage(streamConfig.id, suffix, data)
copyIntoTableFromStage(streamConfig.id, suffix, stagedFileName)
}

private fun uploadRecordsToStage(streamId: StreamId, buffer: SerializableBuffer): String {
private fun uploadRecordsToStage(
streamId: StreamId,
suffix: String,
buffer: SerializableBuffer
): String {
val objectPath: String = stagingFullPath(streamId)
log.info {
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName} to path $objectPath"
"Uploading records to for ${streamId.rawNamespace}.${streamId.rawName}$suffix to path $objectPath"
}
return gcsStorageOperations.uploadRecordsToBucket(buffer, streamId.rawNamespace, objectPath)
}

private fun copyIntoTableFromStage(streamId: StreamId, stagedFileName: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
private fun copyIntoTableFromStage(streamId: StreamId, suffix: String, stagedFileName: String) {
val tableId = tableId(streamId, suffix)
val stagingPath = stagingFullPath(streamId)
val fullFilePath = "gs://${gcsConfig.bucketName}/$stagingPath$stagedFileName"
log.info { "Uploading records from file $fullFilePath to target Table $tableId" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package io.airbyte.integrations.destination.bigquery.operation

import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery.TableId
import com.google.cloud.bigquery.TableResult
import io.airbyte.integrations.base.destination.operation.StorageOperation
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
Expand All @@ -14,10 +16,9 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap

private val log = KotlinLogging.logger {}
Expand All @@ -29,39 +30,91 @@ abstract class BigQueryStorageOperation<Data>(
protected val datasetLocation: String
) : StorageOperation<Data> {
private val existingSchemas = ConcurrentHashMap.newKeySet<String>()
override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
// Prepare staging table. For overwrite, it does drop-create so we can skip explicit create.
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
truncateStagingTable(streamId)
if (replace) {
truncateStagingTable(streamId, suffix)
} else {
createStagingTable(streamId)
createStagingTable(streamId, suffix)
}
}

private fun createStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
override fun overwriteStage(streamId: StreamId, suffix: String) {
if (suffix == "") {
throw IllegalArgumentException("Cannot overwrite raw table with empty suffix")
}
bigquery.delete(tableId(streamId, ""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it have to be a no-op when table doesn't exist semantics ? if i remove IF EXISTS in snowflake, truncateRefresh tests fail. as-in this is called when actual table doesn't exist too ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah there's no if rawTableExists check anywhere. We could add that, but I think it's easier to just do if not exists?

(I'll update the javadoc to clarify that that's a requirement)

bigquery.query(
QueryJobConfiguration.of(
"""ALTER TABLE `${streamId.rawNamespace}`.`${streamId.rawName}$suffix` RENAME TO `${streamId.rawName}`"""
gisripa marked this conversation as resolved.
Show resolved Hide resolved
),
)
}

override fun transferFromTempStage(streamId: StreamId, suffix: String) {
if (suffix == "") {
throw IllegalArgumentException(
"Cannot transfer records from temp raw table with empty suffix"
)
}
// TODO figure out how to make this work
// something about incompatible partitioning spec (probably b/c we're copying from a temp
// table partitioned on generation ID into an old real raw table partitioned on
// extracted_at)
val tempRawTable = tableId(streamId, suffix)
// val jobConf =
// CopyJobConfiguration.newBuilder(tableId(streamId, ""), tempRawTable)
// .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
// .build()
// val job = bigquery.create(JobInfo.of(jobConf))
// BigQueryUtils.waitForJobFinish(job)

bigquery.query(
QueryJobConfiguration.of(
"""
INSERT INTO `${streamId.rawNamespace}`.`${streamId.rawName}`
SELECT * FROM `${streamId.rawNamespace}`.`${streamId.rawName}$suffix`
""".trimIndent()
)
)
bigquery.delete(tempRawTable)
gisripa marked this conversation as resolved.
Show resolved Hide resolved
}

override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
val result: TableResult =
bigquery.query(
QueryJobConfiguration.of(
"SELECT _airbyte_generation_id FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix LIMIT 1"
gisripa marked this conversation as resolved.
Show resolved Hide resolved
),
)
if (result.totalRows == 0L) {
return null
}
return result.iterateAll().first()["_airbyte_generation_id"].longValue
}

private fun createStagingTable(streamId: StreamId, suffix: String) {
BigQueryUtils.createPartitionedTableIfNotExists(
bigquery,
tableId,
BigQueryRecordFormatter.SCHEMA_V2
tableId(streamId, suffix),
BigQueryRecordFormatter.SCHEMA_V2,
)
}

private fun dropStagingTable(streamId: StreamId) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
bigquery.delete(tableId)
private fun dropStagingTable(streamId: StreamId, suffix: String) {
bigquery.delete(tableId(streamId, suffix))
}

/**
* "Truncates" table, this is a workaround to the issue with TRUNCATE TABLE in BigQuery where
* the table's partition filter must be turned off to truncate. Since deleting a table is a free
* operation this option re-uses functions that already exist
*/
private fun truncateStagingTable(streamId: StreamId) {
private fun truncateStagingTable(streamId: StreamId, suffix: String) {
val tableId = TableId.of(streamId.rawNamespace, streamId.rawName)
log.info { "Truncating raw table $tableId" }
dropStagingTable(streamId)
createStagingTable(streamId)
dropStagingTable(streamId, suffix)
createStagingTable(streamId, suffix)
}

override fun cleanupStage(streamId: StreamId) {
Expand All @@ -81,19 +134,20 @@ abstract class BigQueryStorageOperation<Data>(
}

override fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) {
if (tmpTableSuffix.isNotBlank()) {
log.info {
"Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${
streamConfig.id.finalTableId(
BigQuerySqlGenerator.QUOTE,
tmpTableSuffix,
)
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix)
)
if (tmpTableSuffix == "") {
throw IllegalArgumentException("Cannot overwrite final table with empty suffix")
}
log.info {
"Overwriting table ${streamConfig.id.finalTableId(BigQuerySqlGenerator.QUOTE)} with ${
streamConfig.id.finalTableId(
BigQuerySqlGenerator.QUOTE,
tmpTableSuffix,
)
}"
}
destinationHandler.execute(
sqlGenerator.overwriteFinalTable(streamConfig.id, tmpTableSuffix),
)
}

override fun typeAndDedupe(
Expand All @@ -109,4 +163,9 @@ abstract class BigQueryStorageOperation<Data>(
finalTableSuffix,
)
}

companion object {
fun tableId(streamId: StreamId, suffix: String = ""): TableId =
TableId.of(streamId.rawNamespace, streamId.rawName + suffix)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.junit.jupiter.api.Test;

public abstract class AbstractBigQueryTypingDedupingTest extends BaseTypingDedupingTest {
Expand Down Expand Up @@ -178,7 +179,9 @@ public void testAirbyteMetaAndGenerationIdMigration() throws Exception {

// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20");
// We don't want to send a stream status message, because this version of destination-bigquery will
// crash.
runSync(catalog, messages1, "airbyte/destination-bigquery:2.4.20", Function.identity(), null);

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand Down
Loading
Loading