Skip to content

Commit

Permalink
Use BigQuery storage read API when reading external BigLake tables
Browse files Browse the repository at this point in the history
The storage APIs support reading BigLake external tables (ie external
tables with a connection). But the current implementation uses views
which can be expensive, because it requires a query. This PR adds
support to read BigLake tables directly using the storage API.

There are no behavior changes for external tables and BQ native tables -
they use the view and storage APIs respectively.

Added a new test for BigLake tables.

Co-authored-by: Marcin Rusek <marcin.rusek@starburstdata.com>
  • Loading branch information
2 people authored and ebyhr committed Dec 12, 2024
1 parent 9b24b74 commit 4718011
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 10 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,13 @@ jobs:
env:
BIGQUERY_CREDENTIALS_KEY: ${{ secrets.BIGQUERY_CREDENTIALS_KEY }}
GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }}
BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID: ${{ vars.BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID }}
if: matrix.modules == 'plugin/trino-bigquery' && !contains(matrix.profile, 'cloud-tests-2') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.BIGQUERY_CREDENTIALS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-1 \
-Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}" \
-Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}"
-Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" \
-Dtesting.bigquery-connection-id="${BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID}"
- name: Cloud BigQuery Smoke Tests
id: tests-bq-smoke
env:
Expand Down
11 changes: 9 additions & 2 deletions plugin/trino-bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ You can follow the steps below to be able to run the integration tests locally.
**BigQuery Admin** role assigned.
* Get the base64 encoded text of the service account credentials file using `base64
/path/to/service_account_credentials.json`.
* Create a new BigQuery `CLOUD_RESOURCE` connection and grant the connection service account GCS permissions.
[Documentation](https://cloud.google.com/bigquery/docs/create-cloud-resource-connection).
* `bq mk --connection --location=us --project_id=$PROJECT_ID --connection_type=CLOUD_RESOURCE $CONNECTION_ID`
* Now we need to grant the new connection's service account the GCS permissions.
* To do this, run `bq show --connection $PROJECT_ID.us.$CONNECTION_ID`, which will display the service account ID.
* Grant the service account GCS Object Viewer permissions. e.g.: `gsutil iam ch serviceAccount:bqcx-xxxx@gcp-sa-bigquery-condel.iam.gserviceaccount.com:objectViewer gs://DESTINATION_BUCKET_NAME`

* The `TestBigQueryWithDifferentProjectIdConnectorSmokeTest` requires an alternate project ID which is different from the
project ID attached to the service account but the service account still has access to.
* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, and `testing.alternate-bq-project-id` in the IntelliJ "Run Configuration"
* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, `testing.alternate-bq-project-id`, and `testing.bigquery-connection-id` in the IntelliJ "Run Configuration"
(or on the CLI if using Maven directly). It should look something like
`-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate`.
`-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate -Dtesting.bigquery-connection-id=my_project.us.connection-id`.
* Run any test of your choice.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
Expand Down Expand Up @@ -119,6 +120,7 @@
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -341,7 +343,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return null;
}

boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType());
boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition());
ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi));
Optional<BigQueryPartitionType> partitionType = getPartitionType(tableInfo.get().getDefinition());
Expand All @@ -362,14 +364,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
.withProjectedColumns(columns.build());
}

