Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination bigquery denormalize "allOf" and "anyOf" fix #11166

Merged
merged 11 commits into from
Apr 5, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.2.14
dockerImageTag: 0.2.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.14"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.integrations.destination.bigquery.formatter;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
Expand All @@ -24,11 +27,10 @@
import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat;
import io.airbyte.integrations.destination.bigquery.JsonSchemaType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -39,26 +41,37 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBigQueryDenormalizedRecordFormatter.class);

private final Set<String> invalidKeys = new HashSet<>();

public static final String NESTED_ARRAY_FIELD = "big_query_array";
protected static final String PROPERTIES_FIELD = "properties";
private static final String TYPE_FIELD = "type";
private static final String ALL_OF_FIELD = "allOf";
private static final String ANY_OF_FIELD = "anyOf";
private static final String ARRAY_ITEMS_FIELD = "items";
private static final String FORMAT_FIELD = "format";
private static final String REF_DEFINITION_KEY = "$ref";

private final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();
private static final ObjectMapper mapper = new ObjectMapper();

public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) {
super(jsonSchema, namingResolver);
}

@Override
protected JsonNode formatJsonSchema(final JsonNode jsonSchema) {
populateEmptyArrays(jsonSchema);
surroundArraysByObjects(jsonSchema);
return jsonSchema;
var modifiedJsonSchema = formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
populateEmptyArrays(modifiedJsonSchema);
surroundArraysByObjects(modifiedJsonSchema);
return modifiedJsonSchema;
}

private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingResolver, final JsonNode jsonSchema) {
LOGGER.info("getSchemaFields : " + jsonSchema + " namingResolver " + namingResolver);
final JsonNode modifiedSchema = jsonSchema.deepCopy();
Preconditions.checkArgument(modifiedSchema.isObject() && modifiedSchema.has(PROPERTIES_FIELD));
ObjectNode properties = (ObjectNode) modifiedSchema.get(PROPERTIES_FIELD);
Jsons.keys(properties).stream()
.peek(addToRefList(properties))
.forEach(key -> properties.replace(key, getFileDefinition(properties.get(key))));
return modifiedSchema;
}

private List<JsonNode> findArrays(final JsonNode node) {
Expand Down Expand Up @@ -199,7 +212,7 @@ public Schema getBigQuerySchema(final JsonNode jsonSchema) {
fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP));
}
LOGGER.info("Airbyte Schema is transformed from {} to {}.", jsonSchema, fieldList);
return com.google.cloud.bigquery.Schema.of(fieldList);
return Schema.of(fieldList);
}

private List<Field> getSchemaFields(final StandardNameTransformer namingResolver, final JsonNode jsonSchema) {
Expand Down Expand Up @@ -235,10 +248,47 @@ private Consumer<String> addToRefList(final ObjectNode properties) {
};
}

private static JsonNode getFileDefinition(final JsonNode fieldDefinition) {
if (fieldDefinition.has(TYPE_FIELD)) {
return fieldDefinition;
} else {
if (fieldDefinition.has(ANY_OF_FIELD) && fieldDefinition.get(ANY_OF_FIELD).isArray()) {
return allOfAndAnyOfFieldProcessing(ANY_OF_FIELD, fieldDefinition);
}
if (fieldDefinition.has(ALL_OF_FIELD) && fieldDefinition.get(ALL_OF_FIELD).isArray()) {
return allOfAndAnyOfFieldProcessing(ALL_OF_FIELD, fieldDefinition);
}
}
return fieldDefinition;
}

private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, final JsonNode fieldDefinition) {
ObjectReader reader = mapper.readerFor(new TypeReference<List<JsonNode>>() {});
List<JsonNode> list;
try {
list = reader.readValue(fieldDefinition.get(fieldName));
} catch (IOException e) {
throw new IllegalStateException(
String.format("Failed to read and process the following field - %s", fieldDefinition));
}
ObjectNode objectNode = mapper.createObjectNode();
list.forEach(field -> {
objectNode.set("big_query_" + field.get("type").asText(), field);
});

return Jsons.jsonNode(ImmutableMap.builder()
.put("type", "object")
.put(PROPERTIES_FIELD, objectNode)
.put("additionalProperties", false)
.build());
}

