From 86f33db442fcbb9bcd966aba39263ce9d4d8d142 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 29 Aug 2022 15:06:56 -0400 Subject: [PATCH 1/5] WIP: Add ability to remove/clear map and set state --- .../pkg/beam/core/runtime/exec/userstate.go | 77 +++++-- sdks/go/pkg/beam/core/state/state.go | 36 ++++ sdks/go/pkg/beam/core/state/state_test.go | 192 ++++++++++++++++++ sdks/go/test/integration/integration.go | 3 + sdks/go/test/integration/primitives/state.go | 61 ++++++ .../test/integration/primitives/state_test.go | 5 + 6 files changed, 361 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/userstate.go b/sdks/go/pkg/beam/core/runtime/exec/userstate.go index e089d843f67f2..b946e67b63d9c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/userstate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/userstate.go @@ -83,14 +83,16 @@ func (s *stateProvider) WriteValueState(val state.Transaction) error { if err != nil { return err } - cl.Write([]byte{}) + _, err = cl.Write([]byte{}) + if err != nil { + return err + } ap, err := s.getBagAppender(val.Key) if err != nil { return err } fv := FullValue{Elm: val.Val} - // TODO(#22736) - consider caching this a proprty of stateProvider enc := MakeElementEncoder(coder.SkipW(s.codersByKey[val.Key])) err = enc.Encode(&fv, ap) if err != nil { @@ -109,7 +111,10 @@ func (s *stateProvider) ClearValueState(val state.Transaction) error { if err != nil { return err } - cl.Write([]byte{}) + _, err = cl.Write([]byte{}) + if err != nil { + return err + } // Any transactions before a clear don't matter s.transactionsByKey[val.Key] = []state.Transaction{val} @@ -153,7 +158,10 @@ func (s *stateProvider) ClearBagState(val state.Transaction) error { if err != nil { return err } - cl.Write([]byte{}) + _, err = cl.Write([]byte{}) + if err != nil { + return err + } // Any transactions before a clear don't matter s.transactionsByKey[val.Key] = []state.Transaction{val} @@ -168,14 +176,12 @@ func (s *stateProvider) WriteBagState(val state.Transaction) error { return err } fv := FullValue{Elm: val.Val} - // TODO(#22736) - consider caching this a proprty of stateProvider enc := MakeElementEncoder(coder.SkipW(s.codersByKey[val.Key])) err = enc.Encode(&fv, ap) if err != nil { return err } - // TODO(#22736) - optimize this a bit once all state types are added. if transactions, ok := s.transactionsByKey[val.Key]; ok { transactions = append(transactions, val) s.transactionsByKey[val.Key] = transactions @@ -254,27 +260,26 @@ func (s *stateProvider) ReadMapStateKeys(userStateID string) ([]interface{}, []s // WriteMapState writes a key value pair to the global map state. func (s *stateProvider) WriteMapState(val state.Transaction) error { - cl, err := s.getMultiMapClearer(val.Key, val.MapKey) + cl, err := s.getMultiMapKeyClearer(val.Key, val.MapKey) + if err != nil { + return err + } + _, err = cl.Write([]byte{}) if err != nil { return err } - cl.Write([]byte{}) ap, err := s.getMultiMapAppender(val.Key, val.MapKey) if err != nil { return err } fv := FullValue{Elm: val.Val} - // TODO(#22736) - consider caching this a proprty of stateProvider enc := MakeElementEncoder(coder.SkipW(s.codersByKey[val.Key])) err = enc.Encode(&fv, ap) if err != nil { return err } - // TODO(#22736) - optimize this a bit once all state types are added. In the case of sets/clears, - // we can remove the transactions. We can also consider combining other transactions on read (or sooner) - // so that we don't need to use as much memory/time replaying transactions. if transactions, ok := s.transactionsByKey[val.Key]; ok { transactions = append(transactions, val) s.transactionsByKey[val.Key] = transactions @@ -285,6 +290,44 @@ func (s *stateProvider) WriteMapState(val state.Transaction) error { return nil } +// ClearMapStateKey deletes a key value pair from the global map state. +func (s *stateProvider) ClearMapStateKey(val state.Transaction) error { + cl, err := s.getMultiMapKeyClearer(val.Key, val.MapKey) + if err != nil { + return err + } + _, err = cl.Write([]byte{}) + if err != nil { + return err + } + + if transactions, ok := s.transactionsByKey[val.Key]; ok { + transactions = append(transactions, val) + s.transactionsByKey[val.Key] = transactions + } else { + s.transactionsByKey[val.Key] = []state.Transaction{val} + } + + return nil +} + +// ClearMapState deletes all key value pairs from the global map state. +func (s *stateProvider) ClearMapState(val state.Transaction) error { + cl, err := s.getMultiMapClearer(val.Key) + if err != nil { + return err + } + _, err = cl.Write([]byte{}) + if err != nil { + return err + } + + // Any transactions before a clear don't matter + s.transactionsByKey[val.Key] = []state.Transaction{val} + + return nil +} + func (s *stateProvider) CreateAccumulatorFn(userStateID string) reflectx.Func { a := s.combineFnsByKey[userStateID] if ca := a.CreateAccumulatorFn(); ca != nil { @@ -379,7 +422,7 @@ func (s *stateProvider) getMultiMapAppender(userStateID string, key interface{}) return w, nil } -func (s *stateProvider) getMultiMapClearer(userStateID string, key interface{}) (io.Writer, error) { +func (s *stateProvider) getMultiMapKeyClearer(userStateID string, key interface{}) (io.Writer, error) { ek, err := s.encodeKey(userStateID, key) if err != nil { return nil, err @@ -391,6 +434,14 @@ func (s *stateProvider) getMultiMapClearer(userStateID string, key interface{}) return w, nil } +func (s *stateProvider) getMultiMapClearer(userStateID string) (io.Writer, error) { + w, err := s.sr.OpenMultimapKeysUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window) + if err != nil { + return nil, err + } + return w, nil +} + func (s *stateProvider) getMultiMapKeyReader(userStateID string) (io.ReadCloser, error) { if r, ok := s.readersByKey[userStateID]; ok { return r, nil diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go index 44b7a193b7566..4aae60e3bfe3f 100644 --- a/sdks/go/pkg/beam/core/state/state.go +++ b/sdks/go/pkg/beam/core/state/state.go @@ -82,6 +82,8 @@ type Provider interface { ReadMapStateValue(userStateID string, key interface{}) (interface{}, []Transaction, error) ReadMapStateKeys(userStateID string) ([]interface{}, []Transaction, error) WriteMapState(val Transaction) error + ClearMapStateKey(val Transaction) error + ClearMapState(val Transaction) error } // PipelineState is an interface representing different kinds of PipelineState (currently just state.Value). @@ -513,6 +515,23 @@ func (s *Map[K, V]) Get(p Provider, key K) (V, bool, error) { return cur.(V), true, nil } +// Remove deletes an entry from this instance of map state. +func (s *Map[K, V]) Remove(p Provider, key K) error { + return p.ClearMapStateKey(Transaction{ + Key: s.Key, + Type: TransactionTypeClear, + MapKey: key, + }) +} + +// Clear deletes all entries from this instance of map state. +func (s *Map[K, V]) Clear(p Provider) error { + return p.ClearMapState(Transaction{ + Key: s.Key, + Type: TransactionTypeClear, + }) +} + // StateKey returns the key for this pipeline state entry. func (s Map[K, V]) StateKey() string { if s.Key == "" { @@ -636,6 +655,23 @@ func (s *Set[K]) Contains(p Provider, key K) (bool, error) { return true, nil } +// Remove deletes an entry from this instance of set state. +func (s Set[K]) Remove(p Provider, key K) error { + return p.ClearMapStateKey(Transaction{ + Key: s.Key, + Type: TransactionTypeClear, + MapKey: key, + }) +} + +// Clear deletes all entries from this instance of set state. +func (s Set[K]) Clear(p Provider) error { + return p.ClearMapState(Transaction{ + Key: s.Key, + Type: TransactionTypeClear, + }) +} + // StateKey returns the key for this pipeline state entry. func (s Set[K]) StateKey() string { if s.Key == "" { diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go index 555e0adbe7b21..4103b8897a8c6 100644 --- a/sdks/go/pkg/beam/core/state/state_test.go +++ b/sdks/go/pkg/beam/core/state/state_test.go @@ -163,6 +163,20 @@ func (s *fakeProvider) WriteMapState(val Transaction) error { return nil } +func (s *fakeProvider) ClearMapStateKey(val Transaction) error { + if transactions, ok := s.transactions[val.Key]; ok { + s.transactions[val.Key] = append(transactions, val) + } else { + s.transactions[val.Key] = []Transaction{val} + } + return nil +} + +func (s *fakeProvider) ClearMapState(val Transaction) error { + s.transactions[val.Key] = []Transaction{val} + return nil +} + func TestValueRead(t *testing.T) { is := make(map[string]interface{}) ts := make(map[string][]Transaction) @@ -841,6 +855,95 @@ func TestMapPut(t *testing.T) { } } +func TestMapRemove(t *testing.T) { + var tests = []struct { + writes [][]string + removes []string + keys []string + }{ + {[][]string{}, []string{}, []string{}}, + {[][]string{{"foo", "bar"}, {"foo2", "bar2"}, {"foo3", "bar3"}}, []string{"foo", "foo2"}, []string{"foo3"}}, + {[][]string{{"foo", "bar"}, {"foo2", "bar2"}, {"foo3", "bar3"}}, []string{"foo", "foo2", "foo"}, []string{"foo3"}}, + {[][]string{{"foo", "bar"}, {"foo2", "bar2"}, {"foo3", "bar3"}}, []string{"foo", "foo2", "foo3"}, []string{}}, + } + + for _, tt := range tests { + f := fakeProvider{ + initialState: make(map[string]interface{}), + transactions: make(map[string][]Transaction), + err: make(map[string]error), + } + vs := MakeMapState[string, string]("vs") + for _, val := range tt.writes { + vs.Put(&f, val[0], val[1]) + } + for _, val := range tt.removes { + err := vs.Remove(&f, val) + if err != nil { + t.Errorf("vs.Remove(%v) returned error %v", val, err) + } + } + val, _, err := vs.Keys(&f) + if err != nil { + t.Errorf("Map.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + } else if len(val) != len(tt.keys) { + t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } else { + eq := true + for idx, v := range val { + if v != tt.keys[idx] { + eq = false + } + } + if !eq { + t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } + } + } +} + +func TestMapClear(t *testing.T) { + var tests = []struct { + writes [][]string + keys []string + }{ + {[][]string{}, []string{}}, + {[][]string{{"foo", "bar"}, {"foo2", "bar2"}, {"foo3", "bar3"}}, []string{}}, + } + + for _, tt := range tests { + f := fakeProvider{ + initialState: make(map[string]interface{}), + transactions: make(map[string][]Transaction), + err: make(map[string]error), + } + vs := MakeMapState[string, string]("vs") + for _, val := range tt.writes { + vs.Put(&f, val[0], val[1]) + } + err := vs.Clear(&f) + if err != nil { + t.Errorf("vs.Clear() returned error %v", err) + } + val, _, err := vs.Keys(&f) + if err != nil { + t.Errorf("Map.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + } else if len(val) != len(tt.keys) { + t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } else { + eq := true + for idx, v := range val { + if v != tt.keys[idx] { + eq = false + } + } + if !eq { + t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } + } + } +} + func TestSetContains(t *testing.T) { is := make(map[string]interface{}) im := make(map[string]map[string]interface{}) @@ -1008,3 +1111,92 @@ func TestSetAdd(t *testing.T) { } } } + +func TestSetRemove(t *testing.T) { + var tests = []struct { + writes []string + removes []string + keys []string + }{ + {[]string{}, []string{}, []string{}}, + {[]string{"foo", "foo2", "foo3"}, []string{"foo", "foo2"}, []string{"foo3"}}, + {[]string{"foo", "foo2", "foo3"}, []string{"foo", "foo2", "foo"}, []string{"foo3"}}, + {[]string{"foo", "foo2", "foo3"}, []string{"foo", "foo2", "foo3"}, []string{}}, + } + + for _, tt := range tests { + f := fakeProvider{ + initialState: make(map[string]interface{}), + transactions: make(map[string][]Transaction), + err: make(map[string]error), + } + vs := MakeSetState[string]("vs") + for _, val := range tt.writes { + vs.Add(&f, val) + } + for _, val := range tt.removes { + err := vs.Remove(&f, val) + if err != nil { + t.Errorf("vs.Remove(%v) returned error %v", val, err) + } + } + val, _, err := vs.Keys(&f) + if err != nil { + t.Errorf("Set.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + } else if len(val) != len(tt.keys) { + t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } else { + eq := true + for idx, v := range val { + if v != tt.keys[idx] { + eq = false + } + } + if !eq { + t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } + } + } +} + +func TestSetClear(t *testing.T) { + var tests = []struct { + writes []string + keys []string + }{ + {[]string{}, []string{}}, + {[]string{"foo", "foo2", "foo3"}, []string{}}, + } + + for _, tt := range tests { + f := fakeProvider{ + initialState: make(map[string]interface{}), + transactions: make(map[string][]Transaction), + err: make(map[string]error), + } + vs := MakeSetState[string]("vs") + for _, val := range tt.writes { + vs.Add(&f, val) + } + err := vs.Clear(&f) + if err != nil { + t.Errorf("vs.Clear() returned error %v", err) + } + val, _, err := vs.Keys(&f) + if err != nil { + t.Errorf("Set.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + } else if len(val) != len(tt.keys) { + t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } else { + eq := true + for idx, v := range val { + if v != tt.keys[idx] { + eq = false + } + } + if !eq { + t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) + } + } + } +} diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 504cf3e58f2d7..3156abb3ac579 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -96,6 +96,7 @@ var directFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", + "TestMapState_Clear", } var portableFilters = []string{ @@ -123,6 +124,7 @@ var portableFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", + "TestMapState_Clear", } var flinkFilters = []string{ @@ -171,6 +173,7 @@ var samzaFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", + "TestMapState_Clear", } var sparkFilters = []string{ diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go index dd6a0ccf49b25..1ad39579647f2 100644 --- a/sdks/go/test/integration/primitives/state.go +++ b/sdks/go/test/integration/primitives/state.go @@ -34,6 +34,7 @@ func init() { register.DoFn3x1[state.Provider, string, int, string](&bagStateClearFn{}) register.DoFn3x1[state.Provider, string, int, string](&combiningStateFn{}) register.DoFn3x1[state.Provider, string, int, string](&mapStateFn{}) + register.DoFn3x1[state.Provider, string, int, string](&mapStateClearFn{}) register.Emitter2[string, int]() register.Combiner1[int](&combine1{}) register.Combiner2[string, int](&combine2{}) @@ -385,3 +386,63 @@ func MapStateParDo() *beam.Pipeline { return p } + +type mapStateClearFn struct { + State1 state.Map[string, int] +} + +func (f *mapStateClearFn) ProcessElement(s state.Provider, w string, c int) string { + _, ok, err := f.State1.Get(s, w) + if err != nil { + panic(err) + } + if ok { + f.State1.Remove(s, w) + f.State1.Put(s, fmt.Sprintf("%v%v", w, 1), 1) + f.State1.Put(s, fmt.Sprintf("%v%v", w, 2), 1) + f.State1.Put(s, fmt.Sprintf("%v%v", w, 3), 1) + } else { + _, ok, err := f.State1.Get(s, fmt.Sprintf("%v%v", w, 1)) + if err != nil { + panic(err) + } + if ok { + f.State1.Clear(s) + } else { + f.State1.Put(s, w, 1) + } + } + + keys, _, err := f.State1.Keys(s) + if err != nil { + panic(err) + } + + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + for _, k := range keys { + _, ok, err = f.State1.Get(s, k) + if err != nil { + panic(err) + } + if !ok { + panic(fmt.Sprintf("%v is present in keys, but not in the map", k)) + } + } + + return fmt.Sprintf("%v: %v", w, keys) +} + +// MapStateParDo_Clear tests clearing and removing from a DoFn that uses map state. +func MapStateParDo_Clear() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + + in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear") + keyed := beam.ParDo(s, func(w string, emit func(string, int)) { + emit(w, 1) + }, in) + counts := beam.ParDo(s, &mapStateFn{State1: state.MakeMapState[string, int]("key1")}, keyed) + passert.Equals(s, counts, "apple: [apple]", "pear: [pear]", "peach: [peach]", "apple: [apple1, apple2, apple3]", "apple: []", "pear: [pear1 pear2 pear3]") + + return p +} diff --git a/sdks/go/test/integration/primitives/state_test.go b/sdks/go/test/integration/primitives/state_test.go index 23d389ba1f17a..6077ae9b734b9 100644 --- a/sdks/go/test/integration/primitives/state_test.go +++ b/sdks/go/test/integration/primitives/state_test.go @@ -51,3 +51,8 @@ func TestMapState(t *testing.T) { integration.CheckFilters(t) ptest.RunAndValidate(t, MapStateParDo()) } + +func TestMapState_Clear(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, MapStateParDo_Clear()) +} From 4a7202f0f887121e5936472a24e7fc96a0236939 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 29 Aug 2022 15:18:37 -0400 Subject: [PATCH 2/5] Add set test + fix map test --- sdks/go/test/integration/primitives/state.go | 65 +++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go index 1ad39579647f2..700ee065d17d6 100644 --- a/sdks/go/test/integration/primitives/state.go +++ b/sdks/go/test/integration/primitives/state.go @@ -35,6 +35,7 @@ func init() { register.DoFn3x1[state.Provider, string, int, string](&combiningStateFn{}) register.DoFn3x1[state.Provider, string, int, string](&mapStateFn{}) register.DoFn3x1[state.Provider, string, int, string](&mapStateClearFn{}) + register.DoFn3x1[state.Provider, string, int, string](&setStateClearFn{}) register.Emitter2[string, int]() register.Combiner1[int](&combine1{}) register.Combiner2[string, int](&combine2{}) @@ -441,8 +442,68 @@ func MapStateParDo_Clear() *beam.Pipeline { keyed := beam.ParDo(s, func(w string, emit func(string, int)) { emit(w, 1) }, in) - counts := beam.ParDo(s, &mapStateFn{State1: state.MakeMapState[string, int]("key1")}, keyed) - passert.Equals(s, counts, "apple: [apple]", "pear: [pear]", "peach: [peach]", "apple: [apple1, apple2, apple3]", "apple: []", "pear: [pear1 pear2 pear3]") + counts := beam.ParDo(s, &mapStateClearFn{State1: state.MakeMapState[string, int]("key1")}, keyed) + passert.Equals(s, counts, "apple: [apple]", "pear: [pear]", "peach: [peach]", "apple: [apple1 apple2 apple3]", "apple: []", "pear: [pear1 pear2 pear3]") + + return p +} + +type setStateClearFn struct { + State1 state.Set[string] +} + +func (f *setStateClearFn) ProcessElement(s state.Provider, w string, c int) string { + ok, err := f.State1.Contains(s, w) + if err != nil { + panic(err) + } + if ok { + f.State1.Remove(s, w) + f.State1.Add(s, fmt.Sprintf("%v%v", w, 1)) + f.State1.Add(s, fmt.Sprintf("%v%v", w, 2)) + f.State1.Add(s, fmt.Sprintf("%v%v", w, 3)) + } else { + ok, err := f.State1.Contains(s, fmt.Sprintf("%v%v", w, 1)) + if err != nil { + panic(err) + } + if ok { + f.State1.Clear(s) + } else { + f.State1.Add(s, w) + } + } + + keys, _, err := f.State1.Keys(s) + if err != nil { + panic(err) + } + + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + for _, k := range keys { + ok, err = f.State1.Contains(s, k) + if err != nil { + panic(err) + } + if !ok { + panic(fmt.Sprintf("%v is present in keys, but not in the map", k)) + } + } + + return fmt.Sprintf("%v: %v", w, keys) +} + +// SetStateParDo_Clear tests clearing and removing from a DoFn that uses set state. +func SetStateParDo_Clear() *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + + in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear") + keyed := beam.ParDo(s, func(w string, emit func(string, int)) { + emit(w, 1) + }, in) + counts := beam.ParDo(s, &setStateClearFn{State1: state.MakeSetState[string]("key1")}, keyed) + passert.Equals(s, counts, "apple: [apple]", "pear: [pear]", "peach: [peach]", "apple: [apple1 apple2 apple3]", "apple: []", "pear: [pear1 pear2 pear3]") return p } From b134540cba5da27afe1763fefa76de4f0402d875 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 29 Aug 2022 15:38:23 -0400 Subject: [PATCH 3/5] String fix --- sdks/go/pkg/beam/core/state/state_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go index 4103b8897a8c6..a43d0c612ab2f 100644 --- a/sdks/go/pkg/beam/core/state/state_test.go +++ b/sdks/go/pkg/beam/core/state/state_test.go @@ -885,7 +885,7 @@ func TestMapRemove(t *testing.T) { } val, _, err := vs.Keys(&f) if err != nil { - t.Errorf("Map.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + t.Errorf("Map.Keys() returned error %v when it shouldn't have after writing: %v", err, tt.writes) } else if len(val) != len(tt.keys) { t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) } else { @@ -927,7 +927,7 @@ func TestMapClear(t *testing.T) { } val, _, err := vs.Keys(&f) if err != nil { - t.Errorf("Map.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + t.Errorf("Map.Keys() returned error %v when it shouldn't have after writing: %v", err, tt.writes) } else if len(val) != len(tt.keys) { t.Errorf("Map.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) } else { @@ -1142,7 +1142,7 @@ func TestSetRemove(t *testing.T) { } val, _, err := vs.Keys(&f) if err != nil { - t.Errorf("Set.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + t.Errorf("Set.Keys() returned error %v when it shouldn't have after writing: %v", err, tt.writes) } else if len(val) != len(tt.keys) { t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) } else { @@ -1184,7 +1184,7 @@ func TestSetClear(t *testing.T) { } val, _, err := vs.Keys(&f) if err != nil { - t.Errorf("Set.Get(\"foo\") returned error %v when it shouldn't have after writing: %v", err, tt.writes) + t.Errorf("Set.Keys() returned error %v when it shouldn't have after writing: %v", err, tt.writes) } else if len(val) != len(tt.keys) { t.Errorf("Set.Keys()=%v, want %v for state key %v", val, tt.keys, vs.Key) } else { From 1919ddfb4189e0b69352e61d607521b81c7c22b9 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 30 Aug 2022 07:38:07 -0400 Subject: [PATCH 4/5] Add missing set state test --- sdks/go/test/integration/integration.go | 9 ++++++--- sdks/go/test/integration/primitives/state.go | 8 ++++---- sdks/go/test/integration/primitives/state_test.go | 9 +++++++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7ed690fd968a4..d5418fb70a3bd 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -97,7 +97,8 @@ var directFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", - "TestMapState_Clear", + "TestMapStateClear", + "TestSetStateClear", } var portableFilters = []string{ @@ -126,7 +127,8 @@ var portableFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", - "TestMapState_Clear", + "TestMapStateClear", + "TestSetStateClear", } var flinkFilters = []string{ @@ -176,7 +178,8 @@ var samzaFilters = []string{ "TestBagState_Clear", "TestCombiningState", "TestMapState", - "TestMapState_Clear", + "TestMapStateClear", + "TestSetStateClear", } var sparkFilters = []string{ diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go index 5761086757612..aba54e97ad33e 100644 --- a/sdks/go/test/integration/primitives/state.go +++ b/sdks/go/test/integration/primitives/state.go @@ -449,8 +449,8 @@ func (f *mapStateClearFn) ProcessElement(s state.Provider, w string, c int) stri return fmt.Sprintf("%v: %v", w, keys) } -// MapStateParDo_Clear tests clearing and removing from a DoFn that uses map state. -func MapStateParDo_Clear() *beam.Pipeline { +// MapStateParDoClear tests clearing and removing from a DoFn that uses map state. +func MapStateParDoClear() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear") @@ -509,8 +509,8 @@ func (f *setStateClearFn) ProcessElement(s state.Provider, w string, c int) stri return fmt.Sprintf("%v: %v", w, keys) } -// SetStateParDo_Clear tests clearing and removing from a DoFn that uses set state. -func SetStateParDo_Clear() *beam.Pipeline { +// SetStateParDoClear tests clearing and removing from a DoFn that uses set state. +func SetStateParDoClear() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear") diff --git a/sdks/go/test/integration/primitives/state_test.go b/sdks/go/test/integration/primitives/state_test.go index 74c08670497bc..dd5a96fd18084 100644 --- a/sdks/go/test/integration/primitives/state_test.go +++ b/sdks/go/test/integration/primitives/state_test.go @@ -57,7 +57,12 @@ func TestMapState(t *testing.T) { ptest.RunAndValidate(t, MapStateParDo()) } -func TestMapState_Clear(t *testing.T) { +func TestMapStateClear(t *testing.T) { integration.CheckFilters(t) - ptest.RunAndValidate(t, MapStateParDo_Clear()) + ptest.RunAndValidate(t, MapStateParDoClear()) +} + +func TestSetStateClear(t *testing.T) { + integration.CheckFilters(t) + ptest.RunAndValidate(t, SetStateParDoClear()) } From 3c80a2df3f871484f63f22d1aa403533952bc821 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 30 Aug 2022 14:45:05 -0400 Subject: [PATCH 5/5] go fmt --- sdks/go/test/integration/primitives/state.go | 2 +- sdks/go/test/integration/primitives/state_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go index 3b61d6a06b4ae..5d5d8afa3b265 100644 --- a/sdks/go/test/integration/primitives/state.go +++ b/sdks/go/test/integration/primitives/state.go @@ -566,4 +566,4 @@ func SetStateParDoClear() *beam.Pipeline { passert.Equals(s, counts, "apple: [apple]", "pear: [pear]", "peach: [peach]", "apple: [apple1 apple2 apple3]", "apple: []", "pear: [pear1 pear2 pear3]") return p -} \ No newline at end of file +} diff --git a/sdks/go/test/integration/primitives/state_test.go b/sdks/go/test/integration/primitives/state_test.go index f21955ee19abf..37f45c2c0c2bf 100644 --- a/sdks/go/test/integration/primitives/state_test.go +++ b/sdks/go/test/integration/primitives/state_test.go @@ -70,4 +70,4 @@ func TestSetState(t *testing.T) { func TestSetStateClear(t *testing.T) { integration.CheckFilters(t) ptest.RunAndValidate(t, SetStateParDoClear()) -} \ No newline at end of file +}