From 36fb819e3a109c08e812d54a07cb71c11888dd52 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 22 Sep 2022 09:39:59 -0700 Subject: [PATCH 1/7] stop overriding namespace? --- .../integrations/destination/bigquery/BigQueryDestination.java | 1 - .../destination/bigquery/BigQueryRecordConsumer.java | 1 - 2 files changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index fc96b7463b82..69cc285f30b4 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -209,7 +209,6 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); - stream.setNamespace(BigQueryUtils.getDatasetId(config)); final String streamName = stream.getName(); final UploaderConfig uploaderConfig = UploaderConfig .builder() diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index 73988c853026..6e8002e4be04 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -46,7 +46,6 @@ public void acceptTracked(final AirbyteMessage message) { lastStateMessage = message; outputRecordCollector.accept(message); } else if (message.getType() == Type.RECORD) { - message.getRecord().setNamespace(datasetId); processRecord(message); } else { LOGGER.warn("Unexpected message: {}", message.getType()); From 2add62e5bbc9929ca3c73a088d9a7aa8b3b7fa37 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 22 Sep 2022 10:22:11 -0700 Subject: [PATCH 2/7] set namespace if needed --- .../integrations/destination/bigquery/BigQueryDestination.java | 3 +++ .../destination/bigquery/BigQueryRecordConsumer.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 69cc285f30b4..7788ed73d06e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -209,6 +209,9 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); + if (stream.getNamespace() == null) { + stream.setNamespace(BigQueryUtils.getDatasetId(config)); + } final String streamName = stream.getName(); final UploaderConfig uploaderConfig = UploaderConfig .builder() diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index 6e8002e4be04..fd487f9d4072 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -46,6 +46,9 @@ public void acceptTracked(final AirbyteMessage message) { lastStateMessage = message; outputRecordCollector.accept(message); } else if (message.getType() == Type.RECORD) { + if (message.getRecord().getNamespace() == null) { + message.getRecord().setNamespace(datasetId); + } processRecord(message); } else { LOGGER.warn("Unexpected message: {}", message.getType()); From c1e85c55655fc27b94e35b54ae6ef4203de25c27 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 22 Sep 2022 10:23:20 -0700 Subject: [PATCH 3/7] also check for empty namespace --- .../integrations/destination/bigquery/BigQueryDestination.java | 3 ++- .../destination/bigquery/BigQueryRecordConsumer.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 7788ed73d06e..803eddfaf3db 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -55,6 +55,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -209,7 +210,7 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); - if (stream.getNamespace() == null) { + if (StringUtils.isEmpty(stream.getNamespace())) { stream.setNamespace(BigQueryUtils.getDatasetId(config)); } final String streamName = stream.getName(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index fd487f9d4072..92fd666eb645 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public void acceptTracked(final AirbyteMessage message) { lastStateMessage = message; outputRecordCollector.accept(message); } else if (message.getType() == Type.RECORD) { - if (message.getRecord().getNamespace() == null) { + if (StringUtils.isEmpty(message.getRecord().getNamespace())) { message.getRecord().setNamespace(datasetId); } processRecord(message); From 361f87ec5b8ed4a35ff388d7a2bb38ffb2ff22a2 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 22 Sep 2022 10:44:02 -0700 Subject: [PATCH 4/7] version bump + changelog --- .../destination-bigquery-denormalized/Dockerfile | 2 +- .../connectors/destination-bigquery/Dockerfile | 2 +- docs/integrations/destinations/bigquery.md | 12 +++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 42c63f9de784..a1dd7c01d333 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.2 +LABEL io.airbyte.version=1.2.3 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index eb6067b29281..3b5d34f7465e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.1 +LABEL io.airbyte.version=1.2.3 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 2df856e21fdd..649f2a4f97bb 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -134,8 +134,9 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | -| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested | +| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace | +| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage | +| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested | | 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 1.1.15 | 2022-08-22 | [15787](https://github.com/airbytehq/airbyte/pull/15787) | Throw exception if job failed | | 1.1.14 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials | @@ -182,9 +183,10 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage | -| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Wrapping string objects with TextNode | -| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested | +| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace | +| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage | +| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | (bugged, do not use) Wrapping string objects with TextNode | +| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested | | 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) | | 1.1.15 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials | | 1.1.14 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | From 5e8ac8d9b84f5e65b929571fac804e8603bb167d Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 22 Sep 2022 18:41:43 +0000 Subject: [PATCH 5/7] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 9aa8a5992440..6ef18ed858f8 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -27,7 +27,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.2.1 + dockerImageTag: 1.2.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 43186bdfd42b..e35d92b9d8cb 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -285,7 +285,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-bigquery:1.2.1" +- dockerImage: "airbyte/destination-bigquery:1.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: From fa30e95b2965273cb62576a995bb35d662d433ef Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 22 Sep 2022 11:56:12 -0700 Subject: [PATCH 6/7] sanitize dataset id --- .../bigquery/BigQueryDenormalizedDestination.java | 4 ++-- .../integrations/destination/bigquery/BigQueryUtils.java | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index 5caf9d1c71f9..7f0b45eb9e0c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -87,8 +87,8 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream, UploaderConfig uploaderConfig, Map> uploaderMap) throws IOException { - Table existingTable = - uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName()); + String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace()); + Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName()); BigQueryRecordFormatter formatter = uploaderConfig.getFormatter(); if (existingTable != null) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index e2cf6c858da5..2d3d1bb92f41 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -263,7 +263,11 @@ private static String getFormattedBigQueryDateTime(final String dateTimeValue) { public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) { final String srcNamespace = stream.getStream().getNamespace(); final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace; - return NAME_TRANSFORMER.getNamespace(schemaName); + return sanitizeDatasetId(schemaName); + } + + public static String sanitizeDatasetId(String datasetId) { + return NAME_TRANSFORMER.getNamespace(datasetId); } public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSyncMode syncMode) { From 8d4b0230b3b460d4e2ace92e58d0a46ebdf75f01 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 22 Sep 2022 19:40:05 +0000 Subject: [PATCH 7/7] auto-bump connector version [ci skip] --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 6ef18ed858f8..4ddbd1763acd 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -40,7 +40,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 1.2.2 + dockerImageTag: 1.2.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index e35d92b9d8cb..f321e4b92a86 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -495,7 +495,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.2" +- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: