Skip to content

Commit

Permalink
🐛 Destination BigQuery-Denormalized: Fix JSON with $ref Definition ke…
Browse files Browse the repository at this point in the history
…ys (#7736)

* BUG-6638: Fix fields with $ref definition.

* BUG-6638: Added integration tests.

* BUG-6638: Added integration tests.

* BUG-6638: Added integration tests.

* BUG-6638: Added doc and pumped Dockerfile version.

* BUG-6638: Added doc and pumped Dockerfile version.

* BUG-6638: replaced for with forEach

* BUG-6638: Bumped specification
  • Loading branch information
alexandertsukanov authored Nov 9, 2021
1 parent 2d2965b commit 48d8250
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- name: Cassandra
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.8"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
47 changes: 47 additions & 0 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4100,6 +4100,53 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-pinterest:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/pinterest"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Pinterest Spec"
type: "object"
required:
- "client_id"
- "client_secret"
- "refresh_token"
additionalProperties: true
properties:
client_id:
type: "string"
title: "Client id"
description: "Your Pinterest client id. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
>docs</a> for instructions on how to generate it."
airbyte_secret: true
client_secret:
type: "string"
title: "Client secret"
description: "Your Pinterest client secret. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
>docs</a> for instructions on how to generate it."
airbyte_secret: true
refresh_token:
type: "string"
title: "Refresh token"
description: "Your Pinterest refresh token. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
>docs</a> for instructions on how to generate it."
airbyte_secret: true
access_token:
type: "string"
title: "Access token"
description: "Your Pinterest access token. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
>docs</a> for instructions on how to generate it."
airbyte_secret: true
start_date:
type: "string"
title: "Start date"
description: "A date in the format YYYY-MM-DD. If you have not set a date,\
\ it will be 2020-07-28 by default."
examples:
- "2020-07-28"
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-pipedrive:0.1.6"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/pipedrive"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -40,6 +42,8 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
protected static final String NESTED_ARRAY_FIELD = "value";
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";
private static final String REF_DEFINITION_KEY = "$ref";
private static final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

@Override
protected String getTargetTableName(final String streamName) {
Expand All @@ -48,14 +52,13 @@ protected String getTargetTableName(final String streamName) {
return getNamingResolver().getIdentifier(streamName);
}

@Override
protected AirbyteMessageConsumer getRecordConsumer(final BigQuery bigquery,
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final boolean isGcsUploadingMode,
final boolean isKeepFilesInGcs) {
return new BigQueryDenormalizedRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, getNamingResolver());
return new BigQueryDenormalizedRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, getNamingResolver(), fieldsContainRefDefinitionValue);
}

@Override
Expand All @@ -73,10 +76,36 @@ protected Schema getBigQuerySchema(final JsonNode jsonSchema) {
private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD));
final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD);
return Jsons.keys(properties).stream().map(key -> getField(namingResolver, key, properties.get(key)).build()).collect(Collectors.toList());
List<Field> tmpFields = Jsons.keys(properties).stream()
.peek(addToRefList(properties))
.map(key -> getField(namingResolver, key, properties.get(key))
.build())
.collect(Collectors.toList());
if (!fieldsContainRefDefinitionValue.isEmpty()) {
LOGGER.warn("Next fields contain \"$ref\" as Definition: {}. They are going to be saved as String Type column", fieldsContainRefDefinitionValue);
}
return tmpFields;
}

/**
* @param properties - JSON schema with properties
*
* The method is responsible for population of fieldsContainRefDefinitionValue set with keys
* contain $ref definition
*
* Currently, AirByte doesn't support parsing value by $ref key definition.
* The issue to track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
*/
private static Consumer<String> addToRefList(ObjectNode properties) {
return key -> {
if (properties.get(key).has(REF_DEFINITION_KEY)) {
fieldsContainRefDefinitionValue.add(key);
}
};
}

private static Builder getField(final BigQuerySQLNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) {

final String fieldName = namingResolver.getIdentifier(key);
final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING);
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, fieldDefinition.get(TYPE_FIELD));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ public class BigQueryDenormalizedRecordConsumer extends BigQueryRecordConsumer {

private final StandardNameTransformer namingResolver;
private final Set<String> invalidKeys;
private final Set<String> fieldsWithRefDefinition;

public BigQueryDenormalizedRecordConsumer(final BigQuery bigquery,
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final StandardNameTransformer namingResolver) {
final StandardNameTransformer namingResolver,
final Set<String> fieldsWithRefDefinition) {
super(bigquery, writeConfigs, catalog, outputRecordCollector, false, false);
this.fieldsWithRefDefinition = fieldsWithRefDefinition;
this.namingResolver = namingResolver;
invalidKeys = new HashSet<>();
}
Expand All @@ -57,12 +60,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
Preconditions.checkArgument(recordMessage.getData().isObject());
final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData());
// replace ObjectNode with TextNode for fields with $ref definition key
// Do not need to iterate through all JSON Object nodes, only first nesting object.
if (!fieldsWithRefDefinition.isEmpty()) {
fieldsWithRefDefinition.forEach(key -> data.put(key, data.get(key).toString()));
}
data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);

return data;
}

@Override
public void close(boolean hasFailed) {
fieldsWithRefDefinition.clear();
super.close(hasFailed);
}

protected JsonNode formatData(final FieldList fields, final JsonNode root) {
// handles empty objects and arrays
if (fields == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class BigQueryDenormalizedDestinationTest {
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONDateTimeFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS5 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONWithReference())
.withEmittedAt(NOW.toEpochMilli()));


private JsonNode config;

Expand Down Expand Up @@ -116,6 +121,7 @@ void setup(final TestInfo info) throws IOException {
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
MESSAGE_USERS3.getRecord().setNamespace(datasetId);
MESSAGE_USERS4.getRecord().setNamespace(datasetId);
MESSAGE_USERS5.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -242,6 +248,24 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception {
extractJsonValues(resultJson.get("items"), "nested_datetime"));
}

@Test
void testJsonReferenceDefinition() throws Exception {
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithReferenceDefinition()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS5);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode resultJson = usersActual.get(0);
assertEquals(usersActual.size(), 1);
assertEquals(extractJsonValues(resultJson, "users"), Set.of("{\"name\":\"John\",\"surname\":\"Adams\"}"));
}

private Set<String> extractJsonValues(final JsonNode node, final String attributeName) {
final List<JsonNode> valuesNode = node.findValues(attributeName);
final Set<String> resultSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,32 @@ public static JsonNode getDataWithJSONDateTimeFormats() {
+ "}");
}

public static JsonNode getDataWithJSONWithReference() {
return Jsons.deserialize(
"{\n"
+ " \"users\" :{\n"
+ " \"name\": \"John\",\n"
+ " \"surname\": \"Adams"
+"\"\n"
+ " }\n"
+ "}");
}

public static JsonNode getSchemaWithReferenceDefinition() {
return Jsons.deserialize(
"{ \n"
+ " \"type\" : [ \"null\", \"object\" ],\n"
+ " \"properties\" : {\n"
+" \"users\": {\n"
+ " \"$ref\": \"#/definitions/users_\"\n"
+
" }\n"
+ " }\n"
+
"}\n"
+ " ");
}

public static JsonNode getDataWithEmptyObjectAndArray() {
return Jsons.deserialize(
"{\n"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.9 | 2021-11-08 | [\#7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key |
| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery |
| 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
| 0.1.6 | 2021-09-16 | [\#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key |
Expand Down

0 comments on commit 48d8250

Please sign in to comment.