diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 6df3aa04eac6..1de0a29272d4 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.4.17 + dockerImageTag: 2.4.18 dockerRepository: airbyte/destination-bigquery documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery githubIssueLabel: destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java index d5df33e2544b..0d5c548a4a9d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java @@ -8,7 +8,7 @@ import com.google.common.util.concurrent.RateLimiter; import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.concurrent.ConcurrentMap; @@ -24,10 +24,10 @@ public class BigQueryAsyncStandardFlush implements DestinationFlushFunction { private static final RateLimiter rateLimiter = RateLimiter.create(0.07); private final BigQuery bigQuery; - private final Supplier>> uploaderMap; + private final Supplier> uploaderMap; public BigQueryAsyncStandardFlush(final BigQuery bigQuery, - final Supplier>> uploaderMap) { + final Supplier> uploaderMap) { this.bigQuery = bigQuery; this.uploaderMap = uploaderMap; } @@ -35,7 +35,7 @@ public BigQueryAsyncStandardFlush(final BigQuery bigQuery, @Override public void flush(final StreamDescriptor decs, final Stream stream) throws Exception { rateLimiter.acquire(); - final ConcurrentMap> uploaderMapSupplied = uploaderMap.get(); + final ConcurrentMap uploaderMapSupplied = uploaderMap.get(); final AtomicInteger recordCount = new AtomicInteger(); stream.forEach(aibyteMessage -> { try { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 9627276452c7..253a95b49dff 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -38,13 +38,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.formatter.GcsCsvBigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator; -import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; @@ -292,14 +290,14 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN BigQueryUtils.getDatasetId(config)); } - protected Supplier>> getUploaderMap( - final BigQuery bigquery, - final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final ParsedCatalog parsedCatalog) + protected Supplier> getUploaderMap( + final BigQuery bigquery, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final ParsedCatalog parsedCatalog) throws IOException { return () -> { - final ConcurrentMap> uploaderMap = new ConcurrentHashMap<>(); + final ConcurrentMap uploaderMap = new ConcurrentHashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); final StreamConfig parsedStream; @@ -315,7 +313,7 @@ protected Supplier> uploaderMap) + final Map uploaderMap) throws IOException { uploaderMap.put( AirbyteStreamNameNamespacePair.fromAirbyteStream(stream), @@ -351,10 +349,10 @@ protected boolean isDefaultAirbyteTmpTableSchema() { return true; } - protected Map getFormatterMap(final JsonNode jsonSchema) { + protected Map getFormatterMap() { return Map.of( - UploaderType.STANDARD, new DefaultBigQueryRecordFormatter(jsonSchema, namingResolver), - UploaderType.CSV, new GcsCsvBigQueryRecordFormatter(jsonSchema, namingResolver)); + UploaderType.STANDARD, new BigQueryRecordFormatter(namingResolver), + UploaderType.CSV, new BigQueryRecordFormatter(namingResolver)); } private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery, @@ -364,7 +362,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer final Consumer outputRecordCollector, final TyperDeduper typerDeduper) throws Exception { - final Supplier>> writeConfigs = getUploaderMap( + final Supplier> writeConfigs = getUploaderMap( bigquery, config, catalog, @@ -390,7 +388,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer LOGGER.info("Raw table {} not found, continuing with creation", rawTableId); } LOGGER.info("Creating table {}", rawTableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, DefaultBigQueryRecordFormatter.SCHEMA_V2); + BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2); } else { uploader.createRawTable(); } @@ -415,7 +413,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer } protected Function getCsvRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) { - return streamSchema -> new GcsCsvBigQueryRecordFormatter(streamSchema, namingResolver); + return streamSchema -> new BigQueryRecordFormatter(namingResolver); } private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java deleted file mode 100644 index 36826772fd87..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.TableId; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; -import io.airbyte.cdk.integrations.base.FailureTrackingAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; -import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter; -import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteMessage.Type; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Record Consumer used for STANDARD INSERTS - */ -@SuppressWarnings("try") -class BigQueryRecordConsumer extends FailureTrackingAirbyteMessageConsumer implements AirbyteMessageConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordConsumer.class); - - private final BigQuery bigquery; - private final Map> uploaderMap; - private final Consumer outputRecordCollector; - private final String defaultDatasetId; - private AirbyteMessage lastStateMessage = null; - - private final ParsedCatalog catalog; - private final TyperDeduper typerDeduper; - - public BigQueryRecordConsumer(final BigQuery bigquery, - final Map> uploaderMap, - final Consumer outputRecordCollector, - final String defaultDatasetId, - final TyperDeduper typerDeduper, - final ParsedCatalog catalog) { - this.bigquery = bigquery; - this.uploaderMap = uploaderMap; - this.outputRecordCollector = outputRecordCollector; - this.defaultDatasetId = defaultDatasetId; - this.typerDeduper = typerDeduper; - this.catalog = catalog; - - LOGGER.info("Got parsed catalog {}", catalog); - LOGGER.info("Got canonical stream IDs {}", uploaderMap.keySet()); - } - - @Override - protected void startTracked() { - // todo (cgardens) - move contents of #write into this method. - // Set up our raw tables - uploaderMap.forEach((streamId, uploader) -> { - final StreamConfig stream = catalog.getStream(streamId); - if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE) { - // For streams in overwrite mode, truncate the raw table. - // non-1s1t syncs actually overwrite the raw table at the end of the sync, so we only do this in - // 1s1t mode. - final TableId rawTableId = TableId.of(stream.getId().getRawNamespace(), stream.getId().getRawName()); - bigquery.delete(rawTableId); - BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, DefaultBigQueryRecordFormatter.SCHEMA_V2); - } else { - uploader.createRawTable(); - } - }); - } - - /** - * Processes STATE and RECORD {@link AirbyteMessage} with all else logged as unexpected - * - *
  • For STATE messages emit messages back to the platform
  • - *
  • For RECORD messages upload message to associated Airbyte Stream. This means that RECORDS will - * be associated with their respective streams when more than one record exists
  • - * - * @param message {@link AirbyteMessage} to be processed - */ - @Override - public void acceptTracked(final AirbyteMessage message) throws Exception { - if (message.getType() == Type.STATE) { - lastStateMessage = message; - outputRecordCollector.accept(message); - } else if (message.getType() == Type.RECORD) { - if (StringUtils.isEmpty(message.getRecord().getNamespace())) { - message.getRecord().setNamespace(defaultDatasetId); - } - processRecord(message); - } else { - LOGGER.warn("Unexpected message: {}", message.getType()); - } - } - - /** - * Processes {@link io.airbyte.protocol.models.AirbyteRecordMessage} by writing Airbyte stream data - * to Big Query Writer - * - * @param message record to be written - */ - private void processRecord(final AirbyteMessage message) { - final var streamId = AirbyteStreamNameNamespacePair.fromRecordMessage(message.getRecord()); - uploaderMap.get(streamId).upload(message); - // We are not doing any incremental typing and de-duping for Standard Inserts, see - // https://github.com/airbytehq/airbyte/issues/27586 - } - - @Override - public void close(final boolean hasFailed) throws Exception { - LOGGER.info("Started closing all connections"); - final List exceptionsThrown = new ArrayList<>(); - uploaderMap.forEach((streamId, uploader) -> { - try { - uploader.close(hasFailed, outputRecordCollector, lastStateMessage); - typerDeduper.typeAndDedupe(streamId.getNamespace(), streamId.getName()); - } catch (final Exception e) { - exceptionsThrown.add(e); - LOGGER.error("Exception while closing uploader {}", uploader, e); - } - }); - typerDeduper.commitFinalTables(); - typerDeduper.cleanup(); - - ConnectorExceptionUtil.logAllAndThrowFirst("Exceptions thrown while closing consumer: ", exceptionsThrown); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java index e468f1697630..f914f501b18c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java @@ -10,7 +10,7 @@ import io.airbyte.cdk.integrations.destination.async.state.FlushFailure; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; -import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; @@ -31,7 +31,7 @@ public BigQueryRecordStandardConsumer(Consumer outputRecordColle BigQuery bigQuery, ConfiguredAirbyteCatalog catalog, Optional defaultNamespace, - Supplier>> uploaderMap) { + Supplier> uploaderMap) { super(outputRecordCollector, onStart, onClose, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 6010d41018fe..7df3bf623d96 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -13,7 +13,6 @@ import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer; import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager; import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction; -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction; import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; @@ -117,12 +116,6 @@ private Map createWriteConfigs(final Json } /** - * Sets up {@link BufferedStreamConsumer} with creation of the destination's raw tables - * - *

    - * Note: targetTableId is synonymous with airbyte_raw table - *

    - * * @param bigQueryGcsOperations collection of Google Cloud Storage Operations * @param writeConfigs configuration settings used to describe how to write data and where it exists */ diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 82b3d67a5c7d..3180c9e8828f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -7,8 +7,6 @@ import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.api.gax.rpc.HeaderProvider; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; @@ -17,7 +15,6 @@ import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.google.cloud.bigquery.InsertAllResponse; @@ -25,7 +22,6 @@ import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; @@ -42,13 +38,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -56,8 +46,6 @@ import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.logging.log4j.util.Strings; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,7 +228,7 @@ static void createPartitionedTableIfNotExists(final BigQuery bigquery, final Tab public static JsonNode getGcsJsonNodeConfig(final JsonNode config) { final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); - final JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder() + return Jsons.jsonNode(ImmutableMap.builder() .put(BigQueryConsts.GCS_BUCKET_NAME, loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME)) .put(BigQueryConsts.GCS_BUCKET_PATH, loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH)) .put(BigQueryConsts.GCS_BUCKET_REGION, getDatasetLocation(config)) @@ -250,36 +238,12 @@ public static JsonNode getGcsJsonNodeConfig(final JsonNode config) { + " \"flattening\": \"No flattening\"\n" + "}")) .build()); - - // Do not log the gcsJsonNode to avoid accidentally emitting credentials (even at DEBUG/TRACE level) - return gcsJsonNode; - } - - public static GcsDestinationConfig getGcsAvroDestinationConfig(final JsonNode config) { - return GcsDestinationConfig.getGcsDestinationConfig(getGcsAvroJsonNodeConfig(config)); } public static GcsDestinationConfig getGcsCsvDestinationConfig(final JsonNode config) { return GcsDestinationConfig.getGcsDestinationConfig(getGcsJsonNodeConfig(config)); } - public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { - final JsonNode loadingMethod = config.get(BigQueryConsts.LOADING_METHOD); - final JsonNode gcsJsonNode = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.GCS_BUCKET_NAME, loadingMethod.get(BigQueryConsts.GCS_BUCKET_NAME)) - .put(BigQueryConsts.GCS_BUCKET_PATH, loadingMethod.get(BigQueryConsts.GCS_BUCKET_PATH)) - .put(BigQueryConsts.GCS_BUCKET_REGION, getDatasetLocation(config)) - .put(BigQueryConsts.CREDENTIAL, loadingMethod.get(BigQueryConsts.CREDENTIAL)) - .put(BigQueryConsts.FORMAT, Jsons.deserialize("{\n" - + " \"format_type\": \"AVRO\",\n" - + " \"flattening\": \"No flattening\"\n" - + "}")) - .build()); - - LOGGER.debug("Composed GCS config is: \n" + gcsJsonNode.toPrettyString()); - return gcsJsonNode; - } - /** * @return a default schema name based on the config. */ @@ -318,63 +282,6 @@ public static boolean getDisableTypeDedupFlag(final JsonNode config) { return false; } - static TableDefinition getTableDefinition(final BigQuery bigquery, final String datasetName, final String tableName) { - final TableId tableId = TableId.of(datasetName, tableName); - return bigquery.getTable(tableId).getDefinition(); - } - - /** - * @param fieldList - the list to be checked - * @return The list of fields with datetime format. - * - */ - public static List getDateTimeFieldsFromSchema(final FieldList fieldList) { - final List dateTimeFields = new ArrayList<>(); - for (final Field field : fieldList) { - if (field.getType().getStandardType().equals(StandardSQLTypeName.DATETIME)) { - dateTimeFields.add(field.getName()); - } - } - return dateTimeFields; - } - - /** - * @param dateTimeFields - list contains fields of DATETIME format - * @param data - Json will be sent to Google BigData service - * - * The special DATETIME format is required to save this type to BigQuery. - * @see Supported - * Google bigquery datatype This method is responsible to adapt JSON DATETIME to Bigquery - */ - public static void transformJsonDateTimeToBigDataFormat(final List dateTimeFields, final JsonNode data) { - dateTimeFields.forEach(e -> { - if (data.isObject() && data.findValue(e) != null && !data.get(e).isNull()) { - final ObjectNode dataObject = (ObjectNode) data; - final JsonNode value = data.findValue(e); - if (value.isArray()) { - final ArrayNode arrayNode = (ArrayNode) value; - final ArrayNode newArrayNode = dataObject.putArray(e); - arrayNode.forEach(jsonNode -> newArrayNode.add(getFormattedBigQueryDateTime(jsonNode.asText()))); - } else if (value.isTextual()) { - dataObject.put(e, getFormattedBigQueryDateTime(value.asText())); - } else { - throw new RuntimeException("Unexpected transformation case"); - } - } - }); - } - - private static String getFormattedBigQueryDateTime(final String dateTimeValue) { - return (dateTimeValue != null ? QueryParameterValue - .dateTime(new DateTime(convertDateToInstantFormat(dateTimeValue)).withZone(DateTimeZone.UTC).toString(BIG_QUERY_DATETIME_FORMAT)).getValue() - : null); - } - - public static String sanitizeDatasetId(final String datasetId) { - return NAME_TRANSFORMER.getNamespace(datasetId); - } - /** * Maps Airbyte internal sync modes with that of BigQuery's sync modes (aka Write Disposition) * @@ -460,23 +367,4 @@ private static String getConnectorNameOrDefault() { .orElse("destination-bigquery"); } - private static String convertDateToInstantFormat(final String data) { - Instant instant = null; - try { - - final ZonedDateTime zdt = ZonedDateTime.parse(data, formatter); - instant = zdt.toLocalDateTime().toInstant(ZoneOffset.UTC); - return instant.toString(); - } catch (final DateTimeParseException e) { - try { - final LocalDateTime dt = LocalDateTime.parse(data, formatter); - instant = dt.toInstant(ZoneOffset.UTC); - return instant.toString(); - } catch (final DateTimeParseException ex) { - // no logging since it may generate too much noise - } - } - return instant == null ? null : instant.toString(); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index 341b812b5a2b..9a14ec303682 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -4,15 +4,18 @@ package io.airbyte.integrations.destination.bigquery.formatter; -import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage; +import io.airbyte.commons.json.Jsons; import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,66 +23,40 @@ * The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a * corresponding uploader. */ -public abstract class BigQueryRecordFormatter { +public class BigQueryRecordFormatter { + public static final Schema SCHEMA_V2 = Schema.of( + Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP), + Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP), + Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class); - protected Schema bigQuerySchema; - protected final Map> mapOfFailedFields = new HashMap<>(); protected final StandardNameTransformer namingResolver; - protected final JsonNode originalJsonSchema; - protected JsonNode jsonSchema; - /** - * These parameters are required for the correct operation of denormalize version of the connector. - */ - protected final Set invalidKeys = new HashSet<>(); - protected final Set fieldsContainRefDefinitionValue = new HashSet<>(); - - public BigQueryRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { + public BigQueryRecordFormatter(final StandardNameTransformer namingResolver) { this.namingResolver = namingResolver; - this.originalJsonSchema = jsonSchema.deepCopy(); - this.jsonSchema = formatJsonSchema(jsonSchema.deepCopy()); } - protected JsonNode formatJsonSchema(final JsonNode jsonSchema) { - // Do nothing by default - return jsonSchema; - }; - - public abstract JsonNode formatRecord(AirbyteRecordMessage recordMessage); - public String formatRecord(PartialAirbyteMessage recordMessage) { - return ""; - } - - public Schema getBigQuerySchema() { - if (bigQuerySchema == null) { - bigQuerySchema = getBigQuerySchema(jsonSchema); - } - return bigQuerySchema; + // Map.of has a @NonNull requirement, so creating a new Hash map + final HashMap destinationV2record = new HashMap<>(); + destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString()); + destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage.getRecord())); + destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, null); + destinationV2record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getSerialized()); + return Jsons.serialize(destinationV2record); } - public JsonNode getJsonSchema() { - return jsonSchema; + private Object getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) { + // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then + // use BQ helpers to string-format correctly. + final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); + return QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); } - protected abstract Schema getBigQuerySchema(JsonNode jsonSchema); - - protected void logFieldFail(final String error, final String fieldName) { - mapOfFailedFields.putIfAbsent(error, new HashSet<>()); - mapOfFailedFields.get(error).add(fieldName); - } - - public void printAndCleanFieldFails() { - if (!mapOfFailedFields.isEmpty()) { - mapOfFailedFields.forEach( - (error, fieldNames) -> LOGGER.warn( - "Field(s) fail with error {}. Fields : {} ", - error, - String.join(", ", fieldNames))); - mapOfFailedFields.clear(); - } + public Schema getBigQuerySchema() { + return SCHEMA_V2; } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryRecordFormatter.java deleted file mode 100644 index 59e98bc957fa..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryRecordFormatter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.formatter; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.QueryParameterValue; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; -import java.util.HashMap; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -/** - * Default BigQuery formatter. Represents default Airbyte schema (three columns). Note! Default - * formatter is used inside Direct uploader. - */ -public class DefaultBigQueryRecordFormatter extends BigQueryRecordFormatter { - - public static final com.google.cloud.bigquery.Schema SCHEMA_V2 = com.google.cloud.bigquery.Schema.of( - Field.of(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, StandardSQLTypeName.TIMESTAMP), - Field.of(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, StandardSQLTypeName.TIMESTAMP), - Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING)); - - public DefaultBigQueryRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { - super(jsonSchema, namingResolver); - } - - @Override - public JsonNode formatRecord(final AirbyteRecordMessage recordMessage) { - // Map.of has a @NonNull requirement, so creating a new Hash map - final HashMap destinationV2record = new HashMap<>(); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString()); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(recordMessage)); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, null); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_DATA, getData(recordMessage)); - return Jsons.jsonNode(destinationV2record); - } - - @Override - public String formatRecord(PartialAirbyteMessage message) { - // Map.of has a @NonNull requirement, so creating a new Hash map - final HashMap destinationV2record = new HashMap<>(); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString()); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, getEmittedAtField(message.getRecord())); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, null); - destinationV2record.put(JavaBaseConstants.COLUMN_NAME_DATA, message.getSerialized()); - return Jsons.serialize(destinationV2record); - } - - protected Object getEmittedAtField(final PartialAirbyteRecordMessage recordMessage) { - // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then - // use BQ helpers to string-format correctly. - final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); - return QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); - } - - protected Object getEmittedAtField(final AirbyteRecordMessage recordMessage) { - // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then - // use BQ helpers to string-format correctly. - final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); - return QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); - } - - protected Object getData(final AirbyteRecordMessage recordMessage) { - final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); - return Jsons.serialize(formattedData); - } - - @Override - public Schema getBigQuerySchema(final JsonNode jsonSchema) { - return SCHEMA_V2; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsAvroBigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsAvroBigQueryRecordFormatter.java deleted file mode 100644 index 111238c8d508..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsAvroBigQueryRecordFormatter.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.formatter; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; - -/** - * Formatter for GCS Avro uploader. Contains specific filling of default Airbyte attributes. - */ -public class GcsAvroBigQueryRecordFormatter extends DefaultBigQueryRecordFormatter { - - public GcsAvroBigQueryRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { - super(jsonSchema, namingResolver); - } - - @Override - protected Object getEmittedAtField(final AirbyteRecordMessage recordMessage) { - return recordMessage.getEmittedAt(); - } - - @Override - protected Object getData(final AirbyteRecordMessage recordMessage) { - return StandardNameTransformer.formatJsonPath(recordMessage.getData()).toString(); - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsCsvBigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsCsvBigQueryRecordFormatter.java deleted file mode 100644 index 8ac49af733c5..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/GcsCsvBigQueryRecordFormatter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.formatter; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; - -/** - * Formatter for GCS CSV uploader. Contains specific filling of default Airbyte attributes. Note! - * That it might be extended during CSV GCS integration. - */ -public class GcsCsvBigQueryRecordFormatter extends DefaultBigQueryRecordFormatter { - - public static final com.google.cloud.bigquery.Schema CSV_SCHEMA = com.google.cloud.bigquery.Schema.of( - Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_DATA, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); - - public GcsCsvBigQueryRecordFormatter(JsonNode jsonSchema, StandardNameTransformer namingResolver) { - super(jsonSchema, namingResolver); - } - - @Override - public Schema getBigQuerySchema(JsonNode jsonSchema) { - return SCHEMA_V2; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java deleted file mode 100644 index 66c3859ebc82..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery.uploader; - -import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.printHeapMemoryConsumption; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.CopyJobConfiguration; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.WriteDisposition; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.cdk.integrations.destination.s3.writer.DestinationWriter; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.bigquery.BigQueryUtils; -import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import java.io.IOException; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractBigQueryUploader { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryUploader.class); - - protected final TableId table; - protected final WriteDisposition syncMode; - protected final T writer; - protected final BigQuery bigQuery; - protected final BigQueryRecordFormatter recordFormatter; - - AbstractBigQueryUploader(final TableId table, - final T writer, - final WriteDisposition syncMode, - final BigQuery bigQuery, - final BigQueryRecordFormatter recordFormatter) { - this.table = table; - this.writer = writer; - this.syncMode = syncMode; - this.bigQuery = bigQuery; - this.recordFormatter = recordFormatter; - } - - protected void postProcessAction(final boolean hasFailed) throws Exception { - // Do nothing by default - } - - public void upload(final AirbyteMessage airbyteMessage) { - try { - writer.write(recordFormatter.formatRecord(airbyteMessage.getRecord())); - } catch (final IOException | RuntimeException e) { - LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); - LOGGER.error(String.format( - "Failed to process a message for job: %s", - writer.toString())); - printHeapMemoryConsumption(); - throw new RuntimeException(e); - } - } - - public void upload(final PartialAirbyteMessage airbyteMessage) { - try { - writer.write(recordFormatter.formatRecord(airbyteMessage)); - } catch (final IOException | RuntimeException e) { - LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); - LOGGER.error(String.format( - "Failed to process a message for job: %s", - writer.toString())); - printHeapMemoryConsumption(); - throw new RuntimeException(e); - } - } - - public void close(final boolean hasFailed, final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) { - try { - recordFormatter.printAndCleanFieldFails(); - - this.writer.close(hasFailed); - - if (!hasFailed) { - uploadData(outputRecordCollector, lastStateMessage); - } - this.postProcessAction(hasFailed); - } catch (final Exception e) { - LOGGER.error(String.format("Failed to close %s writer, \n details: %s", this, e.getMessage())); - printHeapMemoryConsumption(); - throw new RuntimeException(e); - } - } - - public void closeAfterPush() { - try { - this.writer.close(false); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - protected void uploadData(final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception { - try { - outputRecordCollector.accept(lastStateMessage); - LOGGER.info("Final state message is accepted."); - } catch (final Exception e) { - LOGGER.error("Upload data is failed!"); - throw e; - } - } - - public void createRawTable() { - // Ensure that this table exists. - // TODO alter an existing raw table? - final Table rawTable = bigQuery.getTable(table); - if (rawTable == null) { - LOGGER.info("Creating raw table {}.", table); - bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build()); - } else { - LOGGER.info("Found raw table {}.", rawTable.getTableId()); - } - } - - /** - * Creates a partitioned table if the table previously was not partitioned - * - *

    - * Note: this logic is deprecated since it was used for the functionality of migrating unpartitioned - * tables to partitioned tables for performance. Since this change was introduced in Oct 2021 there - * is a well founded belief that any customer's that would have ran a sync in between end of 2022 - * and Oct 2021 would have migrated to a partition table - *

    - * - * @param bigQuery BigQuery interface - * @param schema Schema of the data table - * @param destinationTableId identifier for a table - */ - @Deprecated - public static void partitionIfUnpartitioned(final BigQuery bigQuery, final Schema schema, final TableId destinationTableId) { - try { - final QueryJobConfiguration queryConfig = QueryJobConfiguration - .newBuilder( - String.format("SELECT max(is_partitioning_column) as is_partitioned FROM `%s.%s.INFORMATION_SCHEMA.COLUMNS` WHERE TABLE_NAME = '%s';", - bigQuery.getOptions().getProjectId(), - destinationTableId.getDataset(), - destinationTableId.getTable())) - .setUseLegacySql(false) - .build(); - final ImmutablePair result = BigQueryUtils.executeQuery(bigQuery, queryConfig); - result.getLeft().getQueryResults().getValues().forEach(row -> { - if (!row.get("is_partitioned").isNull() && row.get("is_partitioned").getStringValue().equals("NO")) { - LOGGER.info("Partitioning existing destination table {}", destinationTableId); - final String tmpPartitionTable = Strings.addRandomSuffix("_airbyte_partitioned_table", "_", 5); - final TableId tmpPartitionTableId = TableId.of(destinationTableId.getDataset(), tmpPartitionTable); - // make sure tmpPartitionTable does not already exist - bigQuery.delete(tmpPartitionTableId); - // Use BigQuery SQL to copy because java api copy jobs does not support creating a table from a - // select query, see: - // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_a_partitioned_table_from_a_query_result - final QueryJobConfiguration partitionQuery = QueryJobConfiguration - .newBuilder( - getCreatePartitionedTableFromSelectQuery(schema, bigQuery.getOptions().getProjectId(), - destinationTableId, - tmpPartitionTable)) - .setUseLegacySql(false) - .build(); - BigQueryUtils.executeQuery(bigQuery, partitionQuery); - // Copying data from a partitioned tmp table into an existing non-partitioned table does not make it - // partitioned... thus, we force re-create from scratch by completely deleting and creating new - // table. - bigQuery.delete(destinationTableId); - copyTable(bigQuery, tmpPartitionTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_EMPTY); - bigQuery.delete(tmpPartitionTableId); - } - }); - } catch (final InterruptedException e) { - LOGGER.warn("Had errors while partitioning: ", e); - } - } - - /** - * Copies table from source to destination, while also creating the destination table if not already - * existing - *

    - * https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table - *

    - * - * @param bigQuery BigQuery interface - * @param sourceTableId source table - * @param destinationTableId destination table - * @param syncMode mapping of Airbyte's sync mode to BigQuery's write mode - */ - public static void copyTable(final BigQuery bigQuery, - final TableId sourceTableId, - final TableId destinationTableId, - final JobInfo.WriteDisposition syncMode) { - final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId) - .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) - .setWriteDisposition(syncMode) - .build(); - - final Job job = bigQuery.create(JobInfo.of(configuration)); - AirbyteExceptionHandler.addStringForDeinterpolation(job.getEtag()); - final ImmutablePair jobStringImmutablePair = BigQueryUtils.executeQuery(job); - if (jobStringImmutablePair.getRight() != null) { - LOGGER.error("Failed on copy tables with error:" + job.getStatus()); - throw new RuntimeException("BigQuery was unable to copy table due to an error: \n" + job.getStatus().getError()); - } - LOGGER.info("successfully copied table: {} to table: {}", sourceTableId, destinationTableId); - } - - private static String getCreatePartitionedTableFromSelectQuery(final Schema schema, - final String projectId, - final TableId destinationTableId, - final String tmpPartitionTable) { - return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable) - + schema.getFields().stream() - .map(field -> String.format("%s %s", field.getName(), field.getType())) - .collect(Collectors.joining(", ")) - + ") partition by date(" - + JavaBaseConstants.COLUMN_NAME_EMITTED_AT - + ") as select " - + schema.getFields().stream() - .map(Field::getName) - .collect(Collectors.joining(", ")) - + String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable()); - } - - @Override - public String toString() { - return "AbstractBigQueryUploader{" + - "table=" + table.getTable() + - ", syncMode=" + syncMode + - ", writer=" + writer.getClass() + - ", recordFormatter=" + recordFormatter.getClass() + - '}'; - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java index 2a464e366645..15ad67881215 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryDirectUploader.java @@ -4,29 +4,83 @@ package io.airbyte.integrations.destination.bigquery.uploader; +import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.printHeapMemoryConsumption; + import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.JobInfo.WriteDisposition; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; -import io.airbyte.integrations.destination.bigquery.BigQueryUtils; +import com.google.cloud.bigquery.TableInfo; +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import java.util.function.Consumer; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryDirectUploader { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDirectUploader.class); -public class BigQueryDirectUploader extends AbstractBigQueryUploader { + protected final TableId table; + protected final WriteDisposition syncMode; + protected final BigQueryTableWriter writer; + protected final BigQuery bigQuery; + protected final BigQueryRecordFormatter recordFormatter; + + BigQueryDirectUploader(final TableId table, + final BigQueryTableWriter writer, + final WriteDisposition syncMode, + final BigQuery bigQuery, + final BigQueryRecordFormatter recordFormatter) { + this.table = table; + this.writer = writer; + this.syncMode = syncMode; + this.bigQuery = bigQuery; + this.recordFormatter = recordFormatter; + } + + public void upload(final PartialAirbyteMessage airbyteMessage) { + try { + writer.write(recordFormatter.formatRecord(airbyteMessage)); + } catch (final IOException | RuntimeException e) { + LOGGER.error("Got an error while writing message: {}", e.getMessage(), e); + LOGGER.error(String.format( + "Failed to process a message for job: %s", + writer.toString())); + printHeapMemoryConsumption(); + throw new RuntimeException(e); + } + } + + public void closeAfterPush() { + try { + this.writer.close(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } - public BigQueryDirectUploader(final TableId table, - final BigQueryTableWriter writer, - final WriteDisposition syncMode, - final BigQuery bigQuery, - final BigQueryRecordFormatter recordFormatter) { - super(table, writer, syncMode, bigQuery, recordFormatter); + public void createRawTable() { + // Ensure that this table exists. + final Table rawTable = bigQuery.getTable(table); + if (rawTable == null) { + LOGGER.info("Creating raw table {}.", table); + bigQuery.create(TableInfo.newBuilder(table, StandardTableDefinition.of(recordFormatter.getBigQuerySchema())).build()); + } else { + LOGGER.info("Found raw table {}.", rawTable.getTableId()); + } } @Override - protected void uploadData(final Consumer outputRecordCollector, final AirbyteMessage lastStateMessage) throws Exception { - BigQueryUtils.waitForJobFinish(writer.getWriteChannel().getJob()); - super.uploadData(outputRecordCollector, lastStateMessage); + public String toString() { + return "BigQueryDirectUploader{" + + "table=" + table.getTable() + + ", syncMode=" + syncMode + + ", writer=" + writer.getClass() + + ", recordFormatter=" + recordFormatter.getClass() + + '}'; } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java index d30d26f3b3f9..c7405a995fe2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -47,7 +47,7 @@ public class BigQueryUploaderFactory { More details: """; - public static AbstractBigQueryUploader getUploader(final UploaderConfig uploaderConfig) + public static BigQueryDirectUploader getUploader(final UploaderConfig uploaderConfig) throws IOException { final String dataset = uploaderConfig.getParsedStream().getId().getRawNamespace(); final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java index 3127905d08b6..50c2608ae171 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java @@ -4,50 +4,24 @@ package io.airbyte.integrations.destination.bigquery.writer; -import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.common.base.Charsets; import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler; -import io.airbyte.cdk.integrations.destination.s3.writer.DestinationWriter; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryTableWriter implements DestinationWriter { +public record BigQueryTableWriter(TableDataWriteChannel writeChannel) { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class); - private final TableDataWriteChannel writeChannel; - - public BigQueryTableWriter(final TableDataWriteChannel writeChannel) { - this.writeChannel = writeChannel; - } - - @Override - public void initialize() throws IOException {} - - @Override - public void write(final UUID id, final AirbyteRecordMessage recordMessage) { - throw new RuntimeException("This write method is not used!"); - } - - @Override - public void write(final JsonNode formattedData) throws IOException { - writeChannel.write(ByteBuffer.wrap((Jsons.serialize(formattedData) + "\n").getBytes(Charsets.UTF_8))); - } - - @Override public void write(final String formattedData) throws IOException { writeChannel.write(ByteBuffer.wrap((formattedData + "\n").getBytes(Charsets.UTF_8))); } - @Override - public void close(final boolean hasFailed) throws IOException { + public void close() throws IOException { this.writeChannel.close(); try { final Job job = writeChannel.getJob(); @@ -61,8 +35,4 @@ public void close(final boolean hasFailed) throws IOException { } } - public TableDataWriteChannel getWriteChannel() { - return writeChannel; - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java deleted file mode 100644 index 64f039215108..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import static org.mockito.Mockito.mock; - -import com.google.cloud.bigquery.BigQuery; -import io.airbyte.cdk.integrations.base.DestinationConfig; -import io.airbyte.cdk.integrations.base.FailureTrackingAirbyteMessageConsumer; -import io.airbyte.cdk.integrations.standardtest.destination.PerStreamStateMessageTest; -import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; -import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; -import java.util.Collections; -import java.util.Map; -import java.util.function.Consumer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class BigQueryRecordConsumerTest extends PerStreamStateMessageTest { - - @Mock - private Map> uploaderMap; - @Mock - private Consumer outputRecordCollector; - - private BigQueryRecordConsumer bigQueryRecordConsumer; - - @BeforeEach - public void setup() { - DestinationConfig.initialize(Jsons.deserialize("{}")); - - ParsedCatalog parsedCatalog = new ParsedCatalog(Collections.emptyList()); - BigQueryV1V2Migrator migrator = Mockito.mock(BigQueryV1V2Migrator.class); - bigQueryRecordConsumer = new BigQueryRecordConsumer( - mock(BigQuery.class), - uploaderMap, - outputRecordCollector, - "test-dataset-id", - new NoopTyperDeduper(), - parsedCatalog); - } - - @Override - protected Consumer getMockedConsumer() { - return outputRecordCollector; - } - - @Override - protected FailureTrackingAirbyteMessageConsumer getMessageConsumer() { - return bigQueryRecordConsumer; - } - -} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index e6304e39a6a5..e1106313b24d 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -219,7 +219,8 @@ tutorials: ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.4.18 | 2024-05-10 | [38111](https://github.com/airbytehq/airbyte/pull/38111) | No functional changes, deleting unused code | | 2.4.17 | 2024-05-09 | [38098](https://github.com/airbytehq/airbyte/pull/38098) | Internal build structure change | | 2.4.16 | 2024-05-08 | [37714](https://github.com/airbytehq/airbyte/pull/37714) | Adopt CDK 0.34.0 | | 2.4.15 | 2024-05-07 | [34611](https://github.com/airbytehq/airbyte/pull/34611) | Adopt CDK 0.33.2 |