Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: airbyte_meta/sync_id/generation_id #38359

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.5.1
dockerImageTag: 2.6.0
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
Expand Down Expand Up @@ -178,26 +178,34 @@ public static Table createTable(final BigQuery bigquery, final String datasetNam
*/
public static void createPartitionedTableIfNotExists(final BigQuery bigquery, final TableId tableId, final Schema schema) {
try {
final var chunkingColumn = JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
final TimePartitioning partitioning = TimePartitioning.newBuilder(TimePartitioning.Type.DAY)
.setField(chunkingColumn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be any side effect in performance by losing the time partition on ab_extracted_at ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking loud Should it be reverse as-in clustering by genId and partitioning by extracted_at

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be convinced to leave this unchanged :P

if we ever want to do the delete ... where generation_id < ? thing, then partitioning on gen ID is how we support that cheaply

and my theory is that partitioning on extracted_at is redundant, since bigquery can just optimize within each partition anyway? but I'm not super familiar with this

// Partition by generation ID. This will be useful for when we want to build
// hybrid refreshes.
final RangePartitioning partitioning = RangePartitioning.newBuilder()
.setField(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
.setRange(RangePartitioning.Range.newBuilder()
.setStart(0L)
// Bigquery allows a table to have up to 10_000 partitions.
.setEnd(10_000L)
// Somewhat conservative estimate. This should avoid issues with
// users running many merge refreshes.
.setInterval(5L)
.build())
.build();

final Clustering clustering = Clustering.newBuilder()
.setFields(ImmutableList.of(chunkingColumn))
.setFields(ImmutableList.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT))
.build();

final StandardTableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
.setSchema(schema)
.setTimePartitioning(partitioning)
.setRangePartitioning(partitioning)
.setClustering(clustering)
.build();
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

final Table table = bigquery.getTable(tableInfo.getTableId());
if (table != null && table.exists()) {
// TODO: Handle migration from v1 -> v2
LOGGER.info("Partitioned table ALREADY EXISTS: {}", tableId);
} else {
bigquery.create(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.bigquery.formatter;

import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.Schema;
Expand All @@ -12,7 +14,6 @@
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage;
import io.airbyte.commons.json.Jsons;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -26,21 +27,24 @@ public class BigQueryRecordFormatter {
Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP),
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING));
Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_META, StandardSQLTypeName.STRING),
Field.of(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, StandardSQLTypeName.INT64));

public BigQueryRecordFormatter() {}

public String formatRecord(PartialAirbyteMessage recordMessage) {
// Map.of has a @NonNull requirement, so creating a new Hash map
final HashMap<String, Object> destinationV2record = new HashMap<>();
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString());
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage.getRecord()));
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, null);
destinationV2record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getSerialized());
return Jsons.serialize(destinationV2record);
public String formatRecord(PartialAirbyteMessage recordMessage, long generationId) {
final ObjectNode record = (ObjectNode) Jsons.emptyObject();
record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString());
record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage.getRecord()));
record.set(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, NullNode.instance);
record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getSerialized());
record.put(JavaBaseConstants.COLUMN_NAME_AB_META, Jsons.serialize(recordMessage.getRecord().getMeta()));
record.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId);
return Jsons.serialize(record);
}

private Object getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) {
private String getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) {
edgao marked this conversation as resolved.
Show resolved Hide resolved
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo
_airbyte_raw_id STRING NOT NULL,
_airbyte_extracted_at TIMESTAMP NOT NULL,
_airbyte_meta JSON NOT NULL,
_airbyte_generation_id INTEGER,
${column_declarations}
)
PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY))
Expand Down Expand Up @@ -310,7 +311,8 @@ private String insertNewRecords(final StreamConfig stream,
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
)
${extractNewRawRecords};""");
}
Expand Down Expand Up @@ -397,17 +399,20 @@ private String upsertNewRecords(final StreamConfig stream,
${columnAssignments}
_airbyte_meta = new_record._airbyte_meta,
_airbyte_raw_id = new_record._airbyte_raw_id,
_airbyte_extracted_at = new_record._airbyte_extracted_at
_airbyte_extracted_at = new_record._airbyte_extracted_at,
_airbyte_generation_id = new_record._airbyte_generation_id
WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT (
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
) VALUES (
${newRecordColumnList}
new_record._airbyte_meta,
new_record._airbyte_raw_id,
new_record._airbyte_extracted_at
new_record._airbyte_extracted_at,
new_record._airbyte_generation_id
);""");
}

