Skip to content

Commit

Permalink
JDBC Destinations: improve error message for conflicting streams (#21342
Browse files Browse the repository at this point in the history
)

* catch conflicting streams as configerror

* add test

* bump version + changelog

* derp, fix test setup

* derp

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and jbfbell committed Jan 13, 2023
1 parent a79fe8c commit bcad152
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.41
dockerImageTag: 0.4.42
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6109,7 +6109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.41"
- dockerImage: "airbyte/destination-snowflake:0.4.42"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

package io.airbyte.integrations.destination.staging;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
Expand All @@ -25,12 +30,14 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -89,7 +96,7 @@ private static List<WriteConfig> createWriteConfigs(final NamingConventionTransf
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {

return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(final NamingConventionTransformer namingResolver,
Expand Down Expand Up @@ -160,15 +167,32 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon
* @param catalog collection of configured streams (e.g. API endpoints or database tables)
* @return
*/
private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> flushBufferFunction(
@VisibleForTesting
CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> flushBufferFunction(
final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig =
writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(
StagingConsumerFactory::toNameNamespacePair, Function.identity()));
final Set<WriteConfig> conflictingStreams = new HashSet<>();
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = new HashMap<>();
for (final WriteConfig config : writeConfigs) {
final AirbyteStreamNameNamespacePair streamIdentifier = toNameNamespacePair(config);
if (pairToWriteConfig.containsKey(streamIdentifier)) {
conflictingStreams.add(config);
final WriteConfig existingConfig = pairToWriteConfig.get(streamIdentifier);
// The first conflicting stream won't have any problems, so we need to explicitly add it here.
conflictingStreams.add(existingConfig);
} else {
pairToWriteConfig.put(streamIdentifier, config);
}
}
if (!conflictingStreams.isEmpty()) {
final String message = String.format(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s",
conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getStreamName()).collect(joining(", "))
);
throw new ConfigErrorException(message);
}

return (pair, writer) -> {
LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.airbyte.integrations.destination.staging;

import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import java.util.List;
import org.junit.jupiter.api.Test;

class StagingConsumerFactoryTest {

@Test()
void detectConflictingStreams() {
final StagingConsumerFactory f = new StagingConsumerFactory();

final ConfigErrorException configErrorException = assertThrows(
ConfigErrorException.class,
() -> f.flushBufferFunction(
null,
null,
List.of(
new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null),
new WriteConfig("example_stream", "source_schema", "destination_default_schema", null, null, null)
),
null
));

assertEquals(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema.example_stream, source_schema.example_stream",
configErrorException.getMessage()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.41
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.name=airbyte/destination-snowflake
3 changes: 2 additions & 1 deletion docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
Expand Down

0 comments on commit bcad152

Please sign in to comment.