private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type)
private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition tableDefinition)
{
TableDefinition.Type type = tableDefinition.getType();
if (isWildcardTable(type, tableName)) {
// Storage API doesn't support reading wildcard tables
return false;
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
if (type == EXTERNAL && !isBigLakeTable(tableDefinition)) {
// Storage API doesn't support reading external tables except BigLake tables
return false;
}
if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) {
Expand All @@ -378,6 +381,18 @@ private static boolean useStorageApi(ConnectorSession session, String tableName,
return true;
}

private static boolean isBigLakeTable(TableDefinition tableDefinition)
{
if (tableDefinition instanceof ExternalTableDefinition externalTableDefinition) {
//BigLake tables are external with connectionId that don't have objectMetadata (ObjectTable discriminator) and their uri starts with gs:// (OMNI table discriminator)
List<String> sourceUris = externalTableDefinition.getSourceUris();
return !isNullOrEmpty(externalTableDefinition.getConnectionId()) &&
isNullOrEmpty(externalTableDefinition.getObjectMetadata()) &&
(sourceUris != null && sourceUris.stream().allMatch(uri -> uri.startsWith("gs://")));
}
return false;
}

private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
Expand Down Expand Up @@ -472,7 +487,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return tableInfos.stream()
.collect(toImmutableMap(
table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()),
table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream()
table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition())).stream()
.map(BigQueryColumnHandle::getColumnMetadata)
.collect(toImmutableList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Optional;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
Expand Down Expand Up @@ -140,7 +141,7 @@ private TableInfo getActualTable(
{
TableDefinition tableDefinition = remoteTable.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
if (tableType == TABLE || tableType == SNAPSHOT) {
if (tableType == TABLE || tableType == SNAPSHOT || tableType == EXTERNAL) {
return remoteTable;
}
if (tableType == VIEW || tableType == MATERIALIZED_VIEW) {
Expand All @@ -152,7 +153,7 @@ private TableInfo getActualTable(
// get it from the view
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter);
}
// Storage API doesn't support reading other table types (materialized views, external)
// Storage API doesn't support reading other table types (materialized views, non-biglake external tables)
throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
tableType, remoteTable.getTableId().getDataset(), remoteTable.getTableId().getTable()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.FieldValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
Expand Down Expand Up @@ -50,6 +51,7 @@

import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.TEST_SCHEMA;
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
Expand All @@ -62,9 +64,12 @@
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty;
import static io.trino.testing.assertions.Assert.assertConsistently;
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.abort;
Expand All @@ -78,13 +83,15 @@ public abstract class BaseBigQueryConnectorTest
{
protected BigQuerySqlExecutor bigQuerySqlExecutor;
private String gcpStorageBucket;
private String bigQueryConnectionId;

@BeforeAll
public void initBigQueryExecutor()
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
// Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv
this.gcpStorageBucket = requiredNonEmptySystemProperty("testing.gcp-storage-bucket");
this.bigQueryConnectionId = requiredNonEmptySystemProperty("testing.bigquery-connection-id");
}

@Override
Expand Down Expand Up @@ -774,12 +781,75 @@ public void testBigQueryExternalTable()

assertUpdate("DROP TABLE test." + externalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + externalTable + "'");

assertThat(getTableReferenceCountInJob(externalTable)).isEqualTo(1);
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + externalTable);
}
}

@Test
public void testBigLakeExternalTable()
{
String biglakeExternalTable = "test_biglake_external" + randomNameSuffix();
try {
onBigQuery("CREATE EXTERNAL TABLE test." + biglakeExternalTable +
" WITH CONNECTION `" + bigQueryConnectionId + "`" +
" OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])");

assertThat(query("DESCRIBE test." + biglakeExternalTable)).matches("DESCRIBE tpch.region");
assertThat(query("SELECT * FROM test." + biglakeExternalTable)).matches("SELECT * FROM tpch.region");

assertUpdate("DROP TABLE test." + biglakeExternalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + biglakeExternalTable + "'");

// BigLake tables should not run queries, since they are read directly using the storage read API.
assertConsistently(
new Duration(3, SECONDS),
new Duration(500, MILLISECONDS),
() -> assertThat(getTableReferenceCountInJob(biglakeExternalTable)).isEqualTo(0));
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + biglakeExternalTable);
}
}

@Test
public void testExternalObjectTable()
{
String objectExternalTable = "test_object_external" + randomNameSuffix();

try {
onBigQuery("CREATE EXTERNAL TABLE test." + objectExternalTable +
" WITH CONNECTION `" + bigQueryConnectionId + "`" +
" OPTIONS (object_metadata = 'SIMPLE', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])");
assertQuery("SELECT table_type FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + objectExternalTable + "'", "VALUES 'BASE TABLE'");

assertThat(query("SELECT uri FROM test." + objectExternalTable)).succeeds();

assertUpdate("DROP TABLE test." + objectExternalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name LIKE '%" + objectExternalTable + "%'");

assertThat(getTableReferenceCountInJob(objectExternalTable)).isEqualTo(1);
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + objectExternalTable);
}
}

private long getTableReferenceCountInJob(String tableName)
{
return bigQuerySqlExecutor.executeQuery("""
SELECT count(*) FROM region-us.INFORMATION_SCHEMA.JOBS WHERE EXISTS(
SELECT * FROM UNNEST(referenced_tables) AS referenced_table
WHERE referenced_table.table_id = '%s')
""".formatted(tableName)).streamValues()
.map(List::getFirst)
.map(FieldValue::getLongValue)
.collect(onlyElement());
}

@Test
public void testQueryLabeling()
{
Expand Down Expand Up @@ -1460,4 +1530,5 @@ private void onBigQuery(@Language("SQL") String sql)
{
bigQuerySqlExecutor.execute(sql);
}

}

0 comments on commit 4718011

Please sign in to comment.