From 1595379fbb5d124c464bcaab2f1af3298617236a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 2 Aug 2023 15:19:52 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Destinations=20snowflake=20+=20b?= =?UTF-8?q?igquery:=20only=20parse=20catalog=20in=201s1t=20mode=20(#28976)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * only parse catalog in 1s1t mode * one more thing? * logistics --- .../connectors/destination-bigquery/Dockerfile | 2 +- .../connectors/destination-bigquery/metadata.yaml | 2 +- .../destination/bigquery/BigQueryDestination.java | 8 ++++++-- .../bigquery/BigQueryStagingConsumerFactory.java | 7 ++++++- .../connectors/destination-snowflake/Dockerfile | 2 +- .../connectors/destination-snowflake/metadata.yaml | 2 +- .../snowflake/SnowflakeGcsStagingDestination.java | 4 +++- .../snowflake/SnowflakeInternalStagingDestination.java | 8 ++++++-- .../snowflake/SnowflakeS3StagingDestination.java | 4 +++- docs/integrations/destinations/bigquery.md | 1 + docs/integrations/destinations/snowflake.md | 1 + 11 files changed, 30 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 27bc1e2cfd46..5d326232ee7e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.7.1 +LABEL io.airbyte.version=1.7.2 LABEL io.airbyte.name=airbyte/destination-bigquery ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 9624bff05119..e1105adfdc78 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 1.7.1 + dockerImageTag: 1.7.2 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 3f0475c2303e..86192fe861e7 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -235,16 +235,18 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, } else { catalogParser = new CatalogParser(sqlGenerator); } - ParsedCatalog parsedCatalog = catalogParser.parseCatalog(catalog); + final ParsedCatalog parsedCatalog; final BigQuery bigquery = getBigQuery(config); TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { + parsedCatalog = catalogParser.parseCatalog(catalog); typerDeduper = new DefaultTyperDeduper<>( sqlGenerator, new BigQueryDestinationHandler(bigquery, datasetLocation), parsedCatalog); } else { + parsedCatalog = null; typerDeduper = new NoopTyperDeduper(); } @@ -268,13 +270,15 @@ protected Map> getUp final Map> uploaderMap = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { final AirbyteStream stream = configStream.getStream(); - StreamConfig parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); + final StreamConfig parsedStream; final String streamName = stream.getName(); String targetTableName; if (use1s1t) { + parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); targetTableName = parsedStream.id().rawName(); } else { + parsedStream = null; targetTableName = getTargetTableName(streamName); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java index 8ca0b612c1bf..a32cb504c009 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java @@ -107,7 +107,12 @@ private Map createWriteConf Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode"); final AirbyteStream stream = configuredStream.getStream(); - StreamConfig streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); + final StreamConfig streamConfig; + if (TypingAndDedupingFlag.isDestinationV2()) { + streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName()); + } else { + streamConfig = null; + } final String streamName = stream.getName(); final BigQueryRecordFormatter recordFormatter = recordFormatterCreator.apply(stream.getJsonSchema()); diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index b834657c693d..0a99bb468ffa 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=1.2.6 +LABEL io.airbyte.version=1.2.7 LABEL io.airbyte.name=airbyte/destination-snowflake ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index c86609d6fee8..d087fae8277a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 1.2.6 + dockerImageTag: 1.2.7 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java index 19c988922d24..98354f8c1f6e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStagingDestination.java @@ -150,11 +150,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final GcsConfig gcsConfig = GcsConfig.getGcsConfig(config); SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(); - ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); + final ParsedCatalog parsedCatalog; TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { + parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog); } else { + parsedCatalog = null; typerDeduper = new NoopTyperDeduper(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 6ebff118208c..2eab53180968 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -124,11 +124,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(); - ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); + final ParsedCatalog parsedCatalog; TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { + parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog); } else { + parsedCatalog = null; typerDeduper = new NoopTyperDeduper(); } @@ -151,11 +153,13 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(); - ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); + final ParsedCatalog parsedCatalog; TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { + parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog); } else { + parsedCatalog = null; typerDeduper = new NoopTyperDeduper(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java index e4ac88f3c195..44944208b372 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StagingDestination.java @@ -140,11 +140,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption")); SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(); - ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); + final ParsedCatalog parsedCatalog; TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { + parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog); typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog); } else { + parsedCatalog = null; typerDeduper = new NoopTyperDeduper(); } diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 3d7fbde0f196..a4980c7580c5 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| +| 1.7.2 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode | | 1.7.1 | 2023-08-02 | [\#28959](https://github.com/airbytehq/airbyte/pull/28959) | Destinations v2: Fix CDC syncs in non-dedup mode | | 1.7.0 | 2023-08-01 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Open up early access program opt-in | | 1.6.0 | 2023-07-26 | [\#28723](https://github.com/airbytehq/airbyte/pull/28723) | Destinations v2: Change raw table dataset and naming convention | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 54a20af2bf62..150f4f35922d 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.2.7 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode | | 1.2.6 | 2023-08-01 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Reduce logging noise | | 1.2.5 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation | | 1.2.4 | 2023-07-21 | [\#28584](https://github.com/airbytehq/airbyte/pull/28584) | Install dependencies in preparation for destinations v2 work |