Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Destination BQ Denormalized: handle null values in fields described by a $ref schema #7804

Merged
merged 3 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";
private static final String REF_DEFINITION_KEY = "$ref";
private static final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

private final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

@Override
protected String getTargetTableName(final String streamName) {
Expand Down Expand Up @@ -73,7 +74,7 @@ protected Schema getBigQuerySchema(final JsonNode jsonSchema) {
return com.google.cloud.bigquery.Schema.of(fieldList);
}

private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
private List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD));
final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD);
List<Field> tmpFields = Jsons.keys(properties).stream()
Expand All @@ -96,7 +97,7 @@ private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer nami
* Currently, AirByte doesn't support parsing value by $ref key definition.
* The issue to track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
*/
private static Consumer<String> addToRefList(ObjectNode properties) {
private Consumer<String> addToRefList(ObjectNode properties) {
return key -> {
if (properties.get(key).has(REF_DEFINITION_KEY)) {
fieldsContainRefDefinitionValue.add(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
// replace ObjectNode with TextNode for fields with $ref definition key
// Do not need to iterate through all JSON Object nodes, only first nesting object.
if (!fieldsWithRefDefinition.isEmpty()) {
fieldsWithRefDefinition.forEach(key -> data.put(key, data.get(key).toString()));
fieldsWithRefDefinition.forEach(key -> {
if (data.get(key) != null && !data.get(key).isNull()){
data.put(key, data.get(key).toString());
}
});
}
data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -32,17 +33,21 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.assertj.core.util.Sets;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -57,33 +62,20 @@
class BigQueryDenormalizedDestinationTest {

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
private static final Set<String> AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID);

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationTest.class);

private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb";
private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getData())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithEmptyObjectAndArray())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONDateTimeFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS5 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONWithReference())
.withEmittedAt(NOW.toEpochMilli()));

private static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData());
private static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray());
private static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats());
private static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats());
private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference());
private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}"));
private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}"));

private JsonNode config;

Expand Down Expand Up @@ -122,6 +114,8 @@ void setup(final TestInfo info) throws IOException {
MESSAGE_USERS3.getRecord().setNamespace(datasetId);
MESSAGE_USERS4.getRecord().setNamespace(datasetId);
MESSAGE_USERS5.getRecord().setNamespace(datasetId);
MESSAGE_USERS6.getRecord().setNamespace(datasetId);
EMPTY_MESSAGE.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -258,12 +252,20 @@ void testJsonReferenceDefinition() throws Exception {
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS5);
consumer.accept(MESSAGE_USERS6);
consumer.accept(EMPTY_MESSAGE);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode resultJson = usersActual.get(0);
assertEquals(usersActual.size(), 1);
assertEquals(extractJsonValues(resultJson, "users"), Set.of("{\"name\":\"John\",\"surname\":\"Adams\"}"));
final Set<String> actual =
retrieveRecordsAsJson(USERS_STREAM_NAME).stream().flatMap(x -> extractJsonValues(x, "users").stream()).collect(Collectors.toSet());

final Set<String> expected = Sets.set(
"{\"name\":\"John\",\"surname\":\"Adams\"}",
null // we expect one record to have not had the users field set
);

assertEquals(2, actual.size());
assertEquals(expected, actual);
}

private Set<String> extractJsonValues(final JsonNode node, final String attributeName) {
Expand All @@ -282,6 +284,13 @@ private Set<String> extractJsonValues(final JsonNode node, final String attribut
return resultSet;
}

private JsonNode removeAirbyteMetadataFields(JsonNode record) {
for (String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) {
((ObjectNode) record).remove(airbyteMetadataField);
}
return record;
}

private List<JsonNode> retrieveRecordsAsJson(final String tableName) throws Exception {
final QueryJobConfiguration queryConfig =
QueryJobConfiguration
Expand All @@ -294,6 +303,7 @@ private List<JsonNode> retrieveRecordsAsJson(final String tableName) throws Exce
.stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false)
.map(v -> v.get("jsonValue").getStringValue())
.map(Jsons::deserialize)
.map(this::removeAirbyteMetadataFields)
.collect(Collectors.toList());
}

Expand All @@ -304,4 +314,10 @@ private static Stream<Arguments> schemaAndDataProvider() {
arguments(getSchema(), MESSAGE_USERS2));
}

private static AirbyteMessage createRecordMessage(String stream, JsonNode data) {
return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(stream)
.withData(data)
.withEmittedAt(NOW.toEpochMilli()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;

public class BigQueryDenormalizedTestDataUtils {
Expand Down Expand Up @@ -203,22 +204,19 @@ public static JsonNode getDataWithJSONDateTimeFormats() {
}

public static JsonNode getDataWithJSONWithReference() {
return Jsons.deserialize(
"{\n"
+ " \"users\" :{\n"
+ " \"name\": \"John\",\n"
+ " \"surname\": \"Adams"
+"\"\n"
+ " }\n"
+ "}");
return Jsons.jsonNode(
ImmutableMap.of("users", ImmutableMap.of(
"name", "John",
"surname", "Adams"
)));
}

public static JsonNode getSchemaWithReferenceDefinition() {
return Jsons.deserialize(
"{ \n"
+ " \"type\" : [ \"null\", \"object\" ],\n"
+ " \"properties\" : {\n"
+" \"users\": {\n"
+ " \"users\": {\n"
+ " \"$ref\": \"#/definitions/users_\"\n"
+
" }\n"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2021-11-09 | [\#7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition |
| 0.1.9 | 2021-11-08 | [\#7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key |
| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery |
| 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
Expand Down