diff --git a/airbyte-integrations/connectors/source-bigquery/metadata.yaml b/airbyte-integrations/connectors/source-bigquery/metadata.yaml index 7bc915c8cb272..e203e3fae69fb 100644 --- a/airbyte-integrations/connectors/source-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/source-bigquery/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c - dockerImageTag: 0.4.1 + dockerImageTag: 0.4.2 dockerRepository: airbyte/source-bigquery documentationUrl: https://docs.airbyte.com/integrations/sources/bigquery githubIssueLabel: source-bigquery diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index d50d679efe277..6c08f587900a5 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -9,6 +9,7 @@ import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable; import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; @@ -128,7 +129,12 @@ protected List>> discoverInternal(fin .name(table.getTableId().getTable()) .fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() .map(f -> { - final StandardSQLTypeName standardType = f.getType().getStandardType(); + final StandardSQLTypeName standardType; + if (f.getType().getStandardType() == StandardSQLTypeName.STRUCT && f.getMode() == Field.Mode.REPEATED) { + standardType = StandardSQLTypeName.ARRAY; + } else + standardType = f.getType().getStandardType(); + return new CommonField<>(f.getName(), standardType); }) .collect(Collectors.toList())) diff --git a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceStructRepeatedTest.java b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceStructRepeatedTest.java new file mode 100644 index 0000000000000..1d7b032925151 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceStructRepeatedTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.bigquery; + +import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.commons.util.MoreIterators; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.CatalogHelpers; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import java.sql.SQLException; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class BigQuerySourceStructRepeatedTest extends AbstractBigQuerySourceTest { + + @Override + public void createTable(String datasetId) throws SQLException { + // create column name interval which should be escaped + database.execute("CREATE TABLE " + datasetId + ".struct_repeated(id int64, key_value_pairs ARRAY>);"); + database.execute("INSERT INTO " + datasetId + ".struct_repeated (id, key_value_pairs) VALUES (1, [('a', 0.7), ('b', 0.8), ('c', 1.2)]);"); + } + + @Test + public void testReadSuccess() throws Exception { + final List actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null)); + + ObjectMapper mapper = new ObjectMapper(); + // JsonNode actualObj = mapper.readTree("{\"key_value_pairs\":[{ \"key\": \"a\",\"value\": \"0.7\"}, + // {\"key\": \"b\",\"value\": \"0.8\"}, {\"key\": \"c\",\"value\": \"1.2\"}]}"); + JsonNode actualObj = mapper.readTree("[{ \"key\": \"a\",\"value\": 0.7}, {\"key\": \"b\",\"value\": 0.8}, {\"key\": \"c\",\"value\": 1.2}]"); + + assertNotNull(actualMessages); + assertEquals(1, actualMessages.size()); + + assertNotNull(actualMessages.get(0).getRecord().getData().get("id")); + assertEquals(actualObj, actualMessages.get(0).getRecord().getData().get("key_value_pairs")); + } + + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return CatalogHelpers.createConfiguredAirbyteCatalog( + "struct_repeated", + config.get(CONFIG_DATASET_ID).asText(), + Field.of("id", JsonSchemaType.NUMBER), + Field.of("key_value_pairs", JsonSchemaType.ARRAY)); + } + +} diff --git a/docs/integrations/sources/bigquery.md b/docs/integrations/sources/bigquery.md index 9c7eea08c178a..b0d73b124294f 100644 --- a/docs/integrations/sources/bigquery.md +++ b/docs/integrations/sources/bigquery.md @@ -88,6 +88,7 @@ Once you've configured BigQuery as a source, delete the Service Account Key from | Version | Date | Pull Request | Subject | |:--------|:-----------| :------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------| +| 0.4.2 | 2024-02-22 | [35503](https://github.com/airbytehq/airbyte/pull/35503) | Source BigQuery: replicating RECORD REPEATED fields | | 0.4.1 | 2024-01-24 | [34453](https://github.com/airbytehq/airbyte/pull/34453) | bump CDK version | | 0.4.0 | 2023-12-18 | [33484](https://github.com/airbytehq/airbyte/pull/33484) | Remove LEGACY state | | 0.3.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |