From 5e31b4e62afce73c726389f05cdcb68b69a57c8f Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 12 Jan 2023 10:33:45 -0800 Subject: [PATCH 1/6] catch conflicting streams as configerror --- .../seed/destination_definitions.yaml | 2 +- .../staging/StagingConsumerFactory.java | 34 +++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 3a517c153d0c0..432083bfeee0d 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -441,4 +441,4 @@ dockerRepository: airbyte/destination-weaviate dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.com/integrations/destinations/weaviate - releaseStage: alpha \ No newline at end of file + releaseStage: alpha diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index a22c93b24345c..5a3b28322d691 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.staging; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; @@ -25,12 +29,14 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -89,7 +95,7 @@ private static List createWriteConfigs(final NamingConventionTransf final JsonNode config, final ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList()); + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(toList()); } private static Function toWriteConfig(final NamingConventionTransformer namingResolver, @@ -165,10 +171,26 @@ private CheckedBiConsumer writeConfigs, final ConfiguredAirbyteCatalog catalog) { - final Map pairToWriteConfig = - writeConfigs.stream() - .collect(Collectors.toUnmodifiableMap( - StagingConsumerFactory::toNameNamespacePair, Function.identity())); + final Set conflictingStreams = new HashSet<>(); + final Map pairToWriteConfig = new HashMap<>(); + for (final WriteConfig config : writeConfigs) { + final AirbyteStreamNameNamespacePair streamIdentifier = toNameNamespacePair(config); + if (pairToWriteConfig.containsKey(streamIdentifier)) { + conflictingStreams.add(config); + final WriteConfig existingConfig = pairToWriteConfig.get(streamIdentifier); + // The first conflicting stream won't have any problems, so we need to explicitly add it here. + conflictingStreams.add(existingConfig); + } else { + pairToWriteConfig.put(streamIdentifier, config); + } + } + if (!conflictingStreams.isEmpty()) { + final String message = String.format( + "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s", + conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getNamespace()).collect(joining(", ")) + ); + throw new ConfigErrorException(message); + } return (pair, writer) -> { LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); From 182d416f3455e9a08b12334acaa1d6fec49c2d51 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 12 Jan 2023 10:46:25 -0800 Subject: [PATCH 2/6] add test --- .../staging/StagingConsumerFactory.java | 4 ++- .../staging/StagingConsumerFactoryTest.java | 34 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index 5a3b28322d691..7d18a99028c0c 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -8,6 +8,7 @@ import static java.util.stream.Collectors.toList; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedBiConsumer; @@ -166,7 +167,8 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon * @param catalog collection of configured streams (e.g. API endpoints or database tables) * @return */ - private CheckedBiConsumer flushBufferFunction( + @VisibleForTesting + CheckedBiConsumer flushBufferFunction( final JdbcDatabase database, final StagingOperations stagingOperations, final List writeConfigs, diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java new file mode 100644 index 0000000000000..6fc40a44fe354 --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java @@ -0,0 +1,34 @@ +package io.airbyte.integrations.destination.staging; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.integrations.destination.jdbc.WriteConfig; +import java.util.List; +import org.junit.jupiter.api.Test; + +class StagingConsumerFactoryTest { + + @Test() + void detectConflictingStreams() { + final StagingConsumerFactory f = new StagingConsumerFactory(); + + final ConfigErrorException configErrorException = assertThrows( + ConfigErrorException.class, + () -> f.flushBufferFunction( + null, + null, + List.of( + new WriteConfig("source_schema1", "example_stream", "destination_default_schema", null, null, null), + new WriteConfig("source_schema2", "example_stream", "destination_default_schema", null, null, null) + ), + null + )); + + assertEquals( + "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema1.example_stream, source_schema2.example_stream", + configErrorException.getMessage() + ); + } + +} From 87076854bec8403cc2c746f4086edf1a9d8db7a1 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 12 Jan 2023 12:44:48 -0800 Subject: [PATCH 3/6] bump version + changelog --- .../connectors/destination-snowflake/Dockerfile | 2 +- docs/integrations/destinations/snowflake.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 8d302c039a106..fca0e9654d410 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.41 +LABEL io.airbyte.version=0.4.42 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index c148dba323df4..9a690157e3865 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards | +| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams | +| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards | | 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud | | 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors | | 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier | From a678556f2c320a51fe8cf00085a04b59bbd84a94 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 12 Jan 2023 13:13:17 -0800 Subject: [PATCH 4/6] derp, fix test setup --- .../destination/staging/StagingConsumerFactoryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java index 6fc40a44fe354..3efe4746038c5 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java @@ -19,8 +19,8 @@ void detectConflictingStreams() { null, null, List.of( - new WriteConfig("source_schema1", "example_stream", "destination_default_schema", null, null, null), - new WriteConfig("source_schema2", "example_stream", "destination_default_schema", null, null, null) + new WriteConfig("example_stream", "source_schema1", "destination_default_schema", null, null, null), + new WriteConfig("example_stream", "source_schema2", "destination_default_schema", null, null, null) ), null )); From 281b0940c36e4f8750700804f40659d6b5706a18 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 12 Jan 2023 13:20:51 -0800 Subject: [PATCH 5/6] derp --- .../destination/staging/StagingConsumerFactory.java | 2 +- .../destination/staging/StagingConsumerFactoryTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index 7d18a99028c0c..b732a7c33054f 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -189,7 +189,7 @@ CheckedBiConsumer if (!conflictingStreams.isEmpty()) { final String message = String.format( "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s", - conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getNamespace()).collect(joining(", ")) + conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getStreamName()).collect(joining(", ")) ); throw new ConfigErrorException(message); } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java index 3efe4746038c5..ef2c91262675b 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java @@ -19,14 +19,14 @@ void detectConflictingStreams() { null, null, List.of( - new WriteConfig("example_stream", "source_schema1", "destination_default_schema", null, null, null), - new WriteConfig("example_stream", "source_schema2", "destination_default_schema", null, null, null) + new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null), + new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null) ), null )); assertEquals( - "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema1.example_stream, source_schema2.example_stream", + "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema.example_stream, source_schema.example_stream", configErrorException.getMessage() ); } From 026775d48b8b2acdce783c32d40b4a17b73df52c Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Thu, 12 Jan 2023 23:42:09 +0000 Subject: [PATCH 6/6] auto-bump connector version --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../init/src/main/resources/seed/destination_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 432083bfeee0d..0579e820fa279 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -347,7 +347,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.41 + dockerImageTag: 0.4.42 documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake icon: snowflake.svg normalizationConfig: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 52de3cb542669..246ca8661f1be 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -6109,7 +6109,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.41" +- dockerImage: "airbyte/destination-snowflake:0.4.42" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake" connectionSpecification: