Skip to content

Commit

Permalink
🐛 Destinations snowflake + bigquery: only parse catalog in 1s1t mode (#…
Browse files Browse the repository at this point in the history
…28976)

* only parse catalog in 1s1t mode

* one more thing?

* logistics
  • Loading branch information
edgao authored and bnchrch committed Aug 3, 2023
1 parent 0fabbaf commit 1595379
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.7.1
LABEL io.airbyte.version=1.7.2
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.1
dockerImageTag: 1.7.2
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,18 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
} else {
catalogParser = new CatalogParser(sqlGenerator);
}
ParsedCatalog parsedCatalog = catalogParser.parseCatalog(catalog);
final ParsedCatalog parsedCatalog;

final BigQuery bigquery = getBigQuery(config);
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = catalogParser.parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(
sqlGenerator,
new BigQueryDestinationHandler(bigquery, datasetLocation),
parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

Expand All @@ -268,13 +270,15 @@ protected Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> getUp
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new HashMap<>();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final AirbyteStream stream = configStream.getStream();
StreamConfig parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
final StreamConfig parsedStream;

final String streamName = stream.getName();
String targetTableName;
if (use1s1t) {
parsedStream = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
targetTableName = parsedStream.id().rawName();
} else {
parsedStream = null;
targetTableName = getTargetTableName(streamName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ private Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> createWriteConf
Preconditions.checkNotNull(configuredStream.getDestinationSyncMode(), "Undefined destination sync mode");

final AirbyteStream stream = configuredStream.getStream();
StreamConfig streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
final StreamConfig streamConfig;
if (TypingAndDedupingFlag.isDestinationV2()) {
streamConfig = parsedCatalog.getStream(stream.getNamespace(), stream.getName());
} else {
streamConfig = null;
}
final String streamName = stream.getName();
final BigQueryRecordFormatter recordFormatter = recordFormatterCreator.apply(stream.getJsonSchema());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.2.6
LABEL io.airbyte.version=1.2.7
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.2.6
dockerImageTag: 1.2.7
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final GcsConfig gcsConfig = GcsConfig.getGcsConfig(config);

SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

Expand All @@ -151,11 +153,13 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final EncryptionConfig encryptionConfig = EncryptionConfig.fromJson(config.get("loading_method").get("encryption"));

SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
ParsedCatalog parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
final ParsedCatalog parsedCatalog;
TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
parsedCatalog = new CatalogParser(sqlGenerator).parseCatalog(catalog);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, new SnowflakeDestinationHandler(getDatabase(getDataSource(config))), parsedCatalog);
} else {
parsedCatalog = null;
typerDeduper = new NoopTyperDeduper();
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.7.2 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode |
| 1.7.1 | 2023-08-02 | [\#28959](https://github.com/airbytehq/airbyte/pull/28959) | Destinations v2: Fix CDC syncs in non-dedup mode |
| 1.7.0 | 2023-08-01 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Open up early access program opt-in |
| 1.6.0 | 2023-07-26 | [\#28723](https://github.com/airbytehq/airbyte/pull/28723) | Destinations v2: Change raw table dataset and naming convention |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.2.7 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode |
| 1.2.6 | 2023-08-01 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Reduce logging noise |
| 1.2.5 | 2023-07-24 | [\#28618](https://github.com/airbytehq/airbyte/pull/28618) | Add hooks in preparation for destinations v2 implementation |
| 1.2.4 | 2023-07-21 | [\#28584](https://github.com/airbytehq/airbyte/pull/28584) | Install dependencies in preparation for destinations v2 work |
Expand Down

0 comments on commit 1595379

Please sign in to comment.