Skip to content

Commit

Permalink
🐛 BigQuery source: Fix nested structs (#6135)
Browse files Browse the repository at this point in the history
* Fixed parsing nested Structs

* Update bigquery.md

* Clean up

* Update Dockerfile
  • Loading branch information
irynakruk authored Sep 17, 2021
1 parent 8094a1d commit 4625d90
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c",
"name": "BigQuery",
"dockerRepository": "airbyte/source-bigquery",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@
- sourceDefinitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
name: BigQuery
dockerRepository: airbyte/source-bigquery
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/bigquery
- sourceDefinitionId: 90916976-a132-4ce9-8bce-82a03dd58788
name: BambooHR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,18 @@ private void setJsonField(Field field, FieldValue fieldValue, ObjectNode node) {
}
} else if (fieldValue.getAttribute().equals(Attribute.RECORD)) {
ObjectNode newNode = node.putObject(fieldName);
field.getSubFields().forEach(recordField -> {
setJsonField(recordField, fieldValue.getRecordValue().get(recordField.getName()), newNode);
});
FieldList subFields = field.getSubFields();
try {
// named get doesn't work here with nested arrays and objects; index is the only correlation between
// field and field value
if (subFields != null && !subFields.isEmpty()) {
for (int i = 0; i < subFields.size(); i++) {
setJsonField(field.getSubFields().get(i), fieldValue.getRecordValue().get(i), newNode);
}
}
} catch (UnsupportedOperationException e) {
LOGGER.error("Failed to parse Object field with name: ", fieldName, e.getMessage());
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-bigquery/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-bigquery
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-bigquery
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_CREDS;
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_PROJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class BigQuerySourceTest {

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
private static final String STREAM_NAME = "id_and_name";

private BigQueryDatabase database;
private Dataset dataset;
private JsonNode config;

@BeforeEach
void setUp() throws IOException, SQLException {
if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH
+ ". Override by setting setting path with the CREDENTIALS_PATH constant.");
}

final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH));

final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString);
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)
.put(CONFIG_DATASET_ID, datasetId)
.build());

database = new BigQueryDatabase(config.get(CONFIG_PROJECT_ID).asText(), credentialsJsonString);

final DatasetInfo datasetInfo =
DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(datasetLocation).build();
dataset = database.getBigQuery().create(datasetInfo);

database.execute(
"CREATE TABLE " + datasetId
+ ".id_and_name(id INT64, array_val ARRAY<STRUCT<key string, value STRUCT<string_val string>>>, object_val STRUCT<val_array ARRAY<STRUCT<value_str1 string>>, value_str2 string>);");
database.execute(
"INSERT INTO " + datasetId
+ ".id_and_name (id, array_val, object_val) VALUES "
+ "(1, [STRUCT('test1_1', STRUCT('struct1_1')), STRUCT('test1_2', STRUCT('struct1_2'))], STRUCT([STRUCT('value1_1'), STRUCT('value1_2')], 'test1_1')), "
+ "(2, [STRUCT('test2_1', STRUCT('struct2_1')), STRUCT('test2_2', STRUCT('struct2_2'))], STRUCT([STRUCT('value2_1'), STRUCT('value2_2')], 'test2_1')), "
+ "(3, [STRUCT('test3_1', STRUCT('struct3_1')), STRUCT('test3_2', STRUCT('struct3_2'))], STRUCT([STRUCT('value3_1'), STRUCT('value3_2')], 'test3_1'));");
}

@AfterEach
void tearDown() {
database.cleanDataSet(dataset.getDatasetId().getDataset());
}

@Test
public void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));

assertNotNull(actualMessages);
assertEquals(3, actualMessages.size());
}

private ConfiguredAirbyteCatalog getConfiguredCatalog() {
return CatalogHelpers.createConfiguredAirbyteCatalog(
STREAM_NAME,
config.get(CONFIG_DATASET_ID).asText(),
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("array_val", JsonSchemaPrimitive.ARRAY),
Field.of("object_val", JsonSchemaPrimitive.OBJECT));
}

}
3 changes: 2 additions & 1 deletion docs/integrations/sources/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ Once you've configured BigQuery as a source, delete the Service Account Key from

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.2 | 2021-09-15 | [#6135](https://github.com/airbytehq/airbyte/pull/6135) | 🐛 BigQuery source: Fix nested structs |
| 0.1.1 | 2021-07-28 | [#4981](https://github.com/airbytehq/airbyte/pull/4981) | 🐛 BigQuery source: Fix nested arrays |
| 0.1.0 | 2021-07-22 | [#4457](https://github.com/airbytehq/airbyte/pull/4457) | 🎉 New Source: Big Query. |
| 0.1.0 | 2021-07-22 | [#4457](https://github.com/airbytehq/airbyte/pull/4457) | 🎉 New Source: Big Query. |

0 comments on commit 4625d90

Please sign in to comment.