Skip to content

Commit

Permalink
feat(go/adbc/driver/bigquery): Implement GetObjects and get tests pas…
Browse files Browse the repository at this point in the history
…sing (apache#2044)

A bunch of stuff I've been working on with the bigquery driver as well
as improvements to the shared machinery for implementing drivers (mostly
`GetObjects` here). There's still more work to do on each of these
paths, but it's probably a good idea to get this chunk merged.

### Primary Changes
- Add steps CI workflows to authenticate with GCP and run BQ ADBC driver
tests
- Currently skip because auth fails, should start to run/succeed when
valid `GCP_WORKLOAD_IDENTITY_PROVIDER` is added to runner environment.
- Update `DbObjectsEnumerator` interface to simplify implementation of
`GetObjects` for new drivers.
- Implement `DbObjectsEnumerator` for bigquery driver.
- Split out bigquery `QueryConfig` (data-only) from `Query` (stateful)
to better decouple `Connection` and `Statement` state.
- Simplify bigquery client authentication to rely on environment for
standard auth conventions.
- Cleanup/expand bigquery driver tests.
- Improve coverage of various `GetObjects` scenarios in driver tests.
- Rewrite Snowflake `GetObjects` implementation using mix of custom
approach + new common driverbase machinery.

### Reason for Snowflake Changes
After expanding the cases in the `GetObjects` test suite, there were
several that Snowflake was failing on. The logic was getting fairly
complicated with the existing approach and I wanted to leverage some of
the new functionality added to `driverbase`, so I attempted a rewrite
(partly inspired by how
[duckdb](https://github.com/duckdb/duckdb/blob/9ad037f3adfe372f17b5178a449ac4b6f9142240/src/common/adbc/adbc.cpp#L979)
currently handles this). In addition to making the tests pass, the new
approach is more performant and more transparent to users looking
through the query log after using the ADBC driver.

New test cases fail when run on current `main`:
```
--- FAIL: TestValidation (74.65s)
    --- FAIL: TestValidation/TestMetadataGetObjectsColumns (71.83s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_catalog_no_filter (0.46s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_dbSchema_no_filter (1.04s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_table_no_filter (6.10s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_column_no_filter (10.45s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_valid (4.26s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_invalid (0.29s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_valid (8.75s)
        --- FAIL: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_invalid (8.10s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_table_valid (9.07s)
        --- FAIL: TestValidation/TestMetadataGetObjectsColumns/filter_table_invalid (12.04s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_column:_in% (9.20s)
FAIL
FAIL    github.com/apache/arrow-adbc/go/adbc/driver/snowflake   75.545s
```
Same tests, using new implementation in this PR:
```
--- PASS: TestValidation (56.64s)
    --- PASS: TestValidation/TestMetadataGetObjectsColumns (53.69s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_catalog_no_filter (1.16s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_dbSchema_no_filter (1.19s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_table_no_filter (4.34s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/depth_column_no_filter (6.13s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_valid (5.09s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_catalog_invalid (5.48s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_valid (6.81s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_dbSchema_invalid (5.35s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_table_valid (5.92s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_table_invalid (4.91s)
        --- PASS: TestValidation/TestMetadataGetObjectsColumns/filter_column:_in% (4.88s)
PASS
ok      github.com/apache/arrow-adbc/go/adbc/driver/snowflake   57.253s
```
  • Loading branch information
joellubi authored Aug 27, 2024
1 parent 4e962c4 commit 7925c42
Show file tree
Hide file tree
Showing 20 changed files with 1,882 additions and 1,767 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/native-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,18 @@ jobs:
strategy:
matrix:
os: ["macos-13", "macos-latest", "ubuntu-latest", "windows-latest"]
permissions:
contents: 'read'
id-token: 'write'
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: 'google-github-actions/auth@v2'
continue-on-error: true # if auth fails, bigquery driver tests should skip
with:
workload_identity_provider: ${{ secrets.GCP_WORKLOAD_IDENTITY_PROVIDER }}
- uses: actions/setup-go@v5
with:
go-version-file: "go/adbc/go.mod"
Expand Down Expand Up @@ -445,11 +452,18 @@ jobs:
goarch: x64
env:
CGO_ENABLED: "1"
permissions:
contents: 'read'
id-token: 'write'
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- uses: 'google-github-actions/auth@v2'
continue-on-error: true # if auth fails, bigquery driver tests should skip
with:
workload_identity_provider: ${{ secrets.GCP_WORKLOAD_IDENTITY_PROVIDER }}
- name: Get required Go version
run: |
(. .env && echo "GO_VERSION=${GO}") >> $GITHUB_ENV
Expand Down
1 change: 1 addition & 0 deletions go/adbc/driver/bigquery/bigquery_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
WithAutocommitSetter(conn).
WithCurrentNamespacer(conn).
WithTableTypeLister(conn).
WithDbObjectsEnumerator(conn).
Connection(), nil
}

Expand Down
192 changes: 188 additions & 4 deletions go/adbc/driver/bigquery/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (

"cloud.google.com/go/bigquery"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/internal"
"github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
"github.com/apache/arrow/go/v18/arrow"
"golang.org/x/oauth2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

Expand All @@ -61,6 +63,184 @@ type connectionImpl struct {
client *bigquery.Client
}

func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) {
catalogPattern, err := internal.PatternToRegexp(catalogFilter)
if err != nil {
return nil, err
}
if catalogPattern == nil {
catalogPattern = internal.AcceptAll
}

// Connections to BQ are scoped to a particular Project, which corresponds to catalog-level namespacing.
// TODO: Consider enumerating projects with ResourceManager API, but this may not be "idiomatic" usage.
project := c.client.Project()
res := make([]string, 0)
if catalogPattern.MatchString(project) {
res = append(res, project)
}

return res, nil
}

func (c *connectionImpl) GetDBSchemasForCatalog(ctx context.Context, catalog string, schemaFilter *string) ([]string, error) {
schemaPattern, err := internal.PatternToRegexp(schemaFilter)
if err != nil {
return nil, err
}
if schemaPattern == nil {
schemaPattern = internal.AcceptAll
}

it := c.client.Datasets(ctx)
it.ProjectID = catalog

res := make([]string, 0)
for {
ds, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
if schemaPattern.MatchString(ds.DatasetID) {
res = append(res, ds.DatasetID)
}

}

return res, nil
}

func (c *connectionImpl) GetTablesForDBSchema(ctx context.Context, catalog string, schema string, tableFilter *string, columnFilter *string, includeColumns bool) ([]driverbase.TableInfo, error) {
tablePattern, err := internal.PatternToRegexp(tableFilter)
if err != nil {
return nil, err
}
if tablePattern == nil {
tablePattern = internal.AcceptAll
}

it := c.client.DatasetInProject(catalog, schema).Tables(ctx)

res := make([]driverbase.TableInfo, 0)
for {
table, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
if !tablePattern.MatchString(table.TableID) {
continue
}

md, err := table.Metadata(ctx, bigquery.WithMetadataView(bigquery.BasicMetadataView))
if err != nil {
return nil, err
}

var constraints []driverbase.ConstraintInfo
if md.TableConstraints != nil {
constraints = make([]driverbase.ConstraintInfo, 0)
if md.TableConstraints.PrimaryKey != nil {
constraints = append(constraints, driverbase.ConstraintInfo{
// BigQuery Primary Keys are unnamed
ConstraintType: internal.PrimaryKey,
ConstraintColumnNames: driverbase.RequiredList(md.TableConstraints.PrimaryKey.Columns),
})
}

for _, fk := range md.TableConstraints.ForeignKeys {
var columnUsage []driverbase.ConstraintColumnUsage
if len(fk.ColumnReferences) > 0 {
columnUsage = make([]driverbase.ConstraintColumnUsage, len(fk.ColumnReferences))
}
for i, ref := range fk.ColumnReferences {
columnUsage[i] = driverbase.ConstraintColumnUsage{
ForeignKeyCatalog: driverbase.Nullable(fk.ReferencedTable.ProjectID),
ForeignKeyDbSchema: driverbase.Nullable(fk.ReferencedTable.DatasetID),
ForeignKeyTable: fk.ReferencedTable.TableID,
ForeignKeyColumn: ref.ReferencedColumn,
}
}
constraints = append(constraints, driverbase.ConstraintInfo{
ConstraintName: driverbase.Nullable(fk.Name),
ConstraintType: internal.ForeignKey,
ConstraintColumnUsage: columnUsage,
})
}
}

var columns []driverbase.ColumnInfo
if includeColumns {
columnPattern, err := internal.PatternToRegexp(columnFilter)
if err != nil {
return nil, err
}
if columnPattern == nil {
columnPattern = internal.AcceptAll
}

columns = make([]driverbase.ColumnInfo, 0)
for pos, fieldschema := range md.Schema {
if columnPattern.MatchString(fieldschema.Name) {
xdbcIsNullable := "YES"
xdbcNullable := int16(1)
if fieldschema.Required {
xdbcIsNullable = "NO"
xdbcNullable = 0
}

xdbcColumnSize := fieldschema.MaxLength
if xdbcColumnSize == 0 {
xdbcColumnSize = fieldschema.Precision
}

var xdbcCharOctetLength int32
if fieldschema.Type == bigquery.BytesFieldType {
xdbcCharOctetLength = int32(fieldschema.MaxLength)
}

field, err := buildField(fieldschema, 0)
if err != nil {
return nil, err
}
xdbcDataType := driverbase.ToXdbcDataType(field.Type)

columns = append(columns, driverbase.ColumnInfo{
ColumnName: fieldschema.Name,
OrdinalPosition: driverbase.Nullable(int32(pos + 1)),
Remarks: driverbase.Nullable(fieldschema.Description),
XdbcDataType: driverbase.Nullable(int16(field.Type.ID())),
XdbcTypeName: driverbase.Nullable(string(fieldschema.Type)),
XdbcNullable: driverbase.Nullable(xdbcNullable),
XdbcSqlDataType: driverbase.Nullable(int16(xdbcDataType)),
XdbcIsNullable: driverbase.Nullable(xdbcIsNullable),
XdbcDecimalDigits: driverbase.Nullable(int16(fieldschema.Scale)),
XdbcColumnSize: driverbase.Nullable(int32(xdbcColumnSize)),
XdbcCharOctetLength: driverbase.Nullable(xdbcCharOctetLength),
XdbcScopeCatalog: driverbase.Nullable(catalog),
XdbcScopeSchema: driverbase.Nullable(schema),
XdbcScopeTable: driverbase.Nullable(table.TableID),
})
}
}
}

res = append(res, driverbase.TableInfo{
TableName: table.TableID,
TableType: string(md.Type),
TableConstraints: constraints,
TableColumns: columns,
})
}

return res, nil
}

type bigQueryTokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
Expand Down Expand Up @@ -255,8 +435,8 @@ func (c *connectionImpl) GetTableSchema(ctx context.Context, catalog *string, db
// NewStatement initializes a new statement object tied to this connection
func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
return &statement{
connectionImpl: c,
query: c.client.Query(""),
alloc: c.Alloc,
cnxn: c,
parameterMode: OptionValueQueryParameterModePositional,
resultRecordBufferSize: c.resultRecordBufferSize,
prefetchConcurrency: c.prefetchConcurrency,
Expand Down Expand Up @@ -483,8 +663,12 @@ func buildField(schema *bigquery.FieldSchema, level uint) (arrow.Field, error) {
metadata["Required"] = strconv.FormatBool(schema.Required)
field.Nullable = !schema.Required
metadata["Type"] = string(schema.Type)
policyTagList, err := json.Marshal(schema.PolicyTags)
if err != nil {

if schema.PolicyTags != nil {
policyTagList, err := json.Marshal(schema.PolicyTags)
if err != nil {
return arrow.Field{}, err
}
metadata["PolicyTags"] = string(policyTagList)
}

Expand Down
Loading

0 comments on commit 7925c42

Please sign in to comment.