From 93ec87cce00e68ad45e8615b59301e5310fc9559 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Thu, 6 Jun 2024 19:14:56 +0200 Subject: [PATCH 1/9] port stf --- server/v2/stf/branch/writer_map.go | 40 +++++++++++++++++++++++----- server/v2/stf/core_router_service.go | 24 +++++++++++------ server/v2/stf/stf.go | 26 +++++++++++++----- 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/server/v2/stf/branch/writer_map.go b/server/v2/stf/branch/writer_map.go index b624a4d2532c..8e7edf0e4104 100644 --- a/server/v2/stf/branch/writer_map.go +++ b/server/v2/stf/branch/writer_map.go @@ -53,21 +53,47 @@ func (b WriterMap) ApplyStateChanges(stateChanges []store.StateChanges) error { return nil } +// GetStateChanges returns the state changes for all actors in the WriterMap, including all direct +// ancesotors from which this WriterMap was derived. +// See WriterMap.recurseStateChanges for more details. +// Subject to possible renaming to ensure a developer can retrieve only changes in *this* branch +// context (not ancestors) if that is desired. +// see: https://github.com/cosmos/cosmos-sdk/pull/20412#discussion_r1618771230 func (b WriterMap) GetStateChanges() ([]store.StateChanges, error) { - sc := make([]store.StateChanges, len(b.branchedWriterState)) - for account, stateChange := range b.branchedWriterState { - kvChanges, err := stateChange.ChangeSets() - if err != nil { - return nil, err - } + var ( + changes = make(map[string][]store.KVPair) + sc []store.StateChanges + ) + if err := b.recurseStateChanges(changes); err != nil { + return nil, err + } + + for account, kvPairs := range changes { sc = append(sc, store.StateChanges{ Actor: []byte(account), - StateChanges: kvChanges, + StateChanges: kvPairs, }) } return sc, nil } +func (b WriterMap) recurseStateChanges(changes map[string][]store.KVPair) error { + // depth first + if wr, ok := b.state.(WriterMap); ok { + if err := wr.recurseStateChanges(changes); err != nil { + return err + } + } + for account, stateChange := range b.branchedWriterState { + kvChanges, err := stateChange.ChangeSets() + if err != nil { + return err + } + changes[account] = append(changes[account], kvChanges...) + } + return nil +} + func (b WriterMap) applyStateChange(sc store.StateChanges) error { writableState, err := b.GetWriter(sc.Actor) if err != nil { diff --git a/server/v2/stf/core_router_service.go b/server/v2/stf/core_router_service.go index f64e90cc285d..15da47e87cdc 100644 --- a/server/v2/stf/core_router_service.go +++ b/server/v2/stf/core_router_service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "strings" "google.golang.org/protobuf/runtime/protoiface" @@ -61,14 +62,8 @@ func (m *msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.Mes // NewQueryRouterService implements router.Service. func NewQueryRouterService(queryRouterBuilder *MsgRouterBuilder) router.Service { - queryRouter, err := queryRouterBuilder.Build() - if err != nil { - panic(fmt.Errorf("cannot create queryRouter: %w", err)) - } - return &queryRouterService{ builder: queryRouterBuilder, - handler: queryRouter, } } @@ -100,8 +95,21 @@ func (m *queryRouterService) InvokeTyped( ctx context.Context, req, resp protoiface.MessageV1, ) error { - // see https://github.com/cosmos/cosmos-sdk/pull/20349 - panic("not implemented") + // TODO lazy initialization is ugly and not thread safe. we don't want to check a mutex on every InvokeTyped either. + if m.handler == nil { + var err error + m.handler, err = m.builder.Build() + if err != nil { + return fmt.Errorf("cannot create queryRouter: %w", err) + } + } + // reflection is required, see https://github.com/cosmos/cosmos-sdk/pull/20349 + res, err := m.handler(ctx, req) + if err != nil { + return err + } + reflect.Indirect(reflect.ValueOf(resp)).Set(reflect.Indirect(reflect.ValueOf(res))) + return nil } // InvokeUntyped execute a message and returns a response. diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index 4848a8c765ab..fd6d434c91c6 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -7,6 +7,7 @@ import ( appmanager "cosmossdk.io/core/app" appmodulev2 "cosmossdk.io/core/appmodule/v2" + corecontext "cosmossdk.io/core/context" "cosmossdk.io/core/event" "cosmossdk.io/core/gas" "cosmossdk.io/core/header" @@ -17,6 +18,9 @@ import ( "cosmossdk.io/server/v2/stf/internal" ) +// Identity defines STF's bytes identity and it's used by STF to store things in its own state. +var Identity = []byte("stf") + // STF is a struct that manages the state transition component of the app. type STF[T transaction.Tx] struct { logger log.Logger @@ -108,10 +112,15 @@ func (s STF[T]) DeliverBlock( // reset events exCtx.events = make([]event.Event, 0) + // begin block - beginBlockEvents, err := s.beginBlock(exCtx) - if err != nil { - return nil, nil, err + var beginBlockEvents []event.Event + if !block.IsGenesis { + // begin block + beginBlockEvents, err = s.beginBlock(exCtx) + if err != nil { + return nil, nil, err + } } // check if we need to return early @@ -401,11 +410,13 @@ func (s STF[T]) validatorUpdates( return ctx.events, valSetUpdates, nil } -const headerInfoPrefix = 0x0 +const headerInfoPrefix = 0x37 // setHeaderInfo sets the header info in the state to be used by queries in the future. func (s STF[T]) setHeaderInfo(state store.WriterMap, headerInfo header.Info) error { - runtimeStore, err := state.GetWriter(appmanager.RuntimeIdentity) + // TODO storing header info is too low level here, stf should be stateless. + // We should have a keeper that does this. + runtimeStore, err := state.GetWriter(Identity) if err != nil { return err } @@ -422,7 +433,7 @@ func (s STF[T]) setHeaderInfo(state store.WriterMap, headerInfo header.Info) err // getHeaderInfo gets the header info from the state. It should only be used for queries func (s STF[T]) getHeaderInfo(state store.WriterMap) (i header.Info, err error) { - runtimeStore, err := state.GetWriter(appmanager.RuntimeIdentity) + runtimeStore, err := state.GetWriter(Identity) if err != nil { return header.Info{}, err } @@ -579,11 +590,12 @@ func (s STF[T]) makeContext( store store.WriterMap, execMode transaction.ExecMode, ) *executionContext { + valuedCtx := context.WithValue(ctx, corecontext.ExecModeKey, execMode) return newExecutionContext( s.makeGasMeter, s.makeGasMeteredState, s.branchFn, - ctx, + valuedCtx, sender, store, execMode, From ed0415f8a0c0926b72f8527badd17e223b031edf Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Mon, 17 Jun 2024 11:00:17 +0200 Subject: [PATCH 2/9] apply comment from matt --- runtime/v2/module.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/v2/module.go b/runtime/v2/module.go index 042f4cadf7d6..4030f2050afd 100644 --- a/runtime/v2/module.go +++ b/runtime/v2/module.go @@ -161,11 +161,10 @@ func SetupAppBuilder(inputs AppInputs) { app.moduleManager.RegisterInterfaces(inputs.InterfaceRegistrar) app.moduleManager.RegisterLegacyAminoCodec(inputs.LegacyAmino) - // TODO: this is a bit of a hack, but it's the only way to get the store keys into the app - // registerStoreKey could instead set this on StoreOptions directly if inputs.StoreOptions != nil { inputs.AppBuilder.storeOptions = inputs.StoreOptions inputs.AppBuilder.storeOptions.StoreKeys = inputs.AppBuilder.app.storeKeys + inputs.AppBuilder.storeOptions.StoreKeys = append(inputs.AppBuilder.storeOptions.StoreKeys, "stf") } } From a457284eed989f978a7dc17ad8b6aab95c977a82 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Tue, 18 Jun 2024 08:52:42 +0200 Subject: [PATCH 3/9] godoc --- server/v2/stf/branch/writer_map.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/v2/stf/branch/writer_map.go b/server/v2/stf/branch/writer_map.go index 8e7edf0e4104..ebe02f4e7b0b 100644 --- a/server/v2/stf/branch/writer_map.go +++ b/server/v2/stf/branch/writer_map.go @@ -77,6 +77,8 @@ func (b WriterMap) GetStateChanges() ([]store.StateChanges, error) { return sc, nil } +// recurseStateChanges will recursively collect state changes from the tree of +// WriterMap's and write them to the `changes` map. func (b WriterMap) recurseStateChanges(changes map[string][]store.KVPair) error { // depth first if wr, ok := b.state.(WriterMap); ok { From b92eb8c307fc2fc6d7abf20911110f4d44ba356b Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Tue, 18 Jun 2024 23:13:32 +0200 Subject: [PATCH 4/9] fix stf --- runtime/v2/builder.go | 21 ++---- runtime/v2/module.go | 4 +- server/v2/stf/core_branch_service_test.go | 13 ++-- server/v2/stf/core_router_service.go | 92 ++++++----------------- server/v2/stf/stf.go | 51 ++++++++----- server/v2/stf/stf_router.go | 47 +++++++++--- server/v2/stf/stf_test.go | 57 +++++++++++--- 7 files changed, 153 insertions(+), 132 deletions(-) diff --git a/runtime/v2/builder.go b/runtime/v2/builder.go index 0898f29bed40..42d17f779e2f 100644 --- a/runtime/v2/builder.go +++ b/runtime/v2/builder.go @@ -96,21 +96,11 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) { return nil, err } - stfMsgHandler, err := a.app.msgRouterBuilder.Build() - if err != nil { - return nil, fmt.Errorf("failed to build STF message handler: %w", err) - } - - stfQueryHandler, err := a.app.queryRouterBuilder.Build() - if err != nil { - return nil, fmt.Errorf("failed to build query handler: %w", err) - } - endBlocker, valUpdate := a.app.moduleManager.EndBlock() - a.app.stf = stf.NewSTF[transaction.Tx]( - stfMsgHandler, - stfQueryHandler, + stf, err := stf.NewSTF[transaction.Tx]( + a.app.msgRouterBuilder, + a.app.queryRouterBuilder, a.app.moduleManager.PreBlocker(), a.app.moduleManager.BeginBlock(), endBlocker, @@ -119,6 +109,11 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) { a.postTxExec, a.branch, ) + if err != nil { + return nil, fmt.Errorf("failed to create STF: %w", err) + } + + a.app.stf = stf rs, err := rootstore.CreateRootStore(a.storeOptions) if err != nil { diff --git a/runtime/v2/module.go b/runtime/v2/module.go index 4030f2050afd..e9d88ea5aa2a 100644 --- a/runtime/v2/module.go +++ b/runtime/v2/module.go @@ -211,8 +211,8 @@ func ProvideEnvironment(logger log.Logger, config *runtimev2.Module, key depinje EventService: stf.NewEventService(), GasService: stf.NewGasMeterService(), HeaderService: stf.HeaderService{}, - QueryRouterService: stf.NewQueryRouterService(appBuilder.app.queryRouterBuilder), - MsgRouterService: stf.NewMsgRouterService(appBuilder.app.msgRouterBuilder), + QueryRouterService: stf.NewQueryRouterService(), + MsgRouterService: stf.NewMsgRouterService([]byte(key.Name())), TransactionService: services.NewContextAwareTransactionService(), KVStoreService: kvService, MemStoreService: memKvService, diff --git a/server/v2/stf/core_branch_service_test.go b/server/v2/stf/core_branch_service_test.go index 722f2ac7f314..ce63c984cc70 100644 --- a/server/v2/stf/core_branch_service_test.go +++ b/server/v2/stf/core_branch_service_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/wrapperspb" appmodulev2 "cosmossdk.io/core/appmodule/v2" - "cosmossdk.io/core/transaction" "cosmossdk.io/server/v2/stf/branch" "cosmossdk.io/server/v2/stf/gas" "cosmossdk.io/server/v2/stf/mock" @@ -16,12 +16,7 @@ import ( func TestBranchService(t *testing.T) { s := &STF[mock.Tx]{ - handleMsg: func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) { - kvSet(t, ctx, "exec") - return nil, nil - }, - handleQuery: nil, - doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil }, + doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil }, doBeginBlock: func(ctx context.Context) error { kvSet(t, ctx, "begin-block") return nil @@ -43,6 +38,10 @@ func TestBranchService(t *testing.T) { makeGasMeter: gas.DefaultGasMeter, makeGasMeteredState: gas.DefaultWrapWithGasMeter, } + addMsgHandlerToSTF(t, s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) { + kvSet(t, ctx, "exec") + return nil, nil + }) makeContext := func() *executionContext { state := mock.DB() diff --git a/server/v2/stf/core_router_service.go b/server/v2/stf/core_router_service.go index 15da47e87cdc..b627f3d09f9a 100644 --- a/server/v2/stf/core_router_service.go +++ b/server/v2/stf/core_router_service.go @@ -2,120 +2,72 @@ package stf import ( "context" - "errors" - "fmt" - "reflect" - "strings" + "cosmossdk.io/core/transaction" "google.golang.org/protobuf/runtime/protoiface" - appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/router" ) // NewMsgRouterService implements router.Service. -func NewMsgRouterService(msgRouterBuilder *MsgRouterBuilder) router.Service { - msgRouter, err := msgRouterBuilder.Build() - if err != nil { - panic(fmt.Errorf("cannot create msgRouter: %w", err)) - } - - return &msgRouterService{ - builder: msgRouterBuilder, - handler: msgRouter, - } +func NewMsgRouterService(identity transaction.Identity) router.Service { + return msgRouterService{identity: identity} } var _ router.Service = (*msgRouterService)(nil) type msgRouterService struct { - builder *MsgRouterBuilder - handler appmodulev2.Handler + // TODO(tip): the identity sits here for the purpose of disallowing modules to impersonate others (sudo). + // right now this is not used, but it serves the reminder of something that we should be eventually + // looking into. + identity []byte } // CanInvoke returns an error if the given message cannot be invoked. -func (m *msgRouterService) CanInvoke(ctx context.Context, typeURL string) error { - if typeURL == "" { - return errors.New("missing type url") - } - - typeURL = strings.TrimPrefix(typeURL, "/") - if exists := m.builder.HandlerExists(typeURL); exists { - return fmt.Errorf("unknown request: %s", typeURL) - } - - return nil +func (m msgRouterService) CanInvoke(ctx context.Context, typeURL string) error { + return ctx.(*executionContext).msgRouter.CanInvoke(ctx, typeURL) } // InvokeTyped execute a message and fill-in a response. // The response must be known and passed as a parameter. // Use InvokeUntyped if the response type is not known. -func (m *msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error { - // see https://github.com/cosmos/cosmos-sdk/pull/20349 - panic("not implemented") +func (m msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error { + return ctx.(*executionContext).msgRouter.InvokeTyped(ctx, msg, resp) } // InvokeUntyped execute a message and returns a response. -func (m *msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) { - return m.handler(ctx, msg) +func (m msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) { + return ctx.(*executionContext).msgRouter.InvokeUntyped(ctx, msg) } // NewQueryRouterService implements router.Service. -func NewQueryRouterService(queryRouterBuilder *MsgRouterBuilder) router.Service { - return &queryRouterService{ - builder: queryRouterBuilder, - } +func NewQueryRouterService() router.Service { + return queryRouterService{} } var _ router.Service = (*queryRouterService)(nil) -type queryRouterService struct { - builder *MsgRouterBuilder - handler appmodulev2.Handler -} +type queryRouterService struct{} // CanInvoke returns an error if the given request cannot be invoked. -func (m *queryRouterService) CanInvoke(ctx context.Context, typeURL string) error { - if typeURL == "" { - return errors.New("missing type url") - } - - typeURL = strings.TrimPrefix(typeURL, "/") - if exists := m.builder.HandlerExists(typeURL); exists { - return fmt.Errorf("unknown request: %s", typeURL) - } - - return nil +func (m queryRouterService) CanInvoke(ctx context.Context, typeURL string) error { + return ctx.(*executionContext).queryRouter.CanInvoke(ctx, typeURL) } // InvokeTyped execute a message and fill-in a response. // The response must be known and passed as a parameter. // Use InvokeUntyped if the response type is not known. -func (m *queryRouterService) InvokeTyped( +func (m queryRouterService) InvokeTyped( ctx context.Context, req, resp protoiface.MessageV1, ) error { - // TODO lazy initialization is ugly and not thread safe. we don't want to check a mutex on every InvokeTyped either. - if m.handler == nil { - var err error - m.handler, err = m.builder.Build() - if err != nil { - return fmt.Errorf("cannot create queryRouter: %w", err) - } - } - // reflection is required, see https://github.com/cosmos/cosmos-sdk/pull/20349 - res, err := m.handler(ctx, req) - if err != nil { - return err - } - reflect.Indirect(reflect.ValueOf(resp)).Set(reflect.Indirect(reflect.ValueOf(res))) - return nil + return ctx.(*executionContext).queryRouter.InvokeTyped(ctx, req, resp) } // InvokeUntyped execute a message and returns a response. -func (m *queryRouterService) InvokeUntyped( +func (m queryRouterService) InvokeUntyped( ctx context.Context, req protoiface.MessageV1, ) (protoiface.MessageV1, error) { - return m.handler(ctx, req) + return ctx.(*executionContext).queryRouter.InvokeUntyped(ctx, req) } diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index fd6d434c91c6..ef0258a59c56 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/core/gas" "cosmossdk.io/core/header" "cosmossdk.io/core/log" + "cosmossdk.io/core/router" "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" stfgas "cosmossdk.io/server/v2/stf/gas" @@ -23,9 +24,10 @@ var Identity = []byte("stf") // STF is a struct that manages the state transition component of the app. type STF[T transaction.Tx] struct { - logger log.Logger - handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error) - handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error) + logger log.Logger + + msgRouter Router + queryRouter Router doPreBlock func(ctx context.Context, txs []T) error doBeginBlock func(ctx context.Context) error @@ -42,8 +44,8 @@ type STF[T transaction.Tx] struct { // NewSTF returns a new STF instance. func NewSTF[T transaction.Tx]( - handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error), - handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error), + msgRouterBuilder *MsgRouterBuilder, + queryRouterBuilder *MsgRouterBuilder, doPreBlock func(ctx context.Context, txs []T) error, doBeginBlock func(ctx context.Context) error, doEndBlock func(ctx context.Context) error, @@ -51,20 +53,31 @@ func NewSTF[T transaction.Tx]( doValidatorUpdate func(ctx context.Context) ([]appmodulev2.ValidatorUpdate, error), postTxExec func(ctx context.Context, tx T, success bool) error, branch func(store store.ReaderMap) store.WriterMap, -) *STF[T] { +) (*STF[T], error) { + + msgRouter, err := msgRouterBuilder.Build() + if err != nil { + return nil, fmt.Errorf("build msg router: %w", err) + } + queryRouter, err := queryRouterBuilder.Build() + if err != nil { + return nil, fmt.Errorf("build query router: %w", err) + } + return &STF[T]{ - handleMsg: handleMsg, - handleQuery: handleQuery, + logger: nil, + msgRouter: msgRouter, + queryRouter: queryRouter, doPreBlock: doPreBlock, doBeginBlock: doBeginBlock, doEndBlock: doEndBlock, - doTxValidation: doTxValidation, doValidatorUpdate: doValidatorUpdate, + doTxValidation: doTxValidation, postTxExec: postTxExec, // TODO branchFn: branch, makeGasMeter: stfgas.DefaultGasMeter, makeGasMeteredState: stfgas.DefaultWrapWithGasMeter, - } + }, nil } // DeliverBlock is our state transition function. @@ -310,7 +323,7 @@ func (s STF[T]) runTxMsgs( execCtx.setGasLimit(gasLimit) for i, msg := range msgs { execCtx.sender = txSenders[i] - resp, err := s.handleMsg(execCtx, msg) + resp, err := s.msgRouter.InvokeUntyped(execCtx, msg) if err != nil { return nil, 0, nil, fmt.Errorf("message execution at index %d failed: %w", i, err) } @@ -346,7 +359,7 @@ func (s STF[T]) runConsensusMessages( ) ([]transaction.Msg, error) { responses := make([]transaction.Msg, len(messages)) for i := range messages { - resp, err := s.handleMsg(ctx, messages[i]) + resp, err := s.msgRouter.InvokeUntyped(ctx, messages[i]) if err != nil { return nil, err } @@ -498,11 +511,7 @@ func (s STF[T]) Query( queryCtx := s.makeContext(ctx, nil, queryState, internal.ExecModeSimulate) queryCtx.setHeaderInfo(hi) queryCtx.setGasLimit(gasLimit) - return s.handleQuery(queryCtx, req) -} - -func (s STF[T]) Message(ctx context.Context, msg transaction.Msg) (response transaction.Msg, err error) { - return s.handleMsg(ctx, msg) + return s.queryRouter.InvokeUntyped(queryCtx, req) } // RunWithCtx is made to support genesis, if genesis was just the execution of messages instead @@ -521,8 +530,9 @@ func (s STF[T]) RunWithCtx( // clone clones STF. func (s STF[T]) clone() STF[T] { return STF[T]{ - handleMsg: s.handleMsg, - handleQuery: s.handleQuery, + logger: s.logger, + msgRouter: s.msgRouter, + queryRouter: s.queryRouter, doPreBlock: s.doPreBlock, doBeginBlock: s.doBeginBlock, doEndBlock: s.doEndBlock, @@ -558,6 +568,9 @@ type executionContext struct { branchFn branchFn makeGasMeter makeGasMeterFn makeGasMeteredStore makeGasMeteredStateFn + + msgRouter router.Service + queryRouter router.Service } // setHeaderInfo sets the header info in the state to be used by queries in the future. diff --git a/server/v2/stf/stf_router.go b/server/v2/stf/stf_router.go index 57e8fbfb9ede..dc005b154ef2 100644 --- a/server/v2/stf/stf_router.go +++ b/server/v2/stf/stf_router.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" + "cosmossdk.io/core/router" gogoproto "github.com/cosmos/gogoproto/proto" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/runtime/protoiface" appmodulev2 "cosmossdk.io/core/appmodule/v2" ) @@ -60,7 +62,7 @@ func (b *MsgRouterBuilder) HandlerExists(msgType string) bool { return ok } -func (b *MsgRouterBuilder) Build() (appmodulev2.Handler, error) { +func (b *MsgRouterBuilder) Build() (Router, error) { handlers := make(map[string]appmodulev2.Handler) globalPreHandler := func(ctx context.Context, msg appmodulev2.Message) error { @@ -92,14 +94,8 @@ func (b *MsgRouterBuilder) Build() (appmodulev2.Handler, error) { handlers[msgType] = buildHandler(handler, preHandlers, globalPreHandler, postHandlers, globalPostHandler) } - // return handler as function - return func(ctx context.Context, msg appmodulev2.Message) (appmodulev2.Message, error) { - typeName := msgTypeURL(msg) - handler, exists := handlers[typeName] - if !exists { - return nil, fmt.Errorf("%w: %s", ErrNoHandler, typeName) - } - return handler(ctx, msg) + return Router{ + handlers: handlers, }, nil } @@ -147,3 +143,36 @@ func msgTypeURL(msg gogoproto.Message) string { return gogoproto.MessageName(msg) } + +var _ router.Service = (*Router)(nil) + +// Router implements the STF router for msg and query handlers. +type Router struct { + handlers map[string]appmodulev2.Handler +} + +func (r Router) CanInvoke(_ context.Context, typeURL string) error { + _, exists := r.handlers[typeURL] + if !exists { + return fmt.Errorf("%w: %s", ErrNoHandler, typeURL) + } + return nil +} + +func (r Router) InvokeTyped(ctx context.Context, req, resp protoiface.MessageV1) error { + handlerResp, err := r.InvokeUntyped(ctx, req) + if err != nil { + return err + } + gogoproto.Merge(resp, handlerResp) + return nil +} + +func (r Router) InvokeUntyped(ctx context.Context, req protoiface.MessageV1) (res protoiface.MessageV1, err error) { + typeName := msgTypeURL(req) + handler, exists := r.handlers[typeName] + if !exists { + return nil, fmt.Errorf("%w: %s", ErrNoHandler, typeName) + } + return handler(ctx, req) +} diff --git a/server/v2/stf/stf_test.go b/server/v2/stf/stf_test.go index 9e030dd52c0a..bdd057313bd1 100644 --- a/server/v2/stf/stf_test.go +++ b/server/v2/stf/stf_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/cosmos/gogoproto/proto" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/wrapperspb" @@ -14,29 +15,56 @@ import ( appmodulev2 "cosmossdk.io/core/appmodule/v2" coregas "cosmossdk.io/core/gas" "cosmossdk.io/core/store" - "cosmossdk.io/core/transaction" "cosmossdk.io/server/v2/stf/branch" "cosmossdk.io/server/v2/stf/gas" "cosmossdk.io/server/v2/stf/mock" ) +func addMsgHandlerToSTF[T any, PT interface { + *T + proto.Message +}, + U any, UT interface { + *U + proto.Message + }]( + t *testing.T, + stf *STF[mock.Tx], + handler func(ctx context.Context, msg PT) (UT, error), +) { + t.Helper() + msgRouterBuilder := NewMsgRouterBuilder() + err := msgRouterBuilder.RegisterHandler( + msgTypeURL(PT(new(T))), + func(ctx context.Context, msg appmodulev2.Message) (msgResp appmodulev2.Message, err error) { + typedReq := msg.(PT) + typedResp, err := handler(ctx, typedReq) + if err != nil { + return nil, err + } + + return typedResp, nil + }, + ) + require.NoError(t, err) + + msgRouter, err := msgRouterBuilder.Build() + require.NoError(t, err) + stf.msgRouter = msgRouter +} + func TestSTF(t *testing.T) { state := mock.DB() mockTx := mock.Tx{ Sender: []byte("sender"), - Msg: wrapperspb.Bool(true), // msg does not matter at all because our handler does nothing. + Msg: wrapperspb.Bool(true), GasLimit: 100_000, } sum := sha256.Sum256([]byte("test-hash")) s := &STF[mock.Tx]{ - handleMsg: func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) { - kvSet(t, ctx, "exec") - return nil, nil - }, - handleQuery: nil, - doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil }, + doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil }, doBeginBlock: func(ctx context.Context) error { kvSet(t, ctx, "begin-block") return nil @@ -59,6 +87,11 @@ func TestSTF(t *testing.T) { makeGasMeteredState: gas.DefaultWrapWithGasMeter, } + addMsgHandlerToSTF(t, s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) { + kvSet(t, ctx, "exec") + return nil, nil + }) + t.Run("begin and end block", func(t *testing.T) { _, newState, err := s.DeliverBlock(context.Background(), &appmanager.BlockRequest[mock.Tx]{ Height: uint64(1), @@ -124,9 +157,9 @@ func TestSTF(t *testing.T) { t.Run("fail exec tx", func(t *testing.T) { // update the stf to fail on the handler s := s.clone() - s.handleMsg = func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) { + addMsgHandlerToSTF(t, &s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) { return nil, fmt.Errorf("failure") - } + }) blockResult, newState, err := s.DeliverBlock(context.Background(), &appmanager.BlockRequest[mock.Tx]{ Height: uint64(1), @@ -167,9 +200,9 @@ func TestSTF(t *testing.T) { t.Run("tx failed and post tx failed", func(t *testing.T) { s := s.clone() - s.handleMsg = func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) { + addMsgHandlerToSTF(t, &s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) { return nil, fmt.Errorf("exec failure") - } + }) s.postTxExec = func(ctx context.Context, tx mock.Tx, success bool) error { return fmt.Errorf("post tx failure") } blockResult, newState, err := s.DeliverBlock(context.Background(), &appmanager.BlockRequest[mock.Tx]{ Height: uint64(1), From 8aac86fa6f2b07c01df7e85baa432fc94b69a0d1 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Tue, 18 Jun 2024 23:26:34 +0200 Subject: [PATCH 5/9] lint gods --- server/v2/stf/core_router_service.go | 2 +- server/v2/stf/stf.go | 7 ++++++- server/v2/stf/stf_router.go | 2 +- store/rootmulti/store_test.go | 1 - 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/v2/stf/core_router_service.go b/server/v2/stf/core_router_service.go index b627f3d09f9a..dd41469f01c2 100644 --- a/server/v2/stf/core_router_service.go +++ b/server/v2/stf/core_router_service.go @@ -3,10 +3,10 @@ package stf import ( "context" - "cosmossdk.io/core/transaction" "google.golang.org/protobuf/runtime/protoiface" "cosmossdk.io/core/router" + "cosmossdk.io/core/transaction" ) // NewMsgRouterService implements router.Service. diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index ef0258a59c56..94153005d9a1 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -54,7 +54,6 @@ func NewSTF[T transaction.Tx]( postTxExec func(ctx context.Context, tx T, success bool) error, branch func(store store.ReaderMap) store.WriterMap, ) (*STF[T], error) { - msgRouter, err := msgRouterBuilder.Build() if err != nil { return nil, fmt.Errorf("build msg router: %w", err) @@ -612,6 +611,8 @@ func (s STF[T]) makeContext( sender, store, execMode, + s.msgRouter, + s.queryRouter, ) } @@ -623,6 +624,8 @@ func newExecutionContext( sender transaction.Identity, state store.WriterMap, execMode transaction.ExecMode, + msgRouter Router, + queryRouter Router, ) *executionContext { meter := makeGasMeterFn(gas.NoGasLimit) meteredState := makeGasMeteredStoreFn(meter, state) @@ -639,6 +642,8 @@ func newExecutionContext( branchFn: branchFn, makeGasMeter: makeGasMeterFn, makeGasMeteredStore: makeGasMeteredStoreFn, + msgRouter: msgRouter, + queryRouter: queryRouter, } } diff --git a/server/v2/stf/stf_router.go b/server/v2/stf/stf_router.go index dc005b154ef2..cb25c6e2bd2b 100644 --- a/server/v2/stf/stf_router.go +++ b/server/v2/stf/stf_router.go @@ -5,12 +5,12 @@ import ( "errors" "fmt" - "cosmossdk.io/core/router" gogoproto "github.com/cosmos/gogoproto/proto" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/runtime/protoiface" appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/router" ) var ErrNoHandler = errors.New("no handler") diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index 0e305eeefffe..baa24a0625d9 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -547,7 +547,6 @@ func TestMultiStore_Pruning(t *testing.T) { _, err := ms.CacheMultiStoreWithVersion(v) require.NoError(t, err, "expected no error when loading height: %d", v) } - }) } } From b75456ed1bd2e5d0df42a4f2c219fa08238f8e93 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Wed, 19 Jun 2024 17:38:17 +0200 Subject: [PATCH 6/9] separate merge logic --- server/v2/stf/stf_router.go | 12 ++++++------ server/v2/stf/stf_router_test.go | 6 ++++++ 2 files changed, 12 insertions(+), 6 deletions(-) create mode 100644 server/v2/stf/stf_router_test.go diff --git a/server/v2/stf/stf_router.go b/server/v2/stf/stf_router.go index cb25c6e2bd2b..f01efdfc2c96 100644 --- a/server/v2/stf/stf_router.go +++ b/server/v2/stf/stf_router.go @@ -6,7 +6,7 @@ import ( "fmt" gogoproto "github.com/cosmos/gogoproto/proto" - "google.golang.org/protobuf/proto" + "github.com/golang/protobuf/proto" "google.golang.org/protobuf/runtime/protoiface" appmodulev2 "cosmossdk.io/core/appmodule/v2" @@ -137,10 +137,6 @@ func buildHandler( // msgTypeURL returns the TypeURL of a proto message. func msgTypeURL(msg gogoproto.Message) string { - if m, ok := msg.(proto.Message); ok { - return string(m.ProtoReflect().Descriptor().FullName()) - } - return gogoproto.MessageName(msg) } @@ -164,10 +160,14 @@ func (r Router) InvokeTyped(ctx context.Context, req, resp protoiface.MessageV1) if err != nil { return err } - gogoproto.Merge(resp, handlerResp) + merge(resp, handlerResp) return nil } +func merge(src protoiface.MessageV1, dst protoiface.MessageV1) { + proto.Merge(src, dst) +} + func (r Router) InvokeUntyped(ctx context.Context, req protoiface.MessageV1) (res protoiface.MessageV1, err error) { typeName := msgTypeURL(req) handler, exists := r.handlers[typeName] diff --git a/server/v2/stf/stf_router_test.go b/server/v2/stf/stf_router_test.go new file mode 100644 index 000000000000..4b9014cc1281 --- /dev/null +++ b/server/v2/stf/stf_router_test.go @@ -0,0 +1,6 @@ +package stf + +import "testing" + +func TestMerge(t *testing.T) { +} From abedc73a14fc9e1fd9a9474b5d38058c4a455202 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Thu, 20 Jun 2024 07:58:17 +0200 Subject: [PATCH 7/9] merge fix --- server/v2/stf/stf_router.go | 4 ++-- server/v2/stf/stf_router_test.go | 6 ------ 2 files changed, 2 insertions(+), 8 deletions(-) delete mode 100644 server/v2/stf/stf_router_test.go diff --git a/server/v2/stf/stf_router.go b/server/v2/stf/stf_router.go index f01efdfc2c96..4f08fef68104 100644 --- a/server/v2/stf/stf_router.go +++ b/server/v2/stf/stf_router.go @@ -4,9 +4,9 @@ import ( "context" "errors" "fmt" + "reflect" gogoproto "github.com/cosmos/gogoproto/proto" - "github.com/golang/protobuf/proto" "google.golang.org/protobuf/runtime/protoiface" appmodulev2 "cosmossdk.io/core/appmodule/v2" @@ -165,7 +165,7 @@ func (r Router) InvokeTyped(ctx context.Context, req, resp protoiface.MessageV1) } func merge(src protoiface.MessageV1, dst protoiface.MessageV1) { - proto.Merge(src, dst) + reflect.Indirect(reflect.ValueOf(dst)).Set(reflect.Indirect(reflect.ValueOf(src))) } func (r Router) InvokeUntyped(ctx context.Context, req protoiface.MessageV1) (res protoiface.MessageV1, err error) { diff --git a/server/v2/stf/stf_router_test.go b/server/v2/stf/stf_router_test.go deleted file mode 100644 index 4b9014cc1281..000000000000 --- a/server/v2/stf/stf_router_test.go +++ /dev/null @@ -1,6 +0,0 @@ -package stf - -import "testing" - -func TestMerge(t *testing.T) { -} From a5444e92dd9143ddae04baa18ae53ceeeaa0bc31 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Thu, 20 Jun 2024 08:00:24 +0200 Subject: [PATCH 8/9] lint --- server/v2/stf/stf_router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/stf/stf_router.go b/server/v2/stf/stf_router.go index 4f08fef68104..8489e16a261e 100644 --- a/server/v2/stf/stf_router.go +++ b/server/v2/stf/stf_router.go @@ -164,7 +164,7 @@ func (r Router) InvokeTyped(ctx context.Context, req, resp protoiface.MessageV1) return nil } -func merge(src protoiface.MessageV1, dst protoiface.MessageV1) { +func merge(src, dst protoiface.MessageV1) { reflect.Indirect(reflect.ValueOf(dst)).Set(reflect.Indirect(reflect.ValueOf(src))) } From 24bcce594a3517609233ab3d736c2d3c1f1d0302 Mon Sep 17 00:00:00 2001 From: unknown unknown Date: Thu, 20 Jun 2024 08:02:38 +0200 Subject: [PATCH 9/9] fix logger --- runtime/v2/builder.go | 1 + server/v2/stf/stf.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/runtime/v2/builder.go b/runtime/v2/builder.go index 42d17f779e2f..e3728a2ec0cb 100644 --- a/runtime/v2/builder.go +++ b/runtime/v2/builder.go @@ -99,6 +99,7 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) { endBlocker, valUpdate := a.app.moduleManager.EndBlock() stf, err := stf.NewSTF[transaction.Tx]( + a.app.logger.With("module", "stf"), a.app.msgRouterBuilder, a.app.queryRouterBuilder, a.app.moduleManager.PreBlocker(), diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index 94153005d9a1..6707e1d8826d 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -44,6 +44,7 @@ type STF[T transaction.Tx] struct { // NewSTF returns a new STF instance. func NewSTF[T transaction.Tx]( + logger log.Logger, msgRouterBuilder *MsgRouterBuilder, queryRouterBuilder *MsgRouterBuilder, doPreBlock func(ctx context.Context, txs []T) error, @@ -64,7 +65,7 @@ func NewSTF[T transaction.Tx]( } return &STF[T]{ - logger: nil, + logger: logger, msgRouter: msgRouter, queryRouter: queryRouter, doPreBlock: doPreBlock,