From 4718011a39f700ab3c8dbe66747c37a9398922f8 Mon Sep 17 00:00:00 2001 From: Anoop Johnson Date: Wed, 13 Mar 2024 02:25:19 +0000 Subject: [PATCH] Use BigQuery storage read API when reading external BigLake tables The storage APIs support reading BigLake external tables (ie external tables with a connection). But the current implementation uses views which can be expensive, because it requires a query. This PR adds support to read BigLake tables directly using the storage API. There are no behavior changes for external tables and BQ native tables - they use the view and storage APIs respectively. Added a new test for BigLake tables. Co-authored-by: Marcin Rusek --- .github/workflows/ci.yml | 4 +- plugin/trino-bigquery/README.md | 11 ++- .../plugin/bigquery/BigQueryMetadata.java | 25 +++++-- .../plugin/bigquery/ReadSessionCreator.java | 5 +- .../bigquery/BaseBigQueryConnectorTest.java | 71 +++++++++++++++++++ 5 files changed, 106 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 91912c5f6e15..7dd65babf608 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -643,11 +643,13 @@ jobs: env: BIGQUERY_CREDENTIALS_KEY: ${{ secrets.BIGQUERY_CREDENTIALS_KEY }} GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }} + BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID: ${{ vars.BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID }} if: matrix.modules == 'plugin/trino-bigquery' && !contains(matrix.profile, 'cloud-tests-2') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.BIGQUERY_CREDENTIALS_KEY != '') run: | $MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-1 \ -Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}" \ - -Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" + -Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" \ + -Dtesting.bigquery-connection-id="${BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID}" - name: Cloud BigQuery Smoke Tests id: tests-bq-smoke env: diff --git a/plugin/trino-bigquery/README.md b/plugin/trino-bigquery/README.md index 2148a9746a4f..8cae94c1c1ae 100644 --- a/plugin/trino-bigquery/README.md +++ b/plugin/trino-bigquery/README.md @@ -22,9 +22,16 @@ You can follow the steps below to be able to run the integration tests locally. **BigQuery Admin** role assigned. * Get the base64 encoded text of the service account credentials file using `base64 /path/to/service_account_credentials.json`. +* Create a new BigQuery `CLOUD_RESOURCE` connection and grant the connection service account GCS permissions. + [Documentation](https://cloud.google.com/bigquery/docs/create-cloud-resource-connection). + * `bq mk --connection --location=us --project_id=$PROJECT_ID --connection_type=CLOUD_RESOURCE $CONNECTION_ID` + * Now we need to grant the new connection's service account the GCS permissions. + * To do this, run `bq show --connection $PROJECT_ID.us.$CONNECTION_ID`, which will display the service account ID. + * Grant the service account GCS Object Viewer permissions. e.g.: `gsutil iam ch serviceAccount:bqcx-xxxx@gcp-sa-bigquery-condel.iam.gserviceaccount.com:objectViewer gs://DESTINATION_BUCKET_NAME` + * The `TestBigQueryWithDifferentProjectIdConnectorSmokeTest` requires an alternate project ID which is different from the project ID attached to the service account but the service account still has access to. -* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, and `testing.alternate-bq-project-id` in the IntelliJ "Run Configuration" +* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, `testing.alternate-bq-project-id`, and `testing.bigquery-connection-id` in the IntelliJ "Run Configuration" (or on the CLI if using Maven directly). It should look something like - `-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate`. + `-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate -Dtesting.bigquery-connection-id=my_project.us.connection-id`. * Run any test of your choice. diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index e8d1e3aca6fa..479a46b8d676 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -17,6 +17,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.ExternalTableDefinition; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; @@ -119,6 +120,7 @@ import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -341,7 +343,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable return null; } - boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType()); + boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition()); ImmutableList.Builder columns = ImmutableList.builder(); columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi)); Optional partitionType = getPartitionType(tableInfo.get().getDefinition()); @@ -362,14 +364,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable .withProjectedColumns(columns.build()); } - private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type) + private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition tableDefinition) { + TableDefinition.Type type = tableDefinition.getType(); if (isWildcardTable(type, tableName)) { // Storage API doesn't support reading wildcard tables return false; } - if (type == EXTERNAL) { - // Storage API doesn't support reading external tables + if (type == EXTERNAL && !isBigLakeTable(tableDefinition)) { + // Storage API doesn't support reading external tables except BigLake tables return false; } if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) { @@ -378,6 +381,18 @@ private static boolean useStorageApi(ConnectorSession session, String tableName, return true; } + private static boolean isBigLakeTable(TableDefinition tableDefinition) + { + if (tableDefinition instanceof ExternalTableDefinition externalTableDefinition) { + //BigLake tables are external with connectionId that don't have objectMetadata (ObjectTable discriminator) and their uri starts with gs:// (OMNI table discriminator) + List sourceUris = externalTableDefinition.getSourceUris(); + return !isNullOrEmpty(externalTableDefinition.getConnectionId()) && + isNullOrEmpty(externalTableDefinition.getObjectMetadata()) && + (sourceUris != null && sourceUris.stream().allMatch(uri -> uri.startsWith("gs://"))); + } + return false; + } + private Optional getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName) { BigQueryClient client = bigQueryClientFactory.create(session); @@ -472,7 +487,7 @@ public Map> listTableColumns(ConnectorSess return tableInfos.stream() .collect(toImmutableMap( table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()), - table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream() + table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition())).stream() .map(BigQueryColumnHandle::getColumnMetadata) .collect(toImmutableList()))); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 8a28d3a99a20..bbfd44649bc6 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Optional; +import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT; import static com.google.cloud.bigquery.TableDefinition.Type.TABLE; @@ -140,7 +141,7 @@ private TableInfo getActualTable( { TableDefinition tableDefinition = remoteTable.getDefinition(); TableDefinition.Type tableType = tableDefinition.getType(); - if (tableType == TABLE || tableType == SNAPSHOT) { + if (tableType == TABLE || tableType == SNAPSHOT || tableType == EXTERNAL) { return remoteTable; } if (tableType == VIEW || tableType == MATERIALIZED_VIEW) { @@ -152,7 +153,7 @@ private TableInfo getActualTable( // get it from the view return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter); } - // Storage API doesn't support reading other table types (materialized views, external) + // Storage API doesn't support reading other table types (materialized views, non-biglake external tables) throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported", tableType, remoteTable.getTableId().getDataset(), remoteTable.getTableId().getTable())); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index 922413f73cc9..48646ef1c81f 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.bigquery; +import com.google.cloud.bigquery.FieldValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; @@ -50,6 +51,7 @@ import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset; +import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor; import static io.trino.plugin.bigquery.BigQueryQueryRunner.TEST_SCHEMA; import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE; @@ -62,9 +64,12 @@ import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty; +import static io.trino.testing.assertions.Assert.assertConsistently; import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assumptions.abort; @@ -78,6 +83,7 @@ public abstract class BaseBigQueryConnectorTest { protected BigQuerySqlExecutor bigQuerySqlExecutor; private String gcpStorageBucket; + private String bigQueryConnectionId; @BeforeAll public void initBigQueryExecutor() @@ -85,6 +91,7 @@ public void initBigQueryExecutor() this.bigQuerySqlExecutor = new BigQuerySqlExecutor(); // Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv this.gcpStorageBucket = requiredNonEmptySystemProperty("testing.gcp-storage-bucket"); + this.bigQueryConnectionId = requiredNonEmptySystemProperty("testing.bigquery-connection-id"); } @Override @@ -774,12 +781,75 @@ public void testBigQueryExternalTable() assertUpdate("DROP TABLE test." + externalTable); assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + externalTable + "'"); + + assertThat(getTableReferenceCountInJob(externalTable)).isEqualTo(1); } finally { onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + externalTable); } } + @Test + public void testBigLakeExternalTable() + { + String biglakeExternalTable = "test_biglake_external" + randomNameSuffix(); + try { + onBigQuery("CREATE EXTERNAL TABLE test." + biglakeExternalTable + + " WITH CONNECTION `" + bigQueryConnectionId + "`" + + " OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])"); + + assertThat(query("DESCRIBE test." + biglakeExternalTable)).matches("DESCRIBE tpch.region"); + assertThat(query("SELECT * FROM test." + biglakeExternalTable)).matches("SELECT * FROM tpch.region"); + + assertUpdate("DROP TABLE test." + biglakeExternalTable); + assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + biglakeExternalTable + "'"); + + // BigLake tables should not run queries, since they are read directly using the storage read API. + assertConsistently( + new Duration(3, SECONDS), + new Duration(500, MILLISECONDS), + () -> assertThat(getTableReferenceCountInJob(biglakeExternalTable)).isEqualTo(0)); + } + finally { + onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + biglakeExternalTable); + } + } + + @Test + public void testExternalObjectTable() + { + String objectExternalTable = "test_object_external" + randomNameSuffix(); + + try { + onBigQuery("CREATE EXTERNAL TABLE test." + objectExternalTable + + " WITH CONNECTION `" + bigQueryConnectionId + "`" + + " OPTIONS (object_metadata = 'SIMPLE', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])"); + assertQuery("SELECT table_type FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + objectExternalTable + "'", "VALUES 'BASE TABLE'"); + + assertThat(query("SELECT uri FROM test." + objectExternalTable)).succeeds(); + + assertUpdate("DROP TABLE test." + objectExternalTable); + assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name LIKE '%" + objectExternalTable + "%'"); + + assertThat(getTableReferenceCountInJob(objectExternalTable)).isEqualTo(1); + } + finally { + onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + objectExternalTable); + } + } + + private long getTableReferenceCountInJob(String tableName) + { + return bigQuerySqlExecutor.executeQuery(""" + SELECT count(*) FROM region-us.INFORMATION_SCHEMA.JOBS WHERE EXISTS( + SELECT * FROM UNNEST(referenced_tables) AS referenced_table + WHERE referenced_table.table_id = '%s') + """.formatted(tableName)).streamValues() + .map(List::getFirst) + .map(FieldValue::getLongValue) + .collect(onlyElement()); + } + @Test public void testQueryLabeling() { @@ -1460,4 +1530,5 @@ private void onBigQuery(@Language("SQL") String sql) { bigQuerySqlExecutor.execute(sql); } + }