Skip to content

Commit

Permalink
Automatically infer state keys from their field name (#22922)
Browse files Browse the repository at this point in the history
* Automatically infer state keys from their field name

* Exercise with units

* Merge in master and fix the StateKey race
  • Loading branch information
damccorm authored Aug 29, 2022
1 parent 45cf3c4 commit a60105a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 24 deletions.
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,21 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) {
return nil, addContext(err, fn)
}

// Make sure that all state entries have keys. If they don't set them to the struct field name.
if fn.Recv != nil {
v := reflect.Indirect(reflect.ValueOf(fn.Recv))
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
if f.CanInterface() {
if ps, ok := f.Interface().(state.PipelineState); ok {
if ps.StateKey() == "" {
f.FieldByName("Key").SetString(v.Type().Field(i).Name)
}
}
}
}
}

// Validate ProcessElement has correct number of main inputs (as indicated by
// numMainIn), and that main inputs are before side inputs.
processFn := fn.methods[processElementName]
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/graph/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func TestNewDoFn(t *testing.T) {
{dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)},
{dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)},
{dfn: &GoodDoFnCoGbk1wSide{}, opt: NumMainInputs(MainKv)},
{dfn: &GoodStatefulDoFn{State1: state.MakeValueState[int]("state1")}, opt: NumMainInputs(MainKv)},
{dfn: &GoodStatefulDoFn2{State1: state.MakeBagState[int]("state1")}, opt: NumMainInputs(MainKv)},
{dfn: &GoodStatefulDoFn{}, opt: NumMainInputs(MainKv)},
{dfn: &GoodStatefulDoFn2{}, opt: NumMainInputs(MainKv)},
{dfn: &GoodStatefulDoFn3{State1: state.MakeCombiningState[int, int, int]("state1", func(a, b int) int {
return a * b
})}, opt: NumMainInputs(MainKv)},
Expand Down
20 changes: 0 additions & 20 deletions sdks/go/pkg/beam/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ func (s *Value[T]) Clear(p Provider) error {

// StateKey returns the key for this pipeline state entry.
func (s Value[T]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

Expand Down Expand Up @@ -232,10 +228,6 @@ func (s *Bag[T]) Clear(p Provider) error {

// StateKey returns the key for this pipeline state entry.
func (s Bag[T]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

Expand Down Expand Up @@ -381,10 +373,6 @@ func (s *Combining[T1, T2, T3]) readAccumulator(p Provider) (interface{}, bool,

// StateKey returns the key for this pipeline state entry.
func (s Combining[T1, T2, T3]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

Expand Down Expand Up @@ -515,10 +503,6 @@ func (s *Map[K, V]) Get(p Provider, key K) (V, bool, error) {

// StateKey returns the key for this pipeline state entry.
func (s Map[K, V]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

Expand Down Expand Up @@ -638,10 +622,6 @@ func (s *Set[K]) Contains(p Provider, key K) (bool, error) {

// StateKey returns the key for this pipeline state entry.
func (s Set[K]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

Expand Down
4 changes: 2 additions & 2 deletions sdks/go/test/integration/primitives/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func ValueStateParDo() *beam.Pipeline {
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &valueStateFn{State1: state.MakeValueState[int]("key1"), State2: state.MakeValueState[string]("key2")}, keyed)
counts := beam.ParDo(s, &valueStateFn{}, keyed)
passert.Equals(s, counts, "apple: 1, I", "pear: 1, I", "peach: 1, I", "apple: 2, II", "apple: 3, III", "pear: 2, II")

return p
Expand Down Expand Up @@ -184,7 +184,7 @@ func BagStateParDo() *beam.Pipeline {
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &bagStateFn{State1: state.MakeBagState[int]("key1"), State2: state.MakeBagState[string]("key2")}, keyed)
counts := beam.ParDo(s, &bagStateFn{}, keyed)
passert.Equals(s, counts, "apple: 0, ", "pear: 0, ", "peach: 0, ", "apple: 1, I", "apple: 2, I,I", "pear: 1, I")

return p
Expand Down

0 comments on commit a60105a

Please sign in to comment.