Expand Down Expand Up @@ -438,7 +443,7 @@ private String extractNewRawRecords(final StreamConfig stream,
WHEN (JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"') IS NOT NULL)
AND (JSON_TYPE(JSON_QUERY(PARSE_JSON(`_airbyte_data`, wide_number_mode=>'round'), '$."${raw_col_name}"')) != 'null')
AND (${json_extract} IS NULL)
THEN 'Problem with `${raw_col_name}`'
THEN JSON '{"field":"${raw_col_name}","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}'
ELSE NULL
END"""))
.collect(joining(",\n")) + "]";
Expand Down Expand Up @@ -488,7 +493,9 @@ WITH intermediate_data AS (
${column_casts}
${column_errors} AS column_errors,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_meta,
_airbyte_generation_id
FROM ${project_id}.${raw_table_id}
WHERE (
_airbyte_loaded_at IS NULL
Expand All @@ -497,17 +504,26 @@ WITH intermediate_data AS (
), new_records AS (
SELECT
${column_list}
to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
to_json(json_set(
coalesce(parse_json(_airbyte_meta), JSON'{}'),
'$.changes',
json_array_append(
coalesce(json_query(parse_json(_airbyte_meta), '$.changes'), JSON'[]'),
'$',
COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), [])
)
)) as _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
FROM intermediate_data
), numbered_rows AS (
SELECT *, row_number() OVER (
PARTITION BY ${pk_list} ORDER BY ${cursor_order_clause} `_airbyte_extracted_at` DESC
) AS row_number
FROM new_records
)
SELECT ${column_list} _airbyte_meta, _airbyte_raw_id, _airbyte_extracted_at
SELECT ${column_list} _airbyte_meta, _airbyte_raw_id, _airbyte_extracted_at, _airbyte_generation_id
FROM numbered_rows
WHERE row_number = 1""");
} else {
Expand All @@ -527,17 +543,28 @@ WITH intermediate_data AS (
${column_casts}
${column_errors} AS column_errors,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_meta,
_airbyte_generation_id
FROM ${project_id}.${raw_table_id}
WHERE
_airbyte_loaded_at IS NULL
${extractedAtCondition}
)
SELECT
${column_list}
to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
to_json(json_set(
coalesce(parse_json(_airbyte_meta), JSON'{}'),
'$.changes',
json_array_append(
coalesce(json_query(parse_json(_airbyte_meta), '$.changes'), JSON'[]'),
'$',
COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), [])
)
)) as _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
_airbyte_extracted_at,
_airbyte_generation_id
FROM intermediate_data""");
}
}
Expand Down Expand Up @@ -599,7 +626,9 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi
_airbyte_raw_id STRING,
_airbyte_data STRING,
_airbyte_extracted_at TIMESTAMP,
_airbyte_loaded_at TIMESTAMP
_airbyte_loaded_at TIMESTAMP,
_airbyte_meta STRING,
_airbyte_generation_id INTEGER
)
PARTITION BY DATE(_airbyte_extracted_at)
CLUSTER BY _airbyte_extracted_at
Expand All @@ -608,7 +637,9 @@ PARTITION BY DATE(_airbyte_extracted_at)
_airbyte_ab_id AS _airbyte_raw_id,
_airbyte_data AS _airbyte_data,
_airbyte_emitted_at AS _airbyte_extracted_at,
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at,
'{"sync_id": 0, "changes": []}' AS _airbyte_meta,
0 as _airbyte_generation_id
FROM ${project_id}.${v1_raw_table}
);
"""));
Expand All @@ -626,8 +657,8 @@ private String escapeColumnNameForJsonPath(final String stringContents) {
return stringContents
// Consider the JSON blob {"foo\\bar": 42}.
// This is an object with key foo\bar.
// The JSONPath for this is (something like...?) $."foo\\bar" (i.e. 2 backslashes).
// TODO is that jsonpath correct?
// The JSONPath for this is $."foo\\bar" (i.e. 2 backslashes to represent the single
// backslash in the key).
// When we represent that path as a SQL string, the backslashes are doubled (to 4): '$."foo\\\\bar"'
// And we're writing that in a Java string, so we have to type out 8 backslashes:
// "'$.\"foo\\\\\\\\bar\"'"
Expand All @@ -637,7 +668,7 @@ private String escapeColumnNameForJsonPath(final String stringContents) {
// which is \\" in a SQL string: '$."foo\\"bar"'
// The backslashes become \\\\ in java, and the quote becomes \": "'$.\"foo\\\\\"bar\"'"
.replace("\"", "\\\\\"")
// Here we're escaping a SQL string, so we only need a single backslash (which is 2, beacuse Java).
// Here we're escaping a SQL string, so we only need a single backslash (which is 2, because Java).
.replace("'", "\\'");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils.*
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDV2Migration
import io.airbyte.integrations.destination.bigquery.migrators.BigQueryDestinationState
import io.airbyte.integrations.destination.bigquery.migrators.BigqueryAirbyteMetaAndGenerationIdMigration
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation
import io.airbyte.integrations.destination.bigquery.operation.BigQueryGcsStorageOperation
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler
Expand Down Expand Up @@ -226,7 +227,11 @@ class BigQueryDestination : BaseConnector(), Destination {
)
val destinationHandler = BigQueryDestinationHandler(bigquery, datasetLocation)

val migrations = listOf(BigQueryDV2Migration(sqlGenerator, bigquery))
val migrations =
listOf(
BigQueryDV2Migration(sqlGenerator, bigquery),
BigqueryAirbyteMetaAndGenerationIdMigration(bigquery),
)

if (uploadingMethod == UploadingMethod.STANDARD) {
val bigQueryClientChunkSize = getBigQueryClientChunkSize(config)
Expand Down Expand Up @@ -289,7 +294,7 @@ class BigQueryDestination : BaseConnector(), Destination {
bigQueryGcsStorageOperations,
initialStatus,
FileUploadFormat.CSV,
V2_WITHOUT_META,
V2_WITH_GENERATION,
disableTD
)
},
Expand Down
Loading
Loading