private static Builder getField(final StandardNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) {
final String fieldName = namingResolver.getIdentifier(key);
final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING);
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, fieldDefinition.get(TYPE_FIELD));
JsonNode updatedFileDefinition = getFileDefinition(fieldDefinition);
JsonNode type = updatedFileDefinition.get(TYPE_FIELD);
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, type);
for (int i = 0; i < fieldTypes.size(); i++) {
final JsonSchemaType fieldType = fieldTypes.get(i);
if (fieldType == JsonSchemaType.NULL) {
Expand All @@ -256,8 +306,8 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi
}
case ARRAY -> {
final JsonNode items;
if (fieldDefinition.has("items")) {
items = fieldDefinition.get("items");
if (updatedFileDefinition.has("items")) {
items = updatedFileDefinition.get("items");
} else {
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
// this is handler for case when we get "array" without "items"
Expand All @@ -268,10 +318,10 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi
}
case OBJECT -> {
final JsonNode properties;
if (fieldDefinition.has(PROPERTIES_FIELD)) {
properties = fieldDefinition.get(PROPERTIES_FIELD);
if (updatedFileDefinition.has(PROPERTIES_FIELD)) {
properties = updatedFileDefinition.get(PROPERTIES_FIELD);
} else {
properties = fieldDefinition;
properties = updatedFileDefinition;
}
final FieldList fieldList = FieldList.of(Jsons.keys(properties)
.stream()
Expand All @@ -292,7 +342,7 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi
}

// If a specific format is defined, use their specific type instead of the JSON's one
final JsonNode fieldFormat = fieldDefinition.get(FORMAT_FIELD);
final JsonNode fieldFormat = updatedFileDefinition.get(FORMAT_FIELD);
if (fieldFormat != null) {
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
if (schemaFormat != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter.NESTED_ARRAY_FIELD;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormats;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithEmptyList;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfFormatsWithNull;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getAnyOfSchema;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getData;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithEmptyObjectAndArray;
import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getDataWithFormats;
Expand Down Expand Up @@ -86,6 +89,9 @@ class BigQueryDenormalizedDestinationTest {
private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference());
private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}"));
private static final AirbyteMessage MESSAGE_USERS7 = createRecordMessage(USERS_STREAM_NAME, getDataWithNestedDatetimeInsideNullObject());
private static final AirbyteMessage MESSAGE_USERS8 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormats());
private static final AirbyteMessage MESSAGE_USERS9 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithNull());
private static final AirbyteMessage MESSAGE_USERS10 = createRecordMessage(USERS_STREAM_NAME, getAnyOfFormatsWithEmptyList());
private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}"));

private JsonNode config;
Expand Down Expand Up @@ -129,6 +135,9 @@ void setup(final TestInfo info) throws IOException {
MESSAGE_USERS5.getRecord().setNamespace(datasetId);
MESSAGE_USERS6.getRecord().setNamespace(datasetId);
MESSAGE_USERS7.getRecord().setNamespace(datasetId);
MESSAGE_USERS8.getRecord().setNamespace(datasetId);
MESSAGE_USERS9.getRecord().setNamespace(datasetId);
MESSAGE_USERS10.getRecord().setNamespace(datasetId);
EMPTY_MESSAGE.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
Expand Down Expand Up @@ -253,6 +262,79 @@ void testWriteWithFormat() throws Exception {
assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema);
}

@Test
void testAnyOf() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS8);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS8.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "id"), extractJsonValues(expectedUsersJson, "id"));
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "type"), extractJsonValues(expectedUsersJson, "type"));
assertEquals(extractJsonValues(resultJson, "email"), extractJsonValues(expectedUsersJson, "email"));
assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar"));
assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids"));
assertEquals(extractJsonValues(resultJson, "admin_ids"), extractJsonValues(expectedUsersJson, "admin_ids"));
assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field"));
assertEquals(extractJsonValues(resultJson, "job_title"), extractJsonValues(expectedUsersJson, "job_title"));
assertEquals(extractJsonValues(resultJson, "has_inbox_seat"), extractJsonValues(expectedUsersJson, "has_inbox_seat"));
assertEquals(extractJsonValues(resultJson, "away_mode_enabled"), extractJsonValues(expectedUsersJson, "away_mode_enabled"));
assertEquals(extractJsonValues(resultJson, "away_mode_reassign"), extractJsonValues(expectedUsersJson, "away_mode_reassign"));
}

