diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java index 1069f6f4c2b2..d68a7cd568cc 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/NamingConventionTransformer.java @@ -21,6 +21,12 @@ public interface NamingConventionTransformer { */ String getIdentifier(String name); + /** + * Handle naming conversions of an input name to output a valid namespace for the desired + * destination. + */ + String getNamespace(String namespace); + /** * Same as getIdentifier but returns also the name of the table for storing raw data * diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java index 9175ee5e11b2..3b292dbb9c19 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/StandardNameTransformer.java @@ -22,6 +22,14 @@ public String getIdentifier(final String name) { return convertStreamName(name); } + /** + * Most destinations have the same naming requirement for namespace and stream names. + */ + @Override + public String getNamespace(final String namespace) { + return convertStreamName(namespace); + } + @Override public String getRawTableName(final String streamName) { return convertStreamName("_airbyte_raw_" + streamName); diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java index 505771088751..76d33b713f64 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DataArgumentsProvider.java @@ -20,6 +20,8 @@ public class DataArgumentsProvider implements ArgumentsProvider { new CatalogMessageTestConfigPair("exchange_rate_catalog.json", "exchange_rate_messages.txt"); public static final CatalogMessageTestConfigPair EDGE_CASE_CONFIG = new CatalogMessageTestConfigPair("edge_case_catalog.json", "edge_case_messages.txt"); + public static final CatalogMessageTestConfigPair NAMESPACE_CONFIG = + new CatalogMessageTestConfigPair("namespace_catalog.json", "namespace_messages.txt"); @Override public Stream provideArguments(final ExtensionContext context) { diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 7ac25334ab3c..de20f4360f4c 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.util.MoreIterators; import io.airbyte.commons.util.MoreLists; import io.airbyte.config.EnvConfigs; import io.airbyte.config.JobGetSpecConfig; @@ -27,6 +28,7 @@ import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -65,16 +67,21 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -860,17 +867,12 @@ void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { final List messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - messages.forEach( - message -> { - if (message.getRecord() != null) { - message.getRecord().setNamespace(namespace); - } - }); + final List messagesWithNewNamespace = getRecordMessagesWithNewNamespace(messages, namespace); final JsonNode config = getConfig(); final String defaultSchema = getDefaultSchema(config); - runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false); - retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema); + runSyncAndVerifyStateOutput(config, messagesWithNewNamespace, configuredCatalog, false); + retrieveRawRecordsAndAssertSameMessages(catalog, messagesWithNewNamespace, defaultSchema); } /** @@ -900,24 +902,15 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception { final var configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); - final var ns1Msgs = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() + final var ns1Messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - ns1Msgs.forEach( - message -> { - if (message.getRecord() != null) { - message.getRecord().setNamespace(namespace1); - } - }); - final var ns2Msgs = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() + final var ns1MessagesAtNamespace1 = getRecordMessagesWithNewNamespace(ns1Messages, namespace1); + final var ns2Messages = MoreResources.readResource(DataArgumentsProvider.EXCHANGE_RATE_CONFIG.messageFile).lines() .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - ns2Msgs.forEach( - message -> { - if (message.getRecord() != null) { - message.getRecord().setNamespace(namespace2); - } - }); - final var allMessages = new ArrayList<>(ns1Msgs); - allMessages.addAll(ns2Msgs); + final var ns2MessagesAtNamespace2 = getRecordMessagesWithNewNamespace(ns2Messages, namespace2); + + final var allMessages = new ArrayList<>(ns1MessagesAtNamespace1); + allMessages.addAll(ns2MessagesAtNamespace2); final JsonNode config = getConfig(); final String defaultSchema = getDefaultSchema(config); @@ -925,6 +918,52 @@ void testSyncWriteSameTableNameDifferentNamespace() throws Exception { retrieveRawRecordsAndAssertSameMessages(catalog, allMessages, defaultSchema); } + public static class NamespaceTestCaseProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(final ExtensionContext context) throws Exception { + final JsonNode testCases = + Jsons.deserialize(MoreResources.readResource("namespace_test_cases.json")); + return MoreIterators.toList(testCases.elements()).stream() + .filter(testCase -> testCase.get("enabled").asBoolean()) + .map(testCase -> Arguments.of( + testCase.get("id").asText(), + testCase.get("namespace").asText(), + testCase.get("normalized").asText())); + } + + } + + @ParameterizedTest + @ArgumentsSource(NamespaceTestCaseProvider.class) + public void testNamespaces(final String testCaseId, final String namespace, final String normalizedNamespace) throws Exception { + final Optional nameTransformer = getNameTransformer(); + nameTransformer.ifPresent(namingConventionTransformer -> assertNamespaceNormalization(testCaseId, normalizedNamespace, + namingConventionTransformer.getNamespace(namespace))); + + if (!implementsNamespaces() || !supportNamespaceTest()) { + return; + } + + final AirbyteCatalog catalog = Jsons.deserialize( + MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.catalogFile), AirbyteCatalog.class); + catalog.getStreams().forEach(stream -> stream.setNamespace(namespace)); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); + + final List messages = MoreResources.readResource(DataArgumentsProvider.NAMESPACE_CONFIG.messageFile).lines() + .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); + final List messagesWithNewNamespace = getRecordMessagesWithNewNamespace(messages, namespace); + + final JsonNode config = getConfig(); + try { + runSyncAndVerifyStateOutput(config, messagesWithNewNamespace, configuredCatalog, false); + } catch (final Exception e) { + throw new IOException(String.format( + "[Test Case %s] Destination failed to sync data to namespace %s, see \"namespace_test_cases.json for details\"", + testCaseId, namespace), e); + } + } + /** * In order to launch a source on Kubernetes in a pod, we need to be able to wrap the entrypoint. * The source connector must specify its entrypoint in the AIRBYTE_ENTRYPOINT variable. This test @@ -943,6 +982,32 @@ public void testEntrypointEnvVar() throws Exception { assertFalse(entrypoint.isBlank()); } + /** + * Whether the destination should be tested against different namespaces. + */ + protected boolean supportNamespaceTest() { + return false; + } + + /** + * Set up the name transformer used by a destination to test it against a variety of namespaces. + */ + protected Optional getNameTransformer() { + return Optional.empty(); + } + + /** + * Override this method if the normalized namespace is different from the default one. E.g. BigQuery + * does allow a name starting with a number. So it should change the expected normalized namespace + * when testCaseId = "S3A-1". Find the testCaseId in "namespace_test_cases.json". + */ + protected void assertNamespaceNormalization(final String testCaseId, + final String expectedNormalizedNamespace, + final String actualNormalizedNamespace) { + assertEquals(expectedNormalizedNamespace, actualNormalizedNamespace, + String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId)); + } + private ConnectorSpecification runSpec() throws WorkerException { return new DefaultGetSpecWorker( workerConfigs, new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null)) @@ -1379,4 +1444,16 @@ private void runAndCheckWithoutNormalization(final List messages retrieveRawRecordsAndAssertSameMessages(catalog, messages, getDefaultSchema(config)); } + /** + * Mutate the input airbyte record message namespace. + */ + private static List getRecordMessagesWithNewNamespace(final List airbyteMessages, final String namespace) { + airbyteMessages.forEach(message -> { + if (message.getRecord() != null) { + message.getRecord().setNamespace(namespace); + } + }); + return airbyteMessages; + } + } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_catalog.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_catalog.json new file mode 100644 index 000000000000..a361581fcb1a --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_catalog.json @@ -0,0 +1,14 @@ +{ + "streams": [ + { + "name": "data_stream", + "json_schema": { + "properties": { + "field1": { + "type": "boolean" + } + } + } + } + ] +} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_messages.txt b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_messages.txt new file mode 100644 index 000000000000..e40a257741e4 --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_messages.txt @@ -0,0 +1,2 @@ +{"type": "RECORD", "record": {"stream": "data_stream", "emitted_at": 1602637589000, "data": { "field1" : true }}} +{"type": "STATE", "state": { "data": {"start_date": "2022-08-17"}}} diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_test_cases.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_test_cases.json new file mode 100644 index 000000000000..f9ad2f047859 --- /dev/null +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/namespace_test_cases.json @@ -0,0 +1,54 @@ +[ + { + "id": "S1-1", + "description": "namespace are converted to lowercase", + "namespace": "NAMESPACE", + "enabled": false, + "normalized": "namespace", + "comment": "this test case is disabled because it is not critical and we are not ready to change the behavior of existing destinations yet" + }, + { + "id": "S2-1", + "description": "namespace allows alphabets, numbers, and underscore", + "namespace": "dest_1001_namespace", + "enabled": true, + "normalized": "dest_1001_namespace" + }, + { + "id": "S2A-1", + "description": "namespace romanization", + "namespace": "namespace_with_spécial_character", + "enabled": true, + "normalized": "namespace_with_special_character" + }, + { + "id": "S2A-2", + "description": "namespace romanization (japanese)", + "namespace": "namespace_こんにちは", + "enabled": false, + "normalized": "namespace_konnichiwa" + }, + { + "id": "S3A-1", + "description": "namespace starting with a number", + "namespace": "99namespace", + "enabled": true, + "normalized": "_99namespace" + }, + { + "id": "S3B-1", + "description": "long namespace (300 characters)", + "namespace": "a_300_characters_looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo_namespace", + "enabled": false, + "normalized": "", + "comment": "this test case is disabled because it is for future testing only" + }, + { + "id": "S3C-1", + "description": "reserved word", + "namespace": "select", + "enabled": false, + "normalized": "", + "comment": "this test case is disabled because it is for future testing only" + } +] diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 2db1049130a2..57afd61f02f9 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.10 +LABEL io.airbyte.version=0.2.11 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java index 9b25b368edff..7c4c9aaa144b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.bigquery; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auth.oauth2.ServiceAccountCredentials; @@ -26,6 +28,7 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; @@ -35,11 +38,13 @@ import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -52,11 +57,12 @@ public class BigQueryDenormalizedDestinationAcceptanceTest extends DestinationAcceptanceTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationAcceptanceTest.class); + private static final BigQuerySQLNameTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); private static final String CONFIG_DATASET_ID = "dataset_id"; - private static final String CONFIG_PROJECT_ID = "project_id"; + protected static final String CONFIG_PROJECT_ID = "project_id"; private static final String CONFIG_DATASET_LOCATION = "dataset_location"; private static final String CONFIG_CREDS = "credentials_json"; private static final List AIRBYTE_COLUMNS = List.of(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); @@ -78,7 +84,7 @@ protected JsonNode getConfig() { } @Override - protected JsonNode getFailCheckConfig() throws Exception { + protected JsonNode getFailCheckConfig() { ((ObjectNode) config).put(CONFIG_PROJECT_ID, "fake"); return config; } @@ -93,6 +99,30 @@ protected boolean implementsNamespaces() { return true; } + @Override + protected boolean supportNamespaceTest() { + return true; + } + + @Override + protected Optional getNameTransformer() { + return Optional.of(NAME_TRANSFORMER); + } + + @Override + protected void assertNamespaceNormalization(final String testCaseId, + final String expectedNormalizedNamespace, + final String actualNormalizedNamespace) { + final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId); + if (testCaseId.equals("S3A-1")) { + // bigquery allows namespace starting with a number, and prepending underscore + // will hide the dataset, so we don't do it as we do for other destinations + assertEquals("99namespace", actualNormalizedNamespace, message); + } else { + assertEquals(expectedNormalizedNamespace, actualNormalizedNamespace, message); + } + } + @Override protected String getDefaultSchema(final JsonNode config) { return config.get(CONFIG_DATASET_ID).asText(); @@ -173,31 +203,33 @@ private Object getTypedFieldValue(final FieldValueList row, final Field field) { } } - @Override - protected void setup(final TestDestinationEnv testEnv) throws Exception { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - + protected JsonNode createConfig() throws IOException { final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); final String datasetLocation = "US"; - final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - config = Jsons.jsonNode(ImmutableMap.builder() + return Jsons.jsonNode(ImmutableMap.builder() .put(CONFIG_PROJECT_ID, projectId) .put(CONFIG_CREDS, credentialsJson.toString()) .put(CONFIG_DATASET_ID, datasetId) .put(CONFIG_DATASET_LOCATION, datasetLocation) .build()); + } + + @Override + protected void setup(final TestDestinationEnv testEnv) throws Exception { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH + + ". Override by setting setting path with the CREDENTIALS_PATH constant."); + } + + config = createConfig(); + final ServiceAccountCredentials credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(config.get(CONFIG_CREDS).asText().getBytes())); - final ServiceAccountCredentials credentials = - ServiceAccountCredentials.fromStream(new ByteArrayInputStream(config.get(CONFIG_CREDS).asText().getBytes())); bigquery = BigQueryOptions.newBuilder() .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) .setCredentials(credentials) @@ -224,7 +256,7 @@ protected void tearDown(final TestDestinationEnv testEnv) { tearDownBigQuery(); } - private void tearDownBigQuery() { + protected void tearDownBigQuery() { // allows deletion of a dataset that has contents final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java index d4458e157b44..d49bd7f4097d 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java @@ -5,184 +5,19 @@ package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.FieldValue; -import com.google.cloud.bigquery.FieldValueList; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.StandardNameTransformer; -import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; -import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.CatalogHelpers; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ArgumentsSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class BigQueryDenormalizedGcsDestinationAcceptanceTest extends DestinationAcceptanceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGcsDestinationAcceptanceTest.class); - - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - - private static final String CONFIG_DATASET_ID = "dataset_id"; - private static final String CONFIG_PROJECT_ID = "project_id"; - private static final String CONFIG_DATASET_LOCATION = "dataset_location"; - private static final List AIRBYTE_COLUMNS = List.of(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); - - private BigQuery bigquery; - private Dataset dataset; - private boolean tornDown; - private JsonNode config; - private final StandardNameTransformer namingResolver = new StandardNameTransformer(); - - @Override - protected String getImageName() { - return "airbyte/destination-bigquery-denormalized:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - @Override - protected JsonNode getFailCheckConfig() throws Exception { - ((ObjectNode) config).put(CONFIG_PROJECT_ID, "fake"); - return config; - } - - @Override - protected boolean supportsDBT() { - return true; - } - - @Override - protected boolean implementsNamespaces() { - return true; - } - - @Override - protected String getDefaultSchema(final JsonNode config) { - return config.get(CONFIG_DATASET_ID).asText(); - } - - @Override - protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) - throws Exception { - final String tableName = namingResolver.getIdentifier(streamName); - final String schema = namingResolver.getIdentifier(namespace); - return retrieveRecordsFromTable(tableName, schema); - } - - @Override - protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) - throws Exception { - return new ArrayList<>(retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namingResolver.getIdentifier(namespace))); - } - - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - result.add(identifier); - result.add(namingResolver.getIdentifier(identifier)); - return result; - } - - private List retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException { - final QueryJobConfiguration queryConfig = - QueryJobConfiguration - .newBuilder( - String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .setUseLegacySql(false).build(); - - final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); - final FieldList fields = queryResults.getSchema().getFields(); - - return StreamSupport - .stream(queryResults.iterateAll().spliterator(), false) - .map(row -> { - final Map jsonMap = Maps.newHashMap(); - for (final Field field : fields) { - final Object value = getTypedFieldValue(row, field); - if (!isAirbyteColumn(field.getName()) && value != null) { - jsonMap.put(field.getName(), value); - } - } - return jsonMap; - }) - .map(Jsons::jsonNode) - .collect(Collectors.toList()); - } - - private boolean isAirbyteColumn(final String name) { - if (AIRBYTE_COLUMNS.contains(name)) { - return true; - } - return name.startsWith("_airbyte") && name.endsWith("_hashid"); - } - - private Object getTypedFieldValue(final FieldValueList row, final Field field) { - final FieldValue fieldValue = row.get(field.getName()); - if (fieldValue.getValue() != null) { - return switch (field.getType().getStandardType()) { - case FLOAT64, NUMERIC -> fieldValue.getDoubleValue(); - case INT64 -> fieldValue.getNumericValue().intValue(); - case STRING -> fieldValue.getStringValue(); - case BOOL -> fieldValue.getBooleanValue(); - case STRUCT -> fieldValue.getRecordValue().toString(); - default -> fieldValue.getValue(); - }; - } else { - return null; - } - } +public class BigQueryDenormalizedGcsDestinationAcceptanceTest extends BigQueryDenormalizedDestinationAcceptanceTest { @Override - protected void setup(final TestDestinationEnv testEnv) throws Exception { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } + protected JsonNode createConfig() throws IOException { + final String credentialsJsonString = Files.readString(CREDENTIALS_PATH); - final String fullConfigFromSecretFileAsString = Files.readString(CREDENTIALS_PATH); - - final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(fullConfigFromSecretFileAsString); + final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(credentialsJsonString); final JsonNode bigqueryConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); final JsonNode gcsConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.GCS_CONFIG); @@ -206,107 +41,13 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { .put(BigQueryConsts.CREDENTIAL, credential) .build()); - config = Jsons.jsonNode(ImmutableMap.builder() + return Jsons.jsonNode(ImmutableMap.builder() .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) .put(BigQueryConsts.CONFIG_CREDS, bigqueryConfigFromSecretFile.toString()) .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) .put(BigQueryConsts.LOADING_METHOD, loadingMethod) .build()); - - final ServiceAccountCredentials credentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(bigqueryConfigFromSecretFile.toString().getBytes())); - - bigquery = BigQueryOptions.newBuilder() - .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) - .setCredentials(credentials) - .build() - .getService(); - - final DatasetInfo datasetInfo = - DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); - dataset = bigquery.create(datasetInfo); - - tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) { - // gcs tmp files are supposed to be removed automatically by consumer - tearDownBigQuery(); - } - - private void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; - } - - // todo (cgardens) - figure out how to share these helpers. they are currently copied from - // BigQueryDestination. - private static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { - final JobId jobId = JobId.of(UUID.randomUUID().toString()); - final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - return executeQuery(queryJob); - } - - private static ImmutablePair executeQuery(final Job queryJob) { - final Job completedJob = waitForQuery(queryJob); - if (completedJob == null) { - throw new RuntimeException("Job no longer exists"); - } else if (completedJob.getStatus().getError() != null) { - // You can also look at queryJob.getStatus().getExecutionErrors() for all - // errors, not just the latest one. - return ImmutablePair.of(null, (completedJob.getStatus().getError().toString())); - } - - return ImmutablePair.of(completedJob, null); - } - - private static Job waitForQuery(final Job queryJob) { - try { - return queryJob.waitFor(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Verify that the integration successfully writes normalized records successfully (without actually - * running the normalization module) Tests a wide variety of messages an schemas (aspirationally, - * anyway). - */ - @ParameterizedTest - @ArgumentsSource(DataArgumentsProvider.class) - public void testSyncNormalizedWithoutNormalization(final String messagesFilename, final String catalogFilename) throws Exception { - final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class); - final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog); - final List messages = MoreResources.readResource(messagesFilename).lines() - .map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList()); - - final JsonNode config = getConfig(); - // don't run normalization though - runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false); - - final String defaultSchema = getDefaultSchema(config); - final List actualMessages = retrieveNormalizedRecords(catalog, defaultSchema); - assertSameMessages(messages, actualMessages, true); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 57925e21f75d..5642239cba1c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.11 +LABEL io.airbyte.version=0.6.12 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java index 12510288c81e..e3dfd07aa5b8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryConsts.java @@ -25,6 +25,8 @@ public class BigQueryConsts { public static final String KEEP_GCS_FILES_VAL = "Keep all tmp files in GCS"; public static final String PART_SIZE = "part_size_mb"; + public static final String NAMESPACE_PREFIX = "n"; + // tests public static final String BIGQUERY_BASIC_CONFIG = "basic_bigquery_config"; public static final String GCS_CONFIG = "gcs_config"; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java index 4e8ea1b158ae..10d3ec442274 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java @@ -10,12 +10,35 @@ public class BigQuerySQLNameTransformer extends StandardNameTransformer { @Override public String convertStreamName(final String input) { - String result = super.convertStreamName(input); + if (input == null) { + return null; + } + + final String result = super.convertStreamName(input); if (!result.substring(0, 1).matches("[A-Za-z_]")) { // has to start with a letter or _ - result = "_" + result; + return "_" + result; } return result; } + /** + * BigQuery allows a number to be the first character of a namespace. Datasets that begin with an + * underscore are hidden databases, and we cannot query .INFORMATION_SCHEMA. + * So we append a letter instead of underscore for normalization. + * Reference: https://cloud.google.com/bigquery/docs/datasets#dataset-naming + */ + @Override + public String getNamespace(final String input) { + if (input == null) { + return null; + } + + final String normalizedName = super.convertStreamName(input); + if (!normalizedName.substring(0, 1).matches("[A-Za-z0-9]")) { + return BigQueryConsts.NAMESPACE_PREFIX + normalizedName; + } + return normalizedName; + } + } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 2c7a3dddd583..0f657d9282e2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -46,6 +46,7 @@ public class BigQueryUtils { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class); private static final String BIG_QUERY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private static final BigQuerySQLNameTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); public static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { final JobId jobId = JobId.of(UUID.randomUUID().toString()); @@ -162,6 +163,9 @@ public static JsonNode getGcsAvroJsonNodeConfig(final JsonNode config) { return gcsJsonNode; } + /** + * @return a default schema name based on the config. + */ public static String getDatasetId(final JsonNode config) { String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); @@ -233,12 +237,9 @@ public static void transformJsonDateTimeToBigDataFormat(List dateTimeFie } public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) { - final String defaultSchema = getDatasetId(config); final String srcNamespace = stream.getStream().getNamespace(); - if (srcNamespace == null) { - return defaultSchema; - } - return srcNamespace; + final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace; + return NAME_TRANSFORMER.getNamespace(schemaName); } public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSyncMode syncMode) { diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java index 39d45f796f96..9083c5cdc558 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/BigQueryUploaderFactory.java @@ -29,17 +29,14 @@ import java.sql.Timestamp; import java.util.HashSet; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BigQueryUploaderFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUploaderFactory.class); - public static AbstractBigQueryUploader getUploader(final UploaderConfig uploaderConfig) throws IOException { - final String schemaName = - BigQueryUtils.getSchema(uploaderConfig.getConfig(), uploaderConfig.getConfigStream()); + final String schemaName = BigQueryUtils.getSchema( + uploaderConfig.getConfig(), + uploaderConfig.getConfigStream()); final String datasetLocation = BigQueryUtils.getDatasetLocation(uploaderConfig.getConfig()); final Set existingSchemas = new HashSet<>(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index e5272c974743..806064854746 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.bigquery; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auth.oauth2.ServiceAccountCredentials; @@ -25,6 +27,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.io.ByteArrayInputStream; @@ -35,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -44,6 +48,7 @@ public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest { + private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationAcceptanceTest.class); protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); @@ -70,7 +75,7 @@ protected JsonNode getConfig() { } @Override - protected JsonNode getFailCheckConfig() throws Exception { + protected JsonNode getFailCheckConfig() { ((ObjectNode) config).put(CONFIG_PROJECT_ID, "fake"); return config; } @@ -90,6 +95,30 @@ protected boolean implementsNamespaces() { return true; } + @Override + protected boolean supportNamespaceTest() { + return true; + } + + @Override + protected Optional getNameTransformer() { + return Optional.of(NAME_TRANSFORMER); + } + + @Override + protected void assertNamespaceNormalization(final String testCaseId, + final String expectedNormalizedNamespace, + final String actualNormalizedNamespace) { + final String message = String.format("Test case %s failed; if this is expected, please override assertNamespaceNormalization", testCaseId); + if (testCaseId.equals("S3A-1")) { + // bigquery allows namespace starting with a number, and prepending underscore + // will hide the dataset, so we don't do it as we do for other destinations + assertEquals("99namespace", actualNormalizedNamespace, message); + } else { + assertEquals(expectedNormalizedNamespace, actualNormalizedNamespace, message); + } + } + @Override protected String getDefaultSchema(final JsonNode config) { return config.get(CONFIG_DATASET_ID).asText(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index fb1089ead4e9..aafe47a63d85 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -34,7 +34,6 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; -import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; @@ -73,42 +72,56 @@ class BigQueryDestinationTest { - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class); + private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test"; - private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; + protected static final String DATASET_LOCATION = "EU"; + protected static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final String TASKS_STREAM_NAME = "tasks"; - private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + protected static final String USERS_STREAM_NAME = "users"; + protected static final String TASKS_STREAM_NAME = "tasks"; + protected static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + protected static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + protected static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build())) .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + protected static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some code.").build())) .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) + protected static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - private static final NamingConventionTransformer NAMING_RESOLVER = new StandardNameTransformer(); + private static final NamingConventionTransformer NAMING_RESOLVER = new BigQuerySQLNameTransformer(); - private JsonNode config; + protected JsonNode config; + protected BigQuery bigquery; + protected Dataset dataset; + protected ConfiguredAirbyteCatalog catalog; + protected boolean tornDown = true; - private BigQuery bigquery; - private Dataset dataset; - private ConfiguredAirbyteCatalog catalog; - - private boolean tornDown = true; + private static Stream datasetIdResetterProvider() { + // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` + return Stream.of( + Arguments.arguments(new DatasetIdResetter(config -> { + })), + Arguments.arguments(new DatasetIdResetter( + config -> { + final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); + ((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID, + String.format("%s:%s", projectId, datasetId)); + }))); + } @BeforeEach void setup(final TestInfo info) throws IOException { @@ -133,8 +146,7 @@ void setup(final TestInfo info) throws IOException { .build() .getService(); - final String datasetId = Strings.addRandomSuffix("111airbyte_tests", "_", 8); - final String datasetLocation = "EU"; + final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_TASKS1.getRecord().setNamespace(datasetId); @@ -142,33 +154,33 @@ void setup(final TestInfo info) throws IOException { catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field - .of("id", JsonSchemaType.STRING)) + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field + .of("id", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(DATASET_LOCATION).build(); dataset = bigquery.create(datasetInfo); config = Jsons.jsonNode(ImmutableMap.builder() .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, DATASET_LOCATION) .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) .build()); tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); + addShutdownHook(); + } + protected void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (!tornDown) { + tearDownBigQuery(); + } + })); } @AfterEach @@ -180,7 +192,7 @@ void tearDown(final TestInfo info) { tearDownBigQuery(); } - private void tearDownBigQuery() { + protected void tearDownBigQuery() { // allows deletion of a dataset that has contents final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); @@ -385,7 +397,7 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase return false; } - private static class DatasetIdResetter { + protected static class DatasetIdResetter { private final Consumer consumer; @@ -399,17 +411,4 @@ public void accept(final JsonNode config) { } - private static Stream datasetIdResetterProvider() { - // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` - return Stream.of( - Arguments.arguments(new DatasetIdResetter(config -> {})), - Arguments.arguments(new DatasetIdResetter( - config -> { - final String projectId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - final String datasetId = ((ObjectNode) config).get(BigQueryConsts.CONFIG_DATASET_ID).asText(); - ((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID, - String.format("%s:%s", projectId, datasetId)); - }))); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java index 766b0f1f8be8..1394968be537 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java @@ -4,105 +4,44 @@ package io.airbyte.integrations.destination.bigquery; -import static java.util.stream.Collectors.toList; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.base.AirbyteMessageConsumer; -import io.airbyte.integrations.base.Destination; -import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.NamingConventionTransformer; -import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.GcsS3Helper; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; -import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Instant; import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class BigQueryGcsDestinationTest { +class BigQueryGcsDestinationTest extends BigQueryDestinationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsDestinationTest.class); - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - - private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; - private static final Instant NOW = Instant.now(); - private static final String USERS_STREAM_NAME = "users"; - private static final String TASKS_STREAM_NAME = "tasks"; - private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "susan").put("id", "30").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "announce the game.").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_TASKS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(TASKS_STREAM_NAME) - .withData(Jsons.jsonNode(ImmutableMap.builder().put("goal", "ship some code.").build())) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE) - .withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build()))); - - private static final NamingConventionTransformer NAMING_RESOLVER = new StandardNameTransformer(); - private JsonNode config; + private static final String DATASET_NAME_PREFIX = "bq_gcs_dest_integration_test"; - private BigQuery bigquery; private AmazonS3 s3Client; - private Dataset dataset; - private ConfiguredAirbyteCatalog catalog; - - private boolean tornDown = true; + @Override @BeforeEach void setup(final TestInfo info) throws IOException { if (info.getDisplayName().equals("testSpec()")) { @@ -127,8 +66,7 @@ void setup(final TestInfo info) throws IOException { .build() .getService(); - final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - final String datasetLocation = "EU"; + final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_TASKS1.getRecord().setNamespace(datasetId); @@ -141,7 +79,7 @@ void setup(final TestInfo info) throws IOException { .of("id", JsonSchemaType.STRING)), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(DATASET_LOCATION).build(); dataset = bigquery.create(datasetInfo); final JsonNode credentialFromSecretFile = credentialsGcsJson.get(BigQueryConsts.CREDENTIAL); @@ -163,7 +101,7 @@ void setup(final TestInfo info) throws IOException { .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, DATASET_LOCATION) .put(BigQueryConsts.LOADING_METHOD, loadingMethod) .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) .build()); @@ -173,18 +111,11 @@ void setup(final TestInfo info) throws IOException { this.s3Client = GcsS3Helper.getGcsS3Client(gcsDestinationConfig); tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); - + addShutdownHook(); } @AfterEach + @Override void tearDown(final TestInfo info) { if (info.getDisplayName().equals("testSpec()")) { return; @@ -220,134 +151,12 @@ protected void tearDownGcs() { } } - private void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; - } - - @Test - void testSpec() throws Exception { - final ConnectorSpecification actual = new BigQueryDestination().spec(); - final String resourceString = MoreResources.readResource("spec.json"); - final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class); - - assertEquals(expected, actual); - } - - @Test - void testCheckSuccess() { - final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); - final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - assertEquals(expected, actual); - } - @Test - void testCheckFailure() { - ((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake"); - final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); - final String actualMessage = actual.getMessage(); - LOGGER.info("Checking expected failure message:" + actualMessage); - assertTrue(actualMessage.contains("Access Denied:")); - final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(""); - assertEquals(expected, actual.withMessage("")); + @Override + void testWritePartitionOverUnpartitioned(final DatasetIdResetter resetDatasetId) throws Exception { + // This test is skipped for GCS staging mode because we load Avro data to BigQuery, but do not + // use the use_avro_logical_types flag to automatically convert the Avro logical timestamp + // type. Therefore, the emission timestamp, which should be used as the partition field, has + // an incorrect type. See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types } - - @Test - void testWriteSuccess() throws Exception { - final BigQueryDestination destination = new BigQueryDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); - - consumer.accept(MESSAGE_USERS1); - consumer.accept(MESSAGE_TASKS1); - consumer.accept(MESSAGE_USERS2); - consumer.accept(MESSAGE_TASKS2); - consumer.accept(MESSAGE_STATE); - consumer.close(); - - final List usersActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(USERS_STREAM_NAME)); - final List expectedUsersJson = Lists.newArrayList(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData()); - assertEquals(expectedUsersJson.size(), usersActual.size()); - assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson)); - - final List tasksActual = retrieveRecords(NAMING_RESOLVER.getRawTableName(TASKS_STREAM_NAME)); - final List expectedTasksJson = Lists.newArrayList(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); - assertEquals(expectedTasksJson.size(), tasksActual.size()); - assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson)); - - assertTmpTablesNotPresent(catalog.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - } - - @Test - void testWriteFailure() throws Exception { - // hack to force an exception to be thrown from within the consumer. - final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); - doThrow(new RuntimeException()).when(spiedMessage).getRecord(); - - final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, catalog, Destination::defaultOutputRecordCollector)); - - assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); - consumer.accept(MESSAGE_USERS2); - - final List tableNames = catalog.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(toList()); - assertTmpTablesNotPresent(catalog.getStreams() - .stream() - .map(ConfiguredAirbyteStream::getStream) - .map(AirbyteStream::getName) - .collect(Collectors.toList())); - // assert that no tables were created. - assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); - } - - private Set fetchNamesOfTablesInDb() throws InterruptedException { - final QueryJobConfiguration queryConfig = QueryJobConfiguration - .newBuilder(String.format("SELECT * FROM %s.INFORMATION_SCHEMA.TABLES;", dataset.getDatasetId().getDataset())) - .setUseLegacySql(false) - .build(); - - return StreamSupport - .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) - .map(v -> v.get("TABLE_NAME").getStringValue()).collect(Collectors.toSet()); - } - - private void assertTmpTablesNotPresent(final List tableNames) throws InterruptedException { - final Set tmpTableNamePrefixes = tableNames.stream().map(name -> name + "_").collect(Collectors.toSet()); - final Set finalTableNames = tableNames.stream().map(name -> name + "_raw").collect(Collectors.toSet()); - // search for table names that have the tmp table prefix but are not raw tables. - assertTrue(fetchNamesOfTablesInDb() - .stream() - .filter(tableName -> !finalTableNames.contains(tableName)) - .noneMatch(tableName -> tmpTableNamePrefixes.stream().anyMatch(tableName::startsWith))); - } - - private List retrieveRecords(final String tableName) throws Exception { - final QueryJobConfiguration queryConfig = - QueryJobConfiguration.newBuilder(String.format("SELECT * FROM %s.%s;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) - .setUseLegacySql(false).build(); - - BigQueryUtils.executeQuery(bigquery, queryConfig); - - return StreamSupport - .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) - .map(v -> v.get(JavaBaseConstants.COLUMN_NAME_DATA).getStringValue()) - .map(Jsons::deserialize) - .collect(Collectors.toList()); - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java new file mode 100644 index 000000000000..7817b9d6d2fc --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformerTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class BigQuerySQLNameTransformerTest { + + private static final BigQuerySQLNameTransformer INSTANCE = new BigQuerySQLNameTransformer(); + private static final Map RAW_TO_NORMALIZED_IDENTIFIERS = Map.of( + "name-space", "name_space", + "spécial_character", "special_character", + "99namespace", "_99namespace", + "*_namespace", "__namespace", + "_namespace", "_namespace"); + + + private static final Map RAW_TO_NORMALIZED_NAMESPACES = Map.of( + "name-space", "name_space", + "spécial_character", "special_character", + // dataset name is allowed to start with a number + "99namespace", "99namespace", + // dataset name starting with an underscore is hidden, so we prepend a letter + "*_namespace", "n__namespace", + "_namespace", "n_namespace"); + + @Test + public void testGetIdentifier() { + assertNull(INSTANCE.getIdentifier(null)); + assertNull(INSTANCE.convertStreamName(null)); + RAW_TO_NORMALIZED_IDENTIFIERS.forEach((raw, normalized) -> { + assertEquals(normalized, INSTANCE.getIdentifier(raw)); + assertEquals(normalized, INSTANCE.convertStreamName(raw)); + }); + } + + @Test + public void testGetNamespace() { + assertNull(INSTANCE.convertStreamName(null)); + RAW_TO_NORMALIZED_NAMESPACES.forEach((raw, normalized) -> { + assertEquals(normalized, INSTANCE.getNamespace(raw)); + }); + } + +} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java index 1e78e263ee42..7ef402f1e65f 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/util/GcsUtils.java @@ -22,10 +22,11 @@ public static Schema getDefaultAvroSchema(final String name, final boolean appendAirbyteFields) { LOGGER.info("Default schema."); final String stdName = AvroConstants.NAME_TRANSFORMER.getIdentifier(name); + final String stdNamespace = AvroConstants.NAME_TRANSFORMER.getNamespace(namespace); SchemaBuilder.RecordBuilder builder = SchemaBuilder.record(stdName); - if (namespace != null) { - builder = builder.namespace(namespace); + if (stdNamespace != null) { + builder = builder.namespace(stdNamespace); } SchemaBuilder.FieldAssembler assembler = builder.fields(); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java index 91331273a571..d319ae68229d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/staging/StagingConsumerFactory.java @@ -103,8 +103,8 @@ private static String getOutputSchema(final AirbyteStream stream, final String defaultDestSchema, final NamingConventionTransformer namingResolver) { return stream.getNamespace() != null - ? namingResolver.getIdentifier(stream.getNamespace()) - : namingResolver.getIdentifier(defaultDestSchema); + ? namingResolver.getNamespace(stream.getNamespace()) + : namingResolver.getNamespace(defaultDestSchema); } private OnStartFunction onStartFunction(final JdbcDatabase database, diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java index d356e177eaa3..689be3e6746f 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformer.java @@ -5,7 +5,16 @@ package io.airbyte.integrations.destination.s3.avro; import io.airbyte.integrations.destination.ExtendedNameTransformer; - +import java.util.Arrays; +import java.util.List; + +/** + *
    + *
  • An Avro name starts with [A-Za-z_], followed by [A-Za-z0-9_].
  • + *
  • An Avro namespace is a dot-separated sequence of such names.
  • + *
  • Reference: https://avro.apache.org/docs/current/spec.html#names
  • + *
+ */ public class AvroNameTransformer extends ExtendedNameTransformer { @Override @@ -14,20 +23,29 @@ public String applyDefaultCase(final String input) { } @Override - public String getIdentifier(final String name) { - return replaceForbiddenCharacters(checkFirsCharInStreamName(convertStreamName(name))); - } + public String convertStreamName(final String input) { + if (input == null) { + return null; + } else if (input.isBlank()) { + return input; + } - private String checkFirsCharInStreamName(final String name) { - if (name.substring(0, 1).matches("[A-Za-z_]")) { - return name; + final String normalizedName = super.convertStreamName(input); + if (normalizedName.substring(0, 1).matches("[A-Za-z_]")) { + return normalizedName; } else { - return "_" + name; + return "_" + normalizedName; } } - private String replaceForbiddenCharacters(final String name) { - return name.replace("-", "_"); + @Override + public String getNamespace(final String input) { + if (input == null) { + return null; + } + + final String[] tokens = input.split("\\."); + return String.join(".", Arrays.stream(tokens).map(this::getIdentifier).toList()); } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index 7289d15a8d05..379674ebecb2 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -122,6 +122,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema, final boolean addStringToLogicalTypes, final boolean isRootNode) { final String stdName = AvroConstants.NAME_TRANSFORMER.getIdentifier(fieldName); + final String stdNamespace = AvroConstants.NAME_TRANSFORMER.getNamespace(fieldNamespace); final SchemaBuilder.RecordBuilder builder = SchemaBuilder.record(stdName); if (!stdName.equals(fieldName)) { standardizedNames.put(fieldName, stdName); @@ -133,8 +134,8 @@ public Schema getAvroSchema(final JsonNode jsonSchema, AvroConstants.DOC_KEY_VALUE_DELIMITER, fieldName)); } - if (fieldNamespace != null) { - builder.namespace(fieldNamespace); + if (stdNamespace != null) { + builder.namespace(stdNamespace); } final JsonNode properties = jsonSchema.get("properties"); @@ -175,7 +176,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema, // Omit the namespace for root level fields, because it is directly assigned in the builder above. // This may not be the correct choice. ? null - : (fieldNamespace == null ? stdName : (fieldNamespace + "." + stdName)); + : (stdNamespace == null ? stdName : (stdNamespace + "." + stdName)); fieldBuilder.type(parseJsonField(subfieldName, subfieldNamespace, subfieldDefinition, appendExtraProps, addStringToLogicalTypes)) .withDefault(null); } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformerTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformerTest.java new file mode 100644 index 000000000000..780502d769fb --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroNameTransformerTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.avro; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class AvroNameTransformerTest { + + private static final AvroNameTransformer INSTANCE = new AvroNameTransformer(); + private static final Map RAW_TO_NORMALIZED_IDENTIFIERS = Map.of( + "name-space", "name_space", + "spécial_character", "special_character", + "99namespace", "_99namespace"); + + private static final Map RAW_TO_NORMALIZED_NAMESPACES = Map.of( + "", "", + "name-space1.name-space2.namespace3", "name_space1.name_space2.namespace3", + "namespace1.spécial_character", "namespace1.special_character", + "99namespace.namespace2", "_99namespace.namespace2"); + + @Test + public void testGetIdentifier() { + assertNull(INSTANCE.getIdentifier(null)); + assertNull(INSTANCE.convertStreamName(null)); + RAW_TO_NORMALIZED_IDENTIFIERS.forEach((raw, normalized) -> { + assertEquals(normalized, INSTANCE.getIdentifier(raw)); + assertEquals(normalized, INSTANCE.convertStreamName(raw)); + }); + } + + @Test + public void testGetNamespace() { + assertNull(INSTANCE.getNamespace(null)); + RAW_TO_NORMALIZED_NAMESPACES.forEach((raw, normalized) -> { + assertEquals(normalized, INSTANCE.getNamespace(raw)); + }); + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json index 1a490a2835c2..5bd54d969dc2 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json @@ -1562,5 +1562,48 @@ "array_field": ["1234", "true", "false", "0.001"], "_airbyte_additional_properties": null } + }, + { + "schemaName": "namespace_with_special_characters", + "namespace": "namespace_with:spécial:characters", + "appendAirbyteFields": false, + "jsonSchema": { + "type": "object", + "properties": { + "node_id": { + "type": ["null", "string"] + } + } + }, + "jsonObject": { + "node_id": "abc123" + }, + "avroSchema": { + "type": "record", + "name": "namespace_with_special_characters", + "namespace": "namespace_with_special_characters", + "fields": [ + { + "name": "node_id", + "type": ["null", "string"], + "default": null + }, + { + "name": "_airbyte_additional_properties", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": null + } + ] + }, + "avroObject": { + "node_id": "abc123", + "_airbyte_additional_properties": null + } } ] diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 2dfd74cee245..8a7cae7a2d9f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.21 +LABEL io.airbyte.version=0.4.22 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java index cea4bf0b88ea..86a2c0f14269 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java @@ -13,4 +13,21 @@ public String applyDefaultCase(final String input) { return input.toUpperCase(); } + /** + * The first character can only be alphanumeric or an underscore. + */ + @Override + public String convertStreamName(final String input) { + if (input == null) { + return null; + } + + final String normalizedName = super.convertStreamName(input); + if (normalizedName.substring(0, 1).matches("[A-Za-z_]")) { + return normalizedName; + } else { + return "_" + normalizedName; + } + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index 17e9b9fd5cdd..a4a658e42286 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -17,20 +17,20 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.io.IOException; import java.nio.file.Path; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; @@ -38,11 +38,12 @@ public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest { + private static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer(); + // this config is based on the static config, and it contains a random // schema name that is different for each test run private JsonNode config; private JdbcDatabase database; - private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer(); @Override protected String getImageName() { @@ -74,7 +75,7 @@ protected List retrieveRecords(final TestDestinationEnv env, final String namespace, final JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace)) + return retrieveRecordsFromTable(NAME_TRANSFORMER.getRawTableName(streamName), NAME_TRANSFORMER.getNamespace(namespace)) .stream() .map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText())) .collect(Collectors.toList()); @@ -95,11 +96,21 @@ protected boolean implementsNamespaces() { return true; } + @Override + protected boolean supportNamespaceTest() { + return true; + } + + @Override + protected Optional getNameTransformer() { + return Optional.of(NAME_TRANSFORMER); + } + @Override protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) throws Exception { - final String tableName = namingResolver.getIdentifier(streamName); - final String schema = namingResolver.getIdentifier(namespace); + final String tableName = NAME_TRANSFORMER.getIdentifier(streamName); + final String schema = NAME_TRANSFORMER.getNamespace(namespace); // Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't // use quoted names // if (!tableName.startsWith("\"")) { @@ -112,7 +123,7 @@ protected List retrieveNormalizedRecords(final TestDestinationEnv test @Override protected List resolveIdentifier(final String identifier) { final List result = new ArrayList<>(); - final String resolved = namingResolver.getIdentifier(identifier); + final String resolved = NAME_TRANSFORMER.getIdentifier(identifier); result.add(identifier); result.add(resolved); if (!resolved.startsWith("\"")) { @@ -177,12 +188,4 @@ public void testSyncWithBillionRecords(final String messagesFilename, final Stri runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false); } - private T parseConfig(final String path, final Class clazz) throws IOException { - return Jsons.deserialize(MoreResources.readResource(path), clazz); - } - - private JsonNode parseConfig(final String path) throws IOException { - return Jsons.deserialize(MoreResources.readResource(path)); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java similarity index 82% rename from airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java rename to airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java index 53adb9c4eac4..a3b1295fef56 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinatiomAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java @@ -8,9 +8,11 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.destination.NamingConventionTransformer; import java.nio.file.Path; +import java.util.Optional; -public class SnowflakeInternalStagingDestinatiomAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { +public class SnowflakeInternalStagingDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest { public JsonNode getStaticConfig() { final JsonNode internalStagingConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/internal_staging_config.json"))); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java new file mode 100644 index 000000000000..133d5c050cac --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlNameTransformerTest.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class SnowflakeSqlNameTransformerTest { + + private static final SnowflakeSQLNameTransformer INSTANCE = new SnowflakeSQLNameTransformer(); + private static final Map RAW_TO_NORMALIZED_IDENTIFIERS = Map.of( + "name-space", "name_space", + "spécial_character", "special_character", + "99namespace", "_99namespace"); + + @Test + public void testGetIdentifier() { + assertNull(INSTANCE.getIdentifier(null)); + assertNull(INSTANCE.convertStreamName(null)); + RAW_TO_NORMALIZED_IDENTIFIERS.forEach((raw, normalized) -> { + assertEquals(normalized, INSTANCE.convertStreamName(raw)); + assertEquals(normalized, INSTANCE.getIdentifier(raw)); + assertEquals(normalized, INSTANCE.getNamespace(raw)); + }); + } + +} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index c7be00605da3..10da7f02d41e 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -145,7 +145,9 @@ When you create a dataset in BigQuery, the dataset name must be unique for each * Dataset names are case-sensitive: mydataset and MyDataset can coexist in the same project. * Dataset names cannot contain spaces or special characters such as -, &, @, or %. -Therefore, Airbyte BigQuery destination will convert any invalid characters into '\_' characters when writing data. +Therefore, Airbyte BigQuery destination will convert any invalid characters into `_` characters when writing data. + +Since datasets that begin with `_` will be hidden from the BigQuery Explorer panel. To avoid creating such datasets, the destination will prepend the namespace with `n` if the converted namespace ## CHANGELOG @@ -153,6 +155,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 0.6.12 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.6.11 | 2022-03-03 | [10755](https://github.com/airbytehq/airbyte/pull/10755) | Make sure to kill children threads and stop JVM | | 0.6.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.6.6 | 2022-02-01 | [\#9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. | @@ -178,6 +181,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------| :--- | +| 0.2.11 | 2022-03-18 | [10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.2.10 | 2022-03-03 | [10755](https://github.com/airbytehq/airbyte/pull/10755) | Make sure to kill children threads and stop JVM | | 0.2.8 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.2.7 | 2022-02-01 | [\#9959](https://github.com/airbytehq/airbyte/pull/9959) | Fix null pointer exception from buffered stream consumer. | diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index a3415f60f2c7..7b93447d3357 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -218,6 +218,7 @@ Now that you have set up the Snowflake destination connector, check out the foll | Version | Date | Pull Request | Subject | |:--------|:-----------| :----- | :------ | +| 0.4.22 | 2022-03-18 | [\#10793](https://github.com/airbytehq/airbyte/pull/10793) | Fix namespace with invalid characters | | 0.4.21 | 2022-03-18 | [\#11071](https://github.com/airbytehq/airbyte/pull/11071) | Switch to compressed on-disk buffering before staging to s3/internal stage | | 0.4.20 | 2022-03-14 | [\#10341](https://github.com/airbytehq/airbyte/pull/10341) | Add Azure blob staging support | | 0.4.19 | 2022-03-11 | [10699](https://github.com/airbytehq/airbyte/pull/10699) | Added unit tests |