Skip to content

Commit

Permalink
feat(bigtable): Add UpdateFamily to allow updating a family type (goo…
Browse files Browse the repository at this point in the history
…gleapis#10759)

* feat(bigtable): Add UpdateFamily to allow updating a family type

* fix API diff

* fix bug

* fix field mask

* add integration test

* fix vet

* fix vet

* fix vet
  • Loading branch information
ron-gal committed Aug 23, 2024
1 parent 7713371 commit ec0cbb2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 25 deletions.
71 changes: 50 additions & 21 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,60 +667,89 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
return ti, nil
}

type gcPolicySettings struct {
type updateFamilyOption struct {
ignoreWarnings bool
}

// GCPolicyOption is the interface to change GC policy settings
// GCPolicyOption is deprecated, kept for backwards compatibility, use UpdateFamilyOption in new code
type GCPolicyOption interface {
apply(s *gcPolicySettings)
apply(s *updateFamilyOption)
}

// UpdateFamilyOption is the interface to update family settings
type UpdateFamilyOption GCPolicyOption

type ignoreWarnings bool

func (w ignoreWarnings) apply(s *gcPolicySettings) {
func (w ignoreWarnings) apply(s *updateFamilyOption) {
s.ignoreWarnings = bool(w)
}

// IgnoreWarnings returns a gcPolicyOption that ignores safety checks when modifying the column families
// IgnoreWarnings returns a updateFamilyOption that ignores safety checks when modifying the column families
func IgnoreWarnings() GCPolicyOption {
return ignoreWarnings(true)
}

func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error {
// SetGCPolicy specifies which cells in a column family should be garbage collected.
// GC executes opportunistically in the background; table reads may return data
// matching the GC policy.
func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
return ac.UpdateFamily(ctx, table, family, Family{GCPolicy: policy})
}

// SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options
func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error {
familyOpts := []UpdateFamilyOption{}
for _, opt := range opts {
if opt != nil {
familyOpts = append(familyOpts, opt.(UpdateFamilyOption))
}
}
return ac.UpdateFamily(ctx, table, family, Family{GCPolicy: policy}, familyOpts...)
}

// UpdateFamily updates column families' garbage colleciton policies and value type.
func (ac *AdminClient) UpdateFamily(ctx context.Context, table, familyName string, family Family, opts ...UpdateFamilyOption) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

s := gcPolicySettings{}
s := updateFamilyOption{}
for _, opt := range opts {
if opt != nil {
opt.apply(&s)
}
}

cf := &btapb.ColumnFamily{}
mask := &field_mask.FieldMask{}
if family.GCPolicy != nil {
cf.GcRule = family.GCPolicy.proto()
mask.Paths = append(mask.Paths, "gc_rule")

}
if family.ValueType != nil {
cf.ValueType = family.ValueType.proto()
mask.Paths = append(mask.Paths, "value_type")
}

// No update
if len(mask.Paths) == 0 {
return nil
}

req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: family,
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{GcRule: policy.proto()}},
Id: familyName,
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: cf},
UpdateMask: mask,
}},
IgnoreWarnings: s.ignoreWarnings,
}
_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
return err
}

// SetGCPolicy specifies which cells in a column family should be garbage collected.
// GC executes opportunistically in the background; table reads may return data
// matching the GC policy.
func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
return ac.SetGCPolicyWithOptions(ctx, table, family, policy)
}

// SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options
func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error {
return ac.setGCPolicy(ctx, table, family, policy, opts...)
}

// DropRowRange permanently deletes a row range from the specified table.
func (ac *AdminClient) DropRowRange(ctx context.Context, table, rowKeyPrefix string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
Expand Down
2 changes: 1 addition & 1 deletion bigtable/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestTableAdmin_UpdateTableDisableChangeStream(t *testing.T) {
func TestTableAdmin_SetGcPolicy(t *testing.T) {
for _, test := range []struct {
desc string
opts GCPolicyOption
opts UpdateFamilyOption
want bool
}{
{
Expand Down
9 changes: 7 additions & 2 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,15 @@ func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColu
return true
})
} else if modify := mod.GetUpdate(); modify != nil {
if _, ok := tbl.families[mod.Id]; !ok {
newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify)
cf, ok := tbl.families[mod.Id]
if !ok {
return nil, fmt.Errorf("no such family %q", mod.Id)
}
newcf := newColumnFamily(req.Name+"/columnFamilies/"+mod.Id, 0, modify)
if cf.valueType != newcf.valueType {
return nil, status.Errorf(codes.InvalidArgument, "Immutable fields 'value_type' cannot be updated")
}

// assume that we ALWAYS want to replace by the new setting
// we may need partial update through
tbl.families[mod.Id] = newcf
Expand Down
14 changes: 13 additions & 1 deletion bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestIntegration_ReadRowList(t *testing.T) {

func TestIntegration_Aggregates(t *testing.T) {
ctx := context.Background()
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
_, _, ac, table, tableName, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -346,6 +346,18 @@ func TestIntegration_Aggregates(t *testing.T) {
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}

err = ac.UpdateFamily(ctx, tableName, family, Family{ValueType: AggregateType{
Input: Int64Type{},
Aggregator: MinAggregator{},
}})
if err == nil {
t.Fatalf("Expected UpdateFamily to fail, but it didn't")
}
wantError := "Immutable fields 'value_type' cannot be updated"
if !strings.Contains(err.Error(), wantError) {
t.Errorf("Wrong error. Expected to containt %q but was %v", wantError, err)
}
}

func TestIntegration_ReadRowListReverse(t *testing.T) {
Expand Down

0 comments on commit ec0cbb2

Please sign in to comment.