diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index f968e607d95b..0ac2e4e4694c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -39,7 +39,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 1.2.7 + dockerImageTag: 1.2.8 documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery icon: bigquery.svg normalizationRepository: airbyte/normalization @@ -55,7 +55,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 1.2.7 + dockerImageTag: 1.2.8 documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery icon: bigquery.svg normalizationRepository: airbyte/normalization diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 60ded4ebaf5b..f7b240eb6b02 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -621,7 +621,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-bigquery:1.2.7" +- dockerImage: "airbyte/destination-bigquery:1.2.8" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery" connectionSpecification: @@ -831,7 +831,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.7" +- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.8" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 0ad3bb307bcf..a32f0bd1181b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.7 +LABEL io.airbyte.version=1.2.8 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index fccbfd8aae01..575b8fe4b92a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.2.7 +LABEL io.airbyte.version=1.2.8 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 2baaf6b4a970..e6c9c68c52f5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -11,12 +11,15 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.api.gax.rpc.HeaderProvider; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Clustering; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; @@ -25,12 +28,14 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.integrations.base.JavaBaseConstants; @@ -45,6 +50,7 @@ import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -64,7 +70,8 @@ public class BigQueryUtils { DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" + "[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]"); private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)"; - private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp"; + private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis(); + private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name"; public static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { final JobId jobId = JobId.of(UUID.randomUUID().toString()); @@ -119,16 +126,67 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) { final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX; - final Dataset dataset = bigquery.getDataset(tmpTestDatasetId); + final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build(); + + bigquery.create(datasetInfo); - // remove possible tmp datasets from previous execution - if (dataset != null && dataset.exists()) { + try { + attemptCreateTableAndTestInsert(bigquery, tmpTestDatasetId); + } finally { bigquery.delete(tmpTestDatasetId); } + } - final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build(); - bigquery.create(datasetInfo); - bigquery.delete(tmpTestDatasetId); + /** + * Method is used to create tmp table and make dummy record insert. It's used in Check() connection + * method to make sure that user has all required roles for upcoming data sync/migration. It also + * verifies if BigQuery project is billable, if not - later sync will fail as non-billable project + * has limitations with stream uploading and DML queries. More details may be found there: + * https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery + * https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language + * + * @param bigquery - initialized bigquery client + * @param tmpTestDatasetId - dataset name where tmp table will be created + */ + private static void attemptCreateTableAndTestInsert(final BigQuery bigquery, final String tmpTestDatasetId) { + // Create dummy schema that will be used for tmp table creation + final Schema testTableSchema = Schema.of( + Field.of("id", StandardSQLTypeName.INT64), + Field.of("name", StandardSQLTypeName.STRING)); + + // Create tmp table to verify if user has a create table permission. Also below we will do test + // records insert in it + final Table test_connection_table_name = createTable(bigquery, tmpTestDatasetId, + CHECK_TEST_TMP_TABLE_NAME, testTableSchema); + + // Try to make test (dummy records) insert to make sure that user has required permissions + try { + final InsertAllResponse response = + bigquery.insertAll(InsertAllRequest + .newBuilder(test_connection_table_name) + .addRow(Map.of("id", 1, "name", "James")) + .addRow(Map.of("id", 2, "name", "Eugene")) + .addRow(Map.of("id", 3, "name", "Angelina")) + .build()); + + if (response.hasErrors()) { + // If any of the insertions failed, this lets you inspect the errors + for (Map.Entry> entry : response.getInsertErrors().entrySet()) { + throw new ConfigErrorException("Failed to check connection: \n" + entry.getValue()); + } + } + } catch (final BigQueryException e) { + throw new ConfigErrorException("Failed to check connection: \n" + e.getMessage()); + } finally { + test_connection_table_name.delete(); + } + } + + public static Table createTable(final BigQuery bigquery, String datasetName, String tableName, Schema schema) { + final TableId tableId = TableId.of(datasetName, tableName); + final TableDefinition tableDefinition = StandardTableDefinition.of(schema); + final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + return bigquery.create(tableInfo); } // https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index f81de9d4fad3..4dae83c587ea 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -76,6 +76,8 @@ class BigQueryDestinationTest { protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); protected static final Path CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH = Path.of("secrets/credentials-with-missed-dataset-creation-role.json"); + protected static final Path CREDENTIALS_NON_BILLABLE_PROJECT_PATH = + Path.of("secrets/credentials-non-billable-project.json"); private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class); private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test"; @@ -250,8 +252,7 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset please add file with creds to ../destination-bigquery/secrets/credentialsWithMissedDatasetCreationRole.json."""); } - final String fullConfigAsString = Files.readString( - CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH); + final String fullConfigAsString = Files.readString(CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH); final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8); @@ -276,6 +277,41 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset assertThat(ex.getMessage()).contains("User does not have bigquery.datasets.create permission"); } + @ParameterizedTest + @MethodSource("datasetIdResetterProvider") + void testCheckFailureNonBillableProject(final DatasetIdResetter resetDatasetId) throws IOException { + + if (!Files.exists(CREDENTIALS_NON_BILLABLE_PROJECT_PATH)) { + throw new IllegalStateException(""" + Json config not found. Must provide path to a big query credentials file, + please add file with creds to + ../destination-bigquery/secrets/credentials-non-billable-project.json"""); + } + final String fullConfigAsString = Files.readString(CREDENTIALS_NON_BILLABLE_PROJECT_PATH); + + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG); + final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(); + + final JsonNode insufficientRoleConfig; + + insufficientRoleConfig = Jsons.jsonNode(ImmutableMap.builder() + .put(BigQueryConsts.CONFIG_PROJECT_ID, projectId) + .put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString()) + .put(BigQueryConsts.CONFIG_DATASET_ID, "testnobilling") + .put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US") + .put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10) + .build()); + + resetDatasetId.accept(insufficientRoleConfig); + + // Assert that check throws exception. Later it will be handled by IntegrationRunner + final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> { + new BigQueryDestination().check(insufficientRoleConfig); + }); + + assertThat(ex.getMessage()).contains("Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier"); + } + @ParameterizedTest @MethodSource("datasetIdResetterProvider") void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception { diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index f333cbf60a5a..a70a25283a03 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -136,7 +136,8 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | fixed check method to capture mismatch dataset location | +| 1.2.8 | 2022-11-22 | [#19489](https://github.com/airbytehq/airbyte/pull/19489) | Added non-billable projects handle to check connection stage | +| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | Fixed check method to capture mismatch dataset location | | 1.2.6 | 2022-11-10 | [#18554](https://github.com/airbytehq/airbyte/pull/18554) | Improve check connection method to handle more errors | | 1.2.5 | 2022-10-19 | [#18162](https://github.com/airbytehq/airbyte/pull/18162) | Improve error logs | | 1.2.4 | 2022-09-26 | [#16890](https://github.com/airbytehq/airbyte/pull/16890) | Add user-agent header | @@ -189,7 +190,8 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| -| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | fixed check method to capture mismatch dataset location | +| 1.2.8 | 2022-11-22 | [#19489](https://github.com/airbytehq/airbyte/pull/19489) | Added non-billable projects handle to check connection stage | +| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | Fixed check method to capture mismatch dataset location | | 1.2.6 | 2022-11-10 | [#18554](https://github.com/airbytehq/airbyte/pull/18554) | Improve check connection method to handle more errors | | 1.2.5 | 2022-10-19 | [#18162](https://github.com/airbytehq/airbyte/pull/18162) | Improve error logs | | 1.2.4 | 2022-09-26 | [#16890](https://github.com/airbytehq/airbyte/pull/16890) | Add user-agent header |