From bc4beba9bd605a0657a8f517a8ef71b9a98a97ba Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Tue, 15 Mar 2022 18:56:34 +0200 Subject: [PATCH 1/6] fix bug with anyOf and allOf json blocks --- ...ltBigQueryDenormalizedRecordFormatter.java | 40 ++++- .../BigQueryDenormalizedDestinationTest.java | 77 ++++++++++ .../BigQueryDenormalizedTestDataUtils.java | 140 ++++++++++++++++++ 3 files changed, 249 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index 63e5478c17cf..053f8ae5b619 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -44,6 +44,8 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR public static final String NESTED_ARRAY_FIELD = "big_query_array"; protected static final String PROPERTIES_FIELD = "properties"; private static final String TYPE_FIELD = "type"; + private static final String ALL_OF_FIELD = "allOf"; + private static final String ANY_OF_FIELD = "anyOf"; private static final String ARRAY_ITEMS_FIELD = "items"; private static final String FORMAT_FIELD = "format"; private static final String REF_DEFINITION_KEY = "$ref"; @@ -199,7 +201,7 @@ public Schema getBigQuerySchema(final JsonNode jsonSchema) { fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); } LOGGER.info("Airbyte Schema is transformed from {} to {}.", jsonSchema, fieldList); - return com.google.cloud.bigquery.Schema.of(fieldList); + return Schema.of(fieldList); } private List getSchemaFields(final StandardNameTransformer namingResolver, final JsonNode jsonSchema) { @@ -235,10 +237,32 @@ private Consumer addToRefList(final ObjectNode properties) { }; } + private static JsonNode getFileDefinition(final JsonNode fieldDefinition) { + if (fieldDefinition.has(TYPE_FIELD)) { + return fieldDefinition; + } else { + if (fieldDefinition.has(ANY_OF_FIELD) && fieldDefinition.get(ANY_OF_FIELD).isArray()) { + var fieldOptional = MoreIterators.toList(fieldDefinition.get(ANY_OF_FIELD).elements()).stream().findFirst(); + if (fieldOptional.isPresent()) { + return getFileDefinition(fieldOptional.get()); + } + } + if (fieldDefinition.has(ALL_OF_FIELD) && fieldDefinition.get(ALL_OF_FIELD).isArray()) { + var fieldOptional = MoreIterators.toList(fieldDefinition.get(ALL_OF_FIELD).elements()).stream().findFirst(); + if (fieldOptional.isPresent()) { + return getFileDefinition(fieldOptional.get()); + } + } + } + return fieldDefinition; + } + private static Builder getField(final StandardNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) { final String fieldName = namingResolver.getIdentifier(key); final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING); - final List fieldTypes = getTypes(fieldName, fieldDefinition.get(TYPE_FIELD)); + JsonNode updatedFileDefinition = getFileDefinition(fieldDefinition); + JsonNode type = updatedFileDefinition.get(TYPE_FIELD); + final List fieldTypes = getTypes(fieldName, type); for (int i = 0; i < fieldTypes.size(); i++) { final JsonSchemaType fieldType = fieldTypes.get(i); if (fieldType == JsonSchemaType.NULL) { @@ -256,8 +280,8 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi } case ARRAY -> { final JsonNode items; - if (fieldDefinition.has("items")) { - items = fieldDefinition.get("items"); + if (updatedFileDefinition.has("items")) { + items = updatedFileDefinition.get("items"); } else { LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type"); // this is handler for case when we get "array" without "items" @@ -268,10 +292,10 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi } case OBJECT -> { final JsonNode properties; - if (fieldDefinition.has(PROPERTIES_FIELD)) { - properties = fieldDefinition.get(PROPERTIES_FIELD); + if (updatedFileDefinition.has(PROPERTIES_FIELD)) { + properties = updatedFileDefinition.get(PROPERTIES_FIELD); } else { - properties = fieldDefinition; + properties = updatedFileDefinition; } final FieldList fieldList = FieldList.of(Jsons.keys(properties) .stream() @@ -292,7 +316,7 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi } // If a specific format is defined, use their specific type instead of the JSON's one - final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD); + final JsonNode fieldFormat = updatedFileDefinition.get(FORMAT_FIELD); if (fieldFormat != null) { final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText()); if (schemaFormat != null) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index b097048196ee..107bcd313202 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -39,6 +39,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -74,6 +75,9 @@ class BigQueryDenormalizedDestinationTest { private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference()); private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}")); private static final AirbyteMessage MESSAGE_USERS7 = createRecordMessage(USERS_STREAM_NAME, getDataWithNestedDatetimeInsideNullObject()); + private static final AirbyteMessage MESSAGE_USERS8 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormats()); + private static final AirbyteMessage MESSAGE_USERS9 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithNull()); + private static final AirbyteMessage MESSAGE_USERS10 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithEmptyList()); private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}")); private JsonNode config; @@ -117,6 +121,9 @@ void setup(final TestInfo info) throws IOException { MESSAGE_USERS5.getRecord().setNamespace(datasetId); MESSAGE_USERS6.getRecord().setNamespace(datasetId); MESSAGE_USERS7.getRecord().setNamespace(datasetId); + MESSAGE_USERS8.getRecord().setNamespace(datasetId); + MESSAGE_USERS9.getRecord().setNamespace(datasetId); + MESSAGE_USERS10.getRecord().setNamespace(datasetId); EMPTY_MESSAGE.getRecord().setNamespace(datasetId); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); @@ -241,6 +248,76 @@ void testWriteWithFormat() throws Exception { assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); } + @Test + void testAnyOf() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + + final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS8); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + final JsonNode expectedUsersJson = MESSAGE_USERS8.getRecord().getData(); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + assertEquals(extractJsonValues(resultJson, "id"), extractJsonValues(expectedUsersJson, "id")); + assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); + assertEquals(extractJsonValues(resultJson, "type"), extractJsonValues(expectedUsersJson, "type")); + assertEquals(extractJsonValues(resultJson, "email"), extractJsonValues(expectedUsersJson, "email")); + assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar")); + assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids")); + assertEquals(extractJsonValues(resultJson, "admin_ids"), extractJsonValues(expectedUsersJson, "admin_ids")); + assertEquals(extractJsonValues(resultJson, "job_title"), extractJsonValues(expectedUsersJson, "job_title")); + assertEquals(extractJsonValues(resultJson, "has_inbox_seat"), extractJsonValues(expectedUsersJson, "has_inbox_seat")); + assertEquals(extractJsonValues(resultJson, "away_mode_enabled"), extractJsonValues(expectedUsersJson, "away_mode_enabled")); + assertEquals(extractJsonValues(resultJson, "away_mode_reassign"), extractJsonValues(expectedUsersJson, "away_mode_reassign")); + } + + @Test + void testAnyOfWithNull() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + + final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS9); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + final JsonNode expectedUsersJson = MESSAGE_USERS9.getRecord().getData(); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); + assertEquals(extractJsonValues(resultJson, "team_ids"), Collections.emptySet()); + assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar")); + } + + @Test + void testAnyOfWithEmptyList() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + + final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS10); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + final JsonNode expectedUsersJson = MESSAGE_USERS10.getRecord().getData(); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); + assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids")); + } + @Test void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 337e674555d4..45cc517e4efc 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -61,6 +61,107 @@ public static JsonNode getSchema() { """); } + public static JsonNode getAnyOfSchema() { + return Jsons.deserialize(""" + { + "type":"object", + "properties":{ + "id":{ + "type":[ + "null", + "string" + ] + }, + "name":{ + "type":[ + "null", + "string" + ] + }, + "type":{ + "type":[ + "null", + "string" + ] + }, + "email":{ + "type":[ + "null", + "string" + ] + }, + "avatar":{ + "type":[ + "null", + "object" + ], + "properties":{ + "image_url":{ + "type":[ + "null", + "string" + ] + } + }, + "additionalProperties":false + }, + "team_ids":{ + "anyOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"null" + } + ] + }, + "admin_ids":{ + "anyOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"null" + } + ] + }, + "job_title":{ + "type":[ + "null", + "string" + ] + }, + "has_inbox_seat":{ + "type":[ + "null", + "boolean" + ] + }, + "away_mode_enabled":{ + "type":[ + "null", + "boolean" + ] + }, + "away_mode_reassign":{ + "type":[ + "null", + "boolean" + ] + } + + }, + "additionalProperties":false + } + """); + } + public static JsonNode getSchemaWithFormats() { return Jsons.deserialize(""" { @@ -197,6 +298,45 @@ public static JsonNode getDataWithFormats() { """); } + public static JsonNode getAnyOfFormats() { + return Jsons.deserialize(""" + { + "id": "ID", + "name": "Andrii", + "type": "some_type", + "email": "email@email.com", + "avatar": { + "image_url": "url_to_avatar.jpg" + }, + "team_ids": [1, 2, 3], + "admin_ids": [2, 5], + "job_title": "title", + "has_inbox_seat": true, + "away_mode_enabled": false, + "away_mode_reassign": false + } + """); + } + + public static JsonNode getAnyOfFormatsWithNull() { + return Jsons.deserialize(""" + { + "name": "Mukola", + "team_ids": null, + "avatar": null + } + """); + } + + public static JsonNode getAnyOfFormatsWithEmptyList() { + return Jsons.deserialize(""" + { + "name": "Sergii", + "team_ids": [] + } + """); + } + public static JsonNode getDataWithJSONDateTimeFormats() { return Jsons.deserialize(""" { From f6ba9f7d7919687056d64e79cb4709d45a892e0e Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 24 Mar 2022 17:32:28 +0200 Subject: [PATCH 2/6] updated tests and formatter --- ...ltBigQueryDenormalizedRecordFormatter.java | 65 +++++++++++++------ .../BigQueryDenormalizedDestinationTest.java | 9 ++- .../BigQueryDenormalizedTestDataUtils.java | 39 ++++++++++- .../formatter/BigQueryRecordFormatter.java | 6 ++ 4 files changed, 96 insertions(+), 23 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index 053f8ae5b619..038637e44ec0 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -4,7 +4,10 @@ package io.airbyte.integrations.destination.bigquery.formatter; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.Field; @@ -24,23 +27,21 @@ import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat; import io.airbyte.integrations.destination.bigquery.JsonSchemaType; import io.airbyte.protocol.models.AirbyteRecordMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.Collections; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryRecordFormatter { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBigQueryDenormalizedRecordFormatter.class); - private final Set invalidKeys = new HashSet<>(); - public static final String NESTED_ARRAY_FIELD = "big_query_array"; protected static final String PROPERTIES_FIELD = "properties"; private static final String TYPE_FIELD = "type"; @@ -49,8 +50,7 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR private static final String ARRAY_ITEMS_FIELD = "items"; private static final String FORMAT_FIELD = "format"; private static final String REF_DEFINITION_KEY = "$ref"; - - private final Set fieldsContainRefDefinitionValue = new HashSet<>(); + private static final ObjectMapper mapper = new ObjectMapper(); public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) { super(jsonSchema, namingResolver); @@ -58,9 +58,21 @@ public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, fin @Override protected JsonNode formatJsonSchema(final JsonNode jsonSchema) { - populateEmptyArrays(jsonSchema); - surroundArraysByObjects(jsonSchema); - return jsonSchema; + var modifiedJsonSchema = formatAllOfAndAnyOfFields(namingResolver, jsonSchema); + populateEmptyArrays(modifiedJsonSchema); + surroundArraysByObjects(modifiedJsonSchema); + return modifiedJsonSchema; + } + + private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingResolver, final JsonNode jsonSchema) { + LOGGER.info("getSchemaFields : " + jsonSchema + " namingResolver " + namingResolver); + final JsonNode modifiedSchema = jsonSchema.deepCopy(); + Preconditions.checkArgument(modifiedSchema.isObject() && modifiedSchema.has(PROPERTIES_FIELD)); + ObjectNode properties = (ObjectNode) modifiedSchema.get(PROPERTIES_FIELD); + Jsons.keys(properties).stream() + .peek(addToRefList(properties)) + .forEach(key -> properties.replace(key, getFileDefinition(properties.get(key)))); + return modifiedSchema; } private List findArrays(final JsonNode node) { @@ -242,21 +254,36 @@ private static JsonNode getFileDefinition(final JsonNode fieldDefinition) { return fieldDefinition; } else { if (fieldDefinition.has(ANY_OF_FIELD) && fieldDefinition.get(ANY_OF_FIELD).isArray()) { - var fieldOptional = MoreIterators.toList(fieldDefinition.get(ANY_OF_FIELD).elements()).stream().findFirst(); - if (fieldOptional.isPresent()) { - return getFileDefinition(fieldOptional.get()); - } + return allOfAndAnyOfFieldProcessing(ANY_OF_FIELD, fieldDefinition); } if (fieldDefinition.has(ALL_OF_FIELD) && fieldDefinition.get(ALL_OF_FIELD).isArray()) { - var fieldOptional = MoreIterators.toList(fieldDefinition.get(ALL_OF_FIELD).elements()).stream().findFirst(); - if (fieldOptional.isPresent()) { - return getFileDefinition(fieldOptional.get()); - } + return allOfAndAnyOfFieldProcessing(ALL_OF_FIELD, fieldDefinition); } } return fieldDefinition; } + private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, final JsonNode fieldDefinition) { + ObjectReader reader = mapper.readerFor(new TypeReference>() {}); + List list; + try { + list = reader.readValue(fieldDefinition.get(fieldName)); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to read and process the following field - %s", fieldDefinition)); + } + ObjectNode objectNode = mapper.createObjectNode(); + list.forEach(field -> { + objectNode.set("big_query_" + field.get("type").asText(), field); + }); + + return Jsons.jsonNode(ImmutableMap.builder() + .put("type", "object") + .put(PROPERTIES_FIELD, objectNode) + .put("additionalProperties", false) + .build()); + } + private static Builder getField(final StandardNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) { final String fieldName = namingResolver.getIdentifier(key); final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index d0f8215cb472..466f49a9c83e 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -5,6 +5,10 @@ package io.airbyte.integrations.destination.bigquery; import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.NESTED_ARRAY_FIELD; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormats; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithEmptyList; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithNull; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfSchema; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getData; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithEmptyObjectAndArray; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithFormats; @@ -283,6 +287,7 @@ void testAnyOf() throws Exception { assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar")); assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids")); assertEquals(extractJsonValues(resultJson, "admin_ids"), extractJsonValues(expectedUsersJson, "admin_ids")); + assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field")); assertEquals(extractJsonValues(resultJson, "job_title"), extractJsonValues(expectedUsersJson, "job_title")); assertEquals(extractJsonValues(resultJson, "has_inbox_seat"), extractJsonValues(expectedUsersJson, "has_inbox_seat")); assertEquals(extractJsonValues(resultJson, "away_mode_enabled"), extractJsonValues(expectedUsersJson, "away_mode_enabled")); @@ -306,7 +311,8 @@ void testAnyOfWithNull() throws Exception { assertEquals(usersActual.size(), 1); final JsonNode resultJson = usersActual.get(0); assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); - assertEquals(extractJsonValues(resultJson, "team_ids"), Collections.emptySet()); + assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids")); + assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field")); assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar")); } @@ -328,6 +334,7 @@ void testAnyOfWithEmptyList() throws Exception { final JsonNode resultJson = usersActual.get(0); assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name")); assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids")); + assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field")); } @Test diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 45cc517e4efc..c2d9e7e9296f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -131,6 +131,22 @@ public static JsonNode getAnyOfSchema() { } ] }, + "all_of_field":{ + "allOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"string" + }, + { + "type":"integer" + } + ] + }, "job_title":{ "type":[ "null", @@ -308,8 +324,19 @@ public static JsonNode getAnyOfFormats() { "avatar": { "image_url": "url_to_avatar.jpg" }, - "team_ids": [1, 2, 3], - "admin_ids": [2, 5], + "team_ids": { + "big_query_array": [1, 2, 3], + "big_query_null": null + }, + "admin_ids": { + "big_query_array": [], + "big_query_null": null + }, + "all_of_field": { + "big_query_array": [4, 5, 6], + "big_query_string": "Some text", + "big_query_integer": 42 + }, "job_title": "title", "has_inbox_seat": true, "away_mode_enabled": false, @@ -323,6 +350,7 @@ public static JsonNode getAnyOfFormatsWithNull() { { "name": "Mukola", "team_ids": null, + "all_of_field": null, "avatar": null } """); @@ -332,7 +360,12 @@ public static JsonNode getAnyOfFormatsWithEmptyList() { return Jsons.deserialize(""" { "name": "Sergii", - "team_ids": [] + "team_ids": [], + "all_of_field": { + "big_query_array": [4, 5, 6], + "big_query_string": "Some text", + "big_query_integer": 42 + } } """); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java index 725b1927c1df..54e07e3540ac 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.java @@ -28,6 +28,12 @@ public abstract class BigQueryRecordFormatter { protected final StandardNameTransformer namingResolver; protected final JsonNode jsonSchema; + /** + * These parameters are required for the correct operation of denormalize version of the connector. + */ + protected final Set invalidKeys = new HashSet<>(); + protected final Set fieldsContainRefDefinitionValue = new HashSet<>(); + public BigQueryRecordFormatter(JsonNode jsonSchema, StandardNameTransformer namingResolver) { this.namingResolver = namingResolver; this.jsonSchema = formatJsonSchema(jsonSchema.deepCopy()); From 7d7345181773f4d2d008bb04e021bca4a68f66d8 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 24 Mar 2022 17:55:13 +0200 Subject: [PATCH 3/6] clean code --- ...ltBigQueryDenormalizedRecordFormatter.java | 19 +++++++++---------- .../BigQueryDenormalizedDestinationTest.java | 4 +--- .../bigquery/BigQuerySQLNameTransformer.java | 6 +++--- .../bigquery/BigQueryDestinationTest.java | 9 ++++----- .../bigquery/BigQueryGcsDestinationTest.java | 5 +++-- .../BigQuerySQLNameTransformerTest.java | 1 - 6 files changed, 20 insertions(+), 24 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index 038637e44ec0..d70199af3ccb 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -27,9 +27,6 @@ import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat; import io.airbyte.integrations.destination.bigquery.JsonSchemaType; import io.airbyte.protocol.models.AirbyteRecordMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.Collections; import java.util.Comparator; @@ -37,6 +34,8 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryRecordFormatter { @@ -70,8 +69,8 @@ private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingR Preconditions.checkArgument(modifiedSchema.isObject() && modifiedSchema.has(PROPERTIES_FIELD)); ObjectNode properties = (ObjectNode) modifiedSchema.get(PROPERTIES_FIELD); Jsons.keys(properties).stream() - .peek(addToRefList(properties)) - .forEach(key -> properties.replace(key, getFileDefinition(properties.get(key)))); + .peek(addToRefList(properties)) + .forEach(key -> properties.replace(key, getFileDefinition(properties.get(key)))); return modifiedSchema; } @@ -270,7 +269,7 @@ private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, fin list = reader.readValue(fieldDefinition.get(fieldName)); } catch (IOException e) { throw new IllegalStateException( - String.format("Failed to read and process the following field - %s", fieldDefinition)); + String.format("Failed to read and process the following field - %s", fieldDefinition)); } ObjectNode objectNode = mapper.createObjectNode(); list.forEach(field -> { @@ -278,10 +277,10 @@ private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, fin }); return Jsons.jsonNode(ImmutableMap.builder() - .put("type", "object") - .put(PROPERTIES_FIELD, objectNode) - .put("additionalProperties", false) - .build()); + .put("type", "object") + .put(PROPERTIES_FIELD, objectNode) + .put("additionalProperties", false) + .build()); } private static Builder getField(final StandardNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 466f49a9c83e..57d45436c934 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.bigquery; -import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.NESTED_ARRAY_FIELD; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormats; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithEmptyList; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithNull; @@ -55,7 +54,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -394,7 +392,7 @@ private Set extractJsonValues(final JsonNode node, final String attribut if (jsonNode.isArray()) { jsonNode.forEach(arrayNodeValue -> resultSet.add(arrayNodeValue.textValue())); } else if (jsonNode.isObject()) { - resultSet.addAll(extractJsonValues(jsonNode, NESTED_ARRAY_FIELD)); + resultSet.addAll(extractJsonValues(jsonNode, "big_query_array")); } else { resultSet.add(jsonNode.textValue()); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java index 10d3ec442274..8b35ff9ccf7c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java @@ -24,9 +24,9 @@ public String convertStreamName(final String input) { /** * BigQuery allows a number to be the first character of a namespace. Datasets that begin with an - * underscore are hidden databases, and we cannot query .INFORMATION_SCHEMA. - * So we append a letter instead of underscore for normalization. - * Reference: https://cloud.google.com/bigquery/docs/datasets#dataset-naming + * underscore are hidden databases, and we cannot query .INFORMATION_SCHEMA. So we + * append a letter instead of underscore for normalization. Reference: + * https://cloud.google.com/bigquery/docs/datasets#dataset-naming */ @Override public String getNamespace(final String input) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index aafe47a63d85..4d9fa9203348 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -112,8 +112,7 @@ class BigQueryDestinationTest { private static Stream datasetIdResetterProvider() { // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` return Stream.of( - Arguments.arguments(new DatasetIdResetter(config -> { - })), + Arguments.arguments(new DatasetIdResetter(config -> {})), Arguments.arguments(new DatasetIdResetter( config -> { final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); @@ -154,9 +153,9 @@ void setup(final TestInfo info) throws IOException { catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field - .of("id", JsonSchemaType.STRING)) + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field + .of("id", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java index 1394968be537..9c877a82da4a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -151,12 +151,13 @@ protected void tearDownGcs() { } } - @Override void testWritePartitionOverUnpartitioned(final DatasetIdResetter resetDatasetId) throws Exception { // This test is skipped for GCS staging mode because we load Avro data to BigQuery, but do not // use the use_avro_logical_types flag to automatically convert the Avro logical timestamp // type. Therefore, the emission timestamp, which should be used as the partition field, has - // an incorrect type. See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types + // an incorrect type. See + // https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java index 7817b9d6d2fc..d5162d694293 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java @@ -19,7 +19,6 @@ class BigQuerySQLNameTransformerTest { "*_namespace", "__namespace", "_namespace", "_namespace"); - private static final Map RAW_TO_NORMALIZED_NAMESPACES = Map.of( "name-space", "name_space", "spécial_character", "special_character", From 2c167489ea8bd3ee2350feeb30fbd90e08121bfa Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Tue, 5 Apr 2022 14:08:41 +0300 Subject: [PATCH 4/6] updated tests --- .../BigQueryDenormalizedTestDataUtils.java | 165 +----------------- .../resources/testdata/dataAnyOfFormats.json | 26 +++ .../dataAnyOfFormatsWithEmptyList.json | 9 + .../testdata/dataAnyOfFormatsWithNull.json | 6 + .../resources/testdata/schemaAnyOfAllOf.json | 83 +++++++++ 5 files changed, 128 insertions(+), 161 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormats.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithEmptyList.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithNull.json create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 88d1292685d5..04f211ec19fc 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -19,120 +19,7 @@ public static JsonNode getSchema() { } public static JsonNode getAnyOfSchema() { - return Jsons.deserialize(""" - { - "type":"object", - "properties":{ - "id":{ - "type":[ - "null", - "string" - ] - }, - "name":{ - "type":[ - "null", - "string" - ] - }, - "type":{ - "type":[ - "null", - "string" - ] - }, - "email":{ - "type":[ - "null", - "string" - ] - }, - "avatar":{ - "type":[ - "null", - "object" - ], - "properties":{ - "image_url":{ - "type":[ - "null", - "string" - ] - } - }, - "additionalProperties":false - }, - "team_ids":{ - "anyOf":[ - { - "type":"array", - "items":{ - "type":"integer" - } - }, - { - "type":"null" - } - ] - }, - "admin_ids":{ - "anyOf":[ - { - "type":"array", - "items":{ - "type":"integer" - } - }, - { - "type":"null" - } - ] - }, - "all_of_field":{ - "allOf":[ - { - "type":"array", - "items":{ - "type":"integer" - } - }, - { - "type":"string" - }, - { - "type":"integer" - } - ] - }, - "job_title":{ - "type":[ - "null", - "string" - ] - }, - "has_inbox_seat":{ - "type":[ - "null", - "boolean" - ] - }, - "away_mode_enabled":{ - "type":[ - "null", - "boolean" - ] - }, - "away_mode_reassign":{ - "type":[ - "null", - "boolean" - ] - } - - }, - "additionalProperties":false - } - """); + return getTestDataFromResourceJson("schemaAnyOfAllOf.json"); } public static JsonNode getSchemaWithFormats() { @@ -156,59 +43,15 @@ public static JsonNode getDataWithFormats() { } public static JsonNode getAnyOfFormats() { - return Jsons.deserialize(""" - { - "id": "ID", - "name": "Andrii", - "type": "some_type", - "email": "email@email.com", - "avatar": { - "image_url": "url_to_avatar.jpg" - }, - "team_ids": { - "big_query_array": [1, 2, 3], - "big_query_null": null - }, - "admin_ids": { - "big_query_array": [], - "big_query_null": null - }, - "all_of_field": { - "big_query_array": [4, 5, 6], - "big_query_string": "Some text", - "big_query_integer": 42 - }, - "job_title": "title", - "has_inbox_seat": true, - "away_mode_enabled": false, - "away_mode_reassign": false - } - """); + return getTestDataFromResourceJson("dataAnyOfFormats.json"); } public static JsonNode getAnyOfFormatsWithNull() { - return Jsons.deserialize(""" - { - "name": "Mukola", - "team_ids": null, - "all_of_field": null, - "avatar": null - } - """); + return getTestDataFromResourceJson("dataAnyOfFormatsWithNull.json"); } public static JsonNode getAnyOfFormatsWithEmptyList() { - return Jsons.deserialize(""" - { - "name": "Sergii", - "team_ids": [], - "all_of_field": { - "big_query_array": [4, 5, 6], - "big_query_string": "Some text", - "big_query_integer": 42 - } - } - """); + return getTestDataFromResourceJson("dataAnyOfFormatsWithEmptyList.json"); } public static JsonNode getDataWithJSONDateTimeFormats() { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormats.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormats.json new file mode 100644 index 000000000000..631c91e6745f --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormats.json @@ -0,0 +1,26 @@ +{ + "id": "ID", + "name": "Andrii", + "type": "some_type", + "email": "email@email.com", + "avatar": { + "image_url": "url_to_avatar.jpg" + }, + "team_ids": { + "big_query_array": [1, 2, 3], + "big_query_null": null + }, + "admin_ids": { + "big_query_array": [], + "big_query_null": null + }, + "all_of_field": { + "big_query_array": [4, 5, 6], + "big_query_string": "Some text", + "big_query_integer": 42 + }, + "job_title": "title", + "has_inbox_seat": true, + "away_mode_enabled": false, + "away_mode_reassign": false +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithEmptyList.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithEmptyList.json new file mode 100644 index 000000000000..d199a7ced4a4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithEmptyList.json @@ -0,0 +1,9 @@ +{ + "name": "Sergii", + "team_ids": [], + "all_of_field": { + "big_query_array": [4, 5, 6], + "big_query_string": "Some text", + "big_query_integer": 42 + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithNull.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithNull.json new file mode 100644 index 000000000000..31b3d5a86723 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/dataAnyOfFormatsWithNull.json @@ -0,0 +1,6 @@ +{ + "name": "Mukola", + "team_ids": null, + "all_of_field": null, + "avatar": null +} diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json new file mode 100644 index 000000000000..2778fe3bdeec --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json @@ -0,0 +1,83 @@ +{ + "type":"object", + "properties":{ + "id":{ + "type":["null", "string"] + }, + "name":{ + "type":["null", "string"] + }, + "type":{ + "type":["null", "string"] + }, + "email":{ + "type":["null", "string"] + }, + "avatar":{ + "type":["null", "object"], + "properties":{ + "image_url":{ + "type":["null", "string"] + } + } + }, + "team_ids":{ + "anyOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"null" + } + ] + }, + "admin_ids":{ + "anyOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"null" + } + ] + }, + "all_of_field":{ + "allOf":[ + { + "type":"array", + "items":{ + "type":"integer" + } + }, + { + "type":"string" + }, + { + "type":"integer" + } + ] + }, + "job_title":{ + "type":[ + "null", + "string" + ] + }, + "has_inbox_seat":{ + "type":["null", "boolean"] + }, + "away_mode_enabled":{ + "type":["null", "boolean"] + }, + "away_mode_reassign":{ + "type":["null", "boolean"] + } + + } +} From 4a7c85c5cf81d71235818a29033a1ef4cf5565da Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Tue, 5 Apr 2022 18:05:35 +0300 Subject: [PATCH 5/6] updated destination_specs yaml file --- .../resources/testdata/schemaAnyOfAllOf.json | 88 +++++++++---------- 1 file changed, 42 insertions(+), 46 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json index 2778fe3bdeec..422f173a6aae 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/resources/testdata/schemaAnyOfAllOf.json @@ -1,83 +1,79 @@ { - "type":"object", - "properties":{ - "id":{ - "type":["null", "string"] + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] }, - "name":{ - "type":["null", "string"] + "name": { + "type": ["null", "string"] }, - "type":{ - "type":["null", "string"] + "type": { + "type": ["null", "string"] }, - "email":{ - "type":["null", "string"] + "email": { + "type": ["null", "string"] }, - "avatar":{ - "type":["null", "object"], - "properties":{ - "image_url":{ - "type":["null", "string"] + "avatar": { + "type": ["null", "object"], + "properties": { + "image_url": { + "type": ["null", "string"] } } }, - "team_ids":{ - "anyOf":[ + "team_ids": { + "anyOf": [ { - "type":"array", - "items":{ - "type":"integer" + "type": "array", + "items": { + "type": "integer" } }, { - "type":"null" + "type": "null" } ] }, - "admin_ids":{ - "anyOf":[ + "admin_ids": { + "anyOf": [ { - "type":"array", - "items":{ - "type":"integer" + "type": "array", + "items": { + "type": "integer" } }, { - "type":"null" + "type": "null" } ] }, - "all_of_field":{ - "allOf":[ + "all_of_field": { + "allOf": [ { - "type":"array", - "items":{ - "type":"integer" + "type": "array", + "items": { + "type": "integer" } }, { - "type":"string" + "type": "string" }, { - "type":"integer" + "type": "integer" } ] }, - "job_title":{ - "type":[ - "null", - "string" - ] + "job_title": { + "type": ["null", "string"] }, - "has_inbox_seat":{ - "type":["null", "boolean"] + "has_inbox_seat": { + "type": ["null", "boolean"] }, - "away_mode_enabled":{ - "type":["null", "boolean"] + "away_mode_enabled": { + "type": ["null", "boolean"] }, - "away_mode_reassign":{ - "type":["null", "boolean"] + "away_mode_reassign": { + "type": ["null", "boolean"] } - } } From c7ff0bd7b0e6079e8524622341683f915a95acf8 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Tue, 5 Apr 2022 18:52:49 +0300 Subject: [PATCH 6/6] updated version of connector and docs --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- .../connectors/destination-bigquery-denormalized/Dockerfile | 2 +- docs/integrations/destinations/bigquery.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 5f9679007270..2cae9ce0726c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -36,7 +36,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.2.14 + dockerImageTag: 0.2.15 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 55f1e5b8f145..129d3c610e37 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -495,7 +495,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.14" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.15" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index e38c692bb017..e42653cb75d8 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.14 +LABEL io.airbyte.version=0.2.15 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 7828f188e4dc..a7285da10db7 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -236,6 +236,7 @@ This uploads data directly from your source to BigQuery. While this is faster to | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------| :--- | +| 0.2.15 | 2022-04-05 | [11166](https://github.com/airbytehq/airbyte/pull/11166) | Fixed handling of anyOf and allOf fields | | 0.2.14 | 2022-04-02 | [11620](https://github.com/airbytehq/airbyte/pull/11620) | Updated spec | | 0.2.13 | 2022-04-01 | [11636](https://github.com/airbytehq/airbyte/pull/11636) | Added new unit tests | | 0.2.12 | 2022-03-28 | [11454](https://github.com/airbytehq/airbyte/pull/11454) | Integration test enhancement for picking test-data and schemas |