From ebcc55754237aa7f38c94d74950a88cc887ae9fa Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Mon, 20 May 2024 17:13:03 +0300 Subject: [PATCH] chore: bump bigquery to v1.61.0 --- go.mod | 7 ++- go.sum | 6 +-- sqlconnect/internal/bigquery/db.go | 3 -- .../internal/bigquery/driver/connection.go | 29 ----------- .../internal/bigquery/driver/connector.go | 9 +--- .../internal/bigquery/driver/driver_test.go | 48 +------------------ .../internal/bigquery/driver/statement.go | 4 +- 7 files changed, 9 insertions(+), 97 deletions(-) diff --git a/go.mod b/go.mod index 7273c8b..a400a8f 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,12 @@ toolchain go1.22.2 require ( cloud.google.com/go v0.113.0 - cloud.google.com/go/bigquery v1.60.0 + cloud.google.com/go/bigquery v1.61.0 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/aws/aws-sdk-go-v2 v1.27.0 github.com/aws/aws-sdk-go-v2/config v1.27.11 github.com/aws/aws-sdk-go-v2/credentials v1.17.11 github.com/aws/aws-sdk-go-v2/service/redshiftdata v1.25.7 - github.com/cenkalti/backoff/v4 v4.3.0 github.com/databricks/databricks-sql-go v1.5.5 github.com/dlclark/regexp2 v1.11.0 github.com/gliderlabs/ssh v0.3.7 @@ -30,7 +29,6 @@ require ( github.com/tidwall/sjson v1.2.5 github.com/trinodb/trino-go-client v0.315.0 golang.org/x/crypto v0.23.0 - golang.org/x/sync v0.7.0 google.golang.org/api v0.181.0 ) @@ -51,7 +49,6 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect - github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect github.com/apache/arrow/go/v16 v16.0.0 // indirect github.com/apache/thrift v0.19.0 // indirect @@ -71,6 +68,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/aws/smithy-go v1.20.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect github.com/coreos/go-oidc/v3 v3.5.0 // indirect github.com/danieljoos/wincred v1.1.2 // indirect @@ -138,6 +136,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/term v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index 992e25c..80572a4 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg= cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= -cloud.google.com/go/bigquery v1.60.0 h1:kA96WfgvCbkqfLnr7xI5uEfJ4h4FrnkdEb0yty0KSZo= -cloud.google.com/go/bigquery v1.60.0/go.mod h1:Clwk2OeC0ZU5G5LDg7mo+h8U7KlAa5v06z5rptKdM3g= +cloud.google.com/go/bigquery v1.61.0 h1:w2Goy9n6gh91LVi6B2Sc+HpBl8WbWhIyzdvVvrAuEIw= +cloud.google.com/go/bigquery v1.61.0/go.mod h1:PjZUje0IocbuTOdq4DBOJLNYB0WF3pAKBHzAYyxCwFo= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= @@ -51,8 +51,6 @@ github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26 h1:3YVZUqkoev4mL+aCwVO github.com/ahmetb/dlog v0.0.0-20170105205344-4fb5f8204f26/go.mod h1:ymXt5bw5uSNu4jveerFxE0vNYxF8ncqbptntMaFMg3k= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= -github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= -github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= github.com/apache/arrow/go/v16 v16.0.0 h1:qRLbJRPj4zaseZrjbDHa7mUoZDDIU+4pu+mE2Lucs5g= diff --git a/sqlconnect/internal/bigquery/db.go b/sqlconnect/internal/bigquery/db.go index 61acd02..2231201 100644 --- a/sqlconnect/internal/bigquery/db.go +++ b/sqlconnect/internal/bigquery/db.go @@ -30,9 +30,6 @@ func NewDB(configJSON json.RawMessage) (*DB, error) { db := sql.OpenDB(driver.NewConnector( config.ProjectID, - driver.Config{ - JobRateLimitExceededRetryEnabled: true, - }, option.WithCredentialsJSON([]byte(config.CredentialsJSON))), ) diff --git a/sqlconnect/internal/bigquery/driver/connection.go b/sqlconnect/internal/bigquery/driver/connection.go index 24d502a..932a5c4 100644 --- a/sqlconnect/internal/bigquery/driver/connection.go +++ b/sqlconnect/internal/bigquery/driver/connection.go @@ -5,16 +5,12 @@ import ( "database/sql/driver" "errors" "fmt" - "strings" - "time" "cloud.google.com/go/bigquery" - "github.com/cenkalti/backoff/v4" "google.golang.org/api/iterator" ) type bigQueryConnection struct { - config Config ctx context.Context client *bigquery.Client closed bool @@ -80,28 +76,3 @@ func (bigQueryConnection) CheckNamedValue(*driver.NamedValue) error { func (connection *bigQueryConnection) BigqueryClient() *bigquery.Client { return connection.client } - -// readWithBackoff will retry the read operation if the error is [jobRateLimitExceeded] and the config is set to retry rate limit errors -// -// TODO: this should no longer be needed once this fix is released: -// https://github.com/googleapis/google-cloud-go/pull/9726 -func (connection *bigQueryConnection) readWithBackoff(ctx context.Context, query *bigquery.Query) (it *bigquery.RowIterator, err error) { - if !connection.config.JobRateLimitExceededRetryEnabled { - return query.Read(ctx) - } - // mimicking google's own retry backoff settings - // https://github.com/googleapis/google-cloud-go/blob/b2e704d9d287445304d2b6030b6e35a4eb8be80a/bigquery/bigquery.go#L236 - retry := backoff.WithContext(backoff.NewExponentialBackOff( - backoff.WithInitialInterval(1*time.Second), - backoff.WithMaxInterval(32*time.Second), - backoff.WithMultiplier(2), - ), ctx) - _ = backoff.Retry(func() error { - it, err = query.Read(ctx) - if err != nil && (strings.Contains(err.Error(), "jobRateLimitExceeded")) { - return err - } - return nil - }, retry) - return it, err -} diff --git a/sqlconnect/internal/bigquery/driver/connector.go b/sqlconnect/internal/bigquery/driver/connector.go index 01a8dc6..a45b7ba 100644 --- a/sqlconnect/internal/bigquery/driver/connector.go +++ b/sqlconnect/internal/bigquery/driver/connector.go @@ -8,21 +8,15 @@ import ( "google.golang.org/api/option" ) -type Config struct { - JobRateLimitExceededRetryEnabled bool // Enable jobRateLimitExceeded retries: default false -} - -func NewConnector(projectID string, config Config, opts ...option.ClientOption) driver.Connector { +func NewConnector(projectID string, opts ...option.ClientOption) driver.Connector { return &bigQueryConnector{ projectID: projectID, - config: config, opts: opts, } } type bigQueryConnector struct { projectID string - config Config opts []option.ClientOption } @@ -33,7 +27,6 @@ func (c *bigQueryConnector) Connect(ctx context.Context) (driver.Conn, error) { } return &bigQueryConnection{ - config: c.config, ctx: ctx, client: client, }, nil diff --git a/sqlconnect/internal/bigquery/driver/driver_test.go b/sqlconnect/internal/bigquery/driver/driver_test.go index 9470c13..5f0c137 100644 --- a/sqlconnect/internal/bigquery/driver/driver_test.go +++ b/sqlconnect/internal/bigquery/driver/driver_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "google.golang.org/api/option" "github.com/rudderlabs/rudder-go-kit/testhelper/rand" @@ -34,7 +33,7 @@ func TestBigqueryDriver(t *testing.T) { require.NoError(t, json.Unmarshal([]byte(configJSON), &c)) t.Run("OpenDB", func(t *testing.T) { - db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) + db := sql.OpenDB(driver.NewConnector(c.ProjectID, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) t.Cleanup(func() { require.NoError(t, db.Close(), "it should be able to close the database connection") }) @@ -224,51 +223,6 @@ func TestBigqueryDriver(t *testing.T) { require.NoError(t, rows.Err()) }) }) - - // TODO: this test should start failing once this fix is released: - // https://github.com/googleapis/google-cloud-go/pull/9726 - t.Run("jobRateLimitExceeded", func(t *testing.T) { - t.Run("selecting from information schema from multiple go-routines should lead to a jobRateLimitExceeded error", func(t *testing.T) { - db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) - t.Cleanup(func() { - require.NoError(t, db.Close(), "it should be able to close the database connection") - }) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(10) - for i := 0; i < 200; i++ { - g.Go(func() error { - _, err := db.ExecContext(ctx, "SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = 'invalid'") - return err - }) - } - err := g.Wait() - require.Error(t, err, "it should return an error") - require.Contains(t, err.Error(), "jobRateLimitExceeded", "it should return a jobRateLimitExceeded error") - }) - - t.Run("selecting from information schema from multiple go-routines and having rate limit retries enabled, shoudln't lead to a jobRateLimitExceeded error", func(t *testing.T) { - db := sql.OpenDB(driver.NewConnector(c.ProjectID, driver.Config{JobRateLimitExceededRetryEnabled: true}, option.WithCredentialsJSON([]byte(c.CredentialsJSON)))) - t.Cleanup(func() { - require.NoError(t, db.Close(), "it should be able to close the database connection") - }) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(10) - for i := 0; i < 50; i++ { - g.Go(func() error { - _, err := db.ExecContext(ctx, "SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = 'invalid'") - return err - }) - } - err := g.Wait() - require.NoError(t, err, "it shouldn't return an error") - }) - }) } type config struct { diff --git a/sqlconnect/internal/bigquery/driver/statement.go b/sqlconnect/internal/bigquery/driver/statement.go index ae5c18e..5bfde9e 100644 --- a/sqlconnect/internal/bigquery/driver/statement.go +++ b/sqlconnect/internal/bigquery/driver/statement.go @@ -41,7 +41,7 @@ func (statement *bigQueryStatement) ExecContext(ctx context.Context, args []driv return nil, err } - rowIterator, err := statement.connection.readWithBackoff(ctx, query) + rowIterator, err := query.Read(ctx) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (statement *bigQueryStatement) QueryContext(ctx context.Context, args []dri return nil, err } - rowIterator, err := statement.connection.readWithBackoff(ctx, query) + rowIterator, err := query.Read(ctx) if err != nil { return nil, err }