Skip to content

Commit

Permalink
refactor: augment private getWriteStream with view support (#7196)
Browse files Browse the repository at this point in the history
This PR augments the base client with fuller support for view-based
resolution of GetWriteStream metadata.  This PR also adds an integration
test that compares behaviors between different stream types (default vs
explicitly created).

Towards: #7103
  • Loading branch information
shollyman authored Dec 29, 2022
1 parent 3592917 commit 92d9d4c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 3 deletions.
7 changes: 5 additions & 2 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
}
if ms.streamSettings.streamID != "" {
// User supplied a stream, we need to verify it exists.
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID)
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false)
if err != nil {
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
}
Expand Down Expand Up @@ -210,10 +210,13 @@ func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWri
// getWriteStream returns information about a given write stream.
//
// It's primarily used for setup validation, and not exposed directly to end users.
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) {
func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) {
req := &storagepb.GetWriteStreamRequest{
Name: streamName,
}
if fullView {
req.View = storagepb.WriteStreamView_FULL
}
return c.rawClient.GetWriteStream(ctx, req)
}

Expand Down
92 changes: 91 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2/apierror"
"go.opencensus.io/stats/view"
"google.golang.org/api/option"
Expand All @@ -36,6 +37,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -112,6 +114,94 @@ func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
}

func TestIntegration_ClientGetWriteStream(t *testing.T) {
ctx := context.Background()
mwClient, bqClient := getTestClients(ctx, t)
defer mwClient.Close()
defer bqClient.Close()

wantLocation := "us-east1"
dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()

testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}

apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema)
parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)
explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: parent,
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
t.Fatalf("CreateWriteStream: %v", err)
}

testCases := []struct {
description string
isDefault bool
streamID string
wantType storagepb.WriteStream_Type
}{
{
description: "default",
isDefault: true,
streamID: fmt.Sprintf("%s/streams/_default", parent),
wantType: storagepb.WriteStream_COMMITTED,
},
{
description: "explicit pending",
streamID: explicitStream.Name,
wantType: storagepb.WriteStream_PENDING,
},
}

for _, tc := range testCases {
for _, fullView := range []bool{false, true} {
info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView)
if err != nil {
t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err)
}
if info.GetType() != tc.wantType {
t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType)
}
if info.GetLocation() != wantLocation {
t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation)
}
if info.GetCommitTime() != nil {
t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime())
}

if !tc.isDefault {
if info.GetCreateTime() == nil {
t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView)
}
} else {
if info.GetCreateTime() != nil {
t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime())
}
}

if !fullView {
if info.GetTableSchema() != nil {
t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView)
}
} else {
if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" {
t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff)
}
}
}
}
}

func TestIntegration_ManagedWriter(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
Expand Down Expand Up @@ -326,7 +416,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
t.Fatalf("NewManagedStream: %v", err)
}

info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID)
info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false)
if err != nil {
t.Errorf("couldn't get stream info: %v", err)
}
Expand Down

0 comments on commit 92d9d4c

Please sign in to comment.