diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go index 7d39aa1ce88d0..788d6d52df67a 100644 --- a/sdks/go/pkg/beam/core/funcx/fn.go +++ b/sdks/go/pkg/beam/core/funcx/fn.go @@ -20,6 +20,7 @@ import ( "reflect" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -82,6 +83,8 @@ const ( FnBundleFinalization FnParamKind = 0x800 // FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator FnWatermarkEstimator FnParamKind = 0x1000 + // FnState indicates a function input parameter that implements state.Provider + FnStateProvider FnParamKind = 0x2000 ) func (k FnParamKind) String() string { @@ -112,6 +115,8 @@ func (k FnParamKind) String() string { return "BundleFinalization" case FnWatermarkEstimator: return "WatermarkEstimator" + case FnStateProvider: + return "StateProvider" default: return fmt.Sprintf("%v", int(k)) } @@ -289,6 +294,17 @@ func (u *Fn) BundleFinalization() (pos int, exists bool) { return -1, false } +// StateProvider returns (index, true) iff the function expects a +// parameter that implements state.Provider. +func (u *Fn) StateProvider() (pos int, exists bool) { + for i, p := range u.Param { + if p.Kind == FnStateProvider { + return i, true + } + } + return -1, false +} + // WatermarkEstimator returns (index, true) iff the function expects a // parameter that implements sdf.WatermarkEstimator. func (u *Fn) WatermarkEstimator() (pos int, exists bool) { @@ -374,6 +390,8 @@ func New(fn reflectx.Func) (*Fn, error) { kind = FnWindow case t == typex.BundleFinalizationType: kind = FnBundleFinalization + case t == state.ProviderType: + kind = FnStateProvider case t == reflectx.Type: kind = FnType case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()): @@ -464,7 +482,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam { } // The order of present parameters and return values must be as follows: -// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) +// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) // where ? indicates 0 or 1, and * indicates any number. // and a SideInput is one of FnValue or FnIter or FnReIter // Note: Fns with inputs must have at least one FnValue as the main input. @@ -496,6 +514,7 @@ var ( errReflectTypePrecedence = errors.New("may only have a single reflect.Type parameter and it must precede the main input parameter") errRTrackerPrecedence = errors.New("may only have a single sdf.RTracker parameter and it must precede the main input parameter") errBundleFinalizationPrecedence = errors.New("may only have a single BundleFinalization parameter and it must precede the main input parameter") + errStateProviderPrecedence = errors.New("may only have a single state.Provider parameter and it must precede the main input parameter") errInputPrecedence = errors.New("inputs parameters must precede emit function parameters") ) @@ -513,6 +532,7 @@ const ( psOutput psRTracker psBundleFinalization + psStateProvider ) func nextParamState(cur paramState, transition FnParamKind) (paramState, error) { @@ -535,6 +555,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psContext: switch transition { @@ -552,6 +574,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psPane: switch transition { @@ -567,6 +591,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psWindow: switch transition { @@ -580,6 +606,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psEventTime: switch transition { @@ -591,6 +619,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psWatermarkEstimator: switch transition { @@ -600,6 +630,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psType: switch transition { @@ -607,13 +639,22 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psBundleFinalization, nil case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psBundleFinalization: switch transition { case FnRTracker: return psRTracker, nil + case FnStateProvider: + return psStateProvider, nil } case psRTracker: + switch transition { + case FnStateProvider: + return psStateProvider, nil + } + case psStateProvider: // Completely handled by the default clause case psInput: switch transition { @@ -644,6 +685,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return -1, errBundleFinalizationPrecedence case FnRTracker: return -1, errRTrackerPrecedence + case FnStateProvider: + return -1, errStateProviderPrecedence case FnIter, FnReIter, FnValue, FnMultiMap: return psInput, nil case FnEmit: diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go index 202de64b19d1e..9eee359c1e539 100644 --- a/sdks/go/pkg/beam/core/funcx/fn_test.go +++ b/sdks/go/pkg/beam/core/funcx/fn_test.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" ) @@ -108,6 +109,11 @@ func TestNew(t *testing.T) { Fn: func(typex.PaneInfo, typex.Window, typex.EventTime, sdf.WatermarkEstimator, reflect.Type, []byte) {}, Param: []FnParamKind{FnPane, FnWindow, FnEventTime, FnWatermarkEstimator, FnType, FnValue}, }, + { + Name: "good10", + Fn: func(sdf.RTracker, state.Provider, []byte) {}, + Param: []FnParamKind{FnRTracker, FnStateProvider, FnValue}, + }, { Name: "good-method", Fn: foo{1}.Do, @@ -211,6 +217,16 @@ func TestNew(t *testing.T) { Fn: func(int, func(int), func() func(*int) bool) {}, Err: errInputPrecedence, }, + { + Name: "errInputPrecedence- StateProvider before RTracker", + Fn: func(state.Provider, sdf.RTracker, int) {}, + Err: errRTrackerPrecedence, + }, + { + Name: "errInputPrecedence- StateProvider after output", + Fn: func(int, state.Provider) {}, + Err: errStateProviderPrecedence, + }, { Name: "errInputPrecedence- input after output", Fn: func(int, func(int), int) {}, diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index 5e18e7559ec67..a9f1c8a092b0b 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -153,14 +153,15 @@ type MultiEdge struct { parent *Scope Op Opcode - DoFn *DoFn // ParDo - RestrictionCoder *coder.Coder // SplittableParDo - CombineFn *CombineFn // Combine - AccumCoder *coder.Coder // Combine - Value []byte // Impulse - External *ExternalTransform // Current External Transforms API - Payload *Payload // Legacy External Transforms API - WindowFn *window.Fn // WindowInto + DoFn *DoFn // ParDo + RestrictionCoder *coder.Coder // SplittableParDo + StateCoders map[string]*coder.Coder // Stateful ParDo + CombineFn *CombineFn // Combine + AccumCoder *coder.Coder // Combine + Value []byte // Impulse + External *ExternalTransform // Current External Transforms API + Payload *Payload // Legacy External Transforms API + WindowFn *window.Fn // WindowInto Input []*Inbound Output []*Outbound diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 6b3656c1c2526..2034349cdf223 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -282,6 +283,27 @@ func (f *DoFn) IsSplittable() bool { return ok } +// PipelineState returns a list of PipelineState objects used to access/mutate global pipeline state (if any). +func (f *DoFn) PipelineState() []state.PipelineState { + var s []state.PipelineState + if f.Recv == nil { + return s + } + + v := reflect.Indirect(reflect.ValueOf(f.Recv)) + + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + if f.CanInterface() { + if ps, ok := f.Interface().(state.PipelineState); ok { + s = append(s, ps) + } + } + } + + return s +} + // SplittableDoFn represents a DoFn implementing SDF methods. type SplittableDoFn DoFn @@ -561,7 +583,14 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) { } } - return (*DoFn)(fn), nil + doFn := (*DoFn)(fn) + + err = validateState(doFn, numMainIn) + if err != nil { + return nil, addContext(err, fn) + } + + return doFn, nil } // validateMainInputs checks that a method has the given number of main inputs @@ -1221,6 +1250,50 @@ func validateStatefulWatermarkSig(fn *Fn, numMainIn int) error { return nil } +func validateState(fn *DoFn, numIn mainInputs) error { + ps := fn.PipelineState() + + if _, hasSp := fn.methods[processElementName].StateProvider(); hasSp { + if numIn == MainSingle { + err := errors.Errorf("ProcessElement uses a StateProvider, but is not keyed") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but is not keyed. "+ + "All stateful DoFns must take a key/value pair as an input.") + } + if len(ps) == 0 { + err := errors.Errorf("ProcessElement uses a StateProvider, but noState structs are attached to the DoFn") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but no State structs are "+ + "attached to the DoFn. Ensure that you are including the State structs you're using to read/write"+ + "global state as public uppercase member variables.") + } + stateKeys := make(map[string]state.PipelineState) + for _, s := range ps { + k := s.StateKey() + if orig, ok := stateKeys[k]; ok { + err := errors.Errorf("Duplicate state key %v", k) + return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+ + "unique per DoFn", k, orig, s) + } else { + stateKeys[k] = s + } + } + + // TODO(#22736) - Remove this once state is fully supported + err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+ + "Please try upgrading to a newer release if one exists or wait for state support to be released.") + } else { + if len(ps) > 0 { + err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+ + "the DoFn: %v", ps) + return errors.SetTopLevelMsgf(err, "ProcessElement doesn't use a StateProvider, but State structs are "+ + "attached to the DoFn: %v\nEnsure that you are using the StateProvider to perform any reads or writes"+ + "of pipeline state.", ps) + } + } + + return nil +} + // CombineFn represents a CombineFn. type CombineFn Fn diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go b/sdks/go/pkg/beam/core/graph/fn_test.go index a1702175f64a7..2117c735dce23 100644 --- a/sdks/go/pkg/beam/core/graph/fn_test.go +++ b/sdks/go/pkg/beam/core/graph/fn_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" ) @@ -52,6 +53,8 @@ func TestNewDoFn(t *testing.T) { {dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)}, {dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)}, {dfn: &GoodDoFnCoGbk1wSide{}, opt: NumMainInputs(MainKv)}, + // TODO(#22736) - Enable this once stateful dofns are fully supported + // {dfn: &GoodStatefulDoFn{State1: state.Value[int](state.MakeValueState[int]("state1"))}, opt: NumMainInputs(MainKv)}, } for _, test := range tests { @@ -70,7 +73,8 @@ func TestNewDoFn(t *testing.T) { }) t.Run("invalid", func(t *testing.T) { tests := []struct { - dfn interface{} + dfn interface{} + numInputs int }{ // Validate main inputs. {dfn: func() int { return 0 }}, // No inputs. @@ -94,26 +98,44 @@ func TestNewDoFn(t *testing.T) { {dfn: &BadDoFnReturnValuesInFinishBundle{}}, {dfn: &BadDoFnReturnValuesInSetup{}}, {dfn: &BadDoFnReturnValuesInTeardown{}}, + {dfn: &BadStatefulDoFnNoStateProvider{State1: state.Value[int](state.MakeValueState[int]("state1"))}}, + {dfn: &BadStatefulDoFnNoStateFields{}}, + {dfn: &BadStatefulDoFnNoKV{State1: state.Value[int](state.MakeValueState[int]("state1"))}, numInputs: 1}, } for _, test := range tests { t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) { - if cfn, err := NewDoFn(test.dfn); err != nil { - t.Logf("NewDoFn failed as expected:\n%v", err) - } else { - t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn) - } - // If validation fails with unknown main inputs, then it should - // always fail for any known number of main inputs, so test them - // all. Error messages won't necessarily match. - if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil { - t.Logf("NewDoFn failed as expected:\n%v", err) - } else { - t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn) - } - if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil { - t.Logf("NewDoFn failed as expected:\n%v", err) - } else { - t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn) + switch test.numInputs { + case 1: + if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil { + t.Logf("NewDoFn failed as expected:\n%v", err) + } else { + t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn) + } + case 2: + if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil { + t.Logf("NewDoFn failed as expected:\n%v", err) + } else { + t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn) + } + default: + if cfn, err := NewDoFn(test.dfn); err != nil { + t.Logf("NewDoFn failed as expected:\n%v", err) + } else { + t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn) + } + // If validation fails with unknown main inputs, then it should + // always fail for any known number of main inputs, so test them + // all. Error messages won't necessarily match. + if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil { + t.Logf("NewDoFn failed as expected:\n%v", err) + } else { + t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn) + } + if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil { + t.Logf("NewDoFn failed as expected:\n%v", err) + } else { + t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn) + } } }) } @@ -1058,6 +1080,14 @@ func (fn *GoodStatefulWatermarkEstimatingKv) WatermarkEstimatorState(estimator * return 0 } +type GoodStatefulDoFn struct { + State1 state.Value[int] +} + +func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, int, int) int { + return 0 +} + // Examples of incorrect SDF signatures. // Examples with missing methods. @@ -1422,6 +1452,29 @@ func (fn *BadManualWatermarkEstimatorMismatched) CreateWatermarkEstimator() *Wat return &WatermarkEstimatorT{} } +type BadStatefulDoFnNoStateProvider struct { + State1 state.Value[int] +} + +func (fn *BadStatefulDoFnNoStateProvider) ProcessElement(int, int) int { + return 0 +} + +type BadStatefulDoFnNoStateFields struct { +} + +func (fn *BadStatefulDoFnNoStateFields) ProcessElement(state.Provider, int) int { + return 0 +} + +type BadStatefulDoFnNoKV struct { + State1 state.Value[int] +} + +func (fn *BadStatefulDoFnNoKV) ProcessElement(state.Provider, int, int) int { + return 0 +} + // Examples of correct CombineFn signatures type MyAccum struct{} diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 48387f78c346d..51f3296ca44d8 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -26,6 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -68,6 +69,21 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) { } } +type stateProvider struct { +} + +// ReadValueState reads a value state from the State API +func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) { + // TODO(#22736) - read from the state api. + return nil, nil, errors.New("Stateful DoFns are not supported yet.") +} + +// WriteValueState writes a value state to the State API +func (s *stateProvider) WriteValueState(val state.Transaction) error { + // TODO(#22736) - read from the state api. + return errors.New("Stateful DoFns are not supported yet.") +} + // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, extra ...interface{}) (*FullValue, error) { @@ -92,10 +108,11 @@ func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, b type invoker struct { fn *funcx.Fn args []interface{} + sp *stateProvider // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. - ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx int // specialized input indexes - outEtIdx, outPcIdx, outErrIdx int // specialized output indexes - in, out []int // general indexes + ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int // specialized input indexes + outEtIdx, outPcIdx, outErrIdx int // specialized output indexes + in, out []int // general indexes ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue. elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types. @@ -125,6 +142,9 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.weIdx, ok = fn.WatermarkEstimator(); !ok { n.weIdx = -1 } + if n.spIdx, ok = fn.StateProvider(); !ok { + n.spIdx = -1 + } if n.outEtIdx, ok = fn.OutEventTime(); !ok { n.outEtIdx = -1 } @@ -188,6 +208,12 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind args[n.weIdx] = we } + if n.spIdx >= 0 { + // TODO(#22736) - provide this with the variable access it needs to talk to the state api. + n.sp = &stateProvider{} + args[n.spIdx] = n.sp + } + // (2) Main input from value, if any. i := 0 if opt != nil { diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index d9d56ddf65df4..5e26155320e8d 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -404,6 +404,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { urnTruncateSizedRestrictions: var data string var sides map[string]*pipepb.SideInput + var userState map[string]*pipepb.StateSpec switch urn { case graphx.URNParDo, urnPairWithRestriction, @@ -416,6 +417,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { } data = string(pardo.GetDoFn().GetPayload()) sides = pardo.GetSideInputs() + userState = pardo.GetStateSpecs() case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert: var cmb pipepb.CombinePayload if err := proto.Unmarshal(payload, &cmb); err != nil { @@ -462,6 +464,20 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { n.PID = transform.GetUniqueName() input := unmarshalKeyedValues(transform.GetInputs()) + + if len(userState) > 0 { + stateIdToCoder := make(map[string]*coder.Coder) + for key, spec := range userState { + // TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types. + cID := spec.GetReadModifyWriteSpec().CoderId + c, err := b.coders.Coder(cID) + if err != nil { + return nil, err + } + stateIdToCoder[key] = c + } + } + for i := 1; i < len(input); i++ { // TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index 75ee58484fd18..4fe1a9e1616df 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" @@ -517,6 +518,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, bool) { return v1pb.Type_WINDOW, true case typex.BundleFinalizationType: return v1pb.Type_BUNDLEFINALIZATION, true + case state.ProviderType: + return v1pb.Type_STATEPROVIDER, true case typex.KVType: return v1pb.Type_KV, true case typex.CoGBKType: @@ -681,6 +684,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, error) { return typex.WindowType, nil case v1pb.Type_BUNDLEFINALIZATION: return typex.BundleFinalizationType, nil + case v1pb.Type_STATEPROVIDER: + return state.ProviderType, nil case v1pb.Type_KV: return typex.KVType, nil case v1pb.Type_COGBK: diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 77797d756f162..653dacce7a36a 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -70,6 +70,7 @@ const ( URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1" URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1" + URNRequiresStatefulProcessing = "beam:requirement:pardo:stateful:v1" URNTruncate = "beam:transform:sdf_truncate_sized_restrictions:v1" // Deprecated: Determine worker binary based on GoWorkerBinary Role instead. @@ -457,6 +458,29 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok { m.requirements[URNRequiresBundleFinalization] = true } + if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok { + m.requirements[URNRequiresStatefulProcessing] = true + stateSpecs := make(map[string]*pipepb.StateSpec) + for _, ps := range edge.Edge.DoFn.PipelineState() { + coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()]) + if err != nil { + return handleErr(err) + } + stateSpecs[ps.StateKey()] = &pipepb.StateSpec{ + // TODO (#22736) - make spec type and protocol conditional on type of State. Right now, assumes ValueState. + // See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635 + Spec: &pipepb.StateSpec_ReadModifyWriteSpec{ + ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{ + CoderId: coderId, + }, + }, + Protocol: &pipepb.FunctionSpec{ + Urn: "beam:user_state:bag:v1", + }, + } + } + payload.StateSpecs = stateSpecs + } spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)} annotations = edge.Edge.DoFn.Annotations() diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go index 413aa0f4d8c47..e2e1e78c06187 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go @@ -24,7 +24,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.19.4 +// protoc v3.14.0 // source: go/pkg/beam/core/runtime/graphx/v1/v1.proto package v1 @@ -218,6 +218,7 @@ const ( Type_COGBK Type_Special = 13 Type_WINDOWEDVALUE Type_Special = 14 Type_BUNDLEFINALIZATION Type_Special = 23 + Type_STATEPROVIDER Type_Special = 24 Type_T Type_Special = 15 Type_U Type_Special = 16 Type_V Type_Special = 17 @@ -240,6 +241,7 @@ var ( 13: "COGBK", 14: "WINDOWEDVALUE", 23: "BUNDLEFINALIZATION", + 24: "STATEPROVIDER", 15: "T", 16: "U", 17: "V", @@ -259,6 +261,7 @@ var ( "COGBK": 13, "WINDOWEDVALUE": 14, "BUNDLEFINALIZATION": 23, + "STATEPROVIDER": 24, "T": 15, "U": 16, "V": 17, @@ -1372,7 +1375,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{ 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, - 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xcb, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xde, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x56, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, @@ -1453,7 +1456,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{ 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x1a, 0x22, 0x27, 0x0a, 0x07, 0x43, 0x68, 0x61, 0x6e, 0x44, 0x69, 0x72, 0x12, 0x08, 0x0a, 0x04, 0x52, 0x45, 0x43, 0x56, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x45, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x42, 0x4f, 0x54, 0x48, 0x10, 0x02, 0x22, - 0xc2, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49, + 0xd5, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4c, 0x4c, 0x45, 0x47, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x59, 0x50, 0x45, 0x10, 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x45, 0x56, @@ -1462,175 +1465,176 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{ 0x05, 0x43, 0x4f, 0x47, 0x42, 0x4b, 0x10, 0x0d, 0x12, 0x11, 0x0a, 0x0d, 0x57, 0x49, 0x4e, 0x44, 0x4f, 0x57, 0x45, 0x44, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x10, 0x0e, 0x12, 0x16, 0x0a, 0x12, 0x42, 0x55, 0x4e, 0x44, 0x4c, 0x45, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x41, 0x54, 0x49, 0x4f, - 0x4e, 0x10, 0x17, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a, 0x01, 0x55, 0x10, - 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57, 0x10, 0x12, 0x12, - 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14, 0x12, 0x05, 0x0a, - 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, - 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, - 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, - 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, - 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, + 0x4e, 0x10, 0x17, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x50, 0x52, 0x4f, 0x56, + 0x49, 0x44, 0x45, 0x52, 0x10, 0x18, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a, + 0x01, 0x55, 0x10, 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57, + 0x10, 0x12, 0x12, 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14, + 0x12, 0x05, 0x0a, 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, + 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, + 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, + 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, + 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, + 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, + 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, + 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, + 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, + 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, + 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73, + 0x65, 0x72, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, + 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, + 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, + 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05, + 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, + 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, + 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, + 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, + 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x12, 0x10, 0x0a, 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67, + 0x65, 0x6e, 0x22, 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, + 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, + 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, + 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, - 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x63, 0x6f, 0x6d, - 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x46, - 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, - 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, - 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, - 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05, 0x44, 0x79, 0x6e, - 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x6f, 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12, + 0x54, 0x0a, 0x05, 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e, + 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, + 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, + 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, + 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05, + 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, + 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b, + 0x0a, 0x09, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x08, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67, + 0x61, 0x70, 0x5f, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70, + 0x4d, 0x73, 0x22, 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64, + 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, - 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67, 0x65, 0x6e, 0x22, - 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, - 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, - 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, - 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, - 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, + 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, + 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, + 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, + 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03, + 0x64, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, + 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, + 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, + 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, + 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22, + 0xba, 0x06, 0x0a, 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a, + 0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, + 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, + 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, + 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, + 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70, + 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, + 0x64, 0x65, 0x12, 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x70, - 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12, 0x54, 0x0a, 0x05, - 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e, 0x2e, 0x6f, 0x72, + 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, + 0x46, 0x6e, 0x12, 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, + 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, + 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, + 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, + 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62, + 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67, + 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, + 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, + 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, + 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f, + 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, + 0x64, 0x1a, 0xb5, 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a, + 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, - 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05, 0x64, 0x79, 0x6e, - 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12, 0x12, - 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, - 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x70, - 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, - 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67, 0x61, 0x70, 0x5f, - 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70, 0x4d, 0x73, 0x22, - 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x12, - 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, - 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, - 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, - 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, + 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, + 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, + 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, + 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, + 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69, + 0x0a, 0x09, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49, + 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e, + 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10, + 0x02, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, + 0x4d, 0x41, 0x50, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41, + 0x50, 0x10, 0x05, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a, + 0x06, 0x52, 0x45, 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74, + 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, - 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, - 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03, 0x64, 0x65, 0x63, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, - 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, - 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, - 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22, 0xba, 0x06, 0x0a, - 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a, 0x02, 0x66, 0x6e, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, - 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, - 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, - 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, - 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12, - 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, - 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, - 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, - 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x69, 0x6e, - 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12, - 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, - 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, - 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, - 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, - 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x07, 0x69, 0x6e, - 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, - 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, - 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, - 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, - 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, - 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f, 0x75, 0x74, 0x62, - 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x1a, 0xb5, - 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a, 0x04, 0x6b, 0x69, - 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, + 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, + 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d, + 0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a, + 0x01, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10, + 0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x19, 0x0a, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e, + 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, + 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, + 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, + 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, + 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x52, 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x73, 0x1a, 0x40, 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, + 0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64, + 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, - 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, - 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, - 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64, + 0x67, 0x65, 0x12, 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, - 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, - 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69, 0x0a, 0x09, 0x49, - 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, - 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e, 0x10, 0x01, 0x12, - 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10, 0x02, 0x12, 0x09, - 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, 0x4d, 0x41, 0x50, - 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41, 0x50, 0x10, 0x05, - 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a, 0x06, 0x52, 0x45, - 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, - 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, - 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, - 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, - 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, - 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d, 0x49, 0x6e, 0x6a, - 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a, 0x01, 0x6e, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x73, - 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x19, 0x0a, - 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x64, - 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, - 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, - 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, - 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x68, - 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x43, 0x6f, 0x64, - 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, - 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x1a, 0x40, - 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, - 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x50, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64, 0x67, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, + 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a, + 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65, + 0x63, 0x74, 0x12, 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, - 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64, 0x67, 0x65, 0x12, - 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, - 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, - 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, - 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74, - 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x12, - 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, - 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, - 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, - 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, - 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x09, 0x72, - 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x62, 0x65, - 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b, - 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x75, 0x6e, 0x74, - 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x52, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f, + 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72, + 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31, + 0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto index ef84c20887a2d..5046994707bff 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto +++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto @@ -114,6 +114,8 @@ message Type { BUNDLEFINALIZATION = 23; + STATEPROVIDER = 24; + T = 15; U = 16; V = 17; diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go new file mode 100644 index 0000000000000..62fd45099368d --- /dev/null +++ b/sdks/go/pkg/beam/core/state/state.go @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package state contains structs for reading and manipulating pipeline state. +package state + +import ( + "reflect" +) + +type TransactionType_Enum int32 + +const ( + TransactionType_Set TransactionType_Enum = 0 + TransactionType_Clear TransactionType_Enum = 1 +) + +var ( + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +// TODO(#20510) - add other forms of state (MapState, BagState, CombiningState), prefetch, and clear. + +// Transaction is used to represent a pending state transaction. This should not be manipulated directly; +// it is primarily used for implementations of the Provider interface to talk to the various State objects. +type Transaction struct { + Key string + Type TransactionType_Enum + Val interface{} +} + +// Provider represents the DoFn parameter used to get and manipulate pipeline state +// stored as key value pairs (https://beam.apache.org/documentation/programming-guide/#state-and-timers). +// This should not be manipulated directly. Instead it should be used as a parameter +// to functions on State objects like state.Value. +type Provider interface { + ReadValueState(id string) (interface{}, []Transaction, error) + WriteValueState(val Transaction) error +} + +// PipelineState is an interface representing different kinds of PipelineState (currently just state.Value). +// It is primarily meant for Beam packages to use and is probably not useful for most pipeline authors. +type PipelineState interface { + StateKey() string + CoderType() reflect.Type +} + +// Value is used to read and write global pipeline state representing a single value. +// Key represents the key used to lookup this state. +type Value[T any] struct { + Key string +} + +// Write is used to write this instance of global pipeline state representing a single value. +func (s *Value[T]) Write(p Provider, val T) error { + return p.WriteValueState(Transaction{ + Key: s.Key, + Type: TransactionType_Set, + Val: val, + }) +} + +// Read is used to read this instance of global pipeline state representing a single value. +// When a value is not found, returns the 0 value and false. +func (s *Value[T]) Read(p Provider) (T, bool, error) { + // This replays any writes that have happened to this value since we last read + // For more detail, see "State Transactionality" below for buffered transactions + cur, bufferedTransactions, err := p.ReadValueState(s.Key) + if err != nil { + var val T + return val, false, err + } + for _, t := range bufferedTransactions { + switch t.Type { + case TransactionType_Set: + cur = t.Val + case TransactionType_Clear: + cur = nil + } + } + if cur == nil { + var val T + return val, false, nil + } + return cur.(T), true, nil +} + +// StateKey returns the key for this pipeline state entry. +func (s Value[T]) StateKey() string { + if s.Key == "" { + // TODO(#22736) - infer the state from the member variable name during pipeline construction. + panic("Value state exists on struct but has not been initialized with a key.") + } + return s.Key +} + +// CoderType returns the type of the value state which should be used for a coder. +func (s Value[T]) CoderType() reflect.Type { + var t T + return reflect.TypeOf(t) +} + +// MakeValueState is a factory function to create an instance of ValueState with the given key. +func MakeValueState[T any](k string) Value[T] { + return Value[T]{ + Key: k, + } +} diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go new file mode 100644 index 0000000000000..b1297891c1c10 --- /dev/null +++ b/sdks/go/pkg/beam/core/state/state_test.go @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package state + +import ( + "errors" + "testing" +) + +var ( + errFake = errors.New("fake error") +) + +type fakeProvider struct { + initialState map[string]interface{} + transactions map[string][]Transaction + err map[string]error +} + +func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) { + if err, ok := s.err[userStateId]; ok { + return nil, nil, err + } + base := s.initialState[userStateId] + trans, ok := s.transactions[userStateId] + if !ok { + trans = []Transaction{} + } + return base, trans, nil +} + +func (s *fakeProvider) WriteValueState(val Transaction) error { + if transactions, ok := s.transactions[val.Key]; ok { + s.transactions[val.Key] = append(transactions, val) + } else { + s.transactions[val.Key] = []Transaction{val} + } + return nil +} + +func TestValueRead(t *testing.T) { + is := make(map[string]interface{}) + ts := make(map[string][]Transaction) + es := make(map[string]error) + is["no_transactions"] = 1 + ts["no_transactions"] = nil + is["basic_set"] = 1 + ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}} + is["basic_clear"] = 1 + ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}} + is["set_then_clear"] = 1 + ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}} + is["set_then_clear_then_set"] = 1 + ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}} + is["err"] = 1 + ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}} + es["err"] = errFake + + f := fakeProvider{ + initialState: is, + transactions: ts, + err: es, + } + + var tests = []struct { + vs Value[int] + val int + ok bool + err error + }{ + {MakeValueState[int]("no_transactions"), 1, true, nil}, + {MakeValueState[int]("basic_set"), 3, true, nil}, + {MakeValueState[int]("basic_clear"), 0, false, nil}, + {MakeValueState[int]("set_then_clear"), 0, false, nil}, + {MakeValueState[int]("set_then_clear_then_set"), 4, true, nil}, + {MakeValueState[int]("err"), 0, false, errFake}, + } + + for _, tt := range tests { + val, ok, err := tt.vs.Read(&f) + if err != nil && tt.err == nil { + t.Errorf("Value.Read() returned error %v for state key %v when it shouldn't have", err, tt.vs.Key) + } else if err == nil && tt.err != nil { + t.Errorf("Value.Read() returned no error for state key %v when it should have returned %v", tt.vs.Key, err) + } else if ok && !tt.ok { + t.Errorf("Value.Read() returned a value %v for state key %v when it shouldn't have", val, tt.vs.Key) + } else if !ok && tt.ok { + t.Errorf("Value.Read() didn't return a value for state key %v when it should have returned %v", tt.vs.Key, tt.val) + } else if val != tt.val { + t.Errorf("Value.Read()=%v, want %v for state key %v", val, tt.val, tt.vs.Key) + } + } +} + +func TestValueWrite(t *testing.T) { + var tests = []struct { + writes []int + val int + ok bool + }{ + {[]int{}, 0, false}, + {[]int{3}, 3, true}, + {[]int{1, 5}, 5, true}, + } + + for _, tt := range tests { + f := fakeProvider{ + initialState: make(map[string]interface{}), + transactions: make(map[string][]Transaction), + err: make(map[string]error), + } + vs := MakeValueState[int]("vs") + for _, val := range tt.writes { + vs.Write(&f, val) + } + val, ok, err := vs.Read(&f) + if err != nil { + t.Errorf("Value.Read() returned error %v when it shouldn't have after writing: %v", err, tt.writes) + } else if ok && !tt.ok { + t.Errorf("Value.Read() returned a value %v when it shouldn't have after writing: %v", val, tt.writes) + } else if !ok && tt.ok { + t.Errorf("Value.Read() didn't return a value when it should have returned %v after writing: %v", tt.val, tt.writes) + } else if val != tt.val { + t.Errorf("Value.Read()=%v, want %v after writing: %v", val, tt.val, tt.writes) + } + } +} diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index 66fca00838dc4..dcdb3e74d1e44 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -92,6 +92,19 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo return nil, addParDoCtx(err, s) } + pipelineState := fn.PipelineState() + if len(pipelineState) > 0 { + edge.StateCoders = make(map[string]*coder.Coder) + for _, ps := range pipelineState { + sT := typex.New(ps.CoderType()) + c, err := inferCoder(sT) + if err != nil { + return nil, addParDoCtx(err, s) + } + edge.StateCoders[ps.StateKey()] = c + } + } + var ret []PCollection for _, out := range edge.Output { c := PCollection{out.To}