@Test
void testAnyOfWithNull() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS9);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS9.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids"));
assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field"));
assertEquals(extractJsonValues(resultJson, "avatar"), extractJsonValues(expectedUsersJson, "avatar"));
}

@Test
void testAnyOfWithEmptyList() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getAnyOfSchema()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS10);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS10.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "team_ids"), extractJsonValues(expectedUsersJson, "team_ids"));
assertEquals(extractJsonValues(resultJson, "all_of_field"), extractJsonValues(expectedUsersJson, "all_of_field"));
}

@Test
void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
Expand Down Expand Up @@ -310,7 +392,7 @@ private Set<String> extractJsonValues(final JsonNode node, final String attribut
if (jsonNode.isArray()) {
jsonNode.forEach(arrayNodeValue -> resultSet.add(arrayNodeValue.textValue()));
} else if (jsonNode.isObject()) {
resultSet.addAll(extractJsonValues(jsonNode, NESTED_ARRAY_FIELD));
resultSet.addAll(extractJsonValues(jsonNode, "big_query_array"));
} else {
resultSet.add(jsonNode.textValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public static JsonNode getSchema() {
return getTestDataFromResourceJson("schema.json");
}

public static JsonNode getAnyOfSchema() {
return getTestDataFromResourceJson("schemaAnyOfAllOf.json");
}

public static JsonNode getSchemaWithFormats() {
return getTestDataFromResourceJson("schemaWithFormats.json");
}
Expand All @@ -38,6 +42,18 @@ public static JsonNode getDataWithFormats() {
return getTestDataFromResourceJson("dataWithFormats.json");
}

public static JsonNode getAnyOfFormats() {
return getTestDataFromResourceJson("dataAnyOfFormats.json");
}

public static JsonNode getAnyOfFormatsWithNull() {
return getTestDataFromResourceJson("dataAnyOfFormatsWithNull.json");
}

public static JsonNode getAnyOfFormatsWithEmptyList() {
return getTestDataFromResourceJson("dataAnyOfFormatsWithEmptyList.json");
}

public static JsonNode getDataWithJSONDateTimeFormats() {
return getTestDataFromResourceJson("dataWithJSONDateTimeFormats.json");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"id": "ID",
"name": "Andrii",
"type": "some_type",
"email": "email@email.com",
"avatar": {
"image_url": "url_to_avatar.jpg"
},
"team_ids": {
"big_query_array": [1, 2, 3],
"big_query_null": null
},
"admin_ids": {
"big_query_array": [],
"big_query_null": null
},
"all_of_field": {
"big_query_array": [4, 5, 6],
"big_query_string": "Some text",
"big_query_integer": 42
},
"job_title": "title",
"has_inbox_seat": true,
"away_mode_enabled": false,
"away_mode_reassign": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "Sergii",
"team_ids": [],
"all_of_field": {
"big_query_array": [4, 5, 6],
"big_query_string": "Some text",
"big_query_integer": 42
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"name": "Mukola",
"team_ids": null,
"all_of_field": null,
"avatar": null
}
Loading