Skip to content

Commit

Permalink
Set state integration test (#22935)
Browse files Browse the repository at this point in the history
* WIP: Set state integration test

* Update sdks/go/test/integration/primitives/state.go

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>

Co-authored-by: Ritesh Ghorse <riteshghorse@gmail.com>
  • Loading branch information
damccorm and riteshghorse authored Aug 30, 2022
1 parent 37ac00b commit 3ede5b7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
3 changes: 3 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var directFilters = []string{
"TestBagState_Clear",
"TestCombiningState",
"TestMapState",
"TestSetState",
}

var portableFilters = []string{
Expand Down Expand Up @@ -125,6 +126,7 @@ var portableFilters = []string{
"TestBagState_Clear",
"TestCombiningState",
"TestMapState",
"TestSetState",
}

var flinkFilters = []string{
Expand Down Expand Up @@ -174,6 +176,7 @@ var samzaFilters = []string{
"TestBagState_Clear",
"TestCombiningState",
"TestMapState",
"TestSetState",
}

var sparkFilters = []string{
Expand Down
45 changes: 45 additions & 0 deletions sdks/go/test/integration/primitives/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {
register.DoFn3x1[state.Provider, string, int, string](&bagStateClearFn{})
register.DoFn3x1[state.Provider, string, int, string](&combiningStateFn{})
register.DoFn3x1[state.Provider, string, int, string](&mapStateFn{})
register.DoFn3x1[state.Provider, string, int, string](&setStateFn{})
register.Emitter2[string, int]()
register.Combiner1[int](&combine1{})
register.Combiner2[string, int](&combine2{})
Expand Down Expand Up @@ -400,3 +401,47 @@ func MapStateParDo() *beam.Pipeline {

return p
}

type setStateFn struct {
State1 state.Set[string]
}

func (f *setStateFn) ProcessElement(s state.Provider, w string, c int) string {
ok, err := f.State1.Contains(s, w)
if err != nil {
panic(err)
}
err = f.State1.Add(s, w)
if err != nil {
panic(err)
}
if ok {
err = f.State1.Add(s, fmt.Sprintf("%v%v", w, 1))
if err != nil {
panic(err)
}
}

keys, _, err := f.State1.Keys(s)
if err != nil {
panic(err)
}

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

return fmt.Sprintf("%v: %v, keys: %v", w, ok, keys)
}

// SetStateParDo tests a DoFn that uses set state.
func SetStateParDo() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()

in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &setStateFn{State1: state.MakeSetState[string]("key1")}, keyed)
passert.Equals(s, counts, "apple: false, keys: [apple]", "pear: false, keys: [pear]", "peach: false, keys: [peach]", "apple: true, keys: [apple apple1]", "apple: true, keys: [apple apple1]", "pear: true, keys: [pear pear1]")

return p
}
5 changes: 5 additions & 0 deletions sdks/go/test/integration/primitives/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func TestMapState(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, MapStateParDo())
}

func TestSetState(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, SetStateParDo())
}

0 comments on commit 3ede5b7

Please sign in to comment.