From 58f18c4ef8584908dd03340daaafecdd10536cbd Mon Sep 17 00:00:00 2001 From: Andrii Leonets <30464745+DoNotPanicUA@users.noreply.github.com> Date: Fri, 9 Sep 2022 01:02:10 +0300 Subject: [PATCH] BigQuery Denormalized : Cover arrays only if they are nested (#14023) * stop covering any array. cover only if we have array of arrays (restriction of BigQuery) * add test with nested arrays and update existing tests * [14058] fix datetime arrays * [11109] cover only array of arrays by object instead of any array * [14058] fix datetime format fail when we have an array of objects with datetime * enable Array and Array+Object DATs * reopen Issue #11166 and disable functionality * Improve the tests by moving common part to Utils * Add tests to check `Array of arrays` cases * Increase version * Doc * format * review update: - update comment about reopen issue - added test case with multiply array sub values - fix nested arrays with datetime - add test case for nested arrays with datetime * fix date formatting * disable testAnyOf test and upd comments * remove some code duplication in the tests * [14668] cover by tests the BigQuery inheritance limitation * Make GCS implementation running same tests as standard impl * Make common format for returning date values to cover DateTime and Timestamp columns by one test * [15363] add backward compatibility for existing connections. * Populate stream config and messages by tablespace. Now it's required inside processing. * Compare only fields from the stream config * Rework BigQueryUploaderFactory and UploaderConfig to have possibility make a decision about array formmater before we create temporary table * Compare fields * remove extra logging * fix project:dataset format of the datasetId * missing import * remove debug logging * fix log messages * format * 4 > 3 --- .../BigQueryDenormalizedDestination.java | 128 ++++++ ...ltBigQueryDenormalizedRecordFormatter.java | 102 ++--- .../arrayformater/ArrayFormatter.java | 18 + .../arrayformater/DefaultArrayFormatter.java | 83 ++++ .../arrayformater/LegacyArrayFormatter.java | 66 +++ ...DenormalizedDestinationAcceptanceTest.java | 89 +---- .../BigQueryDenormalizedDestinationTest.java | 321 ++++++--------- ...ormalizedGcsDestinationAcceptanceTest.java | 39 +- ...igQueryDenormalizedGcsDestinationTest.java | 377 +----------------- .../BigQueryDenormalizedTestConstants.java | 24 ++ .../BigQueryDenormalizedTestDataUtils.java | 186 +++++++++ .../resources/testdata/dataArrays.json | 14 + .../testdata/dataMaxNestedDepth.json | 31 ++ .../testdata/dataTooDeepNestedDepth.json | 33 ++ .../testdata/expectedDataArrays.json | 50 +++ .../resources/testdata/schemaArrays.json | 78 ++++ .../testdata/schemaMaxNestedDepth.json | 113 ++++++ .../testdata/schemaTooDeepNestedDepth.json | 117 ++++++ .../BigQueryDenormalizedUtilsTest.java | 3 +- .../BigQueryDenormalizedTestSchemaUtils.java | 8 + .../resources/schemas/expectedSchema.json | 36 +- .../schemas/expectedSchemaArrays.json | 85 ++++ .../expectedSchemaWithInvalidArrayType.json | 32 +- .../test/resources/schemas/schemaArrays.json | 55 +++ .../bigquery/BigQueryDestination.java | 16 +- .../bigquery/BigQueryGcsOperations.java | 3 +- .../bigquery/BigQueryRecordConsumer.java | 6 +- .../destination/bigquery/BigQueryUtils.java | 30 +- .../formatter/BigQueryRecordFormatter.java | 8 +- .../uploader/AbstractBigQueryUploader.java | 4 + .../uploader/BigQueryUploaderFactory.java | 8 +- .../uploader/config/UploaderConfig.java | 14 + .../BigQueryDestinationAcceptanceTest.java | 4 +- 33 files changed, 1370 insertions(+), 811 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/ArrayFormatter.java create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/DefaultArrayFormatter.java create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/LegacyArrayFormatter.java create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestConstants.java create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataArrays.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataMaxNestedDepth.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataTooDeepNestedDepth.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/expectedDataArrays.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaArrays.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaMaxNestedDepth.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaTooDeepNestedDepth.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaArrays.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/schemaArrays.json diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index 3660e3fbea75..5caf9d1c71f9 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -4,22 +4,37 @@ package io.airbyte.integrations.destination.bigquery; +import static com.google.cloud.bigquery.Field.Mode.REPEATED; + import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Table; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter; import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter; +import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.LegacyArrayFormatter; +import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; +import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; +import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; +import io.airbyte.protocol.models.AirbyteStream; +import java.io.IOException; import java.util.Map; import java.util.function.BiFunction; import java.util.function.Function; +import javax.annotation.Nullable; import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BigQueryDenormalizedDestination extends BigQueryDestination { + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class); + @Override protected String getTargetTableName(final String streamName) { // This BigQuery destination does not write to a staging "raw" table but directly to a normalized @@ -67,6 +82,119 @@ protected Function getTargetTableNameTransformer(final BigQueryS return namingResolver::getIdentifier; } + @Override + protected void putStreamIntoUploaderMap(AirbyteStream stream, + UploaderConfig uploaderConfig, + Map> uploaderMap) + throws IOException { + Table existingTable = + uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName()); + BigQueryRecordFormatter formatter = uploaderConfig.getFormatter(); + + if (existingTable != null) { + LOGGER.info("Target table already exists. Checking could we use the default destination processing."); + if (!compareSchemas((formatter.getBigQuerySchema()), existingTable.getDefinition().getSchema())) { + ((DefaultBigQueryDenormalizedRecordFormatter) formatter).setArrayFormatter(new LegacyArrayFormatter()); + LOGGER.warn("Existing target table has different structure with the new destination processing. Trying legacy implementation."); + } else { + LOGGER.info("Existing target table {} has equal structure with the destination schema. Using the default array processing.", + stream.getName()); + } + } else { + LOGGER.info("Target table is not created yet. The default destination processing will be used."); + } + + AbstractBigQueryUploader uploader = BigQueryUploaderFactory.getUploader(uploaderConfig); + uploaderMap.put( + AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + uploader); + } + + /** + * Compare calculated bigquery schema and existing schema of the table. Note! We compare only fields + * from the calculated schema to avoid manually created fields in the table. + * + * @param expectedSchema BigQuery schema of the table which we calculated using the stream schema + * config + * @param existingSchema BigQuery schema of the existing table (created by previous run) + * @return Are calculated fields same as we have in the existing table + */ + private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema, @Nullable com.google.cloud.bigquery.Schema existingSchema) { + if (expectedSchema != null && existingSchema == null) { + LOGGER.warn("Existing schema is null when we expect {}", expectedSchema); + return false; + } else if (expectedSchema == null && existingSchema == null) { + LOGGER.info("Existing and expected schemas are null."); + return true; + } else if (expectedSchema == null) { + LOGGER.warn("Expected schema is null when we have existing schema {}", existingSchema); + return false; + } + + var expectedFields = expectedSchema.getFields(); + var existingFields = existingSchema.getFields(); + + for (Field expectedField : expectedFields) { + var existingField = existingFields.get(expectedField.getName()); + if (isDifferenceBetweenFields(expectedField, existingField)) { + LOGGER.warn("Expected field {} is different from existing field {}", expectedField, existingField); + return false; + } + } + + LOGGER.info("Existing and expected schemas are equal."); + return true; + } + + private boolean isDifferenceBetweenFields(Field expectedField, Field existingField) { + if (existingField == null) { + return true; + } else { + return !expectedField.getType().equals(existingField.getType()) + || !compareRepeatedMode(expectedField, existingField) + || !compareSubFields(expectedField, existingField); + } + } + + /** + * Compare field modes. Field can have on of four modes: NULLABLE, REQUIRED, REPEATED, null. Only + * the REPEATED mode difference is critical. The method fails only if at least one is REPEATED and + * the second one is not. + * + * @param expectedField expected field structure + * @param existingField existing field structure + * @return is critical difference in the field modes + */ + private boolean compareRepeatedMode(Field expectedField, Field existingField) { + var expectedMode = expectedField.getMode(); + var existingMode = existingField.getMode(); + + if (expectedMode != null && expectedMode.equals(REPEATED) || existingMode != null && existingMode.equals(REPEATED)) { + return expectedMode != null && expectedMode.equals(existingMode); + } else { + return true; + } + } + + private boolean compareSubFields(Field expectedField, Field existingField) { + var expectedSubFields = expectedField.getSubFields(); + var existingSubFields = existingField.getSubFields(); + + if (expectedSubFields == null || expectedSubFields.isEmpty()) { + return true; + } else if (existingSubFields == null || existingSubFields.isEmpty()) { + return false; + } else { + for (Field expectedSubField : expectedSubFields) { + var existingSubField = existingSubFields.get(expectedSubField.getName()); + if (isDifferenceBetweenFields(expectedSubField, existingSubField)) { + return false; + } + } + return true; + } + } + public static void main(final String[] args) throws Exception { final Destination destination = new BigQueryDenormalizedDestination(); new IntegrationRunner(destination).run(args); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index 198f6200fe81..ae4235d95fd3 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Builder; @@ -26,6 +25,8 @@ import io.airbyte.integrations.destination.bigquery.BigQueryUtils; import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat; import io.airbyte.integrations.destination.bigquery.JsonSchemaType; +import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.ArrayFormatter; +import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.DefaultArrayFormatter; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.util.Collections; @@ -42,26 +43,39 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBigQueryDenormalizedRecordFormatter.class); - public static final String NESTED_ARRAY_FIELD = "big_query_array"; - protected static final String PROPERTIES_FIELD = "properties"; - private static final String TYPE_FIELD = "type"; + public static final String PROPERTIES_FIELD = "properties"; + public static final String TYPE_FIELD = "type"; private static final String ALL_OF_FIELD = "allOf"; private static final String ANY_OF_FIELD = "anyOf"; - private static final String ARRAY_ITEMS_FIELD = "items"; private static final String FORMAT_FIELD = "format"; private static final String AIRBYTE_TYPE = "airbyte_type"; private static final String REF_DEFINITION_KEY = "$ref"; private static final ObjectMapper mapper = new ObjectMapper(); + protected ArrayFormatter arrayFormatter; + public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { super(jsonSchema, namingResolver); } + private ArrayFormatter getArrayFormatter() { + if (arrayFormatter == null) { + arrayFormatter = new DefaultArrayFormatter(); + } + return arrayFormatter; + } + + public void setArrayFormatter(ArrayFormatter arrayFormatter) { + this.arrayFormatter = arrayFormatter; + this.jsonSchema = formatJsonSchema(this.originalJsonSchema.deepCopy()); + this.bigQuerySchema = getBigQuerySchema(jsonSchema); + } + @Override protected JsonNode formatJsonSchema(final JsonNode jsonSchema) { - var modifiedJsonSchema = formatAllOfAndAnyOfFields(namingResolver, jsonSchema); - populateEmptyArrays(modifiedJsonSchema); - surroundArraysByObjects(modifiedJsonSchema); + var modifiedJsonSchema = jsonSchema.deepCopy(); // Issue #5912 is reopened (PR #11166) formatAllOfAndAnyOfFields(namingResolver, jsonSchema); + getArrayFormatter().populateEmptyArrays(modifiedJsonSchema); + getArrayFormatter().surroundArraysByObjects(modifiedJsonSchema); return modifiedJsonSchema; } @@ -76,53 +90,6 @@ private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingR return modifiedSchema; } - private List findArrays(final JsonNode node) { - if (node != null) { - return node.findParents(TYPE_FIELD).stream() - .filter( - jsonNode -> { - final JsonNode type = jsonNode.get(TYPE_FIELD); - if (type.isArray()) { - final ArrayNode typeNode = (ArrayNode) type; - for (final JsonNode arrayTypeNode : typeNode) { - if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) { - return true; - } - } - } else if (type.isTextual()) { - return jsonNode.asText().equals("array"); - } - return false; - }) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } - - private void populateEmptyArrays(final JsonNode node) { - findArrays(node).forEach(jsonNode -> { - if (!jsonNode.has(ARRAY_ITEMS_FIELD)) { - final ObjectNode nodeToChange = (ObjectNode) jsonNode; - nodeToChange.putObject(ARRAY_ITEMS_FIELD).putArray(TYPE_FIELD).add("string"); - } - }); - } - - private void surroundArraysByObjects(final JsonNode node) { - findArrays(node).forEach( - jsonNode -> { - final JsonNode arrayNode = jsonNode.deepCopy(); - - final ObjectNode newNode = (ObjectNode) jsonNode; - newNode.removeAll(); - newNode.putArray(TYPE_FIELD).add("object"); - newNode.putObject(PROPERTIES_FIELD).set(NESTED_ARRAY_FIELD, arrayNode); - - surroundArraysByObjects(arrayNode.get(ARRAY_ITEMS_FIELD)); - }); - } - @Override public JsonNode formatRecord(final AirbyteRecordMessage recordMessage) { // Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then @@ -153,25 +120,32 @@ protected void addAirbyteColumns(final ObjectNode data, final AirbyteRecordMessa data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt); } - protected JsonNode formatData(final FieldList fields, final JsonNode root) { + private JsonNode formatData(final FieldList fields, final JsonNode root) { // handles empty objects and arrays if (fields == null) { return root; } - formatDateTimeFields(fields, root); + JsonNode formattedData; if (root.isObject()) { - return getObjectNode(fields, root); + formattedData = getObjectNode(fields, root); } else if (root.isArray()) { - return getArrayNode(fields, root); + formattedData = getArrayNode(fields, root); } else { - return root; + formattedData = root; } + formatDateTimeFields(fields, formattedData); + + return formattedData; } protected void formatDateTimeFields(final FieldList fields, final JsonNode root) { final List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields); if (!dateTimeFields.isEmpty() && !root.isNull()) { - BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root); + if (root.isArray()) { + root.forEach(jsonNode -> BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, jsonNode)); + } else { + BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, root); + } } } @@ -185,11 +159,11 @@ private JsonNode getArrayNode(final FieldList fields, final JsonNode root) { } else { subFields = arrayField.getSubFields(); } - final JsonNode items = Jsons.jsonNode(MoreIterators.toList(root.elements()).stream() + List arrayItems = MoreIterators.toList(root.elements()).stream() .map(p -> formatData(subFields, p)) - .collect(Collectors.toList())); + .toList(); - return Jsons.jsonNode(ImmutableMap.of(NESTED_ARRAY_FIELD, items)); + return getArrayFormatter().formatArrayItems(arrayItems); } private JsonNode getObjectNode(final FieldList fields, final JsonNode root) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/ArrayFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/ArrayFormatter.java new file mode 100644 index 000000000000..75d717751756 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/ArrayFormatter.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.formatter.arrayformater; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; + +public interface ArrayFormatter { + + void populateEmptyArrays(final JsonNode node); + + void surroundArraysByObjects(final JsonNode node); + + JsonNode formatArrayItems(final List arrayItems); + +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/DefaultArrayFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/DefaultArrayFormatter.java new file mode 100644 index 000000000000..b7ae89fee5a0 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/DefaultArrayFormatter.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.formatter.arrayformater; + +import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.PROPERTIES_FIELD; +import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.TYPE_FIELD; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class DefaultArrayFormatter implements ArrayFormatter { + + public static final String NESTED_ARRAY_FIELD = "big_query_array"; + public static final String ARRAY_ITEMS_FIELD = "items"; + + @Override + public void populateEmptyArrays(final JsonNode node) { + findArrays(node).forEach(jsonNode -> { + if (!jsonNode.has(ARRAY_ITEMS_FIELD)) { + final ObjectNode nodeToChange = (ObjectNode) jsonNode; + nodeToChange.putObject(ARRAY_ITEMS_FIELD).putArray(TYPE_FIELD).add("string"); + } + }); + } + + @Override + public void surroundArraysByObjects(final JsonNode node) { + findArrays(node).forEach( + jsonNode -> { + if (isAirbyteArray(jsonNode.get(ARRAY_ITEMS_FIELD))) { + final ObjectNode arrayNode = jsonNode.get(ARRAY_ITEMS_FIELD).deepCopy(); + final ObjectNode originalNode = (ObjectNode) jsonNode; + + originalNode.remove(ARRAY_ITEMS_FIELD); + final ObjectNode itemsNode = originalNode.putObject(ARRAY_ITEMS_FIELD); + itemsNode.putArray(TYPE_FIELD).add("object"); + itemsNode.putObject(PROPERTIES_FIELD).putObject(NESTED_ARRAY_FIELD).setAll(arrayNode); + + surroundArraysByObjects(originalNode.get(ARRAY_ITEMS_FIELD)); + } + }); + } + + @Override + public JsonNode formatArrayItems(List arrayItems) { + return Jsons + .jsonNode(arrayItems.stream().map(node -> (node.isArray() ? Jsons.jsonNode(ImmutableMap.of(NESTED_ARRAY_FIELD, node)) : node)).toList()); + } + + protected List findArrays(final JsonNode node) { + if (node != null) { + return node.findParents(TYPE_FIELD).stream() + .filter(this::isAirbyteArray) + .collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + protected boolean isAirbyteArray(final JsonNode node) { + final JsonNode type = node.get(TYPE_FIELD); + if (type.isArray()) { + final ArrayNode typeNode = (ArrayNode) type; + for (final JsonNode arrayTypeNode : typeNode) { + if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) { + return true; + } + } + } else if (type.isTextual()) { + return node.asText().equals("array"); + } + return false; + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/LegacyArrayFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/LegacyArrayFormatter.java new file mode 100644 index 000000000000..d36649780a44 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/arrayformater/LegacyArrayFormatter.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery.formatter.arrayformater; + +import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.PROPERTIES_FIELD; +import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.TYPE_FIELD; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class LegacyArrayFormatter extends DefaultArrayFormatter { + + @Override + public void surroundArraysByObjects(final JsonNode node) { + findArrays(node).forEach( + jsonNode -> { + final JsonNode arrayNode = jsonNode.deepCopy(); + + final ObjectNode newNode = (ObjectNode) jsonNode; + newNode.removeAll(); + newNode.putArray(TYPE_FIELD).add("object"); + newNode.putObject(PROPERTIES_FIELD).set(NESTED_ARRAY_FIELD, arrayNode); + + surroundArraysByObjects(arrayNode.get(ARRAY_ITEMS_FIELD)); + }); + } + + @Override + protected List findArrays(final JsonNode node) { + if (node != null) { + return node.findParents(TYPE_FIELD).stream() + .filter( + jsonNode -> { + final JsonNode type = jsonNode.get(TYPE_FIELD); + if (type.isArray()) { + final ArrayNode typeNode = (ArrayNode) type; + for (final JsonNode arrayTypeNode : typeNode) { + if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) { + return true; + } + } + } else if (type.isTextual()) { + return jsonNode.asText().equals("array"); + } + return false; + }) + .collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + @Override + public JsonNode formatArrayItems(List arrayItems) { + return Jsons.jsonNode(ImmutableMap.of(NESTED_ARRAY_FIELD, arrayItems)); + } + +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java index a40f7cf0dd18..66c30fcd3061 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java @@ -4,16 +4,20 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.AIRBYTE_COLUMNS; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CONFIG_PROJECT_ID; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.NAME_TRANSFORMER; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.configureBigQuery; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.createCommonConfig; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getBigQueryDataSet; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.tearDownBigQuery; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.ConnectionProperty; import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.FieldValue; @@ -23,11 +27,9 @@ import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.string.Strings; import io.airbyte.db.bigquery.BigQueryResultSet; import io.airbyte.db.bigquery.BigQuerySourceOperations; import io.airbyte.integrations.base.JavaBaseConstants; @@ -41,11 +43,7 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -61,19 +59,9 @@ public class BigQueryDenormalizedDestinationAcceptanceTest extends DestinationAcceptanceTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationAcceptanceTest.class); - private static final BigQuerySQLNameTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); - - protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - - private static final String CONFIG_DATASET_ID = "dataset_id"; - protected static final String CONFIG_PROJECT_ID = "project_id"; - private static final String CONFIG_DATASET_LOCATION = "dataset_location"; - private static final String CONFIG_CREDS = "credentials_json"; - private static final List AIRBYTE_COLUMNS = List.of(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); private BigQuery bigquery; private Dataset dataset; - private boolean tornDown; private JsonNode config; private final StandardNameTransformer namingResolver = new StandardNameTransformer(); @@ -126,7 +114,7 @@ protected boolean supportBasicDataTypeTest() { // #13154 Normalization issue @Override protected boolean supportArrayDataTypeTest() { - return false; + return true; } @Override @@ -150,7 +138,7 @@ protected void assertNamespaceNormalization(final String testCaseId, @Override protected String getDefaultSchema(final JsonNode config) { - return config.get(CONFIG_DATASET_ID).asText(); + return BigQueryUtils.getDatasetId(config); } @Override @@ -216,70 +204,19 @@ private Object getTypedFieldValue(final FieldValueList row, final Field field) { } protected JsonNode createConfig() throws IOException { - final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); - final String datasetLocation = "US"; - final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - - return Jsons.jsonNode(ImmutableMap.builder() - .put(CONFIG_PROJECT_ID, projectId) - .put(CONFIG_CREDS, credentialsJson.toString()) - .put(CONFIG_DATASET_ID, datasetId) - .put(CONFIG_DATASET_LOCATION, datasetLocation) - .build()); + return createCommonConfig(); } @Override protected void setup(final TestDestinationEnv testEnv) throws Exception { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - config = createConfig(); - final ServiceAccountCredentials credentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(config.get(CONFIG_CREDS).asText().getBytes(StandardCharsets.UTF_8))); - - bigquery = BigQueryOptions.newBuilder() - .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) - .setCredentials(credentials) - .build() - .getService(); - - final DatasetInfo datasetInfo = - DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); - dataset = bigquery.create(datasetInfo); - - tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); + bigquery = configureBigQuery(config); + dataset = getBigQueryDataSet(config, bigquery); } @Override protected void tearDown(final TestDestinationEnv testEnv) { - tearDownBigQuery(); - } - - protected void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; + tearDownBigQuery(dataset, bigquery); } // todo (cgardens) - figure out how to share these helpers. they are currently copied from diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 8561f290a6a5..eb541afc32c1 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -4,56 +4,57 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.AIRBYTE_COLUMNS; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.BIGQUERY_DATETIME_FORMAT; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.USERS_STREAM_NAME; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.configureBigQuery; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.createCommonConfig; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormats; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithEmptyList; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithNull; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfSchema; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getBigQueryDataSet; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getCommonCatalog; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getData; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataArrays; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataMaxNestedDepth; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataTooDeepNestedDepth; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithEmptyObjectAndArray; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithFormats; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithJSONDateTimeFormats; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithJSONWithReference; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithNestedDatetimeInsideNullObject; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getExpectedDataArrays; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchema; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaArrays; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaMaxNestedDepth; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaTooDeepNestedDepth; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithDateTime; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithFormats; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithInvalidArrayType; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithNestedDatetimeInsideNullObject; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithReferenceDefinition; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.runDestinationWrite; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.tearDownBigQuery; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.DestinationSyncMode; -import io.airbyte.protocol.models.SyncMode; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Instant; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -62,8 +63,10 @@ import java.util.stream.StreamSupport; import org.assertj.core.util.Sets; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; @@ -74,34 +77,31 @@ class BigQueryDenormalizedDestinationTest { - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - private static final Set AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID); - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationTest.class); - - private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; - private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData()); - private static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray()); - private static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats()); - private static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats()); - private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference()); - private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}")); - private static final AirbyteMessage MESSAGE_USERS7 = createRecordMessage(USERS_STREAM_NAME, getDataWithNestedDatetimeInsideNullObject()); - private static final AirbyteMessage MESSAGE_USERS8 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormats()); - private static final AirbyteMessage MESSAGE_USERS9 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithNull()); - private static final AirbyteMessage MESSAGE_USERS10 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithEmptyList()); - private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}")); - - private JsonNode config; - - private BigQuery bigquery; - private Dataset dataset; - private ConfiguredAirbyteCatalog catalog; - private String datasetId; - - private boolean tornDown = true; + protected static final Instant NOW = Instant.now(); + protected static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData()); + protected static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray()); + protected static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats()); + protected static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats()); + protected static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference()); + protected static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}")); + protected static final AirbyteMessage MESSAGE_USERS7 = createRecordMessage(USERS_STREAM_NAME, getDataWithNestedDatetimeInsideNullObject()); + protected static final AirbyteMessage MESSAGE_USERS8 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormats()); + protected static final AirbyteMessage MESSAGE_USERS9 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithNull()); + protected static final AirbyteMessage MESSAGE_USERS10 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithEmptyList()); + protected static final AirbyteMessage MESSAGE_USERS11 = createRecordMessage(USERS_STREAM_NAME, getDataArrays()); + protected static final AirbyteMessage MESSAGE_USERS12 = createRecordMessage(USERS_STREAM_NAME, getDataTooDeepNestedDepth()); + protected static final AirbyteMessage MESSAGE_USERS13 = createRecordMessage(USERS_STREAM_NAME, getDataMaxNestedDepth()); + protected static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}")); + + protected JsonNode config; + protected BigQuery bigquery; + protected Dataset dataset; + protected String datasetId; + + protected JsonNode createConfig() throws IOException { + return createCommonConfig(); + } @BeforeEach void setup(final TestInfo info) throws IOException { @@ -109,25 +109,11 @@ void setup(final TestInfo info) throws IOException { return; } - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - final ServiceAccountCredentials credentials = - ServiceAccountCredentials.fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8))); - bigquery = BigQueryOptions.newBuilder() - .setProjectId(projectId) - .setCredentials(credentials) - .build() - .getService(); - - datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - final String datasetLocation = "EU"; + config = createConfig(); + bigquery = configureBigQuery(config); + dataset = getBigQueryDataSet(config, bigquery); + datasetId = dataset.getDatasetId().getDataset(); + MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_USERS3.getRecord().setNamespace(datasetId); @@ -138,29 +124,11 @@ void setup(final TestInfo info) throws IOException { MESSAGE_USERS8.getRecord().setNamespace(datasetId); MESSAGE_USERS9.getRecord().setNamespace(datasetId); MESSAGE_USERS10.getRecord().setNamespace(datasetId); + MESSAGE_USERS11.getRecord().setNamespace(datasetId); + MESSAGE_USERS12.getRecord().setNamespace(datasetId); + MESSAGE_USERS13.getRecord().setNamespace(datasetId); EMPTY_MESSAGE.getRecord().setNamespace(datasetId); - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); - dataset = bigquery.create(datasetInfo); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); - } @AfterEach @@ -169,35 +137,13 @@ void tearDown(final TestInfo info) { return; } - tearDownBigQuery(); - } - - private void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; + tearDownBigQuery(dataset, bigquery); } @ParameterizedTest @MethodSource("schemaAndDataProvider") void testNestedWrite(final JsonNode schema, final AirbyteMessage message) throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(schema)) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(message); - consumer.close(); + runDestinationWrite(getCommonCatalog(schema, datasetId), config, message); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = message.getRecord().getData(); @@ -210,16 +156,7 @@ void testNestedWrite(final JsonNode schema, final AirbyteMessage message) throws @Test void testNestedDataTimeInsideNullObject() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream( - new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithNestedDatetimeInsideNullObject())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS7); - consumer.close(); + runDestinationWrite(getCommonCatalog(getSchemaWithNestedDatetimeInsideNullObject(), datasetId), config, MESSAGE_USERS7); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = MESSAGE_USERS7.getRecord().getData(); @@ -229,17 +166,18 @@ void testNestedDataTimeInsideNullObject() throws Exception { assertEquals(extractJsonValues(resultJson, "appointment"), extractJsonValues(expectedUsersJson, "appointment")); } + protected Schema getExpectedSchemaForWriteWithFormatTest() { + return Schema.of( + Field.of("name", StandardSQLTypeName.STRING), + Field.of("date_of_birth", StandardSQLTypeName.DATE), + Field.of("updated_at", StandardSQLTypeName.DATETIME), + Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), + Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); + } + @Test void testWriteWithFormat() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithFormats())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS3); - consumer.close(); + runDestinationWrite(getCommonCatalog(getSchemaWithFormats(), datasetId), config, MESSAGE_USERS3); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = MESSAGE_USERS3.getRecord().getData(); @@ -250,29 +188,20 @@ void testWriteWithFormat() throws Exception { // Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't // expect to receive the value we sent. - assertEquals(Set.of("2021-10-11T06:36:53"), extractJsonValues(resultJson, "updated_at")); - - final Schema expectedSchema = Schema.of( - Field.of("name", StandardSQLTypeName.STRING), - Field.of("date_of_birth", StandardSQLTypeName.DATE), - Field.of("updated_at", StandardSQLTypeName.DATETIME), - Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), - Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); - - assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); + var expectedValue = LocalDate.parse(extractJsonValues(expectedUsersJson, "updated_at").stream().findFirst().get(), + DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)); + var actualValue = + LocalDate.parse(extractJsonValues(resultJson, "updated_at").stream().findFirst().get(), + DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)); + assertEquals(expectedValue, actualValue); + + assertEquals(BigQueryUtils.getTableDefinition(bigquery, datasetId, USERS_STREAM_NAME).getSchema(), getExpectedSchemaForWriteWithFormatTest()); } @Test + @Disabled // Issue #5912 is reopened void testAnyOf() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS8); - consumer.close(); + runDestinationWrite(getCommonCatalog(getAnyOfSchema(), datasetId), config, MESSAGE_USERS8); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = MESSAGE_USERS8.getRecord().getData(); @@ -293,16 +222,9 @@ void testAnyOf() throws Exception { } @Test + @Disabled // Issue #5912 is reopened void testAnyOfWithNull() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS9); - consumer.close(); + runDestinationWrite(getCommonCatalog(getAnyOfSchema(), datasetId), config, MESSAGE_USERS9); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = MESSAGE_USERS9.getRecord().getData(); @@ -315,16 +237,9 @@ void testAnyOfWithNull() throws Exception { } @Test + @Disabled // Issue #5912 is reopened void testAnyOfWithEmptyList() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS10); - consumer.close(); + runDestinationWrite(getCommonCatalog(getAnyOfSchema(), datasetId), config, MESSAGE_USERS10); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); final JsonNode expectedUsersJson = MESSAGE_USERS10.getRecord().getData(); @@ -337,41 +252,26 @@ void testAnyOfWithEmptyList() throws Exception { @Test void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithDateTime())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS4); - consumer.close(); + runDestinationWrite(getCommonCatalog(getSchemaWithDateTime(), datasetId), config, MESSAGE_USERS4); final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); assertEquals(usersActual.size(), 1); final JsonNode resultJson = usersActual.get(0); // BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format - // returns "yyyy-MM-dd'T'HH:mm:ss" format - assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson, "updated_at")); + Set actualValues = extractJsonValues(resultJson, "updated_at"); + assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").withZone(DateTimeZone.UTC).toString(BIGQUERY_DATETIME_FORMAT)), + actualValues); + // check nested datetime - assertEquals(Set.of(new DateTime("2021-11-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), - extractJsonValues(resultJson.get("items"), "nested_datetime")); + actualValues = extractJsonValues(resultJson.get("items"), "nested_datetime"); + assertEquals(Set.of(new DateTime("2021-11-11T06:36:53+00:00").withZone(DateTimeZone.UTC).toString(BIGQUERY_DATETIME_FORMAT)), + actualValues); } @Test void testJsonReferenceDefinition() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithReferenceDefinition())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS5); - consumer.accept(MESSAGE_USERS6); - consumer.accept(EMPTY_MESSAGE); - consumer.close(); + runDestinationWrite(getCommonCatalog(getSchemaWithReferenceDefinition(), datasetId), config, MESSAGE_USERS5, MESSAGE_USERS6, EMPTY_MESSAGE); final Set actual = retrieveRecordsAsJson(USERS_STREAM_NAME).stream().flatMap(x -> extractJsonValues(x, "users").stream()).collect(Collectors.toSet()); @@ -385,6 +285,32 @@ void testJsonReferenceDefinition() throws Exception { assertEquals(expected, actual); } + @Test + void testArrays() throws Exception { + runDestinationWrite(getCommonCatalog(getSchemaArrays(), datasetId), config, MESSAGE_USERS11); + + assertEquals(getExpectedDataArrays(), retrieveRecordsAsJson(USERS_STREAM_NAME).get(0)); + } + + // Issue #14668 + @Test + void testTooDeepNestedDepth() { + try { + runDestinationWrite(getCommonCatalog(getSchemaTooDeepNestedDepth(), datasetId), config, MESSAGE_USERS12); + } catch (Exception e) { + assert (e.getCause().getMessage().contains("nested too deeply")); + } + } + + // Issue #14668 + @Test + void testMaxNestedDepth() throws Exception { + runDestinationWrite(getCommonCatalog(getSchemaMaxNestedDepth(), datasetId), config, MESSAGE_USERS13); + + assertEquals(getDataMaxNestedDepth().findValue("str_value").asText(), + retrieveRecordsAsJson(USERS_STREAM_NAME).get(0).findValue("str_value").asText()); + } + private Set extractJsonValues(final JsonNode node, final String attributeName) { final List valuesNode = node.findValues(attributeName); final Set resultSet = new HashSet<>(); @@ -402,7 +328,7 @@ private Set extractJsonValues(final JsonNode node, final String attribut } private JsonNode removeAirbyteMetadataFields(final JsonNode record) { - for (final String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) { + for (final String airbyteMetadataField : AIRBYTE_COLUMNS) { ((ObjectNode) record).remove(airbyteMetadataField); } return record; @@ -412,18 +338,29 @@ private List retrieveRecordsAsJson(final String tableName) throws Exce final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder( - String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) + String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", datasetId, tableName.toLowerCase())) .setUseLegacySql(false).build(); BigQueryUtils.executeQuery(bigquery, queryConfig); - return StreamSupport + var valuesStream = StreamSupport .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) - .map(v -> v.get("jsonValue").getStringValue()) + .map(v -> v.get("jsonValue").getStringValue()); + return formatDateValues(valuesStream) .map(Jsons::deserialize) .map(this::removeAirbyteMetadataFields) .collect(Collectors.toList()); } + /** + * BigQuery returns date values in a different format based on the column type. Datetime : + * YYYY-MM-DD'T'HH:MM:SS Timestamp : YYYY-MM-DD'T'HH:MM:SS'Z' + * + * This method formats all values as Airbite format to simplify test result validation. + */ + private Stream formatDateValues(Stream values) { + return values.map(s -> s.replaceAll("(\"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2})(Z)(\")", "$1$3")); + } + private static Stream schemaAndDataProvider() { return Stream.of( arguments(getSchema(), MESSAGE_USERS1), diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java index 23233ad2543f..e928355e952f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java @@ -4,49 +4,16 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.createGcsConfig; + import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; import java.io.IOException; -import java.nio.file.Files; public class BigQueryDenormalizedGcsDestinationAcceptanceTest extends BigQueryDenormalizedDestinationAcceptanceTest { @Override protected JsonNode createConfig() throws IOException { - final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - - final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(credentialsJsonString); - final JsonNode bigqueryConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final JsonNode gcsConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.GCS_CONFIG); - - final String projectId = bigqueryConfigFromSecretFile.get(CONFIG_PROJECT_ID).asText(); - final String datasetLocation = "US"; - - final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - - final JsonNode gcsCredentialFromSecretFile = gcsConfigFromSecretFile.get(BigQueryConsts.CREDENTIAL); - final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CREDENTIAL_TYPE, gcsCredentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) - .build()); - - final JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) - .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) - .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.CREDENTIAL, credential) - .build()); - - return Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, bigqueryConfigFromSecretFile.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) - .put(BigQueryConsts.LOADING_METHOD, loadingMethod) - .build()); + return createGcsConfig(); } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java index 2f14e32a8f81..911ebb2c6d91 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java @@ -4,391 +4,30 @@ package io.airbyte.integrations.destination.bigquery; -import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.NESTED_ARRAY_FIELD; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getData; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithEmptyObjectAndArray; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithFormats; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithJSONDateTimeFormats; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithJSONWithReference; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchema; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithDateTime; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithFormats; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithInvalidArrayType; -import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithReferenceDefinition; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.params.provider.Arguments.arguments; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.createGcsConfig; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.DestinationSyncMode; -import io.airbyte.protocol.models.SyncMode; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Instant; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.assertj.core.util.Sets; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -class BigQueryDenormalizedGcsDestinationTest { +class BigQueryDenormalizedGcsDestinationTest extends BigQueryDenormalizedDestinationTest { - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - private static final Set AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID); - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGcsDestinationTest.class); - - private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; - private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData()); - private static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray()); - private static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats()); - private static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats()); - private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference()); - private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}")); - private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}")); - - private JsonNode config; - private AmazonS3 s3Client; - - private BigQuery bigquery; - private Dataset dataset; - private ConfiguredAirbyteCatalog catalog; - private String datasetId; - - private boolean tornDown = true; - - @BeforeEach - void setup(final TestInfo info) throws IOException { - if (info.getDisplayName().equals("testSpec()")) { - return; - } - - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final JsonNode credentialsGcsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.GCS_CONFIG); - - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - final ServiceAccountCredentials credentials = - ServiceAccountCredentials.fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8))); - bigquery = BigQueryOptions.newBuilder() - .setProjectId(projectId) - .setCredentials(credentials) - .build() - .getService(); - - datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - final String datasetLocation = "EU"; - MESSAGE_USERS1.getRecord().setNamespace(datasetId); - MESSAGE_USERS2.getRecord().setNamespace(datasetId); - MESSAGE_USERS3.getRecord().setNamespace(datasetId); - MESSAGE_USERS4.getRecord().setNamespace(datasetId); - MESSAGE_USERS5.getRecord().setNamespace(datasetId); - MESSAGE_USERS6.getRecord().setNamespace(datasetId); - EMPTY_MESSAGE.getRecord().setNamespace(datasetId); - - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); - dataset = bigquery.create(datasetInfo); - - final JsonNode credentialFromSecretFile = credentialsGcsJson.get(BigQueryConsts.CREDENTIAL); - final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CREDENTIAL_TYPE, credentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) - .build()); - - final JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) - .put(BigQueryConsts.KEEP_GCS_FILES, BigQueryConsts.KEEP_GCS_FILES_VAL) - .put(BigQueryConsts.GCS_BUCKET_NAME, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_NAME)) - .put(BigQueryConsts.GCS_BUCKET_PATH, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.CREDENTIAL, credential) - .build()); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) - .put(BigQueryConsts.LOADING_METHOD, loadingMethod) - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); - this.s3Client = gcsDestinationConfig.getS3Client(); - - tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); - - } - - @AfterEach - void tearDown(final TestInfo info) { - if (info.getDisplayName().equals("testSpec()")) { - return; - } - tearDownGcs(); - tearDownBigQuery(); - } - - /** - * Remove all the GCS output from the tests. - */ - protected void tearDownGcs() { - final JsonNode properties = config.get(BigQueryConsts.LOADING_METHOD); - final String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); - final String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText(); - - final List keysToDelete = new LinkedList<>(); - final List objects = s3Client - .listObjects(gcsBucketName, gcs_bucket_path) - .getObjectSummaries(); - for (final S3ObjectSummary object : objects) { - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(object.getKey())); - } - - if (keysToDelete.size() > 0) { - LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); - // Google Cloud Storage doesn't accept request to delete multiple objects - for (final DeleteObjectsRequest.KeyVersion keyToDelete : keysToDelete) { - s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); - } - LOGGER.info("Deleted {} file(s).", keysToDelete.size()); - } - } - - private void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; - } - - @ParameterizedTest - @MethodSource("schemaAndDataProvider") - void testNestedWrite(final JsonNode schema, final AirbyteMessage message) throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(schema)) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.start(); - consumer.accept(message); - consumer.close(); - - final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); - final JsonNode expectedUsersJson = message.getRecord().getData(); - assertEquals(usersActual.size(), 1); - final JsonNode resultJson = usersActual.get(0); - assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); - assertEquals(extractJsonValues(resultJson, "grants"), extractJsonValues(expectedUsersJson, "grants")); - assertEquals(extractJsonValues(resultJson, "domain"), extractJsonValues(expectedUsersJson, "domain")); + @Override + protected JsonNode createConfig() throws IOException { + return createGcsConfig(); } - @Test - void testWriteWithFormat() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithFormats())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.start(); - consumer.accept(MESSAGE_USERS3); - consumer.close(); - - final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); - final JsonNode expectedUsersJson = MESSAGE_USERS3.getRecord().getData(); - assertEquals(usersActual.size(), 1); - final JsonNode resultJson = usersActual.get(0); - assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); - assertEquals(extractJsonValues(resultJson, "date_of_birth"), extractJsonValues(expectedUsersJson, "date_of_birth")); - - // Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't - // expect to receive the value we sent. - assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2021-10-11T06:36:53Z")); - - final Schema expectedSchema = Schema.of( + @Override + protected Schema getExpectedSchemaForWriteWithFormatTest() { + return Schema.of( Field.of("name", StandardSQLTypeName.STRING), Field.of("date_of_birth", StandardSQLTypeName.DATE), Field.of("updated_at", StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); - final Schema actualSchema = BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(); - - assertNotNull(actualSchema); - actualSchema.getFields().forEach(actualField -> assertEquals(expectedSchema.getFields().get(actualField.getName()), - Field.of(actualField.getName(), actualField.getType()))); - } - - @Test - void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithDateTime())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.start(); - consumer.accept(MESSAGE_USERS4); - consumer.close(); - - final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); - assertEquals(usersActual.size(), 1); - final JsonNode resultJson = usersActual.get(0); - - // BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format - // returns "yyyy-MM-dd'T'HH:mm:ss" format - assertEquals(Set.of("2021-10-11T06:36:53Z"), extractJsonValues(resultJson, "updated_at")); - // check nested datetime - assertEquals(Set.of("2021-11-11T06:36:53Z"), - extractJsonValues(resultJson.get("items"), "nested_datetime")); - } - - @Test - void testJsonReferenceDefinition() throws Exception { - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithReferenceDefinition())) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); - - final BigQueryDestination destination = new BigQueryDenormalizedDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.start(); - consumer.accept(MESSAGE_USERS5); - consumer.accept(MESSAGE_USERS6); - consumer.accept(EMPTY_MESSAGE); - consumer.close(); - - final Set actual = - retrieveRecordsAsJson(USERS_STREAM_NAME).stream().flatMap(x -> extractJsonValues(x, "users").stream()).collect(Collectors.toSet()); - - final Set expected = Sets.set( - "{\"name\":\"John\",\"surname\":\"Adams\"}", - null // we expect one record to have not had the users field set - ); - - assertEquals(2, actual.size()); - assertEquals(expected, actual); - } - - private Set extractJsonValues(final JsonNode node, final String attributeName) { - final List valuesNode = node.findValues(attributeName); - final Set resultSet = new HashSet<>(); - valuesNode.forEach(jsonNode -> { - if (jsonNode.isArray()) { - jsonNode.forEach(arrayNodeValue -> resultSet.add(arrayNodeValue.textValue())); - } else if (jsonNode.isObject()) { - resultSet.addAll(extractJsonValues(jsonNode, NESTED_ARRAY_FIELD)); - } else { - resultSet.add(jsonNode.textValue()); - } - }); - - return resultSet; - } - - private JsonNode removeAirbyteMetadataFields(final JsonNode record) { - for (final String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) { - ((ObjectNode) record).remove(airbyteMetadataField); - } - return record; - } - - private List retrieveRecordsAsJson(final String tableName) throws Exception { - final QueryJobConfiguration queryConfig = - QueryJobConfiguration - .newBuilder( - String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) - .setUseLegacySql(false).build(); - BigQueryUtils.executeQuery(bigquery, queryConfig); - - return StreamSupport - .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) - .map(v -> v.get("jsonValue").getStringValue()) - .map(Jsons::deserialize) - .map(this::removeAirbyteMetadataFields) - .collect(Collectors.toList()); - } - - private static Stream schemaAndDataProvider() { - return Stream.of( - arguments(getSchema(), MESSAGE_USERS1), - arguments(getSchemaWithInvalidArrayType(), MESSAGE_USERS1), - arguments(getSchema(), MESSAGE_USERS2)); - } - - private static AirbyteMessage createRecordMessage(final String stream, final JsonNode data) { - return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(stream) - .withData(data) - .withEmittedAt(NOW.toEpochMilli())); } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestConstants.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestConstants.java new file mode 100644 index 000000000000..a6256e957eb8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestConstants.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.integrations.base.JavaBaseConstants; +import java.nio.file.Path; +import java.util.List; + +public class BigQueryDenormalizedTestConstants { + + public static final BigQuerySQLNameTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); + public static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + public static final String CONFIG_DATASET_ID = "dataset_id"; + public static final String CONFIG_PROJECT_ID = "project_id"; + public static final String CONFIG_DATASET_LOCATION = "dataset_location"; + public static final String CONFIG_CREDS = "credentials_json"; + public static final List AIRBYTE_COLUMNS = List.of(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + public static final String USERS_STREAM_NAME = "users"; + + public static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss"; + +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 8bb921bbf3a1..7f1c473c68dc 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -4,14 +4,50 @@ package io.airbyte.integrations.destination.bigquery.util; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CONFIG_CREDS; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CONFIG_DATASET_ID; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CONFIG_DATASET_LOCATION; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CONFIG_PROJECT_ID; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.CREDENTIALS_PATH; +import static io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants.USERS_STREAM_NAME; + import com.fasterxml.jackson.databind.JsonNode; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.destination.bigquery.BigQueryConsts; +import io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedDestination; +import io.airbyte.integrations.destination.bigquery.BigQueryDenormalizedTestConstants; +import io.airbyte.integrations.destination.bigquery.BigQueryDestination; +import io.airbyte.integrations.destination.bigquery.BigQueryUtils; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.SyncMode; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BigQueryDenormalizedTestDataUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedTestDataUtils.class); + private static final String JSON_FILES_BASE_LOCATION = "testdata/"; public static JsonNode getSchema() { @@ -34,6 +70,34 @@ public static JsonNode getSchemaWithInvalidArrayType() { return getTestDataFromResourceJson("schemaWithInvalidArrayType.json"); } + public static JsonNode getSchemaArrays() { + return getTestDataFromResourceJson("schemaArrays.json"); + } + + public static JsonNode getDataArrays() { + return getTestDataFromResourceJson("dataArrays.json"); + } + + public static JsonNode getSchemaTooDeepNestedDepth() { + return getTestDataFromResourceJson("schemaTooDeepNestedDepth.json"); + } + + public static JsonNode getDataTooDeepNestedDepth() { + return getTestDataFromResourceJson("dataTooDeepNestedDepth.json"); + } + + public static JsonNode getSchemaMaxNestedDepth() { + return getTestDataFromResourceJson("schemaMaxNestedDepth.json"); + } + + public static JsonNode getDataMaxNestedDepth() { + return getTestDataFromResourceJson("dataMaxNestedDepth.json"); + } + + public static JsonNode getExpectedDataArrays() { + return getTestDataFromResourceJson("expectedDataArrays.json"); + } + public static JsonNode getData() { return getTestDataFromResourceJson("data.json"); } @@ -90,4 +154,126 @@ private static JsonNode getTestDataFromResourceJson(final String fileName) { return Jsons.deserialize(fileContent); } + public static ConfiguredAirbyteCatalog getCommonCatalog(final JsonNode schema, final String datasetId) { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(schema)) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + } + + public static void runDestinationWrite(ConfiguredAirbyteCatalog catalog, JsonNode config, AirbyteMessage... messages) throws Exception { + final BigQueryDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.start(); + for (AirbyteMessage message : messages) { + consumer.accept(message); + } + consumer.close(); + } + + private static void checkCredentialPath() { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH + + ". Override by setting setting path with the CREDENTIALS_PATH constant."); + } + } + + public static JsonNode createCommonConfig() throws IOException { + checkCredentialPath(); + + final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); + final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); + final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); + final String datasetLocation = "US"; + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + + return Jsons.jsonNode(ImmutableMap.builder() + .put(CONFIG_PROJECT_ID, projectId) + .put(BigQueryDenormalizedTestConstants.CONFIG_CREDS, credentialsJson.toString()) + .put(CONFIG_DATASET_ID, datasetId) + .put(CONFIG_DATASET_LOCATION, datasetLocation) + .build()); + } + + public static JsonNode createGcsConfig() throws IOException { + checkCredentialPath(); + + final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); + + final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(credentialsJsonString); + final JsonNode bigqueryConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); + final JsonNode gcsConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.GCS_CONFIG); + + final String projectId = bigqueryConfigFromSecretFile.get(CONFIG_PROJECT_ID).asText(); + final String datasetLocation = "US"; + + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + + final JsonNode gcsCredentialFromSecretFile = gcsConfigFromSecretFile.get(BigQueryConsts.CREDENTIAL); + final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CREDENTIAL_TYPE, gcsCredentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) + .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) + .build()); + + final JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) + .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) + .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) + .put(BigQueryConsts.CREDENTIAL, credential) + .build()); + + return Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_CREDS, bigqueryConfigFromSecretFile.toString()) + .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.LOADING_METHOD, loadingMethod) + .build()); + } + + public static BigQuery configureBigQuery(final JsonNode config) throws IOException { + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(config.get(CONFIG_CREDS).asText().getBytes(StandardCharsets.UTF_8))); + + return BigQueryOptions.newBuilder() + .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) + .setCredentials(credentials) + .build() + .getService(); + } + + public static Dataset getBigQueryDataSet(final JsonNode config, final BigQuery bigQuery) { + final DatasetInfo datasetInfo = + DatasetInfo.newBuilder(BigQueryUtils.getDatasetId(config)).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); + Dataset dataset = bigQuery.create(datasetInfo); + trackTestDataSet(dataset, bigQuery); + return dataset; + } + + private static Set dataSetsForDrop = new HashSet<>(); + + public static void trackTestDataSet(final Dataset dataset, final BigQuery bigQuery) { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> tearDownBigQuery(dataset, bigQuery))); + } + + public static synchronized void tearDownBigQuery(final Dataset dataset, final BigQuery bigQuery) { + if (dataSetsForDrop.contains(dataset)) { + // allows deletion of a dataset that has contents + final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); + + final boolean success = bigQuery.delete(dataset.getDatasetId(), option); + if (success) { + LOGGER.info("BQ Dataset " + dataset + " deleted..."); + } else { + LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); + } + dataSetsForDrop.remove(dataset); + } + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataArrays.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataArrays.json new file mode 100644 index 000000000000..a06a3bb82198 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataArrays.json @@ -0,0 +1,14 @@ +{ + "object_with_arrays": { + "array_3": [1, 2, 3] + }, + "simple_string": "simple string", + "array_1": [ + [1, 2], + [2, 3] + ], + "array_4": [[[4]]], + "array_5": [[[[5]]]], + "array_6": [["2021-10-11T06:36:53+00:00", "2020-10-10T01:00:00+00:00"]], + "array_7": [[["2021-10-11T06:36:53+00:00", "2020-10-10T01:00:00+00:00"]]] +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataMaxNestedDepth.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataMaxNestedDepth.json new file mode 100644 index 000000000000..1462597b8f10 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataMaxNestedDepth.json @@ -0,0 +1,31 @@ +{ + "rec_lvl_1": { + "rec_lvl_2": { + "rec_lvl_3": { + "rec_lvl_4": { + "rec_lvl_5": { + "rec_lvl_6": { + "rec_lvl_7": { + "rec_lvl_8": { + "rec_lvl_9": { + "rec_lvl_10": { + "rec_lvl_11": { + "rec_lvl_12": { + "rec_lvl_13": { + "rec_lvl_14": { + "str_value" : "test_value" + } + } + } + } + } + } + } + } + } + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataTooDeepNestedDepth.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataTooDeepNestedDepth.json new file mode 100644 index 000000000000..3f0f2468cbc9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataTooDeepNestedDepth.json @@ -0,0 +1,33 @@ +{ + "rec_lvl_1": { + "rec_lvl_2": { + "rec_lvl_3": { + "rec_lvl_4": { + "rec_lvl_5": { + "rec_lvl_6": { + "rec_lvl_7": { + "rec_lvl_8": { + "rec_lvl_9": { + "rec_lvl_10": { + "rec_lvl_11": { + "rec_lvl_12": { + "rec_lvl_13": { + "rec_lvl_14": { + "rec_lvl_15": { + "str_value" : "test_value" + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/expectedDataArrays.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/expectedDataArrays.json new file mode 100644 index 000000000000..ed63c2a27ce3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/expectedDataArrays.json @@ -0,0 +1,50 @@ +{ + "object_with_arrays": { + "array_3": [1, 2, 3] + }, + "simple_string": "simple string", + "array_1": [ + { + "big_query_array": [1, 2] + }, + { + "big_query_array": [2, 3] + } + ], + "array_4": [ + { + "big_query_array": [ + { + "big_query_array": [4] + } + ] + } + ], + "array_5": [ + { + "big_query_array": [ + { + "big_query_array": [ + { + "big_query_array": [5] + } + ] + } + ] + } + ], + "array_6": [ + { + "big_query_array": ["2021-10-11T06:36:53", "2020-10-10T01:00:00"] + } + ], + "array_7": [ + { + "big_query_array": [ + { + "big_query_array": ["2021-10-11T06:36:53", "2020-10-10T01:00:00"] + } + ] + } + ] +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaArrays.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaArrays.json new file mode 100644 index 000000000000..90d0e379b667 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaArrays.json @@ -0,0 +1,78 @@ +{ + "type": ["object"], + "properties": { + "object_with_arrays": { + "type": ["object"], + "properties": { + "array_3": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + }, + "simple_string": { + "type": ["string"] + }, + "array_1": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + }, + "array_4": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + }, + "array_5": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + } + }, + "array_6": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["string"], + "format": "date-time" + } + } + }, + "array_7": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["string"], + "format": "date-time" + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaMaxNestedDepth.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaMaxNestedDepth.json new file mode 100644 index 000000000000..8570c57e1f2f --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaMaxNestedDepth.json @@ -0,0 +1,113 @@ +{ + "type": [ + "object" + ], + "properties": { + "rec_lvl_1": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_2": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_3": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_4": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_5": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_6": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_7": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_8": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_9": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_10": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_11": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_12": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_13": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_14": { + "type": [ + "object" + ], + "properties": { + + "str_value" : { + "type": [ + "string" + ] + + + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaTooDeepNestedDepth.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaTooDeepNestedDepth.json new file mode 100644 index 000000000000..4eda1667967d --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaTooDeepNestedDepth.json @@ -0,0 +1,117 @@ +{ + "type": [ + "object" + ], + "properties": { + "rec_lvl_1": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_2": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_3": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_4": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_5": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_6": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_7": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_8": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_9": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_10": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_11": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_12": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_13": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_14": { + "type": [ + "object" + ], + "properties": { + "rec_lvl_15": { + "type": [ + "object" + ], + "properties": { + "str_value" : { + "type": [ + "string" + ] + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedUtilsTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedUtilsTest.java index 229c6f820e04..3de0e1d4e17b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedUtilsTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedUtilsTest.java @@ -260,7 +260,8 @@ private static Stream actualAndExpectedSchemasProvider() { arguments(getSchemaWithInvalidArrayType(), getExpectedSchemaWithInvalidArrayType()), arguments(getSchemaWithReferenceDefinition(), getExpectedSchemaWithReferenceDefinition()), arguments(getSchemaWithNestedDatetimeInsideNullObject(), - getExpectedSchemaWithNestedDatetimeInsideNullObject())); + getExpectedSchemaWithNestedDatetimeInsideNullObject()), + arguments(getSchemaArrays(), getExpectedSchemaArrays())); } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestSchemaUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestSchemaUtils.java index f82acef1ac1c..4e6498ecffe9 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestSchemaUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestSchemaUtils.java @@ -42,6 +42,10 @@ public static JsonNode getSchemaWithNestedDatetimeInsideNullObject() { return getTestDataFromResourceJson("schemaWithNestedDatetimeInsideNullObject.json"); } + public static JsonNode getSchemaArrays() { + return getTestDataFromResourceJson("schemaArrays.json"); + } + public static JsonNode getExpectedSchema() { return getTestDataFromResourceJson("expectedSchema.json"); } @@ -66,6 +70,10 @@ public static JsonNode getExpectedSchemaWithNestedDatetimeInsideNullObject() { return getTestDataFromResourceJson("expectedSchemaWithNestedDatetimeInsideNullObject.json"); } + public static JsonNode getExpectedSchemaArrays() { + return getTestDataFromResourceJson("expectedSchemaArrays.json"); + } + private static JsonNode getTestDataFromResourceJson(final String fileName) { final String fileContent; try { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchema.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchema.json index cc0cce46d641..6534fc7a3d0e 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchema.json +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchema.json @@ -5,38 +5,22 @@ "type": ["null", "string"], "format": "date-time" }, - "name": { - "type": ["string"] - }, + "name": { "type": ["string"] }, "permission-list": { "type": ["object"], "properties": { "big_query_array": { - "type": ["object"], - "properties": { - "big_query_array": { - "type": ["array"], - "items": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "domain": { "type": ["string"] }, + "grants": { "type": ["object"], "properties": { - "domain": { - "type": ["string"] - }, - "grants": { - "type": ["object"], - "properties": { - "big_query_array": { - "type": ["object"], - "properties": { - "big_query_array": { - "type": ["array"], - "items": { - "type": ["string"] - } - } - } - } - } + "big_query_array": { + "type": ["array"], + "items": { "type": ["string"] } } } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaArrays.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaArrays.json new file mode 100644 index 000000000000..3a1b9f624266 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaArrays.json @@ -0,0 +1,85 @@ +{ + "type": ["object"], + "properties": { + "object_with_arrays": { + "type": ["object"], + "properties": { + "array_3": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + }, + "simple_string": { + "type": ["string"] + }, + "array_1": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + } + }, + "array_4": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + } + } + } + } + }, + "array_5": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "big_query_array": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + } + } + } + } + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaWithInvalidArrayType.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaWithInvalidArrayType.json index 584dd272b9c1..4ecf67dd3f7d 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaWithInvalidArrayType.json +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/expectedSchemaWithInvalidArrayType.json @@ -5,27 +5,17 @@ "type": ["string"] }, "permission-list": { - "type": ["object"], - "properties": { - "big_query_array": { - "type": ["array"], - "items": { - "type": ["object"], - "properties": { - "domain": { - "type": ["string"] - }, - "grants": { - "type": ["object"], - "properties": { - "big_query_array": { - "type": ["array"], - "items": { - "type": ["string"] - } - } - } - } + "type": ["array"], + "items": { + "type": ["object"], + "properties": { + "domain": { + "type": ["string"] + }, + "grants": { + "type": ["array"], + "items": { + "type": ["string"] } } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/schemaArrays.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/schemaArrays.json new file mode 100644 index 000000000000..92de3f8afdb9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test/resources/schemas/schemaArrays.json @@ -0,0 +1,55 @@ +{ + "type": ["object"], + "properties": { + "object_with_arrays": { + "type": ["object"], + "properties": { + "array_3": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + }, + "simple_string": { + "type": ["string"] + }, + "array_1": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + }, + "array_4": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + }, + "array_5": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": ["array"], + "items": { + "type": "integer" + } + } + } + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index b6ed2eea285b..fc96b7463b82 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -209,6 +209,7 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); + stream.setNamespace(BigQueryUtils.getDatasetId(config)); final String streamName = stream.getName(); final UploaderConfig uploaderConfig = UploaderConfig .builder() @@ -221,13 +222,20 @@ protected Map> getUp .isDefaultAirbyteTmpSchema(isDefaultAirbyteTmpTableSchema()) .build(); - uploaderMap.put( - AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), - BigQueryUploaderFactory.getUploader(uploaderConfig)); + putStreamIntoUploaderMap(stream, uploaderConfig, uploaderMap); } return uploaderMap; } + protected void putStreamIntoUploaderMap(final AirbyteStream stream, + final UploaderConfig uploaderConfig, + final Map> uploaderMap) + throws IOException { + uploaderMap.put( + AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream), + BigQueryUploaderFactory.getUploader(uploaderConfig)); + } + /** * BigQuery might have different structure of the Temporary table. If this method returns TRUE, * temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table @@ -254,7 +262,7 @@ private AirbyteMessageConsumer getStandardRecordConsumer(final JsonNode config, final Consumer outputRecordCollector) throws IOException { final Map> writeConfigs = getUploaderMap(config, catalog); - return new BigQueryRecordConsumer(writeConfigs, outputRecordCollector); + return new BigQueryRecordConsumer(writeConfigs, outputRecordCollector, BigQueryUtils.getDatasetId(config)); } public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java index 0fb00935a0b1..b2bd77a69efe 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java @@ -138,7 +138,8 @@ public void copyIntoTmpTableFromStage(final String datasetId, BigQueryUtils.waitForJobFinish(loadJob); LOGGER.info("[{}] Tmp table {} (dataset {}) is successfully appended with staging files", loadJob.getJobId(), tmpTableId, datasetId); } catch (final BigQueryException | InterruptedException e) { - throw new RuntimeException(String.format("[%s] Failed to upload staging files to tmp table %s (%s)", loadJob.getJobId(), tmpTableId, datasetId), e); + throw new RuntimeException( + String.format("[%s] Failed to upload staging files to tmp table %s (%s)", loadJob.getJobId(), tmpTableId, datasetId), e); } }); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index 42b466ee10cb..73988c853026 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -24,12 +24,15 @@ public class BigQueryRecordConsumer extends FailureTrackingAirbyteMessageConsume private final Map> uploaderMap; private final Consumer outputRecordCollector; + private final String datasetId; private AirbyteMessage lastStateMessage = null; public BigQueryRecordConsumer(final Map> uploaderMap, - final Consumer outputRecordCollector) { + final Consumer outputRecordCollector, + final String datasetId) { this.uploaderMap = uploaderMap; this.outputRecordCollector = outputRecordCollector; + this.datasetId = datasetId; } @Override @@ -43,6 +46,7 @@ public void acceptTracked(final AirbyteMessage message) { lastStateMessage = message; outputRecordCollector.accept(message); } else if (message.getType() == Type.RECORD) { + message.getRecord().setNamespace(datasetId); processRecord(message); } else { LOGGER.warn("Unexpected message: {}", message.getType()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 6ae70cd99629..e2cf6c858da5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -7,6 +7,7 @@ import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.getJobErrorMessage; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; @@ -46,6 +47,7 @@ import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,20 +233,30 @@ public static List getDateTimeFieldsFromSchema(final FieldList fieldList * "https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#details_of_loading_json_data">Supported * Google bigquery datatype This method is responsible to adapt JSON DATETIME to Bigquery */ - public static void transformJsonDateTimeToBigDataFormat(final List dateTimeFields, final ObjectNode data) { + public static void transformJsonDateTimeToBigDataFormat(final List dateTimeFields, final JsonNode data) { dateTimeFields.forEach(e -> { - if (data.findValue(e) != null && !data.get(e).isNull()) { - final String googleBigQueryDateFormat = QueryParameterValue - .dateTime(new DateTime(convertDateToInstantFormat(data - .findValue(e) - .asText())) - .toString(BIG_QUERY_DATETIME_FORMAT)) - .getValue(); - data.put(e, googleBigQueryDateFormat); + if (data.isObject() && data.findValue(e) != null && !data.get(e).isNull()) { + ObjectNode dataObject = (ObjectNode) data; + JsonNode value = data.findValue(e); + if (value.isArray()) { + ArrayNode arrayNode = (ArrayNode) value; + ArrayNode newArrayNode = dataObject.putArray(e); + arrayNode.forEach(jsonNode -> newArrayNode.add(getFormattedBigQueryDateTime(jsonNode.asText()))); + } else if (value.isTextual()) { + dataObject.put(e, getFormattedBigQueryDateTime(value.asText())); + } else { + throw new RuntimeException("Unexpected transformation case"); + } } }); } + private static String getFormattedBigQueryDateTime(final String dateTimeValue) { + return (dateTimeValue != null ? QueryParameterValue + .dateTime(new DateTime(convertDateToInstantFormat(dateTimeValue)).withZone(DateTimeZone.UTC).toString(BIG_QUERY_DATETIME_FORMAT)).getValue() + : null); + } + /** * @return BigQuery dataset ID */ diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index 0b6e67b30f8e..e9e2f07865f9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -23,10 +23,11 @@ public abstract class BigQueryRecordFormatter { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryRecordFormatter.class); - private Schema bigQuerySchema; - private final Map> mapOfFailedFields = new HashMap<>(); + protected Schema bigQuerySchema; + protected final Map> mapOfFailedFields = new HashMap<>(); protected final StandardNameTransformer namingResolver; - protected final JsonNode jsonSchema; + protected final JsonNode originalJsonSchema; + protected JsonNode jsonSchema; /** * These parameters are required for the correct operation of denormalize version of the connector. @@ -36,6 +37,7 @@ public abstract class BigQueryRecordFormatter { public BigQueryRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { this.namingResolver = namingResolver; + this.originalJsonSchema = jsonSchema.deepCopy(); this.jsonSchema = formatJsonSchema(jsonSchema.deepCopy()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index 2166ffffb87b..de6c9a2a1de0 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -53,6 +53,10 @@ public abstract class AbstractBigQueryUploader { this.recordFormatter = recordFormatter; } + public BigQueryRecordFormatter getRecordFormatter() { + return recordFormatter; + } + protected void postProcessAction(final boolean hasFailed) throws Exception { // Do nothing by default } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java index e4392bae4398..53afe07f940f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.WriteChannelConfiguration; import io.airbyte.integrations.destination.bigquery.BigQueryUtils; -import io.airbyte.integrations.destination.bigquery.UploadingMethod; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig; import io.airbyte.integrations.destination.bigquery.writer.BigQueryTableWriter; @@ -37,10 +36,7 @@ public static AbstractBigQueryUploader getUploader(final UploaderConfig uploa final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig()); final Set existingSchemas = new HashSet<>(); - final boolean isGcsUploadingMode = BigQueryUtils.getLoadingMethod(uploaderConfig.getConfig()) == UploadingMethod.GCS; - final BigQueryRecordFormatter recordFormatter = isGcsUploadingMode - ? uploaderConfig.getFormatterMap().get(UploaderType.AVRO) - : uploaderConfig.getFormatterMap().get(UploaderType.STANDARD); + final BigQueryRecordFormatter recordFormatter = uploaderConfig.getFormatter(); final Schema bigQuerySchema = recordFormatter.getBigQuerySchema(); final TableId targetTable = TableId.of(schemaName, uploaderConfig.getTargetTableName()); @@ -57,7 +53,7 @@ public static AbstractBigQueryUploader getUploader(final UploaderConfig uploa final JobInfo.WriteDisposition syncMode = BigQueryUtils.getWriteDisposition( uploaderConfig.getConfigStream().getDestinationSyncMode()); - return (isGcsUploadingMode + return (uploaderConfig.isGcsUploadingMode() ? getGcsBigQueryUploader( uploaderConfig.getConfig(), uploaderConfig.getConfigStream(), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java index b695992050ae..8c6a38161624 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/config/UploaderConfig.java @@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.cloud.bigquery.BigQuery; +import io.airbyte.integrations.destination.bigquery.BigQueryUtils; +import io.airbyte.integrations.destination.bigquery.UploadingMethod; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -25,4 +27,16 @@ public class UploaderConfig { private Map formatterMap; private boolean isDefaultAirbyteTmpSchema; + public boolean isGcsUploadingMode() { + return BigQueryUtils.getLoadingMethod(config) == UploadingMethod.GCS; + } + + public UploaderType getUploaderType() { + return (isGcsUploadingMode() ? UploaderType.AVRO : UploaderType.STANDARD); + } + + public BigQueryRecordFormatter getFormatter() { + return formatterMap.get(getUploaderType()); + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index edda58225624..89960f67e499 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -141,7 +141,7 @@ protected void assertNamespaceNormalization(final String testCaseId, @Override protected String getDefaultSchema(final JsonNode config) { - return config.get(CONFIG_DATASET_ID).asText(); + return BigQueryUtils.getDatasetId(config); } @Override @@ -221,7 +221,7 @@ protected void setupBigQuery(final JsonNode credentialsJson) throws IOException .getService(); final DatasetInfo datasetInfo = - DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); + DatasetInfo.newBuilder(getDefaultSchema(config)).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); dataset = bigquery.create(datasetInfo); tornDown = false;