Skip to content

Commit

Permalink
Fix a few linting issues (#22842)
Browse files Browse the repository at this point in the history
* Fix a few linting issues

* Avoid stuttering names
  • Loading branch information
damccorm authored Aug 24, 2022
1 parent 702ce76 commit 80ced0f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func validateState(fn *DoFn, numIn mainInputs) error {
"unique per DoFn", k, orig, s)
}
t := s.StateType()
if t != state.StateTypeValue && t != state.StateTypeBag {
if t != state.TypeValue && t != state.TypeBag {
err := errors.Errorf("Unrecognized state type %v for state %v", t, s)
return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+
"type is state.Value and state.Bag", t, s)
Expand Down
40 changes: 19 additions & 21 deletions sdks/go/pkg/beam/core/runtime/exec/userstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,42 +162,40 @@ func (s *stateProvider) WriteBagState(val state.Transaction) error {
func (s *stateProvider) getReader(userStateID string) (io.ReadCloser, error) {
if r, ok := s.readersByKey[userStateID]; ok {
return r, nil
} else {
r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.readersByKey[userStateID] = r
return s.readersByKey[userStateID], nil
}
r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.readersByKey[userStateID] = r
return s.readersByKey[userStateID], nil
}

func (s *stateProvider) getAppender(userStateID string) (io.Writer, error) {
if w, ok := s.appendersByKey[userStateID]; ok {
return w, nil
} else {
w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.appendersByKey[userStateID] = w
return s.appendersByKey[userStateID], nil
}
w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.appendersByKey[userStateID] = w
return s.appendersByKey[userStateID], nil
}

func (s *stateProvider) getClearer(userStateID string) (io.Writer, error) {
if w, ok := s.clearersByKey[userStateID]; ok {
return w, nil
} else {
w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.clearersByKey[userStateID] = w
return s.clearersByKey[userStateID], nil
}
w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.clearersByKey[userStateID] = w
return s.clearersByKey[userStateID], nil
}

// UserStateAdapter provides a state provider to be used for user state.
type UserStateAdapter interface {
NewStateProvider(ctx context.Context, reader StateReader, w typex.Window, element interface{}) (stateProvider, error)
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
return handleErr(err)
}
switch ps.StateType() {
case state.StateTypeValue:
case state.TypeValue:
stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
Expand All @@ -482,7 +482,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
Urn: URNBagUserState,
},
}
case state.StateTypeBag:
case state.TypeBag:
stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
Spec: &pipepb.StateSpec_BagSpec{
BagSpec: &pipepb.BagStateSpec{
Expand Down
22 changes: 11 additions & 11 deletions sdks/go/pkg/beam/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
// TransactionTypeEnum represents the type of state transaction (e.g. set, clear)
type TransactionTypeEnum int32

// StateTypeEnum represents the type of a state instance (e.g. value, bag, etc...)
type StateTypeEnum int32
// TypeEnum represents the type of a state instance (e.g. value, bag, etc...)
type TypeEnum int32

const (
// TransactionTypeSet is the set transaction type
Expand All @@ -33,10 +33,10 @@ const (
TransactionTypeClear TransactionTypeEnum = 1
// TransactionTypeAppend is the append transaction type
TransactionTypeAppend TransactionTypeEnum = 2
// StateTypeValue represents a value state
StateTypeValue StateTypeEnum = 0
// StateTypeBag represents a bag state
StateTypeBag StateTypeEnum = 1
// TypeValue represents a value state
TypeValue TypeEnum = 0
// TypeBag represents a bag state
TypeBag TypeEnum = 1
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ type Provider interface {
type PipelineState interface {
StateKey() string
CoderType() reflect.Type
StateType() StateTypeEnum
StateType() TypeEnum
}

// Value is used to read and write global pipeline state representing a single value.
Expand Down Expand Up @@ -129,8 +129,8 @@ func (s Value[T]) CoderType() reflect.Type {
}

// StateType returns the type of the state (in this case always Value).
func (s Value[T]) StateType() StateTypeEnum {
return StateTypeValue
func (s Value[T]) StateType() TypeEnum {
return TypeValue
}

// MakeValueState is a factory function to create an instance of ValueState with the given key.
Expand Down Expand Up @@ -199,8 +199,8 @@ func (s Bag[T]) CoderType() reflect.Type {
}

// StateType returns the type of the state (in this case always Bag).
func (s Bag[T]) StateType() StateTypeEnum {
return StateTypeBag
func (s Bag[T]) StateType() TypeEnum {
return TypeBag
}

// MakeBagState is a factory function to create an instance of BagState with the given key.
Expand Down

0 comments on commit 80ced0f

Please sign in to comment.