Skip to content

Commit

Permalink
🐛Destination-Bigquery: Added an explicit error message if sync fails …
Browse files Browse the repository at this point in the history
…due to a config issue (#21144)

* [19998] Destination-Bigquery: Added an explicit error message in sync fails due to a config issue
  • Loading branch information
etsybaev authored Jan 18, 2023
1 parent f49266f commit 8ed229f
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.9
dockerImageTag: 1.2.11
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationConfig:
Expand All @@ -58,7 +58,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.2.10
dockerImageTag: 1.2.11
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.9"
- dockerImage: "airbyte/destination-bigquery:1.2.11"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -831,7 +831,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.10"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.11"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.10
LABEL io.airbyte.version=1.2.11
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.9
LABEL io.airbyte.version=1.2.11
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ protected void startTracked() {
* Processes STATE and RECORD {@link AirbyteMessage} with all else logged as unexpected
*
* <li>For STATE messages emit messages back to the platform</li>
* <li>For RECORD messages upload message to associated Airbyte Stream. This means that RECORDS will be associated with their respective streams when
* more than one record exists</li>
* <li>For RECORD messages upload message to associated Airbyte Stream. This means that RECORDS will
* be associated with their respective streams when more than one record exists</li>
*
* @param message {@link AirbyteMessage} to be processed
*/
Expand All @@ -66,7 +66,8 @@ public void acceptTracked(final AirbyteMessage message) {
}

/**
* Processes {@link io.airbyte.protocol.models.AirbyteRecordMessage} by writing Airbyte stream data to Big Query Writer
* Processes {@link io.airbyte.protocol.models.AirbyteRecordMessage} by writing Airbyte stream data
* to Big Query Writer
*
* @param message record to be written
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d
}

public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX + System.currentTimeMillis();
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX + System.currentTimeMillis();
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();

bigquery.create(datasetInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
package io.airbyte.integrations.destination.bigquery.uploader;

import static io.airbyte.integrations.destination.s3.avro.AvroConstants.JSON_CONVERTER;
import static software.amazon.awssdk.http.HttpStatusCode.FORBIDDEN;
import static software.amazon.awssdk.http.HttpStatusCode.NOT_FOUND;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
Expand All @@ -30,6 +34,20 @@

public class BigQueryUploaderFactory {

private static final String CONFIG_ERROR_MSG = """
Failed to write to destination schema.
1. Make sure you have all required permissions for writing to the schema.
2. Make sure that the actual destination schema's location corresponds to location provided
in connector's config.
3. Try to change the "Destination schema" from "Mirror Source Structure" (if it's set) tp the
"Destination Default" option.
More details:
""";

public static AbstractBigQueryUploader<?> getUploader(final UploaderConfig uploaderConfig)
throws IOException {
final String schemaName = BigQueryUtils.getSchema(uploaderConfig.getConfig(), uploaderConfig.getConfigStream());
Expand Down Expand Up @@ -141,7 +159,17 @@ private static BigQueryDirectUploader getBigQueryDirectUploader(
.setProject(bigQuery.getOptions().getProjectId())
.build();

final TableDataWriteChannel writer = bigQuery.writer(job, writeChannelConfiguration);
final TableDataWriteChannel writer;

try {
writer = bigQuery.writer(job, writeChannelConfiguration);
} catch (final BigQueryException e) {
if (e.getCode() == FORBIDDEN || e.getCode() == NOT_FOUND) {
throw new ConfigErrorException(CONFIG_ERROR_MSG + e);
} else {
throw new BigQueryException(e.getCode(), e.getMessage());
}
}

// this this optional value. If not set - use default client's value (15MiG)
final Integer bigQueryClientChunkSizeFomConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,20 @@

@TestInstance(PER_CLASS)
class BigQueryDestinationTest {

protected static final Path CREDENTIALS_STANDARD_INSERT_PATH = Path.of("secrets/credentials-standard.json");
protected static final Path CREDENTIALS_BAD_PROJECT_PATH = Path.of("secrets/credentials-badproject.json");
protected static final Path CREDENTIALS_NO_DATASET_CREATION_PATH =
Path.of("secrets/credentials-standard-no-dataset-creation.json");
protected static final Path CREDENTIALS_NON_BILLABLE_PROJECT_PATH =
Path.of("secrets/credentials-standard-non-billable-project.json");
protected static final Path CREDENTIALS_NO_EDIT_PUBLIC_SCHEMA_ROLE_PATH =
Path.of("secrets/credentials-no-edit-public-schema-role.json");
protected static final Path CREDENTIALS_WITH_GCS_STAGING_PATH =
Path.of("secrets/credentials-gcs-staging.json");

protected static final Path[] ALL_PATHS = {CREDENTIALS_WITH_GCS_STAGING_PATH, CREDENTIALS_BAD_PROJECT_PATH, CREDENTIALS_NO_DATASET_CREATION_PATH,
CREDENTIALS_NON_BILLABLE_PROJECT_PATH, CREDENTIALS_WITH_GCS_STAGING_PATH};
CREDENTIALS_NO_EDIT_PUBLIC_SCHEMA_ROLE_PATH, CREDENTIALS_NON_BILLABLE_PROJECT_PATH, CREDENTIALS_WITH_GCS_STAGING_PATH};
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class);
private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test";

Expand Down Expand Up @@ -116,8 +119,9 @@ class BigQueryDestinationTest {
protected static JsonNode configWithProjectId;
protected static JsonNode configWithBadProjectId;
protected static JsonNode insufficientRoleConfig;
protected static JsonNode noEditPublicSchemaRoleConfig;
protected static JsonNode nonBillableConfig;
protected static JsonNode gcsStagingConfig; //default BigQuery config. Also used for setup/teardown
protected static JsonNode gcsStagingConfig; // default BigQuery config. Also used for setup/teardown
protected BigQuery bigquery;
protected Dataset dataset;
protected static Map<String, JsonNode> configs;
Expand All @@ -129,56 +133,56 @@ private Stream<Arguments> successTestConfigProvider() {
return Stream.of(
Arguments.of("config"),
Arguments.of("configWithProjectId"),
Arguments.of("gcsStagingConfig")
);
Arguments.of("gcsStagingConfig"));
}

private Stream<Arguments> failCheckTestConfigProvider() {
return Stream.of(
Arguments.of("configWithBadProjectId", "User does not have bigquery.datasets.create permission in project"),
Arguments.of("insufficientRoleConfig", "User does not have bigquery.datasets.create permission"),
Arguments.of("nonBillableConfig", "Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier")
);
Arguments.of("nonBillableConfig", "Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier"));
}

private Stream<Arguments> failWriteTestConfigProvider() {
return Stream.of(
Arguments.of("configWithBadProjectId", "User does not have bigquery.datasets.create permission in project"),
Arguments.of("insufficientRoleConfig", "Permission bigquery.tables.create denied")
);
Arguments.of("noEditPublicSchemaRoleConfig", "Failed to write to destination schema."), // (or it may not exist)
Arguments.of("insufficientRoleConfig", "Permission bigquery.tables.create denied"));
}

@BeforeAll
public static void beforeAll() throws IOException {
for(Path path : ALL_PATHS) {
for (Path path : ALL_PATHS) {
if (!Files.exists(path)) {
throw new IllegalStateException(
String.format("Must provide path to a big query credentials file. Please add file with credentials to %s", path.toAbsolutePath()));
}
}

datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8);
//Set up config objects for test scenarios
//config - basic config for standard inserts that should succeed check and write tests
//this config is also used for housekeeping (checking records, and cleaning up)
// Set up config objects for test scenarios
// config - basic config for standard inserts that should succeed check and write tests
// this config is also used for housekeeping (checking records, and cleaning up)
config = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_STANDARD_INSERT_PATH, datasetId);

//all successful configs use the same project ID
// all successful configs use the same project ID
projectId = config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

//configWithProjectId - config that uses project:dataset notation for datasetId
// configWithProjectId - config that uses project:dataset notation for datasetId
final String dataSetWithProjectId = String.format("%s:%s", projectId, datasetId);
configWithProjectId = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_STANDARD_INSERT_PATH, dataSetWithProjectId);

//configWithBadProjectId - config that uses "fake" project ID and should fail
// configWithBadProjectId - config that uses "fake" project ID and should fail
final String dataSetWithBadProjectId = String.format("%s:%s", "fake", datasetId);
configWithBadProjectId = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_BAD_PROJECT_PATH, dataSetWithBadProjectId);

//config that has insufficient privileges
// config that has insufficient privileges
insufficientRoleConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_NO_DATASET_CREATION_PATH, datasetId);
//config that tries to write to a project with disabled billing (free tier)
// config that tries to write to a project with disabled billing (free tier)
nonBillableConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_NON_BILLABLE_PROJECT_PATH, "testnobilling");
//config with GCS staging
// config that has no privileges to edit anything in Public schema
noEditPublicSchemaRoleConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_NO_EDIT_PUBLIC_SCHEMA_ROLE_PATH, "public");
// config with GCS staging
gcsStagingConfig = BigQueryDestinationTestUtils.createConfig(CREDENTIALS_WITH_GCS_STAGING_PATH, datasetId);

MESSAGE_USERS1.getRecord().setNamespace(datasetId);
Expand All @@ -188,28 +192,33 @@ public static void beforeAll() throws IOException {

catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, datasetId,
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
io.airbyte.protocol.models.Field.of("name", JsonSchemaType.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaType.STRING))
.withDestinationSyncMode(DestinationSyncMode.APPEND),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, datasetId, Field.of("goal", JsonSchemaType.STRING))));

configs = new HashMap<String, JsonNode>() {{
put("config", config);
put("configWithProjectId", configWithProjectId);
put("configWithBadProjectId", configWithBadProjectId);
put("insufficientRoleConfig", insufficientRoleConfig);
put("nonBillableConfig", nonBillableConfig);
put("gcsStagingConfig", gcsStagingConfig);
}};
configs = new HashMap<String, JsonNode>() {

{
put("config", config);
put("configWithProjectId", configWithProjectId);
put("configWithBadProjectId", configWithBadProjectId);
put("insufficientRoleConfig", insufficientRoleConfig);
put("noEditPublicSchemaRoleConfig", noEditPublicSchemaRoleConfig);
put("nonBillableConfig", nonBillableConfig);
put("gcsStagingConfig", gcsStagingConfig);
}

};
}

protected void initBigQuery(JsonNode config) throws IOException {
bigquery = BigQueryDestinationTestUtils.initBigQuery(config, projectId);
try {
dataset = BigQueryDestinationTestUtils.initDataSet(config, bigquery, datasetId);
} catch(Exception ex) {
//ignore
} catch (Exception ex) {
// ignore
}
}

Expand Down Expand Up @@ -255,7 +264,7 @@ void testCheckSuccess(String configName) throws IOException {
@ParameterizedTest
@MethodSource("failCheckTestConfigProvider")
void testCheckFailures(String configName, String error) {
//TODO: this should always throw ConfigErrorException
// TODO: this should always throw ConfigErrorException
JsonNode testConfig = configs.get(configName);
final Exception ex = assertThrows(Exception.class, () -> {
new BigQueryDestination().check(testConfig);
Expand Down Expand Up @@ -322,15 +331,15 @@ void testWriteFailure(String configName, String error) throws Exception {
}

private Set<String> fetchNamesOfTablesInDb() throws InterruptedException {
if(dataset == null || bigquery == null) {
if (dataset == null || bigquery == null) {
return Collections.emptySet();
}
final QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(String.format("SELECT * FROM `%s.INFORMATION_SCHEMA.TABLES`;", dataset.getDatasetId().getDataset()))
.setUseLegacySql(false)
.build();

if(!dataset.exists()) {
if (!dataset.exists()) {
return Collections.emptySet();
}
return StreamSupport
Expand Down Expand Up @@ -429,4 +438,5 @@ private boolean isTablePartitioned(final BigQuery bigquery, final Dataset datase
}
return false;
}

}
Loading

0 comments on commit 8ed229f

Please sign in to comment.