Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BigQuery Destination Integration tests #20851

Merged
merged 6 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected BigQuery getBigQuery(final JsonNode config) {
}
}

private static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
public static GoogleCredentials getServiceAccountCredentials(final JsonNode config) throws IOException {
if (!BigQueryUtils.isUsingJsonCredentials(config)) {
LOGGER.info("No service account key json is provided. It is required if you are using Airbyte cloud.");
LOGGER.info("Using the default service account credential from environment.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class BigQueryUtils {
DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]");
private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis();
Copy link
Contributor

@etsybaev etsybaev Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, if this timestamp removing may cause issues during simultaneous tests execution? Ex. same tests for different PRs, local run, etc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't affect anything since it's implemented back below in checkHashCreateAndDeleteDatasetRole and CHECK_TEST_DATASET_SUFFIX is only called within that method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I moved the timestamp from class instantiation time to test execution time is that it was causing name collision when tests run too fast.

private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_";
private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name";

public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
Expand Down Expand Up @@ -125,7 +125,7 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d
}

public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX;
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX + System.currentTimeMillis();
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();

bigquery.create(datasetInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
Expand All @@ -31,36 +24,27 @@
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class AbstractBigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final NamingConventionTransformer NAME_TRANSFORMER = new BigQuerySQLNameTransformer();
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationAcceptanceTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryDestinationAcceptanceTest.class);

protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");

protected static final String CONFIG_DATASET_ID = "dataset_id";
protected static final String CONFIG_PROJECT_ID = "project_id";
protected static final String CONFIG_DATASET_LOCATION = "dataset_location";
protected static final String CONFIG_CREDS = "credentials_json";

protected Path secretsFile;
protected BigQuery bigquery;
protected Dataset dataset;
protected boolean tornDown;

protected JsonNode config;
protected final StandardNameTransformer namingResolver = new StandardNameTransformer();

Expand Down Expand Up @@ -165,7 +149,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
protected List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

final QueryJobConfiguration queryConfig =
Expand All @@ -177,110 +161,27 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
.build();

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

@Override
protected void setup(final TestDestinationEnv testEnv) throws Exception {
if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH
+ ". Override by setting setting path with the CREDENTIALS_PATH constant.");
}

final String fullConfigAsString = Files.readString(CREDENTIALS_PATH);
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

protected void setUpBigQuery() throws IOException {
//secrets file should be set by the inhereting class
Assertions.assertNotNull(secretsFile);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
config = BigQueryDestinationTestUtils.createConfig(secretsFile, datasetId);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJson.toString())
.put(CONFIG_DATASET_ID, datasetId)
.put(CONFIG_DATASET_LOCATION, datasetLocation)
.build());

setupBigQuery(credentialsJson);
}

protected void setupBigQuery(final JsonNode credentialsJson) throws IOException {
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsJson.toString().getBytes(StandardCharsets.UTF_8)));

bigquery = BigQueryOptions.newBuilder()
.setProjectId(config.get(CONFIG_PROJECT_ID).asText())
.setCredentials(credentials)
.build()
.getService();

final DatasetInfo datasetInfo =
DatasetInfo.newBuilder(getDefaultSchema(config)).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build();
dataset = bigquery.create(datasetInfo);

tornDown = false;
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
if (!tornDown) {
tearDownBigQuery();
}
}));
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
tearDownBigQuery();
final String projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId);
dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId);
}

protected void tearDownBigQuery() {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();

final boolean success = bigquery.delete(dataset.getDatasetId(), option);
if (success) {
LOGGER.info("BQ Dataset " + dataset + " deleted...");
} else {
LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!");
}

tornDown = true;
}

// todo (cgardens) - figure out how to share these helpers. they are currently copied from
// BigQueryDestination.
private static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
return executeQuery(queryJob);
}

private static ImmutablePair<Job, String> executeQuery(final Job queryJob) {
final Job completedJob = waitForQuery(queryJob);
if (completedJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (completedJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
return ImmutablePair.of(null, (completedJob.getStatus().getError().toString()));
}

return ImmutablePair.of(completedJob, null);
}

private static Job waitForQuery(final Job queryJob) {
try {
return queryJob.waitFor();
} catch (final Exception e) {
throw new RuntimeException(e);
}
BigQueryDestinationTestUtils.tearDownBigQuery(bigquery, dataset, LOGGER);
}

}
Loading