Skip to content

Commit

Permalink
Go Map State integration test (apache#22898)
Browse files Browse the repository at this point in the history
* Add map state in the Go Sdk

* Remove unused function for now

* WIP: MapState integration test

* WIP: MapState integration test

* fix bad merge
  • Loading branch information
damccorm authored and Kanishk Karanawat committed Sep 29, 2022
1 parent 4046777 commit 78895ac
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
6 changes: 5 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
// Running integration tests can be done with a go test call with any flags that
// are required by the test pipelines, such as --runner or --endpoint.
// Example:
// go test -v ./sdks/go/test/integration/... --runner=portable --endpoint=localhost:8099
//
// go test -v ./sdks/go/test/integration/... --runner=portable --endpoint=localhost:8099
//
// Alternatively, tests can be executed by running the
// run_validatesrunner_tests.sh script, which also performs much of the
Expand Down Expand Up @@ -92,6 +93,7 @@ var directFilters = []string{
"TestValueState",
"TestBagState",
"TestCombiningState",
"TestMapState",
}

var portableFilters = []string{
Expand All @@ -116,6 +118,7 @@ var portableFilters = []string{
"TestValueState",
"TestBagState",
"TestCombiningState",
"TestMapState",
}

var flinkFilters = []string{
Expand Down Expand Up @@ -161,6 +164,7 @@ var samzaFilters = []string{
"TestValueState",
"TestBagState",
"TestCombiningState",
"TestMapState",
}

var sparkFilters = []string{
Expand Down
52 changes: 52 additions & 0 deletions sdks/go/test/integration/primitives/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package primitives

import (
"fmt"
"sort"
"strconv"
"strings"

Expand All @@ -30,6 +31,7 @@ func init() {
register.DoFn3x1[state.Provider, string, int, string](&valueStateFn{})
register.DoFn3x1[state.Provider, string, int, string](&bagStateFn{})
register.DoFn3x1[state.Provider, string, int, string](&combiningStateFn{})
register.DoFn3x1[state.Provider, string, int, string](&mapStateFn{})
register.Emitter2[string, int]()
register.Combiner1[int](&combine1{})
register.Combiner2[string, int](&combine2{})
Expand Down Expand Up @@ -252,3 +254,53 @@ func CombiningStateParDo() *beam.Pipeline {

return p
}

type mapStateFn struct {
State1 state.Map[string, int]
}

func (f *mapStateFn) ProcessElement(s state.Provider, w string, c int) string {
i, _, err := f.State1.Get(s, w)
if err != nil {
panic(err)
}
i++
err = f.State1.Put(s, w, i)
if err != nil {
panic(err)
}
err = f.State1.Put(s, fmt.Sprintf("%v%v", w, i), i)
if err != nil {
panic(err)
}
j, _, err := f.State1.Get(s, w)
if err != nil {
panic(err)
}
if i != j {
panic(fmt.Sprintf("Reading state multiple times for %v produced different results: %v != %v", w, i, j))
}

keys, _, err := f.State1.Keys(s)
if err != nil {
panic(err)
}

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

return fmt.Sprintf("%v: %v, keys: %v", w, i, keys)
}

// MapStateParDo tests a DoFn that uses value state.
func MapStateParDo() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()

in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &mapStateFn{State1: state.MakeMapState[string, int]("key1")}, keyed)
passert.Equals(s, counts, "apple: 1, keys: [apple apple1]", "pear: 1, keys: [pear pear1]", "peach: 1, keys: [peach peach1]", "apple: 2, keys: [apple apple1 apple2]", "apple: 3, keys: [apple apple1 apple2 apple3]", "pear: 2, keys: [pear pear1 pear2]")

return p
}
5 changes: 5 additions & 0 deletions sdks/go/test/integration/primitives/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func TestCombiningState(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, CombiningStateParDo())
}

func TestMapState(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, MapStateParDo())
}

0 comments on commit 78895ac

Please sign in to comment.