Skip to content

Commit

Permalink
🐞 Fix invalid char in snowflake & bigquery namespace (#10793)
Browse files Browse the repository at this point in the history
* Add namespace test for snowflake

* Enable namespace test for bigquery

* Format code

* Capitalize test case id

* Update exception message to point to test case file

* Update snowflake name transformer to prepend underscore

* Override convertStreamName instead of getIdentifier

* Add missing state message

* Remove unused import

* Disable more namespace test cases

We don't want to introduce changes that will affect existing connections for now.

* Dry method that mutates namespace

* Pass through null

* Normalize namespace

* Fix test case

* Revert consumer factory changes

* Normalize namespace in catalog

* Revert catalog normalization

* Enable namespace test for all snowflake destination tests

* Test namespace for both bigquery destination tests

* Add unit test for bigquery name transformer

* Transform bigquery schema name

* Fix avro name transformer

* Normalize avro namespace

* Standardize namespace in gcs utils

* Bump version for snowflake and bigquery

* Enable namespace test for bigquery denormalized

* Dry bigquery denormalized acceptance test

* Revert some of the variable scope change

* Fix unit test

* Bump version

* Introduce getNamespace method

* Implement getNamespace method for bigquery

* Switch to getNamespace methods

* Update comments

* Fix bigquery denormalized acceptance test

* Format code

* Dry bigquery destination test

* Skip partition test for gcs mode

* Bump version
  • Loading branch information
tuliren authored Mar 20, 2022
1 parent 0f475ce commit 21ec23c
Show file tree
Hide file tree
Showing 32 changed files with 621 additions and 609 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public interface NamingConventionTransformer {
*/
String getIdentifier(String name);

/**
* Handle naming conversions of an input name to output a valid namespace for the desired
* destination.
*/
String getNamespace(String namespace);

/**
* Same as getIdentifier but returns also the name of the table for storing raw data
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public String getIdentifier(final String name) {
return convertStreamName(name);
}

/**
* Most destinations have the same naming requirement for namespace and stream names.
*/
@Override
public String getNamespace(final String namespace) {
return convertStreamName(namespace);
}

@Override
public String getRawTableName(final String streamName) {
return convertStreamName("_airbyte_raw_" + streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class DataArgumentsProvider implements ArgumentsProvider {
new CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt");
public static final CatalogMessageTestConfigPair EDGE_CASE_CONFIG =
new CatalogMessageTestConfigPair("edge_case_catalog.json", "edge_case_messages.txt");
public static final CatalogMessageTestConfigPair NAMESPACE_CONFIG =
new CatalogMessageTestConfigPair("namespace_catalog.json", "namespace_messages.txt");

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.JobGetSpecConfig;
Expand All @@ -27,6 +28,7 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -65,16 +67,21 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -860,17 +867,12 @@ void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception {

final List<AirbyteMessage> messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
messages.forEach(
message -> {
if (message.getRecord() != null) {
message.getRecord().setNamespace(namespace);
}
});
final List<AirbyteMessage> messagesWithNewNamespace = getRecordMessagesWithNewNamespace(messages, namespace);

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
runSyncAndVerifyStateOutput(config, messagesWithNewNamespace, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messagesWithNewNamespace, defaultSchema);
}

/**
Expand Down Expand Up @@ -900,31 +902,68 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception {

final var configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final var ns1Msgs = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
final var ns1Messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
ns1Msgs.forEach(
message -> {
if (message.getRecord() != null) {
message.getRecord().setNamespace(namespace1);
}
});
final var ns2Msgs = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
final var ns1MessagesAtNamespace1 = getRecordMessagesWithNewNamespace(ns1Messages, namespace1);
final var ns2Messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
ns2Msgs.forEach(
message -> {
if (message.getRecord() != null) {
message.getRecord().setNamespace(namespace2);
}
});
final var allMessages = new ArrayList<>(ns1Msgs);
allMessages.addAll(ns2Msgs);
final var ns2MessagesAtNamespace2 = getRecordMessagesWithNewNamespace(ns2Messages, namespace2);

final var allMessages = new ArrayList<>(ns1MessagesAtNamespace1);
allMessages.addAll(ns2MessagesAtNamespace2);

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, allMessages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, allMessages, defaultSchema);
}

public static class NamespaceTestCaseProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
final JsonNode testCases =
Jsons.deserialize(MoreResources.readResource("namespace_test_cases.json"));
return MoreIterators.toList(testCases.elements()).stream()
.filter(testCase -> testCase.get("enabled").asBoolean())
.map(testCase -> Arguments.of(
testCase.get("id").asText(),
testCase.get("namespace").asText(),
testCase.get("normalized").asText()));
}

}

@ParameterizedTest
@ArgumentsSource(NamespaceTestCaseProvider.class)
public void testNamespaces(final String testCaseId, final String namespace, final String normalizedNamespace) throws Exception {
final Optional<NamingConventionTransformer> nameTransformer = getNameTransformer();
nameTransformer.ifPresent(namingConventionTransformer -> assertNamespaceNormalization(testCaseId, normalizedNamespace,
namingConventionTransformer.getNamespace(namespace)));

if (!implementsNamespaces() || !supportNamespaceTest()) {
return;
}

final AirbyteCatalog catalog = Jsons.deserialize(
MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.catalogFile), AirbyteCatalog.class);
catalog.getStreams().forEach(stream -> stream.setNamespace(namespace));
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);

final List<AirbyteMessage> messages = MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.messageFile).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
final List<AirbyteMessage> messagesWithNewNamespace = getRecordMessagesWithNewNamespace(messages, namespace);

final JsonNode config = getConfig();
try {
runSyncAndVerifyStateOutput(config, messagesWithNewNamespace, configuredCatalog, false);
} catch (final Exception e) {
throw new IOException(String.format(
"[Test Case %s] Destination failed to sync data to namespace %s, see \"namespace_test_cases.json for details\"",
testCaseId, namespace), e);
}
}

/**
* In order to launch a source on Kubernetes in a pod, we need to be able to wrap the entrypoint.
* The source connector must specify its entrypoint in the AIRBYTE_ENTRYPOINT variable. This test
Expand All @@ -943,6 +982,32 @@ public void testEntrypointEnvVar() throws Exception {
assertFalse(entrypoint.isBlank());
}

/**
* Whether the destination should be tested against different namespaces.
*/
protected boolean supportNamespaceTest() {
return false;
}

/**
* Set up the name transformer used by a destination to test it against a variety of namespaces.
*/
protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.empty();
}

/**
* Override this method if the normalized namespace is different from the default one. E.g. BigQuery
* does allow a name starting with a number. So it should change the expected normalized namespace
* when testCaseId = "S3A-1". Find the testCaseId in "namespace_test_cases.json".
*/
protected void assertNamespaceNormalization(final String testCaseId,
final String expectedNormalizedNamespace,
final String actualNormalizedNamespace) {
assertEquals(expectedNormalizedNamespace, actualNormalizedNamespace,
String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId));
}

private ConnectorSpecification runSpec() throws WorkerException {
return new DefaultGetSpecWorker(
workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null))
Expand Down Expand Up @@ -1379,4 +1444,16 @@ private void runAndCheckWithoutNormalization(final List<AirbyteMessage> messages
retrieveRawRecordsAndAssertSameMessages(catalog, messages, getDefaultSchema(config));
}

/**
* Mutate the input airbyte record message namespace.
*/
private static List<AirbyteMessage> getRecordMessagesWithNewNamespace(final List<AirbyteMessage> airbyteMessages, final String namespace) {
airbyteMessages.forEach(message -> {
if (message.getRecord() != null) {
message.getRecord().setNamespace(namespace);
}
});
return airbyteMessages;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"streams": [
{
"name": "data_stream",
"json_schema": {
"properties": {
"field1": {
"type": "boolean"
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"type": "RECORD", "record": {"stream": "data_stream", "emitted_at": 1602637589000, "data": { "field1" : true }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-08-17"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
[
{
"id": "S1-1",
"description": "namespace are converted to lowercase",
"namespace": "NAMESPACE",
"enabled": false,
"normalized": "namespace",
"comment": "this test case is disabled because it is not critical and we are not ready to change the behavior of existing destinations yet"
},
{
"id": "S2-1",
"description": "namespace allows alphabets, numbers, and underscore",
"namespace": "dest_1001_namespace",
"enabled": true,
"normalized": "dest_1001_namespace"
},
{
"id": "S2A-1",
"description": "namespace romanization",
"namespace": "namespace_with_spécial_character",
"enabled": true,
"normalized": "namespace_with_special_character"
},
{
"id": "S2A-2",
"description": "namespace romanization (japanese)",
"namespace": "namespace_こんにちは",
"enabled": false,
"normalized": "namespace_konnichiwa"
},
{
"id": "S3A-1",
"description": "namespace starting with a number",
"namespace": "99namespace",
"enabled": true,
"normalized": "_99namespace"
},
{
"id": "S3B-1",
"description": "long namespace (300 characters)",
"namespace": "a_300_characters_looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo_namespace",
"enabled": false,
"normalized": "",
"comment": "this test case is disabled because it is for future testing only"
},
{
"id": "S3C-1",
"description": "reserved word",
"namespace": "select",
"enabled": false,
"normalized": "",
"comment": "this test case is disabled because it is for future testing only"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.version=0.2.11
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Loading

0 comments on commit 21ec23c

Please sign in to comment.