Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[18713] Destination-bigquery: updated Check method to test for non-billable project #19489

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
Expand All @@ -25,12 +28,14 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -45,6 +50,7 @@
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -64,7 +70,8 @@ public class BigQueryUtils {
DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]");
private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis();
private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name";

public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
Expand Down Expand Up @@ -119,16 +126,67 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d

public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX;
final Dataset dataset = bigquery.getDataset(tmpTestDatasetId);
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();

bigquery.create(datasetInfo);
Copy link
Contributor

@grishick grishick Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if bigquery.create(datasetInfo) fails - do we report a config error or a runtime error?

Copy link
Contributor Author

@etsybaev etsybaev Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be handled by Catch in IntegrationRunner class. Basically, like any other exception. But we may catch it explicitly if needed. Should I update the PR?


// remove possible tmp datasets from previous execution
if (dataset != null && dataset.exists()) {
try {
attemptCreateTableAndTestInsert(bigquery, tmpTestDatasetId);
} finally {
bigquery.delete(tmpTestDatasetId);
}
}

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();
bigquery.create(datasetInfo);
bigquery.delete(tmpTestDatasetId);
/**
* Method is used to create tmp table and make dummy record insert. It's used in Check() connection
* method to make sure that user has all required roles for upcoming data sync/migration. It also
* verifies if BigQuery project is billable, if not - later sync will fail as non-billable project
* has limitations with stream uploading and DML queries. More details may be found there:
* https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery
* https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language
*
* @param bigquery - initialized bigquery client
* @param tmpTestDatasetId - dataset name where tmp table will be created
*/
private static void attemptCreateTableAndTestInsert(final BigQuery bigquery, final String tmpTestDatasetId) {
// Create dummy schema that will be used for tmp table creation
final Schema testTableSchema = Schema.of(
Field.of("id", StandardSQLTypeName.INT64),
Field.of("name", StandardSQLTypeName.STRING));

// Create tmp table to verify if user has a create table permission. Also below we will do test
// records insert in it
final Table test_connection_table_name = createTable(bigquery, tmpTestDatasetId,
CHECK_TEST_TMP_TABLE_NAME, testTableSchema);

// Try to make test (dummy records) insert to make sure that user has required permissions
try {
final InsertAllResponse response =
bigquery.insertAll(InsertAllRequest
.newBuilder(test_connection_table_name)
.addRow(Map.of("id", 1, "name", "James"))
.addRow(Map.of("id", 2, "name", "Eugene"))
.addRow(Map.of("id", 3, "name", "Angelina"))
.build());

if (response.hasErrors()) {
// If any of the insertions failed, this lets you inspect the errors
for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
throw new ConfigErrorException("Failed to check connection: \n" + entry.getValue());
}
}
} catch (final BigQueryException e) {
throw new ConfigErrorException("Failed to check connection: \n" + e.getMessage());
} finally {
test_connection_table_name.delete();
}
}

public static Table createTable(final BigQuery bigquery, String datasetName, String tableName, Schema schema) {
final TableId tableId = TableId.of(datasetName, tableName);
final TableDefinition tableDefinition = StandardTableDefinition.of(schema);
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
return bigquery.create(tableInfo);
}

// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class BigQueryDestinationTest {
protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
protected static final Path CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH =
Path.of("secrets/credentials-with-missed-dataset-creation-role.json");
protected static final Path CREDENTIALS_NON_BILLABLE_PROJECT_PATH =
Path.of("secrets/credentials-non-billable-project.json");

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class);
private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test";
Expand Down Expand Up @@ -250,8 +252,7 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset
please add file with creds to
../destination-bigquery/secrets/credentialsWithMissedDatasetCreationRole.json.""");
}
final String fullConfigAsString = Files.readString(
CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH);
final String fullConfigAsString = Files.readString(CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH);
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8);
Expand All @@ -276,6 +277,41 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset
assertThat(ex.getMessage()).contains("User does not have bigquery.datasets.create permission");
}

@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckFailureNonBillableProject(final DatasetIdResetter resetDatasetId) throws IOException {

if (!Files.exists(CREDENTIALS_NON_BILLABLE_PROJECT_PATH)) {
throw new IllegalStateException("""
Json config not found. Must provide path to a big query credentials file,
please add file with creds to
../destination-bigquery/secrets/credentials-non-billable-project.json""");
}
final String fullConfigAsString = Files.readString(CREDENTIALS_NON_BILLABLE_PROJECT_PATH);

final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

final JsonNode insufficientRoleConfig;

insufficientRoleConfig = Jsons.jsonNode(ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString())
.put(BigQueryConsts.CONFIG_DATASET_ID, "testnobilling")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US")
.put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10)
.build());

resetDatasetId.accept(insufficientRoleConfig);

// Assert that check throws exception. Later it will be handled by IntegrationRunner
final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> {
new BigQueryDestination().check(insufficientRoleConfig);
});

assertThat(ex.getMessage()).contains("Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier");
}

@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception {
Expand Down