Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): improve error communication (#6360
Browse files Browse the repository at this point in the history
)

* feat(bigquery/storage/managedwriter): improve error communication

Fixes: #6321
  • Loading branch information
shollyman authored Aug 11, 2022
1 parent d03c3e1 commit b30d89d
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 4 deletions.
4 changes: 2 additions & 2 deletions bigquery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ require (
cloud.google.com/go/storage v1.23.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.8
github.com/googleapis/gax-go/v2 v2.4.0
github.com/googleapis/gax-go/v2 v2.5.1
go.opencensus.io v0.23.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f
google.golang.org/api v0.90.0
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
)

require (
Expand Down
7 changes: 5 additions & 2 deletions bigquery/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/OthfcblKl4IGNaM=
github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99EXz9pXxye9YM=
github.com/googleapis/gax-go/v2 v2.4.0 h1:dS9eYAjhrE2RjmzYw2XAPvcXfmcQLtFEQWn0CR82awk=
github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c=
github.com/googleapis/gax-go/v2 v2.5.1 h1:kBRZU0PSuI7PspsSb/ChWoVResUcwNVIdpB049pKTiw=
github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo=
github.com/googleapis/go-type-adapters v1.0.0 h1:9XdMn+d/G57qq1s8dNc5IesGCXHf6V2HZ2JwRxfA2tA=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down Expand Up @@ -604,6 +605,7 @@ google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljW
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE=
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 h1:QntLWYqZeuBtJkth3m/6DLznnI0AHJr+AgJXvVh/izw=
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down Expand Up @@ -653,8 +655,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
Expand Down
29 changes: 29 additions & 0 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,34 @@ have been finalized, meaning they'll no longer allow further data writes.
// Using the client, we can commit data from multple streams to the same
// table atomically.
resp, err := client.BatchCommitWriteStreams(ctx, req)
# Error Handling
Like other Google Cloud services, this API relies on common components that can provide an
enhanced set of errors when communicating about the results of API interactions.
Specifically, the apierror package (https://pkg.go.dev/github.com/googleapis/gax-go/v2/apierror)
provides convenience methods for extracting structured information about errors.
The BigQuery Storage API service augments applicable errors with service-specific details in
the form of a StorageError message. The StorageError message is accessed via the ExtractProtoMessage
method in the apierror package. Note that the StorageError messsage does not implement Go's error
interface.
An example of accessing the structured error details:
// By way of example, let's assume the response from an append call returns an error.
_, err := result.GetResult(ctx)
if err != nil {
if apiErr, ok := apierror.FromError(err); ok {
// We now have an instance of APIError, which directly exposes more specific
// details about multiple failure conditions include transport-level errors.
storageErr := &storagepb.StorageError{}
if e := apiErr.Details().ExtractProtoMessage(storageErr); e != nil {
// storageErr now contains service-specific information about the error.
log.Printf("Received service-specific error code %s", storageErr.GetCode().String())
}
}
}
*/
package managedwriter
122 changes: 122 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("ErrorBehaviors", func(t *testing.T) {
t.Parallel()
testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -404,6 +408,124 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
withExactRowCount(int64(len(testSimpleData))))
}

// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

data := make([][]byte, len(testSimpleData))
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data[k] = b
}

// Send an append at an invalid offset.
result, err := ms.AppendRows(ctx, data, WithOffset(99))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
//
off, err := result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}

apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se := &storagepb.StorageError{}
e := apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Send "real" append to advance the offset.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err != nil {
t.Errorf("expected offset, got error %v", err)
}
wantOffset := int64(0)
if off != wantOffset {
t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
}
// Now, send at the start offset again.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Finalize the stream.
if _, err := ms.Finalize(ctx); err != nil {
t.Errorf("Finalize had error: %v", err)
}
// Send another append, which is disallowed for finalized streams.
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_STREAM_FINALIZED
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
}

func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down

0 comments on commit b30d89d

Please sign in to comment.