From 1fedf11b349c30d9d021b266b64b58b7ff7d4208 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 24 Aug 2022 09:20:26 -0400 Subject: [PATCH 1/2] Fix a few linting issues --- .../pkg/beam/core/runtime/exec/userstate.go | 40 +++++++++---------- sdks/go/test/integration/primitives/state.go | 4 +- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/userstate.go b/sdks/go/pkg/beam/core/runtime/exec/userstate.go index 21fab583b8ec3..05105cda78f68 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/userstate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/userstate.go @@ -162,42 +162,40 @@ func (s *stateProvider) WriteBagState(val state.Transaction) error { func (s *stateProvider) getReader(userStateID string) (io.ReadCloser, error) { if r, ok := s.readersByKey[userStateID]; ok { return r, nil - } else { - r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window) - if err != nil { - return nil, err - } - s.readersByKey[userStateID] = r - return s.readersByKey[userStateID], nil } + r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window) + if err != nil { + return nil, err + } + s.readersByKey[userStateID] = r + return s.readersByKey[userStateID], nil } func (s *stateProvider) getAppender(userStateID string) (io.Writer, error) { if w, ok := s.appendersByKey[userStateID]; ok { return w, nil - } else { - w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window) - if err != nil { - return nil, err - } - s.appendersByKey[userStateID] = w - return s.appendersByKey[userStateID], nil } + w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window) + if err != nil { + return nil, err + } + s.appendersByKey[userStateID] = w + return s.appendersByKey[userStateID], nil } func (s *stateProvider) getClearer(userStateID string) (io.Writer, error) { if w, ok := s.clearersByKey[userStateID]; ok { return w, nil - } else { - w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window) - if err != nil { - return nil, err - } - s.clearersByKey[userStateID] = w - return s.clearersByKey[userStateID], nil } + w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window) + if err != nil { + return nil, err + } + s.clearersByKey[userStateID] = w + return s.clearersByKey[userStateID], nil } +// UserStateAdapter provides a state provider to be used for user state. type UserStateAdapter interface { NewStateProvider(ctx context.Context, reader StateReader, w typex.Window, element interface{}) (stateProvider, error) } diff --git a/sdks/go/test/integration/primitives/state.go b/sdks/go/test/integration/primitives/state.go index ed23f1784c118..d67f5e3605df9 100644 --- a/sdks/go/test/integration/primitives/state.go +++ b/sdks/go/test/integration/primitives/state.go @@ -43,7 +43,7 @@ func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string if !ok { i = 1 } - f.State1.Write(s, i+1) + err = f.State1.Write(s, i+1) if err != nil { panic(err) } @@ -55,7 +55,7 @@ func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string if !ok { j = "I" } - f.State2.Write(s, j+"I") + err = f.State2.Write(s, j+"I") if err != nil { panic(err) } From 5e5b3a1b2f08d4a8789be995c96304f49fab353d Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Wed, 24 Aug 2022 09:38:51 -0400 Subject: [PATCH 2/2] Avoid stuttering names --- sdks/go/pkg/beam/core/graph/fn.go | 2 +- .../pkg/beam/core/runtime/graphx/translate.go | 4 ++-- sdks/go/pkg/beam/core/state/state.go | 22 +++++++++---------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 78aab66de8150..9a9f517862239 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -1274,7 +1274,7 @@ func validateState(fn *DoFn, numIn mainInputs) error { "unique per DoFn", k, orig, s) } t := s.StateType() - if t != state.StateTypeValue && t != state.StateTypeBag { + if t != state.TypeValue && t != state.TypeBag { err := errors.Errorf("Unrecognized state type %v for state %v", t, s) return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+ "type is state.Value and state.Bag", t, s) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 4d113ec2202f7..63972be645a85 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { return handleErr(err) } switch ps.StateType() { - case state.StateTypeValue: + case state.TypeValue: stateSpecs[ps.StateKey()] = &pipepb.StateSpec{ Spec: &pipepb.StateSpec_ReadModifyWriteSpec{ ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{ @@ -482,7 +482,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { Urn: URNBagUserState, }, } - case state.StateTypeBag: + case state.TypeBag: stateSpecs[ps.StateKey()] = &pipepb.StateSpec{ Spec: &pipepb.StateSpec_BagSpec{ BagSpec: &pipepb.BagStateSpec{ diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go index 8877260afb843..d655554e825b7 100644 --- a/sdks/go/pkg/beam/core/state/state.go +++ b/sdks/go/pkg/beam/core/state/state.go @@ -23,8 +23,8 @@ import ( // TransactionTypeEnum represents the type of state transaction (e.g. set, clear) type TransactionTypeEnum int32 -// StateTypeEnum represents the type of a state instance (e.g. value, bag, etc...) -type StateTypeEnum int32 +// TypeEnum represents the type of a state instance (e.g. value, bag, etc...) +type TypeEnum int32 const ( // TransactionTypeSet is the set transaction type @@ -33,10 +33,10 @@ const ( TransactionTypeClear TransactionTypeEnum = 1 // TransactionTypeAppend is the append transaction type TransactionTypeAppend TransactionTypeEnum = 2 - // StateTypeValue represents a value state - StateTypeValue StateTypeEnum = 0 - // StateTypeBag represents a bag state - StateTypeBag StateTypeEnum = 1 + // TypeValue represents a value state + TypeValue TypeEnum = 0 + // TypeBag represents a bag state + TypeBag TypeEnum = 1 ) var ( @@ -70,7 +70,7 @@ type Provider interface { type PipelineState interface { StateKey() string CoderType() reflect.Type - StateType() StateTypeEnum + StateType() TypeEnum } // Value is used to read and write global pipeline state representing a single value. @@ -129,8 +129,8 @@ func (s Value[T]) CoderType() reflect.Type { } // StateType returns the type of the state (in this case always Value). -func (s Value[T]) StateType() StateTypeEnum { - return StateTypeValue +func (s Value[T]) StateType() TypeEnum { + return TypeValue } // MakeValueState is a factory function to create an instance of ValueState with the given key. @@ -199,8 +199,8 @@ func (s Bag[T]) CoderType() reflect.Type { } // StateType returns the type of the state (in this case always Bag). -func (s Bag[T]) StateType() StateTypeEnum { - return StateTypeBag +func (s Bag[T]) StateType() TypeEnum { + return TypeBag } // MakeBagState is a factory function to create an instance of BagState with the given key.