Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a few linting issues #22842

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/graph/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func validateState(fn *DoFn, numIn mainInputs) error {
"unique per DoFn", k, orig, s)
}
t := s.StateType()
if t != state.StateTypeValue && t != state.StateTypeBag {
if t != state.TypeValue && t != state.TypeBag {
err := errors.Errorf("Unrecognized state type %v for state %v", t, s)
return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+
"type is state.Value and state.Bag", t, s)
Expand Down
40 changes: 19 additions & 21 deletions sdks/go/pkg/beam/core/runtime/exec/userstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,42 +162,40 @@ func (s *stateProvider) WriteBagState(val state.Transaction) error {
func (s *stateProvider) getReader(userStateID string) (io.ReadCloser, error) {
if r, ok := s.readersByKey[userStateID]; ok {
return r, nil
} else {
r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.readersByKey[userStateID] = r
return s.readersByKey[userStateID], nil
}
r, err := s.sr.OpenBagUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.readersByKey[userStateID] = r
return s.readersByKey[userStateID], nil
}

func (s *stateProvider) getAppender(userStateID string) (io.Writer, error) {
if w, ok := s.appendersByKey[userStateID]; ok {
return w, nil
} else {
w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.appendersByKey[userStateID] = w
return s.appendersByKey[userStateID], nil
}
w, err := s.sr.OpenBagUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.appendersByKey[userStateID] = w
return s.appendersByKey[userStateID], nil
}

func (s *stateProvider) getClearer(userStateID string) (io.Writer, error) {
if w, ok := s.clearersByKey[userStateID]; ok {
return w, nil
} else {
w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.clearersByKey[userStateID] = w
return s.clearersByKey[userStateID], nil
}
w, err := s.sr.OpenBagUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window)
if err != nil {
return nil, err
}
s.clearersByKey[userStateID] = w
return s.clearersByKey[userStateID], nil
}

// UserStateAdapter provides a state provider to be used for user state.
type UserStateAdapter interface {
NewStateProvider(ctx context.Context, reader StateReader, w typex.Window, element interface{}) (stateProvider, error)
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
return handleErr(err)
}
switch ps.StateType() {
case state.StateTypeValue:
case state.TypeValue:
stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
Expand All @@ -482,7 +482,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
Urn: URNBagUserState,
},
}
case state.StateTypeBag:
case state.TypeBag:
stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
Spec: &pipepb.StateSpec_BagSpec{
BagSpec: &pipepb.BagStateSpec{
Expand Down
22 changes: 11 additions & 11 deletions sdks/go/pkg/beam/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
// TransactionTypeEnum represents the type of state transaction (e.g. set, clear)
type TransactionTypeEnum int32

// StateTypeEnum represents the type of a state instance (e.g. value, bag, etc...)
type StateTypeEnum int32
// TypeEnum represents the type of a state instance (e.g. value, bag, etc...)
type TypeEnum int32

const (
// TransactionTypeSet is the set transaction type
Expand All @@ -33,10 +33,10 @@ const (
TransactionTypeClear TransactionTypeEnum = 1
// TransactionTypeAppend is the append transaction type
TransactionTypeAppend TransactionTypeEnum = 2
// StateTypeValue represents a value state
StateTypeValue StateTypeEnum = 0
// StateTypeBag represents a bag state
StateTypeBag StateTypeEnum = 1
// TypeValue represents a value state
TypeValue TypeEnum = 0
// TypeBag represents a bag state
TypeBag TypeEnum = 1
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ type Provider interface {
type PipelineState interface {
StateKey() string
CoderType() reflect.Type
StateType() StateTypeEnum
StateType() TypeEnum
}

// Value is used to read and write global pipeline state representing a single value.
Expand Down Expand Up @@ -129,8 +129,8 @@ func (s Value[T]) CoderType() reflect.Type {
}

// StateType returns the type of the state (in this case always Value).
func (s Value[T]) StateType() StateTypeEnum {
return StateTypeValue
func (s Value[T]) StateType() TypeEnum {
return TypeValue
}

// MakeValueState is a factory function to create an instance of ValueState with the given key.
Expand Down Expand Up @@ -199,8 +199,8 @@ func (s Bag[T]) CoderType() reflect.Type {
}

// StateType returns the type of the state (in this case always Bag).
func (s Bag[T]) StateType() StateTypeEnum {
return StateTypeBag
func (s Bag[T]) StateType() TypeEnum {
return TypeBag
}

// MakeBagState is a factory function to create an instance of BagState with the given key.
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/test/integration/primitives/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string
if !ok {
i = 1
}
f.State1.Write(s, i+1)
err = f.State1.Write(s, i+1)
if err != nil {
panic(err)
}
Expand All @@ -55,7 +55,7 @@ func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) string
if !ok {
j = "I"
}
f.State2.Write(s, j+"I")
err = f.State2.Write(s, j+"I")
if err != nil {
panic(err)
}
Expand Down