From 99335daca940676cf4920fa42afd0ef2c8f0f64b Mon Sep 17 00:00:00 2001 From: Greg Solovyev Date: Wed, 28 Dec 2022 13:37:33 -0800 Subject: [PATCH] Refactor BigQuery Destination Integration tests (#20851) * Refactor BigQuery Destination Integration tests to reduce code duplication and move configuration logic from code into config files. This refactoring will make it easier to add more test cases for configuration variations such as data location, accounts with various permission combinations, and account impersonation * More refactoring * fix typo * Change secret file names to avoid conflict with current tests on master * remove copy-pasted credential file paths * more copu-pasta reduction --- .../bigquery/BigQueryDestination.java | 2 +- .../destination/bigquery/BigQueryUtils.java | 4 +- ...actBigQueryDestinationAcceptanceTest.java} | 129 +------ .../bigquery/BigQueryDestinationTest.java | 350 ++++++++---------- .../BigQueryDestinationTestUtils.java | 133 +++++++ .../BigQueryGcsDestinationAcceptanceTest.java | 95 +++-- .../bigquery/BigQueryGcsDestinationTest.java | 162 -------- ...ueryStandardDestinationAcceptanceTest.java | 39 ++ 8 files changed, 380 insertions(+), 534 deletions(-) rename airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/{BigQueryDestinationAcceptanceTest.java => AbstractBigQueryDestinationAcceptanceTest.java} (54%) create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.java delete mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java create mode 100644 airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardDestinationAcceptanceTest.java diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 7ff3dfdaa5b6..8900c0b092bb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -177,7 +177,7 @@ protected BigQuery getBigQuery(final JsonNode config) { } } - private static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException { + public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException { if (!BigQueryUtils.isUsingJsonCredentials(config)) { LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud."); LOGGER.info("Using the default service account credential from environment."); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index c82dd5ba72db..d0478fb6856a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -70,7 +70,7 @@ public class BigQueryUtils { DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" + "[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]"); private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)"; - private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis(); + private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_"; private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name"; public static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { @@ -125,7 +125,7 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d } public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) { - final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX; + final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX + System.currentTimeMillis(); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build(); bigquery.create(datasetInfo); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java similarity index 54% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java index 89960f67e499..fff0b0e300b3 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/AbstractBigQueryDestinationAcceptanceTest.java @@ -8,19 +8,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.ConnectionProperty; import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; @@ -31,36 +24,27 @@ import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.TimeZone; -import java.util.UUID; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest { +public abstract class AbstractBigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest { private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer(); - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationAcceptanceTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryDestinationAcceptanceTest.class); - protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - - protected static final String CONFIG_DATASET_ID = "dataset_id"; protected static final String CONFIG_PROJECT_ID = "project_id"; - protected static final String CONFIG_DATASET_LOCATION = "dataset_location"; - protected static final String CONFIG_CREDS = "credentials_json"; - + protected Path secretsFile; protected BigQuery bigquery; protected Dataset dataset; - protected boolean tornDown; + protected JsonNode config; protected final StandardNameTransformer namingResolver = new StandardNameTransformer(); @@ -165,7 +149,7 @@ protected List retrieveRecords(final TestDestinationEnv env, .collect(Collectors.toList()); } - private List retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException { + protected List retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); final QueryJobConfiguration queryConfig = @@ -177,7 +161,7 @@ private List retrieveRecordsFromTable(final String tableName, final St .setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC"))) .build(); - final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); + final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); final FieldList fields = queryResults.getSchema().getFields(); BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); @@ -185,102 +169,19 @@ private List retrieveRecordsFromTable(final String tableName, final St .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } - @Override - protected void setup(final TestDestinationEnv testEnv) throws Exception { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - - final String fullConfigAsString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); - final String datasetLocation = "US"; - + protected void setUpBigQuery() throws IOException { + //secrets file should be set by the inhereting class + Assertions.assertNotNull(secretsFile); final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + config = BigQueryDestinationTestUtils.createConfig(secretsFile, datasetId); - config = Jsons.jsonNode(ImmutableMap.builder() - .put(CONFIG_PROJECT_ID, projectId) - .put(CONFIG_CREDS, credentialsJson.toString()) - .put(CONFIG_DATASET_ID, datasetId) - .put(CONFIG_DATASET_LOCATION, datasetLocation) - .build()); - - setupBigQuery(credentialsJson); - } - - protected void setupBigQuery(final JsonNode credentialsJson) throws IOException { - final ServiceAccountCredentials credentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8))); - - bigquery = BigQueryOptions.newBuilder() - .setProjectId(config.get(CONFIG_PROJECT_ID).asText()) - .setCredentials(credentials) - .build() - .getService(); - - final DatasetInfo datasetInfo = - DatasetInfo.newBuilder(getDefaultSchema(config)).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build(); - dataset = bigquery.create(datasetInfo); - - tornDown = false; - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) { - tearDownBigQuery(); + final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId); + dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId); } protected void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); - } - - tornDown = true; - } - - // todo (cgardens) - figure out how to share these helpers. they are currently copied from - // BigQueryDestination. - private static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { - final JobId jobId = JobId.of(UUID.randomUUID().toString()); - final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - return executeQuery(queryJob); - } - - private static ImmutablePair executeQuery(final Job queryJob) { - final Job completedJob = waitForQuery(queryJob); - if (completedJob == null) { - throw new RuntimeException("Job no longer exists"); - } else if (completedJob.getStatus().getError() != null) { - // You can also look at queryJob.getStatus().getExecutionErrors() for all - // errors, not just the latest one. - return ImmutablePair.of(null, (completedJob.getStatus().getError().toString())); - } - - return ImmutablePair.of(completedJob, null); - } - - private static Job waitForQuery(final Job queryJob) { - try { - return queryJob.waitFor(); - } catch (final Exception e) { - throw new RuntimeException(e); - } + BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index a9f775576730..8d93d74543d9 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -9,16 +9,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doThrow; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; import static org.mockito.Mockito.spy; +import com.amazonaws.services.s3.AmazonS3; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.StandardSQLTypeName; @@ -27,7 +24,6 @@ import com.google.cloud.bigquery.TableInfo; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; @@ -35,6 +31,7 @@ import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; @@ -48,42 +45,47 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.ConnectorSpecification; import io.airbyte.protocol.models.v0.DestinationSyncMode; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@TestInstance(PER_CLASS) class BigQueryDestinationTest { - - protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); - protected static final Path CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH = - Path.of("secrets/credentials-with-missed-dataset-creation-role.json"); + protected static final Path CREDENTIALS_STANDARD_INSERT_PATH = Path.of("secrets/credentials-standard.json"); + protected static final Path CREDENTIALS_BAD_PROJECT_PATH = Path.of("secrets/credentials-badproject.json"); + protected static final Path CREDENTIALS_NO_DATASET_CREATION_PATH = + Path.of("secrets/credentials-standard-no-dataset-creation.json"); protected static final Path CREDENTIALS_NON_BILLABLE_PROJECT_PATH = - Path.of("secrets/credentials-non-billable-project.json"); + Path.of("secrets/credentials-standard-non-billable-project.json"); + protected static final Path CREDENTIALS_WITH_GCS_STAGING_PATH = + Path.of("secrets/credentials-gcs-staging.json"); + protected static final Path[] ALL_PATHS = {CREDENTIALS_WITH_GCS_STAGING_PATH, CREDENTIALS_BAD_PROJECT_PATH, CREDENTIALS_NO_DATASET_CREATION_PATH, + CREDENTIALS_NON_BILLABLE_PROJECT_PATH, CREDENTIALS_WITH_GCS_STAGING_PATH}; private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class); private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test"; - protected static final String DATASET_LOCATION = "EU"; - protected static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; private static final Instant NOW = Instant.now(); protected static final String USERS_STREAM_NAME = "users"; protected static final String TASKS_STREAM_NAME = "tasks"; @@ -108,49 +110,77 @@ class BigQueryDestinationTest { private static final NamingConventionTransformer NAMING_RESOLVER = new BigQuerySQLNameTransformer(); - protected JsonNode config; + protected static String projectId; + protected static String datasetId; + protected static JsonNode config; + protected static JsonNode configWithProjectId; + protected static JsonNode configWithBadProjectId; + protected static JsonNode insufficientRoleConfig; + protected static JsonNode nonBillableConfig; + protected static JsonNode gcsStagingConfig; //default BigQuery config. Also used for setup/teardown protected BigQuery bigquery; protected Dataset dataset; - protected ConfiguredAirbyteCatalog catalog; - protected boolean tornDown = true; + protected static Map configs; + protected static ConfiguredAirbyteCatalog catalog; + + private AmazonS3 s3Client; - private static Stream datasetIdResetterProvider() { - // parameterized test with two dataset-id patterns: `dataset_id` and `project-id:dataset_id` + private Stream successTestConfigProvider() { return Stream.of( - Arguments.arguments(new DatasetIdResetter(config -> {})), - Arguments.arguments(new DatasetIdResetter( - config -> { - final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - final String datasetId = config.get(BigQueryConsts.CONFIG_DATASET_ID).asText(); - ((ObjectNode) config).put(BigQueryConsts.CONFIG_DATASET_ID, - String.format("%s:%s", projectId, datasetId)); - }))); + Arguments.of("config"), + Arguments.of("configWithProjectId"), + Arguments.of("gcsStagingConfig") + ); } - @BeforeEach - void setup(final TestInfo info) throws IOException { - if (info.getDisplayName().equals("testSpec()")) { - return; - } + private Stream failCheckTestConfigProvider() { + return Stream.of( + Arguments.of("configWithBadProjectId", "User does not have bigquery.datasets.create permission in project"), + Arguments.of("insufficientRoleConfig", "User does not have bigquery.datasets.create permission"), + Arguments.of("nonBillableConfig", "Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier") + ); + } - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/config/credentials.json. Override by setting setting path with the CREDENTIALS_PATH constant."); + private Stream failWriteTestConfigProvider() { + return Stream.of( + Arguments.of("configWithBadProjectId", "User does not have bigquery.datasets.create permission in project"), + Arguments.of("insufficientRoleConfig", "Permission bigquery.tables.create denied") + ); + } + + @BeforeAll + public static void beforeAll() throws IOException { + for(Path path : ALL_PATHS) { + if (!Files.exists(path)) { + throw new IllegalStateException( + String.format("Must provide path to a big query credentials file. Please add file with credentials to %s", path.toAbsolutePath())); + } } - final String fullConfigAsString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); + //Set up config objects for test scenarios + //config - basic config for standard inserts that should succeed check and write tests + //this config is also used for housekeeping (checking records, and cleaning up) + config = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_STANDARD_INSERT_PATH, datasetId); - final ServiceAccountCredentials credentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8))); - bigquery = BigQueryOptions.newBuilder() - .setProjectId(projectId) - .setCredentials(credentials) - .build() - .getService(); + //all successful configs use the same project ID + projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + + //configWithProjectId - config that uses project:dataset notation for datasetId + final String dataSetWithProjectId = String.format("%s:%s", projectId, datasetId); + configWithProjectId = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_STANDARD_INSERT_PATH, dataSetWithProjectId); + + //configWithBadProjectId - config that uses "fake" project ID and should fail + final String dataSetWithBadProjectId = String.format("%s:%s", "fake", datasetId); + configWithBadProjectId = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_BAD_PROJECT_PATH, dataSetWithBadProjectId); + + //config that has insufficient privileges + insufficientRoleConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_NO_DATASET_CREATION_PATH, datasetId); + //config that tries to write to a project with disabled billing (free tier) + nonBillableConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_NON_BILLABLE_PROJECT_PATH, "testnobilling"); + //config with GCS staging + gcsStagingConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_WITH_GCS_STAGING_PATH, datasetId); - final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_TASKS1.getRecord().setNamespace(datasetId); @@ -158,56 +188,50 @@ void setup(final TestInfo info) throws IOException { catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, - io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), - io.airbyte.protocol.models.Field - .of("id", JsonSchemaType.STRING)) + io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING), + io.airbyte.protocol.models.Field + .of("id", JsonSchemaType.STRING)) .withDestinationSyncMode(DestinationSyncMode.APPEND), CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(DATASET_LOCATION).build(); - dataset = bigquery.create(datasetInfo); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, DATASET_LOCATION) - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - tornDown = false; - addShutdownHook(); + configs = new HashMap() {{ + put("config", config); + put("configWithProjectId", configWithProjectId); + put("configWithBadProjectId", configWithBadProjectId); + put("insufficientRoleConfig", insufficientRoleConfig); + put("nonBillableConfig", nonBillableConfig); + put("gcsStagingConfig", gcsStagingConfig); + }}; } - protected void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - if (!tornDown) { - tearDownBigQuery(); - } - })); + protected void initBigQuery(JsonNode config) throws IOException { + bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId); + try { + dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId); + } catch(Exception ex) { + //ignore + } } - @AfterEach - void tearDown(final TestInfo info) { + @BeforeEach + void setup(final TestInfo info) throws IOException { if (info.getDisplayName().equals("testSpec()")) { return; } - - tearDownBigQuery(); + bigquery = null; + dataset = null; + final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig + .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(gcsStagingConfig)); + this.s3Client = gcsDestinationConfig.getS3Client(); } - protected void tearDownBigQuery() { - // allows deletion of a dataset that has contents - final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); - - final boolean success = bigquery.delete(dataset.getDatasetId(), option); - if (success) { - LOGGER.info("BQ Dataset " + dataset + " deleted..."); - } else { - LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); + @AfterEach + void tearDown(final TestInfo info) { + if (info.getDisplayName().equals("testSpec()")) { + return; } - - tornDown = true; + BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER); + BigQueryDestinationTestUtils.tearDownGcs(s3Client, config, LOGGER); } @Test @@ -220,104 +244,32 @@ void testSpec() throws Exception { } @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testCheckSuccess(final DatasetIdResetter resetDatasetId) { - resetDatasetId.accept(config); - final AirbyteConnectionStatus actual = new BigQueryDestination().check(config); + @MethodSource("successTestConfigProvider") + void testCheckSuccess(String configName) throws IOException { + JsonNode testConfig = configs.get(configName); + final AirbyteConnectionStatus actual = new BigQueryDestination().check(testConfig); final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); assertEquals(expected, actual); } @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testCheckFailure(final DatasetIdResetter resetDatasetId) { - ((ObjectNode) config).put(BigQueryConsts.CONFIG_PROJECT_ID, "fake"); - resetDatasetId.accept(config); - - // Assert that check throws exception. Later it will be handled by IntegrationRunner - final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> { - new BigQueryDestination().check(config); - }); - - assertThat(ex.getMessage()).contains("Access Denied"); - } - - @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdResetter resetDatasetId) throws IOException { - - if (!Files.exists(CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH)) { - throw new IllegalStateException(""" - Json config not found. Must provide path to a big query credentials file, - please add file with creds to - ../destination-bigquery/secrets/credentialsWithMissedDatasetCreationRole.json."""); - } - final String fullConfigAsString = Files.readString(CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH); - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); - - final JsonNode insufficientRoleConfig; - - insufficientRoleConfig = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, DATASET_LOCATION) - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - resetDatasetId.accept(insufficientRoleConfig); - - // Assert that check throws exception. Later it will be handled by IntegrationRunner - final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> { - new BigQueryDestination().check(insufficientRoleConfig); + @MethodSource("failCheckTestConfigProvider") + void testCheckFailures(String configName, String error) { + //TODO: this should always throw ConfigErrorException + JsonNode testConfig = configs.get(configName); + final Exception ex = assertThrows(Exception.class, () -> { + new BigQueryDestination().check(testConfig); }); - - assertThat(ex.getMessage()).contains("User does not have bigquery.datasets.create permission"); + assertThat(ex.getMessage()).contains(error); } @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testCheckFailureNonBillableProject(final DatasetIdResetter resetDatasetId) throws IOException { - - if (!Files.exists(CREDENTIALS_NON_BILLABLE_PROJECT_PATH)) { - throw new IllegalStateException(""" - Json config not found. Must provide path to a big query credentials file, - please add file with creds to - ../destination-bigquery/secrets/credentials-non-billable-project.json"""); - } - final String fullConfigAsString = Files.readString(CREDENTIALS_NON_BILLABLE_PROJECT_PATH); - - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - - final JsonNode insufficientRoleConfig; - - insufficientRoleConfig = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, "testnobilling") - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US") - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - resetDatasetId.accept(insufficientRoleConfig); - - // Assert that check throws exception. Later it will be handled by IntegrationRunner - final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> { - new BigQueryDestination().check(insufficientRoleConfig); - }); - - assertThat(ex.getMessage()).contains("Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier"); - } - - @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception { - resetDatasetId.accept(config); + @MethodSource("successTestConfigProvider") + void testWriteSuccess(String configName) throws Exception { + initBigQuery(config); + JsonNode testConfig = configs.get(configName); final BigQueryDestination destination = new BigQueryDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + final AirbyteMessageConsumer consumer = destination.getConsumer(testConfig, catalog, Destination::defaultOutputRecordCollector); consumer.start(); consumer.accept(MESSAGE_USERS1); @@ -345,19 +297,15 @@ void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception { } @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testWriteFailure(final DatasetIdResetter resetDatasetId) throws Exception { - resetDatasetId.accept(config); - // hack to force an exception to be thrown from within the consumer. - final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1); - doThrow(new RuntimeException()).when(spiedMessage).getRecord(); - - final AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(config, catalog, Destination::defaultOutputRecordCollector)); - - consumer.start(); - assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); - consumer.accept(MESSAGE_USERS2); - consumer.close(); + @MethodSource("failWriteTestConfigProvider") + void testWriteFailure(String configName, String error) throws Exception { + initBigQuery(config); + JsonNode testConfig = configs.get(configName); + final Exception ex = assertThrows(Exception.class, () -> { + AirbyteMessageConsumer consumer = spy(new BigQueryDestination().getConsumer(testConfig, catalog, Destination::defaultOutputRecordCollector)); + consumer.start(); + }); + assertThat(ex.getMessage()).contains(error); final List tableNames = catalog.getStreams() .stream() @@ -374,11 +322,17 @@ void testWriteFailure(final DatasetIdResetter resetDatasetId) throws Exception { } private Set fetchNamesOfTablesInDb() throws InterruptedException { + if(dataset == null || bigquery == null) { + return Collections.emptySet(); + } final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder(String.format("SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES`;", dataset.getDatasetId().getDataset())) .setUseLegacySql(false) .build(); + if(!dataset.exists()) { + return Collections.emptySet(); + } return StreamSupport .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) .map(v -> v.get("TABLE_NAME").getStringValue()).collect(Collectors.toSet()); @@ -409,14 +363,15 @@ private List retrieveRecords(final String tableName) throws Exception } @ParameterizedTest - @MethodSource("datasetIdResetterProvider") - void testWritePartitionOverUnpartitioned(final DatasetIdResetter resetDatasetId) throws Exception { - resetDatasetId.accept(config); + @MethodSource("successTestConfigProvider") + void testWritePartitionOverUnpartitioned(String configName) throws Exception { + JsonNode testConfig = configs.get(configName); + initBigQuery(config); final String raw_table_name = String.format("_airbyte_raw_%s", USERS_STREAM_NAME); createUnpartitionedTable(bigquery, dataset, raw_table_name); assertFalse(isTablePartitioned(bigquery, dataset, raw_table_name)); final BigQueryDestination destination = new BigQueryDestination(); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + final AirbyteMessageConsumer consumer = destination.getConsumer(testConfig, catalog, Destination::defaultOutputRecordCollector); consumer.start(); consumer.accept(MESSAGE_USERS1); @@ -474,19 +429,4 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase } return false; } - - protected static class DatasetIdResetter { - - private final Consumer consumer; - - DatasetIdResetter(final Consumer consumer) { - this.consumer = consumer; - } - - public void accept(final JsonNode config) { - consumer.accept(config); - } - - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.java new file mode 100644 index 000000000000..3f2b7829550d --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTestUtils.java @@ -0,0 +1,133 @@ +package io.airbyte.integrations.destination.bigquery; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.LinkedList; +import java.util.List; +import org.slf4j.Logger; + +public class BigQueryDestinationTestUtils { + + /** + * Parse the config file and replace dataset with datasetId randomly generated by the test + * @param configFile + * @param datasetId + * @return + * @throws IOException + */ + public static JsonNode createConfig(Path configFile, String datasetId) throws IOException { + final String tmpConfigAsString = Files.readString(configFile); + final JsonNode tmpConfigJson = Jsons.deserialize(tmpConfigAsString); + return Jsons.jsonNode(((ObjectNode)tmpConfigJson).put(BigQueryConsts.CONFIG_DATASET_ID, datasetId)); + } + + /** + * Get a handle for the BigQuery dataset instance used by the test. + * This dataset instance will be used to verify results of test operations + * and for cleaning up after the test runs + * @param config + * @param bigquery + * @param datasetId + * @return + */ + public static Dataset initDataSet(JsonNode config, BigQuery bigquery, String datasetId) { + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId) + .setLocation(config.get(BigQueryConsts.CONFIG_DATASET_LOCATION).asText()).build(); + try { + return bigquery.create(datasetInfo); + } catch(Exception ex) { + if(ex.getMessage().indexOf("Already Exists") > -1) { + return bigquery.getDataset(datasetId); + } + } + return null; + } + + /** + * Initialized bigQuery instance that will be used for verifying results of test operations + * and for cleaning up BigQuery dataset after the test + * @param config + * @param projectId + * @return + * @throws IOException + */ + public static BigQuery initBigQuery(JsonNode config, String projectId) throws IOException { + final GoogleCredentials credentials = BigQueryDestination.getServiceAccountCredentials(config); + return BigQueryOptions.newBuilder() + .setProjectId(projectId) + .setCredentials(credentials) + .build() + .getService(); + } + + /** + * Deletes bigquery data set created during the test + * @param bigquery + * @param dataset + * @param LOGGER + */ + public static void tearDownBigQuery(BigQuery bigquery, Dataset dataset, Logger LOGGER) { + // allows deletion of a dataset that has contents + final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); + if(bigquery == null || dataset == null) { + return; + } + try { + final boolean success = bigquery.delete(dataset.getDatasetId(), option); + if (success) { + LOGGER.info("BQ Dataset " + dataset + " deleted..."); + } else { + LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!"); + } + } catch (Exception ex) { + LOGGER.error("Failed to remove BigQuery resources after the test", ex); + } + } + + /** + * Remove all the GCS output from the tests. + */ + public static void tearDownGcs(AmazonS3 s3Client, JsonNode config, Logger LOGGER) { + if(s3Client == null) { + return; + } + if(BigQueryUtils.getLoadingMethod(config) != UploadingMethod.GCS) { + return; + } + final JsonNode properties = config.get(BigQueryConsts.LOADING_METHOD); + final String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); + final String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText(); + try { + final List keysToDelete = new LinkedList<>(); + final List objects = s3Client + .listObjects(gcsBucketName, gcs_bucket_path) + .getObjectSummaries(); + for (final S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (final KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s).", keysToDelete.size()); + } + } catch (Exception ex) { + LOGGER.error("Failed to remove GCS resources after the test", ex); + } + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java index 9226bec91b69..4f13a6cf0d51 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationAcceptanceTest.java @@ -4,59 +4,54 @@ package io.airbyte.integrations.destination.bigquery; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import java.nio.file.Files; -import java.nio.file.Path; - -public class BigQueryGcsDestinationAcceptanceTest extends BigQueryDestinationAcceptanceTest { +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; - private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); +import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.nio.file.Path; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(PER_CLASS) +public class BigQueryGcsDestinationAcceptanceTest extends AbstractBigQueryDestinationAcceptanceTest { + private AmazonS3 s3Client; + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsDestinationAcceptanceTest.class); + + /** + * Sets up secretsFile path as well as BigQuery and GCS instances for verification and cleanup + * This function will be called before EACH test. + * @see DestinationAcceptanceTest#setUpInternal() + * @param testEnv - information about the test environment. + * @throws Exception - can throw any exception, test framework will handle. + */ + @Override + protected void setup(TestDestinationEnv testEnv) throws Exception { + //use secrets file with GCS staging config + secretsFile = Path.of("secrets/credentials-gcs-staging.json"); + setUpBigQuery(); + + //the setup steps below are specific to GCS staging use case + final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig + .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); + this.s3Client = gcsDestinationConfig.getS3Client(); + } + /** + * Removes data from bigquery and GCS + * This function will be called after EACH test + * @see DestinationAcceptanceTest#tearDownInternal() + * @param testEnv - information about the test environment. + * @throws Exception - can throw any exception, test framework will handle. + */ @Override - protected void setup(final TestDestinationEnv testEnv) throws Exception { - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH - + ". Override by setting setting path with the CREDENTIALS_PATH constant."); - } - - final String fullConfigFromSecretFileAsString = Files.readString(CREDENTIALS_PATH); - - final JsonNode fullConfigFromSecretFileJson = Jsons.deserialize(fullConfigFromSecretFileAsString); - final JsonNode bigqueryConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final JsonNode gcsConfigFromSecretFile = fullConfigFromSecretFileJson.get(BigQueryConsts.GCS_CONFIG); - - final String projectId = bigqueryConfigFromSecretFile.get(CONFIG_PROJECT_ID).asText(); - final String datasetLocation = "US"; - - final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); - - final JsonNode gcsCredentialFromSecretFile = gcsConfigFromSecretFile.get(BigQueryConsts.CREDENTIAL); - final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CREDENTIAL_TYPE, gcsCredentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, gcsCredentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) - .build()); - - final JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) - .put(BigQueryConsts.GCS_BUCKET_NAME, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_NAME)) - .put(BigQueryConsts.GCS_BUCKET_PATH, gcsConfigFromSecretFile.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.CREDENTIAL, credential) - .build()); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, bigqueryConfigFromSecretFile.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, datasetLocation) - .put(BigQueryConsts.LOADING_METHOD, loadingMethod) - .build()); - - setupBigQuery(bigqueryConfigFromSecretFile); + protected void tearDown(TestDestinationEnv testEnv) { + tearDownBigQuery(); + tearDownGcs(); } + protected void tearDownGcs() { + BigQueryDestinationTestUtils.tearDownGcs(s3Client, config, LOGGER); + } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java deleted file mode 100644 index ed2cb16ae548..000000000000 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsDestinationTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.bigquery; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.auth.oauth2.ServiceAccountCredentials; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.protocol.models.Field; -import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.LinkedList; -import java.util.List; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class BigQueryGcsDestinationTest extends BigQueryDestinationTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsDestinationTest.class); - - private static final String DATASET_NAME_PREFIX = "bq_gcs_dest_integration_test"; - - private AmazonS3 s3Client; - - @Override - @BeforeEach - void setup(final TestInfo info) throws IOException { - if (info.getDisplayName().equals("testSpec()")) { - return; - } - - if (!Files.exists(CREDENTIALS_PATH)) { - throw new IllegalStateException( - "Must provide path to a big query credentials file. By default {module-root}/config/credentials.json. Override by setting setting path with the CREDENTIALS_PATH constant."); - } - final String fullConfigAsString = Files.readString(CREDENTIALS_PATH); - final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); - final JsonNode credentialsGcsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.GCS_CONFIG); - - final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); - - final ServiceAccountCredentials credentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8))); - bigquery = BigQueryOptions.newBuilder() - .setProjectId(projectId) - .setCredentials(credentials) - .build() - .getService(); - - final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); - MESSAGE_USERS1.getRecord().setNamespace(datasetId); - MESSAGE_USERS2.getRecord().setNamespace(datasetId); - MESSAGE_TASKS1.getRecord().setNamespace(datasetId); - MESSAGE_TASKS2.getRecord().setNamespace(datasetId); - - catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( - CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId, - Field.of("name", JsonSchemaType.STRING), - Field - .of("id", JsonSchemaType.STRING)), - CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING)))); - - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(DATASET_LOCATION).build(); - dataset = bigquery.create(datasetInfo); - - final JsonNode credentialFromSecretFile = credentialsGcsJson.get(BigQueryConsts.CREDENTIAL); - final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CREDENTIAL_TYPE, credentialFromSecretFile.get(BigQueryConsts.CREDENTIAL_TYPE)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_ID, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_ID)) - .put(BigQueryConsts.HMAC_KEY_ACCESS_SECRET, credentialFromSecretFile.get(BigQueryConsts.HMAC_KEY_ACCESS_SECRET)) - .build()); - - final JsonNode loadingMethod = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.METHOD, BigQueryConsts.GCS_STAGING) - .put(BigQueryConsts.KEEP_GCS_FILES, BigQueryConsts.KEEP_GCS_FILES_VAL) - .put(BigQueryConsts.GCS_BUCKET_NAME, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_NAME)) - .put(BigQueryConsts.GCS_BUCKET_PATH, credentialsGcsJson.get(BigQueryConsts.GCS_BUCKET_PATH).asText() + System.currentTimeMillis()) - .put(BigQueryConsts.CREDENTIAL, credential) - .build()); - - config = Jsons.jsonNode(ImmutableMap.builder() - .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) - .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) - .put(BigQueryConsts.CONFIG_DATASET_ID, datasetId) - .put(BigQueryConsts.CONFIG_DATASET_LOCATION, DATASET_LOCATION) - .put(BigQueryConsts.LOADING_METHOD, loadingMethod) - .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) - .build()); - - final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig - .getGcsDestinationConfig(BigQueryUtils.getGcsJsonNodeConfig(config)); - this.s3Client = gcsDestinationConfig.getS3Client(); - - tornDown = false; - addShutdownHook(); - } - - @AfterEach - @Override - void tearDown(final TestInfo info) { - if (info.getDisplayName().equals("testSpec()")) { - return; - } - - tearDownGcs(); - tearDownBigQuery(); - } - - /** - * Remove all the GCS output from the tests. - */ - protected void tearDownGcs() { - final JsonNode properties = config.get(BigQueryConsts.LOADING_METHOD); - final String gcsBucketName = properties.get(BigQueryConsts.GCS_BUCKET_NAME).asText(); - final String gcs_bucket_path = properties.get(BigQueryConsts.GCS_BUCKET_PATH).asText(); - - final List keysToDelete = new LinkedList<>(); - final List objects = s3Client - .listObjects(gcsBucketName, gcs_bucket_path) - .getObjectSummaries(); - for (final S3ObjectSummary object : objects) { - keysToDelete.add(new KeyVersion(object.getKey())); - } - - if (keysToDelete.size() > 0) { - LOGGER.info("Tearing down test bucket path: {}/{}", gcsBucketName, gcs_bucket_path); - // Google Cloud Storage doesn't accept request to delete multiple objects - for (final KeyVersion keyToDelete : keysToDelete) { - s3Client.deleteObject(gcsBucketName, keyToDelete.getKey()); - } - LOGGER.info("Deleted {} file(s).", keysToDelete.size()); - } - } - - @Override - void testWritePartitionOverUnpartitioned(final DatasetIdResetter resetDatasetId) throws Exception { - // This test is skipped for GCS staging mode because we load Avro data to BigQuery, but do not - // use the use_avro_logical_types flag to automatically convert the Avro logical timestamp - // type. Therefore, the emission timestamp, which should be used as the partition field, has - // an incorrect type. See - // https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types - } - -} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardDestinationAcceptanceTest.java new file mode 100644 index 000000000000..0542c8a3837a --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryStandardDestinationAcceptanceTest.java @@ -0,0 +1,39 @@ +package io.airbyte.integrations.destination.bigquery; + +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import java.nio.file.Path; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(PER_CLASS) +public class BigQueryStandardDestinationAcceptanceTest extends AbstractBigQueryDestinationAcceptanceTest { + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStandardDestinationAcceptanceTest.class); + + /** + * Sets up secretsFile path and BigQuery instance for verification and cleanup + * This function will be called before EACH test. + * @see DestinationAcceptanceTest#setUpInternal() + * @param testEnv - information about the test environment. + * @throws Exception - can throw any exception, test framework will handle. + */ + @Override + protected void setup(TestDestinationEnv testEnv) throws Exception { + secretsFile = Path.of("secrets/credentials-standard.json"); + setUpBigQuery(); + } + + /** + * Removes data from bigquery + * This function will be called after EACH test + * @see DestinationAcceptanceTest#tearDownInternal() + * @param testEnv - information about the test environment. + * @throws Exception - can throw any exception, test framework will handle. + */ + @Override + protected void tearDown(TestDestinationEnv testEnv) { + tearDownBigQuery(); + } +}