diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 432083bfeee0d..0579e820fa279 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -347,7 +347,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.41 + dockerImageTag: 0.4.42 documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake icon: snowflake.svg normalizationConfig: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 52de3cb542669..246ca8661f1be 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -6109,7 +6109,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.41" +- dockerImage: "airbyte/destination-snowflake:0.4.42" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index a22c93b24345c..b732a7c33054f 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -4,8 +4,13 @@ package io.airbyte.integrations.destination.staging; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; + import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedBiConsumer; import io.airbyte.commons.functional.CheckedBiFunction; import io.airbyte.commons.json.Jsons; @@ -25,12 +30,14 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -89,7 +96,7 @@ private static List createWriteConfigs(final NamingConventionTransf final JsonNode config, final ConfiguredAirbyteCatalog catalog) { - return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList()); + return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(toList()); } private static Function toWriteConfig(final NamingConventionTransformer namingResolver, @@ -160,15 +167,32 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon * @param catalog collection of configured streams (e.g. API endpoints or database tables) * @return */ - private CheckedBiConsumer flushBufferFunction( + @VisibleForTesting + CheckedBiConsumer flushBufferFunction( final JdbcDatabase database, final StagingOperations stagingOperations, final List writeConfigs, final ConfiguredAirbyteCatalog catalog) { - final Map pairToWriteConfig = - writeConfigs.stream() - .collect(Collectors.toUnmodifiableMap( - StagingConsumerFactory::toNameNamespacePair, Function.identity())); + final Set conflictingStreams = new HashSet<>(); + final Map pairToWriteConfig = new HashMap<>(); + for (final WriteConfig config : writeConfigs) { + final AirbyteStreamNameNamespacePair streamIdentifier = toNameNamespacePair(config); + if (pairToWriteConfig.containsKey(streamIdentifier)) { + conflictingStreams.add(config); + final WriteConfig existingConfig = pairToWriteConfig.get(streamIdentifier); + // The first conflicting stream won't have any problems, so we need to explicitly add it here. + conflictingStreams.add(existingConfig); + } else { + pairToWriteConfig.put(streamIdentifier, config); + } + } + if (!conflictingStreams.isEmpty()) { + final String message = String.format( + "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s", + conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getStreamName()).collect(joining(", ")) + ); + throw new ConfigErrorException(message); + } return (pair, writer) -> { LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount())); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java new file mode 100644 index 0000000000000..ef2c91262675b --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/staging/StagingConsumerFactoryTest.java @@ -0,0 +1,34 @@ +package io.airbyte.integrations.destination.staging; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.commons.exceptions.ConfigErrorException; +import io.airbyte.integrations.destination.jdbc.WriteConfig; +import java.util.List; +import org.junit.jupiter.api.Test; + +class StagingConsumerFactoryTest { + + @Test() + void detectConflictingStreams() { + final StagingConsumerFactory f = new StagingConsumerFactory(); + + final ConfigErrorException configErrorException = assertThrows( + ConfigErrorException.class, + () -> f.flushBufferFunction( + null, + null, + List.of( + new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null), + new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null) + ), + null + )); + + assertEquals( + "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema.example_stream, source_schema.example_stream", + configErrorException.getMessage() + ); + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 8d302c039a106..fca0e9654d410 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.41 +LABEL io.airbyte.version=0.4.42 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index c148dba323df4..9a690157e3865 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards | +| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams | +| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards | | 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud | | 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors | | 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |