Skip to content

Commit

Permalink
Refactor BigQuery Destination Integration tests (airbytehq#20851)
Browse files Browse the repository at this point in the history
* Refactor BigQuery Destination Integration tests
to reduce code duplication and move configuration
logic from code into config files.
This refactoring will make it easier to add more
test cases for configuration variations such as
data location, accounts with various permission combinations,
and account impersonation

* More refactoring

* fix typo

* Change secret file names to avoid conflict with current tests on master

* remove copy-pasted credential file paths

* more copu-pasta reduction
  • Loading branch information
grishick authored Dec 28, 2022
1 parent d25e4a9 commit 99335da
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected BigQuery getBigQuery(final JsonNode config) {
}
}

private static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
if (!BigQueryUtils.isUsingJsonCredentials(config)) {
LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud.");
LOGGER.info("Using the default service account credential from environment.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class BigQueryUtils {
DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]");
private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis();
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_";
private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name";

public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d
}

public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX;
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX + System.currentTimeMillis();
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();

bigquery.create(datasetInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
Expand All @@ -31,36 +24,27 @@
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class AbstractBigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer();
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationAcceptanceTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryDestinationAcceptanceTest.class);

protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");

protected static final String CONFIG_DATASET_ID = "dataset_id";
protected static final String CONFIG_PROJECT_ID = "project_id";
protected static final String CONFIG_DATASET_LOCATION = "dataset_location";
protected static final String CONFIG_CREDS = "credentials_json";

protected Path secretsFile;
protected BigQuery bigquery;
protected Dataset dataset;
protected boolean tornDown;

protected JsonNode config;
protected final StandardNameTransformer namingResolver = new StandardNameTransformer();

Expand Down Expand Up @@ -165,7 +149,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
protected List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

final QueryJobConfiguration queryConfig =
Expand All @@ -177,110 +161,27 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
.build();

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

@Override
protected void setup(final TestDestinationEnv testEnv) throws Exception {
if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH
+ ". Override by setting setting path with the CREDENTIALS_PATH constant.");
}

final String fullConfigAsString = Files.readString(CREDENTIALS_PATH);
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

protected void setUpBigQuery() throws IOException {
//secrets file should be set by the inhereting class
Assertions.assertNotNull(secretsFile);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
config = BigQueryDestinationTestUtils.createConfig(secretsFile, datasetId);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJson.toString())
.put(CONFIG_DATASET_ID, datasetId)
.put(CONFIG_DATASET_LOCATION, datasetLocation)
.build());

setupBigQuery(credentialsJson);
}

protected void setupBigQuery(final JsonNode credentialsJson) throws IOException {
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8)));

bigquery = BigQueryOptions.newBuilder()
.setProjectId(config.get(CONFIG_PROJECT_ID).asText())
.setCredentials(credentials)
.build()
.getService();

final DatasetInfo datasetInfo =
DatasetInfo.newBuilder(getDefaultSchema(config)).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build();
dataset = bigquery.create(datasetInfo);

tornDown = false;
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
if (!tornDown) {
tearDownBigQuery();
}
}));
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
tearDownBigQuery();
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId);
dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId);
}

protected void tearDownBigQuery() {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();

final boolean success = bigquery.delete(dataset.getDatasetId(), option);
if (success) {
LOGGER.info("BQ Dataset " + dataset + " deleted...");
} else {
LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!");
}

tornDown = true;
}

// todo (cgardens) - figure out how to share these helpers. they are currently copied from
// BigQueryDestination.
private static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
return executeQuery(queryJob);
}

private static ImmutablePair<Job, String> executeQuery(final Job queryJob) {
final Job completedJob = waitForQuery(queryJob);
if (completedJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (completedJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
return ImmutablePair.of(null, (completedJob.getStatus().getError().toString()));
}

return ImmutablePair.of(completedJob, null);
}

private static Job waitForQuery(final Job queryJob) {
try {
return queryJob.waitFor();
} catch (final Exception e) {
throw new RuntimeException(e);
}
BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}

}
Loading

0 comments on commit 99335da

Please sign in to comment.