Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC Destinations: improve error message for conflicting streams #21342

Merged
merged 7 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,4 @@
dockerRepository: airbyte/destination-weaviate
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/weaviate
releaseStage: alpha
releaseStage: alpha
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

package io.airbyte.integrations.destination.staging;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
Expand All @@ -25,12 +30,14 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -89,7 +96,7 @@ private static List<WriteConfig> createWriteConfigs(final NamingConventionTransf
final JsonNode config,
final ConfiguredAirbyteCatalog catalog) {

return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config)).collect(toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(final NamingConventionTransformer namingResolver,
Expand Down Expand Up @@ -160,15 +167,32 @@ private static AirbyteStreamNameNamespacePair toNameNamespacePair(final WriteCon
* @param catalog collection of configured streams (e.g. API endpoints or database tables)
* @return
*/
private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> flushBufferFunction(
@VisibleForTesting
CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Exception> flushBufferFunction(
final JdbcDatabase database,
final StagingOperations stagingOperations,
final List<WriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog) {
final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig =
writeConfigs.stream()
.collect(Collectors.toUnmodifiableMap(
StagingConsumerFactory::toNameNamespacePair, Function.identity()));
final Set<WriteConfig> conflictingStreams = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block seems to make sense in the onStartFunction since none of the values you're accessing here seem to be only isolated to the flushBufferFunction section

The reason that this makes sense to me in the onStartFunction is primarily because you already have the writeConfig values (since we're setting up the temporary tables for each stream (synonymous with writeConfig) and the metadata for each streamIdentifier will be known at setup time

It also fits with what onStartFunction means which is set up so this is only called once as opposed to many times throughout the flushing of the buffer (N times where N can be infinitely large)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block happens before the lambda gets created (i.e. it's not actually part of the flush buffer function), so I think it only gets executed once per sync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does make sense, although I would be hesitant to have this in the flushBufferFunction since it muddies the singular responsibility of the buffer flush. Also, since we know that the lambda for onStartFunction is called only once and it already includes writeConfigs that it would be another suitable location

That said, this isn't blocking

final Map<AirbyteStreamNameNamespacePair, WriteConfig> pairToWriteConfig = new HashMap<>();
for (final WriteConfig config : writeConfigs) {
final AirbyteStreamNameNamespacePair streamIdentifier = toNameNamespacePair(config);
if (pairToWriteConfig.containsKey(streamIdentifier)) {
conflictingStreams.add(config);
final WriteConfig existingConfig = pairToWriteConfig.get(streamIdentifier);
// The first conflicting stream won't have any problems, so we need to explicitly add it here.
conflictingStreams.add(existingConfig);
} else {
pairToWriteConfig.put(streamIdentifier, config);
}
}
if (!conflictingStreams.isEmpty()) {
final String message = String.format(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: %s",
conflictingStreams.stream().map(config -> config.getNamespace() + "." + config.getNamespace()).collect(joining(", "))
);
throw new ConfigErrorException(message);
}

return (pair, writer) -> {
LOGGER.info("Flushing buffer for stream {} ({}) to staging", pair.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.airbyte.integrations.destination.staging;

import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import java.util.List;
import org.junit.jupiter.api.Test;

class StagingConsumerFactoryTest {

@Test()
void detectConflictingStreams() {
final StagingConsumerFactory f = new StagingConsumerFactory();

final ConfigErrorException configErrorException = assertThrows(
ConfigErrorException.class,
() -> f.flushBufferFunction(
null,
null,
List.of(
new WriteConfig("source_schema1", "example_stream", "destination_default_schema", null, null, null),
new WriteConfig("source_schema2", "example_stream", "destination_default_schema", null, null, null)
),
null
));

assertEquals(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using ${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema1.example_stream, source_schema2.example_stream",
configErrorException.getMessage()
);
}

}