Skip to content

Commit

Permalink
fix(bigquery): fetch dst table for jobs when readings with Storage API (
Browse files Browse the repository at this point in the history
#7325)

Pay the cost of re-fetching job information when using the Storage API, to guarantee availability of Destination Table. Some cases were not covered before, like when the Job take a long time to run.

Fixes #7322
  • Loading branch information
alvarowolfx authored Jan 30, 2023
1 parent 2f45776 commit 0bf80d7
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 16 deletions.
8 changes: 6 additions & 2 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
// large datasets from tables, jobs or queries.
// Calling this method twice will return an error.
func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
if c.rc != nil {
if c.isStorageReadAvailable() {
return fmt.Errorf("failed: storage read client already set up")
}
rc, err := newReadClient(ctx, c.projectID, opts...)
Expand All @@ -113,6 +113,10 @@ func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.Cli
return nil
}

func (c *Client) isStorageReadAvailable() bool {
return c.rc != nil
}

// Project returns the project ID or number for this instance of the client, which may have
// either been explicitly specified or autodetected.
func (c *Client) Project() string {
Expand All @@ -123,7 +127,7 @@ func (c *Client) Project() string {
// Close should be called when the client is no longer needed.
// It need not be called at program exit.
func (c *Client) Close() error {
if c.rc != nil {
if c.isStorageReadAvailable() {
err := c.rc.close()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, strin
return nil, err
}
var it *RowIterator
if j.c.rc != nil {
if j.c.isStorageReadAvailable() {
it, err = newStorageRowIteratorFromJob(ctx, j)
if err != nil {
it = nil
Expand Down
11 changes: 3 additions & 8 deletions bigquery/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,8 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {

if resp.JobComplete {
// If more pages are available, discard and use the Storage API instead
if resp.PageToken != "" && q.client.rc != nil {
// Needed to fetch destination table
job, err := q.client.JobFromID(ctx, resp.JobReference.JobId)
if err != nil {
return nil, err
}
it, err = newStorageRowIteratorFromJob(ctx, job)
if resp.PageToken != "" && q.client.isStorageReadAvailable() {
it, err = newStorageRowIteratorFromJob(ctx, minimalJob)
if err == nil {
return it, nil
}
Expand Down Expand Up @@ -439,7 +434,7 @@ func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
// user's Query configuration. If all the options set on the job are supported on the
// faster query path, this method returns a QueryRequest suitable for execution.
func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
if q.forceStorageAPI && q.client.rc != nil {
if q.forceStorageAPI && q.client.isStorageReadAvailable() {
return nil, fmt.Errorf("force Storage API usage")
}
// This is a denylist of settings which prevent us from composing an equivalent
Expand Down
15 changes: 12 additions & 3 deletions bigquery/storage_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
// Don't forceStorageAPI usage and still see internally Storage API is selected
q := storageOptimizedClient.Query(sql)
q.DisableQueryCache = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -304,12 +305,13 @@ func TestIntegration_StorageReadCancel(t *testing.T) {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
table := "`bigquery-public-data.samples.github_timeline`"
sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
storageOptimizedClient.rc.settings.maxWorkerCount = 1
q := storageOptimizedClient.Query(sql)
q.DisableQueryCache = true
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
Expand All @@ -319,21 +321,28 @@ func TestIntegration_StorageReadCancel(t *testing.T) {
t.Fatal("expected query to use Storage API")
}

// Cancel read after readings 1000 rows
rowsRead := 0
for {
var dst []Value
err := it.Next(&dst)
if err == iterator.Done {
break
}
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled) {
break
}
t.Fatalf("failed to fetch via storage API: %v", err)
}
rowsRead++
if rowsRead > 1000 {
cancel()
}
}
// resources are cleaned asynchronously
time.Sleep(500 * time.Millisecond)
time.Sleep(time.Second)
if !it.arrowIterator.isDone() {
t.Fatal("expected stream to be done")
}
Expand Down
7 changes: 6 additions & 1 deletion bigquery/storage_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ func newStorageRowIteratorFromTable(ctx context.Context, table *Table, ordered b
return it, nil
}

func newStorageRowIteratorFromJob(ctx context.Context, job *Job) (*RowIterator, error) {
func newStorageRowIteratorFromJob(ctx context.Context, j *Job) (*RowIterator, error) {
// Needed to fetch destination table
job, err := j.c.JobFromID(ctx, j.jobID)
if err != nil {
return nil, err
}
cfg, err := job.Config()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion bigquery/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (t *Table) Read(ctx context.Context) *RowIterator {
}

func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
if t.c.rc != nil {
if t.c.isStorageReadAvailable() {
it, err := newStorageRowIteratorFromTable(ctx, t, false)
if err == nil {
return it
Expand Down

0 comments on commit 0bf80d7

Please sign in to comment.