From 9535023a2b9edf30cc36f10120f6b2976cedf7ed Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 10 Dec 2024 15:49:59 +0100 Subject: [PATCH 01/10] chore: add ParamsQuerier interface --- pkg/client/interface.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index cd637be2e..e588efcdd 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -380,3 +380,18 @@ type HistoricalQueryCache[T any] interface { // SetAtHeight adds or updates a value at a specific height SetAtHeight(key string, value T, height int64) error } + +// ParamsQuerier represents a generic querier for module parameters. +// This interface should be implemented by any module-specific querier +// that needs to access and cache on-chain parameters. +// +// DEV_NOTE: Can't use cosmostypes.Msg instead of any because M +// would be a pointer but Keeper#GetParams() returns a value. 🙄 +type ParamsQuerier[P any] interface { + // GetParams queries the chain for the current module parameters, where + // P is the params type of a given module (e.g. sharedtypes.Params). + GetParams(ctx context.Context) (P, error) + // GetParamsAtHeight returns the parameters as they were at the specified + // height, where M is the params type of a given module (e.g. sharedtypes.Params). + GetParamsAtHeight(ctx context.Context, height int64) (P, error) +} From ba61d1446d89720dec7571c4a7d39ea90e614036 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 10 Dec 2024 15:44:51 +0100 Subject: [PATCH 02/10] feat: add ParamsQuerier --- pkg/client/interface.go | 6 +- pkg/client/query/options.go | 52 +++++++++++ pkg/client/query/paramsquerier.go | 143 ++++++++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 pkg/client/query/options.go create mode 100644 pkg/client/query/paramsquerier.go diff --git a/pkg/client/interface.go b/pkg/client/interface.go index e588efcdd..079083240 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination=../../testutil/mockclient/grpc_conn_mock.go -package=mockclient github.com/cosmos/gogoproto/grpc ClientConn //go:generate mockgen -destination=../../testutil/mockclient/events_query_client_mock.go -package=mockclient . Dialer,Connection,EventsQueryClient //go:generate mockgen -destination=../../testutil/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient //go:generate mockgen -destination=../../testutil/mockclient/delegation_client_mock.go -package=mockclient . DelegationClient @@ -384,10 +385,7 @@ type HistoricalQueryCache[T any] interface { // ParamsQuerier represents a generic querier for module parameters. // This interface should be implemented by any module-specific querier // that needs to access and cache on-chain parameters. -// -// DEV_NOTE: Can't use cosmostypes.Msg instead of any because M -// would be a pointer but Keeper#GetParams() returns a value. 🙄 -type ParamsQuerier[P any] interface { +type ParamsQuerier[P cosmostypes.Msg] interface { // GetParams queries the chain for the current module parameters, where // P is the params type of a given module (e.g. sharedtypes.Params). GetParams(ctx context.Context) (P, error) diff --git a/pkg/client/query/options.go b/pkg/client/query/options.go new file mode 100644 index 000000000..4437fa0dc --- /dev/null +++ b/pkg/client/query/options.go @@ -0,0 +1,52 @@ +package query + +import ( + sdkerrors "cosmossdk.io/errors" + + "github.com/pokt-network/poktroll/pkg/client/query/cache" +) + +const ( + defaultPruneOlderThan = 100 + defaultMaxKeys = 1000 +) + +// paramsQuerierConfig is the configuration for parameter queriers. It is intended +// to be configured via ParamsQuerierOptionFn functions. +type paramsQuerierConfig struct { + // CacheOpts are the options passed to create the params cache + CacheOpts []cache.QueryCacheOptionFn + // ModuleName is used for logging and error context + ModuleName string + // ModuleParamError is the base error type for parameter query errors + ModuleParamError *sdkerrors.Error +} + +// ParamsQuerierOptionFn is a function which receives a paramsQuerierConfig for configuration. +type ParamsQuerierOptionFn func(*paramsQuerierConfig) + +// DefaultParamsQuerierConfig returns the default configuration for parameter queriers +func DefaultParamsQuerierConfig() *paramsQuerierConfig { + return ¶msQuerierConfig{ + CacheOpts: []cache.QueryCacheOptionFn{ + cache.WithHistoricalMode(defaultPruneOlderThan), + cache.WithMaxKeys(defaultMaxKeys), + cache.WithEvictionPolicy(cache.FirstInFirstOut), + }, + } +} + +// WithModuleInfo sets the module name and param error for the querier. +func WithModuleInfo(moduleName string, moduleParamError *sdkerrors.Error) ParamsQuerierOptionFn { + return func(cfg *paramsQuerierConfig) { + cfg.ModuleName = moduleName + cfg.ModuleParamError = moduleParamError + } +} + +// WithQueryCacheOptions is used to configure the params HistoricalQueryCache. +func WithQueryCacheOptions(opts ...cache.QueryCacheOptionFn) ParamsQuerierOptionFn { + return func(cfg *paramsQuerierConfig) { + cfg.CacheOpts = append(cfg.CacheOpts, opts...) + } +} diff --git a/pkg/client/query/paramsquerier.go b/pkg/client/query/paramsquerier.go new file mode 100644 index 000000000..78943cb77 --- /dev/null +++ b/pkg/client/query/paramsquerier.go @@ -0,0 +1,143 @@ +package query + +import ( + "context" + "errors" + + "cosmossdk.io/depinject" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + gogogrpc "github.com/cosmos/gogoproto/grpc" + + "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/query/cache" + "github.com/pokt-network/poktroll/pkg/polylog" +) + +// abstractParamsQuerier is NOT intended to be used for anything except the +// compile-time interface compliance assertion that immediately follows. +type abstractParamsQuerier = cachedParamsQuerier[cosmostypes.Msg, paramsQuerierIface[cosmostypes.Msg]] + +var _ client.ParamsQuerier[cosmostypes.Msg] = (*abstractParamsQuerier)(nil) + +// paramsQuerierIface is an interface which generated query clients MUST implement +// to be compatible with the cachedParamsQuerier. +// +// DEV_NOTE: It is mainly required due to syntactic constraints imposed by the generics +// (i.e. otherwise, P here MUST be a value type, and there's no way to express that Q +// (below) SHOULD be in terms of the concrete type of P in NewCachedParamsQuerier). +type paramsQuerierIface[P cosmostypes.Msg] interface { + GetParams(context.Context) (P, error) +} + +// NewCachedParamsQuerier creates a new, generic, params querier with the given +// concrete query client constructor and the configuration which results from +// applying the given options. +func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( + deps depinject.Config, + queryClientConstructor func(conn gogogrpc.ClientConn) Q, + opts ...ParamsQuerierOptionFn, +) (_ client.ParamsQuerier[P], err error) { + cfg := DefaultParamsQuerierConfig() + for _, opt := range opts { + opt(cfg) + } + + querier := &cachedParamsQuerier[P, Q]{ + config: cfg, + paramsCache: cache.NewInMemoryCache[P](cfg.CacheOpts...), + } + + if err = depinject.Inject( + deps, + &querier.clientConn, + ); err != nil { + return nil, err + } + + querier.queryClient = queryClientConstructor(querier.clientConn) + + return querier, nil +} + +// cachedParamsQuerier provides a generic implementation of cached param querying. +// It handles parameter caching and chain querying in a generic way, where +// P is a pointer type of the parameters, and Q is the interface type of the +// corresponding query client. +type cachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]] struct { + clientConn gogogrpc.ClientConn + queryClient Q + paramsCache client.HistoricalQueryCache[P] + config *paramsQuerierConfig +} + +// GetParams returns the latest cached params, if any; otherwise, it queries the +// current on-chain params and caches them. +func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { + logger := polylog.Ctx(ctx).With( + "module", bq.config.ModuleName, + "method", "GetParams", + ) + + // Check the cache first. + var paramsZero P + cached, err := bq.paramsCache.Get("params") + switch { + case err == nil: + logger.Debug().Msgf("params cache hit") + return cached, nil + case !errors.Is(err, cache.ErrCacheMiss): + return paramsZero, err + } + + logger.Debug().Msgf("%s", err) + + // Query on-chain on cache miss. + params, err := bq.queryClient.GetParams(ctx) + if err != nil { + if bq.config.ModuleParamError != nil { + return paramsZero, bq.config.ModuleParamError.Wrap(err.Error()) + } + return paramsZero, err + } + + // Update the cache. + if err = bq.paramsCache.Set("params", params); err != nil { + return paramsZero, err + } + + return params, nil +} + +// GetParamsAtHeight returns parameters as they were as of the given height, **if +// that height is present in the cache**. Otherwise, it queries the current params +// and returns them. +// +// TODO_MAINNET(@bryanchriswhite): Once on-chain historical data is available, +// update this to query for the historical params, rather than returning the +// current params, if the case of a cache miss. +func (bq *cachedParamsQuerier[P, Q]) GetParamsAtHeight(ctx context.Context, height int64) (P, error) { + logger := polylog.Ctx(ctx).With( + "module", bq.config.ModuleName, + "method", "GetParamsAtHeight", + "height", height, + ) + + // Try to get from cache at specific height + cached, err := bq.paramsCache.GetAtHeight("params", height) + switch { + case err == nil: + logger.Debug().Msg("params cache hit") + return cached, nil + case !errors.Is(err, cache.ErrCacheMiss): + return cached, err + } + + logger.Debug().Msgf("%s", err) + + // TODO_MAINNET(@bryanchriswhite): Implement querying historical params from chain + err = cache.ErrCacheMiss.Wrapf("TODO: on-chain historical data not implemented") + logger.Error().Msgf("%s", err) + + // Meanwhile, return current params as fallback. 😬 + return bq.GetParams(ctx) +} From 6bcaa7390dc82cd6a31b08a918534c180a35d43d Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 16 Dec 2024 14:25:10 +0100 Subject: [PATCH 03/10] fix: failing build --- pkg/client/query/paramsquerier.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/client/query/paramsquerier.go b/pkg/client/query/paramsquerier.go index 78943cb77..1a5007757 100644 --- a/pkg/client/query/paramsquerier.go +++ b/pkg/client/query/paramsquerier.go @@ -42,9 +42,14 @@ func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( opt(cfg) } + paramsCache, err := cache.NewInMemoryCache[P](cfg.CacheOpts...) + if err != nil { + return nil, err + } + querier := &cachedParamsQuerier[P, Q]{ config: cfg, - paramsCache: cache.NewInMemoryCache[P](cfg.CacheOpts...), + paramsCache: paramsCache, } if err = depinject.Inject( From afebf13fb53db6d8ddebfc84bfa8e1299b8b24b4 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 17:31:17 +0100 Subject: [PATCH 04/10] chore: review feedback improvements Co-authored-by: Redouane Lakrache --- pkg/client/interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 03d7f7923..31cea8f75 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -390,6 +390,6 @@ type ParamsQuerier[P cosmostypes.Msg] interface { // P is the params type of a given module (e.g. sharedtypes.Params). GetParams(ctx context.Context) (P, error) // GetParamsAtHeight returns the parameters as they were at the specified - // height, where M is the params type of a given module (e.g. sharedtypes.Params). + // height, where P is the params type of a given module (e.g. sharedtypes.Params). GetParamsAtHeight(ctx context.Context, height int64) (P, error) } From 3abc4b5c51bcbb9a7229b65ada79e0602d1c4f27 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 17:48:56 +0100 Subject: [PATCH 05/10] chore: review feedback improvements --- api/poktroll/tokenomics/types.pulsar.go | 4 +- app/app_config.go | 2 +- pkg/client/query/options.go | 37 +++++++++++++------ pkg/client/query/paramsquerier.go | 15 +++----- .../keeper/token_logic_modules_test.go | 4 +- 5 files changed, 37 insertions(+), 25 deletions(-) diff --git a/api/poktroll/tokenomics/types.pulsar.go b/api/poktroll/tokenomics/types.pulsar.go index e48600e95..b6cd8ff4d 100644 --- a/api/poktroll/tokenomics/types.pulsar.go +++ b/api/poktroll/tokenomics/types.pulsar.go @@ -3,11 +3,11 @@ package tokenomics import ( v1beta1 "cosmossdk.io/api/cosmos/base/v1beta1" - proof "github.com/pokt-network/poktroll/api/poktroll/proof" fmt "fmt" _ "github.com/cosmos/cosmos-proto" runtime "github.com/cosmos/cosmos-proto/runtime" _ "github.com/cosmos/gogoproto/gogoproto" + proof "github.com/pokt-network/poktroll/api/poktroll/proof" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoiface "google.golang.org/protobuf/runtime/protoiface" protoimpl "google.golang.org/protobuf/runtime/protoimpl" @@ -3067,7 +3067,7 @@ type ModToModTransfer struct { OpReason SettlementOpReason `protobuf:"varint,1,opt,name=op_reason,json=opReason,proto3,enum=poktroll.tokenomics.SettlementOpReason" json:"op_reason,omitempty"` SenderModule string `protobuf:"bytes,2,opt,name=SenderModule,proto3" json:"SenderModule,omitempty"` - RecipientModule string `protobuf:"bytes,3,opt,name=RecipientModule,proto3" json:"RecipientModule,omitempty"` // This the semantic module named that can be found by searching for "ModuleName =" in the codebase + RecipientModule string `protobuf:"bytes,3,opt,name=RecipientModule,proto3" json:"RecipientModule,omitempty"` // This the semantic module named that can be found by searching for "moduleName =" in the codebase Coin *v1beta1.Coin `protobuf:"bytes,4,opt,name=coin,proto3" json:"coin,omitempty"` } diff --git a/app/app_config.go b/app/app_config.go index 76cc7b5b2..e2e3096cf 100644 --- a/app/app_config.go +++ b/app/app_config.go @@ -240,7 +240,7 @@ var ( stakingtypes.BondedPoolName, stakingtypes.NotBondedPoolName, // We allow the following module accounts to receive funds: - // govtypes.ModuleName + // govtypes.moduleName } // appConfig application configuration (used by depinject) diff --git a/pkg/client/query/options.go b/pkg/client/query/options.go index 4437fa0dc..a335a68a3 100644 --- a/pkg/client/query/options.go +++ b/pkg/client/query/options.go @@ -1,9 +1,12 @@ package query import ( + "context" + sdkerrors "cosmossdk.io/errors" "github.com/pokt-network/poktroll/pkg/client/query/cache" + "github.com/pokt-network/poktroll/pkg/polylog" ) const ( @@ -14,12 +17,13 @@ const ( // paramsQuerierConfig is the configuration for parameter queriers. It is intended // to be configured via ParamsQuerierOptionFn functions. type paramsQuerierConfig struct { - // CacheOpts are the options passed to create the params cache - CacheOpts []cache.QueryCacheOptionFn - // ModuleName is used for logging and error context - ModuleName string - // ModuleParamError is the base error type for parameter query errors - ModuleParamError *sdkerrors.Error + logger polylog.Logger + // cacheOpts are the options passed to create the params cache + cacheOpts []cache.QueryCacheOptionFn + // moduleName is used for logging and error context + moduleName string + // moduleParamError is the base error type for parameter query errors + moduleParamError *sdkerrors.Error } // ParamsQuerierOptionFn is a function which receives a paramsQuerierConfig for configuration. @@ -28,7 +32,7 @@ type ParamsQuerierOptionFn func(*paramsQuerierConfig) // DefaultParamsQuerierConfig returns the default configuration for parameter queriers func DefaultParamsQuerierConfig() *paramsQuerierConfig { return ¶msQuerierConfig{ - CacheOpts: []cache.QueryCacheOptionFn{ + cacheOpts: []cache.QueryCacheOptionFn{ cache.WithHistoricalMode(defaultPruneOlderThan), cache.WithMaxKeys(defaultMaxKeys), cache.WithEvictionPolicy(cache.FirstInFirstOut), @@ -37,16 +41,27 @@ func DefaultParamsQuerierConfig() *paramsQuerierConfig { } // WithModuleInfo sets the module name and param error for the querier. -func WithModuleInfo(moduleName string, moduleParamError *sdkerrors.Error) ParamsQuerierOptionFn { +func WithModuleInfo(ctx context.Context, moduleName string, moduleParamError *sdkerrors.Error) ParamsQuerierOptionFn { + logger := polylog.Ctx(ctx).With( + "module_params_querier", moduleName, + ) return func(cfg *paramsQuerierConfig) { - cfg.ModuleName = moduleName - cfg.ModuleParamError = moduleParamError + cfg.logger = logger + cfg.moduleName = moduleName + cfg.moduleParamError = moduleParamError } } // WithQueryCacheOptions is used to configure the params HistoricalQueryCache. func WithQueryCacheOptions(opts ...cache.QueryCacheOptionFn) ParamsQuerierOptionFn { return func(cfg *paramsQuerierConfig) { - cfg.CacheOpts = append(cfg.CacheOpts, opts...) + cfg.cacheOpts = append(cfg.cacheOpts, opts...) + } +} + +// WithLogger sets the logger for the querier. +func WithLogger(logger polylog.Logger) ParamsQuerierOptionFn { + return func(cfg *paramsQuerierConfig) { + cfg.logger = logger } } diff --git a/pkg/client/query/paramsquerier.go b/pkg/client/query/paramsquerier.go index 1a5007757..c8161e496 100644 --- a/pkg/client/query/paramsquerier.go +++ b/pkg/client/query/paramsquerier.go @@ -10,7 +10,6 @@ import ( "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/query/cache" - "github.com/pokt-network/poktroll/pkg/polylog" ) // abstractParamsQuerier is NOT intended to be used for anything except the @@ -42,7 +41,7 @@ func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( opt(cfg) } - paramsCache, err := cache.NewInMemoryCache[P](cfg.CacheOpts...) + paramsCache, err := cache.NewInMemoryCache[P](cfg.cacheOpts...) if err != nil { return nil, err } @@ -78,8 +77,7 @@ type cachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]] struct { // GetParams returns the latest cached params, if any; otherwise, it queries the // current on-chain params and caches them. func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { - logger := polylog.Ctx(ctx).With( - "module", bq.config.ModuleName, + logger := bq.config.logger.With( "method", "GetParams", ) @@ -99,8 +97,8 @@ func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { // Query on-chain on cache miss. params, err := bq.queryClient.GetParams(ctx) if err != nil { - if bq.config.ModuleParamError != nil { - return paramsZero, bq.config.ModuleParamError.Wrap(err.Error()) + if bq.config.moduleParamError != nil { + return paramsZero, bq.config.moduleParamError.Wrap(err.Error()) } return paramsZero, err } @@ -121,14 +119,13 @@ func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { // update this to query for the historical params, rather than returning the // current params, if the case of a cache miss. func (bq *cachedParamsQuerier[P, Q]) GetParamsAtHeight(ctx context.Context, height int64) (P, error) { - logger := polylog.Ctx(ctx).With( - "module", bq.config.ModuleName, + logger := bq.config.logger.With( "method", "GetParamsAtHeight", "height", height, ) // Try to get from cache at specific height - cached, err := bq.paramsCache.GetAtHeight("params", height) + cached, err := bq.paramsCache.GetAsOfVersion("params", height) switch { case err == nil: logger.Debug().Msg("params cache hit") diff --git a/x/tokenomics/keeper/token_logic_modules_test.go b/x/tokenomics/keeper/token_logic_modules_test.go index b70d32608..8c312efc2 100644 --- a/x/tokenomics/keeper/token_logic_modules_test.go +++ b/x/tokenomics/keeper/token_logic_modules_test.go @@ -160,7 +160,7 @@ func TestProcessTokenLogicModules_TLMBurnEqualsMint_Valid(t *testing.T) { require.True(t, supplierIsFound) require.Equal(t, &supplierStake, supplier.GetStake()) - // Assert that `suppliertypes.ModuleName` account module balance is *unchanged* + // Assert that `suppliertypes.moduleName` account module balance is *unchanged* // NB: Supplier rewards are minted to the supplier module account but then immediately // distributed to the supplier accounts which provided service in a given session. supplierModuleEndBalance := getBalance(t, ctx, keepers, supplierModuleAddress) @@ -305,7 +305,7 @@ func TestProcessTokenLogicModules_TLMBurnEqualsMint_Valid_SupplierExceedsMaxClai require.True(t, supplierIsFound) require.Equal(t, &supplierStake, supplier.GetStake()) - // Assert that `suppliertypes.ModuleName` account module balance is *unchanged* + // Assert that `suppliertypes.moduleName` account module balance is *unchanged* // NB: Supplier rewards are minted to the supplier module account but then immediately // distributed to the supplier accounts which provided service in a given session. supplierModuleEndBalance := getBalance(t, ctx, keepers, supplierModuleAddress) From afcde67bfa6978b59d6c0bfdcbfc5c1f1349d1fa Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 18 Dec 2024 17:54:06 +0100 Subject: [PATCH 06/10] fix: typo --- pkg/client/query/cache/config.go | 4 ++-- pkg/client/query/options.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index 3881bbf5e..77fae86b3 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -67,10 +67,10 @@ func (cfg *queryCacheConfig) Validate() error { // WithHistoricalMode enables historical caching with the given maxVersionAge // configuration; if 0, no historical pruning is performed. -func WithHistoricalMode(numRetainedVersions int64) QueryCacheOptionFn { +func WithHistoricalMode(maxVersionAge int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { cfg.historical = true - cfg.maxVersionAge = numRetainedVersions + cfg.maxVersionAge = maxVersionAge } } diff --git a/pkg/client/query/options.go b/pkg/client/query/options.go index a335a68a3..8e5781d19 100644 --- a/pkg/client/query/options.go +++ b/pkg/client/query/options.go @@ -10,8 +10,8 @@ import ( ) const ( - defaultPruneOlderThan = 100 - defaultMaxKeys = 1000 + defaultMaxVersionAge = 100 + defaultMaxKeys = 1000 ) // paramsQuerierConfig is the configuration for parameter queriers. It is intended @@ -33,7 +33,7 @@ type ParamsQuerierOptionFn func(*paramsQuerierConfig) func DefaultParamsQuerierConfig() *paramsQuerierConfig { return ¶msQuerierConfig{ cacheOpts: []cache.QueryCacheOptionFn{ - cache.WithHistoricalMode(defaultPruneOlderThan), + cache.WithHistoricalMode(defaultMaxVersionAge), cache.WithMaxKeys(defaultMaxKeys), cache.WithEvictionPolicy(cache.FirstInFirstOut), }, From ae9bd25704e1869373fda2d0979a9f1b0d4d0761 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 16 Jan 2025 11:53:46 +0100 Subject: [PATCH 07/10] wip: stash --- pkg/client/events/query_client.go | 2 +- pkg/client/interface.go | 4 +- pkg/client/query/cache/memory.go | 2 +- pkg/client/query/options.go | 22 +- pkg/client/query/paramsquerier.go | 81 +++++- pkg/client/query/sharedquerier.go | 2 + pkg/crypto/rings/client.go | 2 +- .../integration/client/params_querier_test.go | 261 ++++++++++++++++++ x/proof/types/shared_query_client.go | 4 +- x/shared/keeper/params_query_client.go | 54 ++++ x/shared/types/query_client.go | 36 +++ 11 files changed, 449 insertions(+), 21 deletions(-) create mode 100644 tests/integration/client/params_querier_test.go create mode 100644 x/shared/keeper/params_query_client.go create mode 100644 x/shared/types/query_client.go diff --git a/pkg/client/events/query_client.go b/pkg/client/events/query_client.go index e04731f8e..d64ecd77f 100644 --- a/pkg/client/events/query_client.go +++ b/pkg/client/events/query_client.go @@ -189,7 +189,7 @@ func (eqc *eventsQueryClient) openEventsBytesAndConn( } // Send the event subscription request on the connection. - if err := conn.Send(req); err != nil { + if err = conn.Send(req); err != nil { subscribeErr := ErrEventsSubscribe.Wrapf("%s", err) // assume the connection is bad closeErr := conn.Close() diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 31cea8f75..a95a0d5ed 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -300,8 +300,8 @@ type SessionQueryClient interface { // SharedQueryClient defines an interface that enables the querying of the // on-chain shared module params. type SharedQueryClient interface { - // GetParams queries the chain for the current shared module parameters. - GetParams(ctx context.Context) (*sharedtypes.Params, error) + ParamsQuerier[*sharedtypes.Params] + // GetSessionGracePeriodEndHeight returns the block height at which the grace period // for the session that includes queryHeight elapses. // The grace period is the number of blocks after the session ends during which relays diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index 088f3f5d1..0e67b9439 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -58,7 +58,7 @@ type cacheValueHistory[T any] struct { // NewInMemoryCache creates a new inMemoryCache with the configuration generated // by the given option functions. -func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (*inMemoryCache[T], error) { +func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (client.QueryCache[T], error) { config := DefaultQueryCacheConfig for _, opt := range opts { diff --git a/pkg/client/query/options.go b/pkg/client/query/options.go index 8e5781d19..deafeb606 100644 --- a/pkg/client/query/options.go +++ b/pkg/client/query/options.go @@ -2,6 +2,7 @@ package query import ( "context" + "fmt" sdkerrors "cosmossdk.io/errors" @@ -29,6 +30,17 @@ type paramsQuerierConfig struct { // ParamsQuerierOptionFn is a function which receives a paramsQuerierConfig for configuration. type ParamsQuerierOptionFn func(*paramsQuerierConfig) +// TODO_IN_THIS_COMMIT: godoc... +func (pqc *paramsQuerierConfig) Validate() error { + if pqc.moduleName == "" { + return fmt.Errorf("moduleName must be set") + } + if pqc.moduleParamError == nil { + return fmt.Errorf("moduleParamError must be set") + } + return nil +} + // DefaultParamsQuerierConfig returns the default configuration for parameter queriers func DefaultParamsQuerierConfig() *paramsQuerierConfig { return ¶msQuerierConfig{ @@ -41,7 +53,11 @@ func DefaultParamsQuerierConfig() *paramsQuerierConfig { } // WithModuleInfo sets the module name and param error for the querier. -func WithModuleInfo(ctx context.Context, moduleName string, moduleParamError *sdkerrors.Error) ParamsQuerierOptionFn { +func WithModuleInfo( + ctx context.Context, + moduleName string, + moduleParamError *sdkerrors.Error, +) ParamsQuerierOptionFn { logger := polylog.Ctx(ctx).With( "module_params_querier", moduleName, ) @@ -53,7 +69,9 @@ func WithModuleInfo(ctx context.Context, moduleName string, moduleParamError *sd } // WithQueryCacheOptions is used to configure the params HistoricalQueryCache. -func WithQueryCacheOptions(opts ...cache.QueryCacheOptionFn) ParamsQuerierOptionFn { +func WithQueryCacheOptions( + opts ...cache.QueryCacheOptionFn, +) ParamsQuerierOptionFn { return func(cfg *paramsQuerierConfig) { cfg.cacheOpts = append(cfg.cacheOpts, opts...) } diff --git a/pkg/client/query/paramsquerier.go b/pkg/client/query/paramsquerier.go index c8161e496..bd5369836 100644 --- a/pkg/client/query/paramsquerier.go +++ b/pkg/client/query/paramsquerier.go @@ -2,14 +2,22 @@ package query import ( "context" + "encoding/json" "errors" + "fmt" "cosmossdk.io/depinject" + abcitypes "github.com/cometbft/cometbft/abci/types" cosmostypes "github.com/cosmos/cosmos-sdk/types" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" gogogrpc "github.com/cosmos/gogoproto/grpc" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query/cache" + "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/observable/channel" ) // abstractParamsQuerier is NOT intended to be used for anything except the @@ -32,6 +40,7 @@ type paramsQuerierIface[P cosmostypes.Msg] interface { // concrete query client constructor and the configuration which results from // applying the given options. func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( + ctx context.Context, deps depinject.Config, queryClientConstructor func(conn gogogrpc.ClientConn) Q, opts ...ParamsQuerierOptionFn, @@ -41,25 +50,50 @@ func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( opt(cfg) } - paramsCache, err := cache.NewInMemoryCache[P](cfg.cacheOpts...) - if err != nil { + if err = cfg.Validate(); err != nil { return nil, err } + //paramsCache, err := cache.NewInMemoryCache[P](cfg.cacheOpts...) + //if err != nil { + // return nil, err + //} + querier := &cachedParamsQuerier[P, Q]{ - config: cfg, - paramsCache: paramsCache, + config: cfg, + //paramsCache: paramsCache, } if err = depinject.Inject( deps, &querier.clientConn, + &querier.paramsCache, + &querier.blockClient, ); err != nil { return nil, err } + // Construct the module-specific query client. querier.queryClient = queryClientConstructor(querier.clientConn) + // Construct an events replay client which is notified about txs which were + // signed by the governance module account; this includes all parameter update + // messages for all modules while excluding almost all other events, reducing + // bandwidth utilization. + query := fmt.Sprintf(govAccountTxQueryFmt, authtypes.NewModuleAddress(govtypes.ModuleName)) + querier.eventsReplayClient, err = events.NewEventsReplayClient(ctx, deps, query, tx.UnmarshalTxResult, 1) + if err != nil { + return + } + + // Prime the cache by querying for the current params. + if _, err = querier.GetParams(ctx); err != nil { + return nil, err + } + + // Subscribe to asynchronous events to keep the cache up-to-date. + go querier.goSubscribeToParamUpdates(ctx) + return querier, nil } @@ -68,10 +102,12 @@ func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( // P is a pointer type of the parameters, and Q is the interface type of the // corresponding query client. type cachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]] struct { - clientConn gogogrpc.ClientConn - queryClient Q - paramsCache client.HistoricalQueryCache[P] - config *paramsQuerierConfig + clientConn gogogrpc.ClientConn + queryClient Q + eventsReplayClient client.EventsReplayClient[*abcitypes.TxResult] + blockClient client.BlockClient + paramsCache client.HistoricalQueryCache[P] + config *paramsQuerierConfig } // GetParams returns the latest cached params, if any; otherwise, it queries the @@ -103,11 +139,6 @@ func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { return paramsZero, err } - // Update the cache. - if err = bq.paramsCache.Set("params", params); err != nil { - return paramsZero, err - } - return params, nil } @@ -143,3 +174,27 @@ func (bq *cachedParamsQuerier[P, Q]) GetParamsAtHeight(ctx context.Context, heig // Meanwhile, return current params as fallback. 😬 return bq.GetParams(ctx) } + +// TODO_IN_THIS_COMMIT: godoc... +var govAccountTxQueryFmt = "tm.event='Tx' AND message.sender='%s'" + +// TODO_IN_THIS_COMMIT: godoc... +func (bq *cachedParamsQuerier[P, Q]) goSubscribeToParamUpdates(ctx context.Context) { + govSignedTxResultsObs := bq.eventsReplayClient.EventsSequence(ctx) + channel.ForEach[*abcitypes.TxResult]( + ctx, govSignedTxResultsObs, + func(ctx context.Context, txEvent *abcitypes.TxResult) { + txEventJSON, err := json.MarshalIndent(txEvent, "", " ") + if err != nil { + panic(err) + } + fmt.Printf(">>> event: %s\n", txEventJSON) + }, + ) + + //govSignedTxResultsCh := govSignedTxResultsObs.Subscribe(ctx).Ch() + + //for govSignedTxResult := range govSignedTxResultsCh { + // // Ignore any message that is NOT a param update + //} +} diff --git a/pkg/client/query/sharedquerier.go b/pkg/client/query/sharedquerier.go index 06e0ed90a..c74f6a4f6 100644 --- a/pkg/client/query/sharedquerier.go +++ b/pkg/client/query/sharedquerier.go @@ -16,6 +16,8 @@ var _ client.SharedQueryClient = (*sharedQuerier)(nil) // querying of on-chain shared information through a single exposed method // which returns an sharedtypes.Session struct type sharedQuerier struct { + client.ParamsQuerier[*sharedtypes.Params] + clientConn grpc.ClientConn sharedQuerier sharedtypes.QueryClient blockQuerier client.BlockQueryClient diff --git a/pkg/crypto/rings/client.go b/pkg/crypto/rings/client.go index 8373be876..e2edc5554 100644 --- a/pkg/crypto/rings/client.go +++ b/pkg/crypto/rings/client.go @@ -49,7 +49,7 @@ type ringClient struct { func NewRingClient(deps depinject.Config) (_ crypto.RingClient, err error) { rc := new(ringClient) - if err := depinject.Inject( + if err = depinject.Inject( deps, &rc.logger, &rc.accountQuerier, diff --git a/tests/integration/client/params_querier_test.go b/tests/integration/client/params_querier_test.go new file mode 100644 index 000000000..e913d6033 --- /dev/null +++ b/tests/integration/client/params_querier_test.go @@ -0,0 +1,261 @@ +package client + +import ( + "bytes" + "context" + "strings" + "testing" + "time" + + "cosmossdk.io/depinject" + comethttp "github.com/cometbft/cometbft/rpc/client/http" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/query" + "github.com/pokt-network/poktroll/pkg/client/query/cache" + "github.com/pokt-network/poktroll/pkg/polylog/polyzero" + _ "github.com/pokt-network/poktroll/pkg/polylog/polyzero" + "github.com/pokt-network/poktroll/testutil/e2e" + "github.com/pokt-network/poktroll/testutil/testclient" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +func TestSanity3(t *testing.T) { + ctx := context.Background() + //app := e2e.NewE2EApp(t) + //t.Cleanup(func() { app.Close() }) + + //clientConn := app.QueryHelper() + //require.NotNil(t, clientConn) + clientConn := testclient.NewLocalnetClientCtx(t, testclient.NewLocalnetFlagSet(t)) + + sharedQueryClient := sharedtypes.NewSharedQueryClient(clientConn) + params, err := sharedQueryClient.GetParams(ctx) + require.NoError(t, err) + require.NotNil(t, params) + + eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:26657/websocket") + eventsBzObs, err := eventsQueryClient.EventsBytes(ctx, "tm.event='Tx'") + require.NoError(t, err) + + t.Log("starting goroutine") + + errCh := make(chan error, 1) + go func() { + t.Log("in goroutine") + + eitherEventsBzCh := eventsBzObs.Subscribe(ctx).Ch() + first := true + for eitherEventBz := range eitherEventsBzCh { + eventBz, err := eitherEventBz.ValueOrError() + if err != nil { + errCh <- err + return + } + + if eventBz == nil || first { + first = false + continue + } + + t.Logf(">>> eventsBz: %s", string(eventBz)) + //t.Logf(">>> eventsBz(hex): %x", eventBz) + break + } + + close(errCh) + }() + + t.Log("goroutine started") + + select { + // TODO_IN_THIS_CASE: extract to testTimeoutDuration const. + case <-time.After(15 * time.Second): + t.Log("timeout") + t.Fatalf("timed out waiting for events bytest observable to receive") + case err = <-errCh: + t.Log("done") + require.NoError(t, err) + } +} + +func TestSanity2(t *testing.T) { + ctx := context.Background() + app := e2e.NewE2EApp(t) + t.Cleanup(func() { app.Close() }) + + clientConn, err := app.GetClientConn() + require.NoError(t, err) + require.NotNil(t, clientConn) + + sharedQueryClient := sharedtypes.NewSharedQueryClient(clientConn) + params, err := sharedQueryClient.GetParams(ctx) + require.NoError(t, err) + require.NotNil(t, params) + + eventsQueryClient := events.NewEventsQueryClient(app.GetWSEndpoint()) + eventsBzObs, err := eventsQueryClient.EventsBytes(ctx, "tm.event='Tx'") + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + eitherEventsBzCh := eventsBzObs.Subscribe(ctx).Ch() + for eitherEventBz := range eitherEventsBzCh { + eventBz, err := eitherEventBz.ValueOrError() + if err != nil { + errCh <- err + return + } + + if eventBz == nil { + continue + } + + t.Logf(">>> eventsBz: %s", string(eventBz)) + t.Logf(">>> eventsBz(hex): %x", eventBz) + break + } + + close(errCh) + }() + + //defaultCUTTM := sharedtypes.DefaultParams().ComputeUnitsToTokensMultiplier + expectedCUTTM := uint64(99) + + //sharedParams, err := sharedQueryClient.GetParams(ctx) + //require.NoError(t, err) + //require.Equal(t, defaultCUTTM, sharedParams.ComputeUnitsToTokensMultiplier) + + paramUpdateMsg := &sharedtypes.MsgUpdateParam{ + Authority: authtypes.NewModuleAddress(govtypes.ModuleName).String(), + Name: "compute_units_to_tokens_multiplier", + AsType: &sharedtypes.MsgUpdateParam_AsUint64{AsUint64: expectedCUTTM}, + } + + _, err = app.RunMsg(t, paramUpdateMsg) + require.NoError(t, err) + + time.Sleep(3 * time.Second) + + select { + // TODO_IN_THIS_CASE: extract to testTimeoutDuration const. + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for events bytest observable to receive") + case err = <-errCh: + require.NoError(t, err) + } +} + +// TODO_IN_THIS_COMMIT: godoc... +type MockParamsCache[P any] struct { + mock.Mock +} + +func (m *MockParamsCache[P]) GetLatestVersion(key string) (P, error) { + args := m.Called(key) + return args.Get(0).(P), args.Error(1) +} + +func (m *MockParamsCache[P]) GetVersion(key string, version int64) (P, error) { + args := m.Called(key, version) + return args.Get(0).(P), args.Error(1) +} + +func (m *MockParamsCache[P]) SetVersion(key string, value P, version int64) error { + args := m.Called(key, value, version) + return args.Error(0) +} + +func TestSanity(t *testing.T) { + ctx := context.Background() + app := e2e.NewE2EApp(t) + t.Cleanup(func() { app.Close() }) + + clientConn, err := app.GetClientConn() + require.NoError(t, err) + require.NotNil(t, clientConn) + + eventsQueryClient := events.NewEventsQueryClient(app.GetWSEndpoint()) + require.NotNil(t, eventsQueryClient) + + // TODO_IN_THIS_COMMIT: add E2EApp#GetGRPCEndpoint() method. + blockQueryClient, err := comethttp.New("tcp://127.0.0.1:42070", "/websocket") + require.NoError(t, err) + + deps := depinject.Supply( + eventsQueryClient, + blockQueryClient, + ) + blockClient, err := block.NewBlockClient(ctx, deps) + require.NoError(t, err) + + logBuffer := new(bytes.Buffer) + logger := polyzero.NewLogger( + polyzero.WithLevel(polyzero.DebugLevel), + polyzero.WithOutput(logBuffer), + ) + + paramsCache, err := cache.NewHistoricalInMemoryCache[*sharedtypes.Params]() + require.NoError(t, err) + + // TODO_IN_THIS_COMMIT: replace polylog.Ctx with logger arg... + ctx = logger.WithContext(ctx) + deps = depinject.Configs( + deps, + depinject.Supply( + logger, + clientConn, + //new(MockParamsCache[*sharedtypes.Params]), + paramsCache, + blockClient, + ), + ) + + moduleInfoOpt := query.WithModuleInfo(ctx, sharedtypes.ModuleName, sharedtypes.ErrSharedParamInvalid) + paramsQuerier, err := query.NewCachedParamsQuerier[*sharedtypes.Params, sharedtypes.SharedQueryClient]( + ctx, deps, + sharedtypes.NewSharedQueryClient, + moduleInfoOpt, + ) + + require.NoError(t, err) + + defaultCUTTM := sharedtypes.DefaultParams().ComputeUnitsToTokensMultiplier + expectedCUTTM := uint64(99) + + sharedParams, err := paramsQuerier.GetParams(ctx) + require.NoError(t, err) + require.Equal(t, defaultCUTTM, sharedParams.ComputeUnitsToTokensMultiplier) + + paramUpdateMsg := &sharedtypes.MsgUpdateParam{ + Authority: authtypes.NewModuleAddress(govtypes.ModuleName).String(), + Name: "compute_units_to_tokens_multiplier", + AsType: &sharedtypes.MsgUpdateParam_AsUint64{AsUint64: expectedCUTTM}, + } + + res, err := app.RunMsg(t, paramUpdateMsg) + require.NoError(t, err) + + t.Logf("res: %+v", res) + + sharedParams, err = paramsQuerier.GetParams(ctx) + require.NoError(t, err) + require.Equal(t, int64(expectedCUTTM), int64(sharedParams.ComputeUnitsToTokensMultiplier)) + + // Wait a tick to ensure the events query client observed the param update tx result. + time.Sleep(100 * time.Millisecond) + + // TODO_IN_THIS_COMMIT: find a better way to assert that the cache was updated... + // Consider mocking the cache implementation... + t.Logf("\n%s", logBuffer.String()) + logLines := strings.Split(strings.Trim(logBuffer.String(), "\n"), "\n") + require.Equal(t, 3, len(logLines)) + require.Contains(t, logLines[0], "cache miss") + require.Contains(t, logLines[1], "cache hit") + require.Contains(t, logLines[2], "cache hit") +} diff --git a/x/proof/types/shared_query_client.go b/x/proof/types/shared_query_client.go index 574735e7e..94b221f20 100644 --- a/x/proof/types/shared_query_client.go +++ b/x/proof/types/shared_query_client.go @@ -7,12 +7,14 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) -var _ client.SharedQueryClient = (*SharedKeeperQueryClient)(nil) +//var _ client.SharedQueryClient = (*SharedKeeperQueryClient)(nil) // SharedKeeperQueryClient is a thin wrapper around the SharedKeeper. // It does not rely on the QueryClient, and therefore does not make any // network requests as in the off-chain implementation. type SharedKeeperQueryClient struct { + client.ParamsQuerier[*sharedtypes.Params] + sharedKeeper SharedKeeper sessionKeeper SessionKeeper } diff --git a/x/shared/keeper/params_query_client.go b/x/shared/keeper/params_query_client.go new file mode 100644 index 000000000..311844f8c --- /dev/null +++ b/x/shared/keeper/params_query_client.go @@ -0,0 +1,54 @@ +package keeper + +import ( + "context" + "fmt" + + "github.com/pokt-network/poktroll/pkg/client" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +var _ client.ParamsQuerier[*sharedtypes.Params] = (*keeperParamsQuerier[sharedtypes.Params, Keeper])(nil) + +// DEV_NOTE: Can't use cosmostypes.Msg instead of any because P +// would be a pointer but GetParams() returns a value. 🙄 +type paramsKeeperIface[P any] interface { + GetParams(context.Context) P +} + +// keeperParamsQuerier provides a base implementation of ParamsQuerier for keeper-based clients +type keeperParamsQuerier[P any, K paramsKeeperIface[P]] struct { + keeper K +} + +// NewKeeperParamsQuerier creates a new keeperParamsQuerier instance +func NewKeeperParamsQuerier[P any, K paramsKeeperIface[P]]( + keeper K, +) (*keeperParamsQuerier[P, K], error) { + return &keeperParamsQuerier[P, K]{ + keeper: keeper, + }, nil +} + +// GetParams retrieves current parameters from the keeper +func (kpq *keeperParamsQuerier[P, K]) GetParams(ctx context.Context) (*P, error) { + params := kpq.keeper.GetParams(ctx) + return ¶ms, nil +} + +// GetParamsAtHeight retrieves parameters as they were at a specific height +// +// TODO_MAINNET(@bryanchriswhite, #931): Integrate with indexer module/mixin once available. +// Currently, this method is (and MUST) NEVER called on-chain and only exists to satisfy the +// client.ParamsQuerier interface. However, it will be needed as part of #931 to support +// querying for params at historical heights, so it's short-circuited for now to always +// return an error. +func (kpq *keeperParamsQuerier[P, K]) GetParamsAtHeight(_ context.Context, _ int64) (*P, error) { + return nil, fmt.Errorf("TODO(#931): Support on-chain historical queries") +} + +// TODO_IN_THIS_COMMIT: godoc... +func (kpq *keeperParamsQuerier[P, K]) SetParamsAtHeight(_ context.Context, height int64, params *P) error { + // TODO_IN_THIS_COMMIT: this will be called on-chain once we have on-chain historical data but it will reference the historical keeper/mix-in method(s). + return fmt.Errorf("TODO(#931): Support on-chain historical caching") +} diff --git a/x/shared/types/query_client.go b/x/shared/types/query_client.go new file mode 100644 index 000000000..9f3841ca7 --- /dev/null +++ b/x/shared/types/query_client.go @@ -0,0 +1,36 @@ +package types + +import ( + "context" + + gogogrpc "github.com/cosmos/gogoproto/grpc" +) + +var _ SharedQueryClient = (*queryClient)(nil) + +// SharedQueryClient is an interface which adapts generated (concrete) shared query client +// to paramsQuerierIface (see: pkg/client/query/paramsquerier.go) such that implementors +// (i.e. the generated shared query client) is compliant with client.ParamsQuerier for the +// shared module's params type. This is required to resolve generic type constraints. +type SharedQueryClient interface { + QueryClient + GetParams(context.Context) (*Params, error) +} + +// NewSharedQueryClient is a wrapper for the shared query client constructor which +// returns a new shared query client as a SharedQueryClient interface type. +func NewSharedQueryClient(conn gogogrpc.ClientConn) SharedQueryClient { + return NewQueryClient(conn).(SharedQueryClient) +} + +// GetParams returns the shared module's params as a pointer, which is critical to +// resolve related generic type constraints between client.ParamsQuerier and it's usages. +func (c *queryClient) GetParams(ctx context.Context) (*Params, error) { + res, err := c.Params(ctx, &QueryParamsRequest{}) + if err != nil { + return nil, err + } + + params := res.GetParams() + return ¶ms, nil +} From e715ab252bfbf52cf128f27a1c2aeee8032c4a67 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 16 Jan 2025 11:54:13 +0100 Subject: [PATCH 08/10] wip: failing for the right reason (no event-driven cache warming)! --- pkg/client/interface.go | 15 ++- pkg/client/query/cache/config.go | 14 ++- pkg/client/query/cache/memory.go | 41 ++++++-- pkg/client/query/cache/memory_test.go | 133 +++++++++++--------------- pkg/client/query/options.go | 2 +- pkg/client/query/paramsquerier.go | 23 ++++- 6 files changed, 132 insertions(+), 96 deletions(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index a95a0d5ed..294a0751c 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -375,11 +375,12 @@ type QueryCache[T any] interface { // HistoricalQueryCache extends QueryCache to support getting and setting values // at multiple heights for a given key. type HistoricalQueryCache[T any] interface { - QueryCache[T] - // GetAsOfVersion retrieves the nearest value <= the specified version number. - GetAsOfVersion(key string, version int64) (T, error) - // SetAsOfVersion adds or updates a value at a specific version number. - SetAsOfVersion(key string, value T, version int64) error + // GetLatestVersion retrieves the historical value with the highest version number. + GetLatestVersion(key string) (T, error) + // GetVersion retrieves the nearest value <= the specified version number. + GetVersion(key string, version int64) (T, error) + // SetVersion adds or updates a value at a specific version number. + SetVersion(key string, value T, version int64) error } // ParamsQuerier represents a generic querier for module parameters. @@ -392,4 +393,8 @@ type ParamsQuerier[P cosmostypes.Msg] interface { // GetParamsAtHeight returns the parameters as they were at the specified // height, where P is the params type of a given module (e.g. sharedtypes.Params). GetParamsAtHeight(ctx context.Context, height int64) (P, error) + + // SetParamsAtHeight sets the parameters at the specified height, where + // P is the params type of a given module (e.g. sharedtypes.Params). + SetParamsAtHeight(ctx context.Context, height int64, params P) error } diff --git a/pkg/client/query/cache/config.go b/pkg/client/query/cache/config.go index 77fae86b3..599a5d857 100644 --- a/pkg/client/query/cache/config.go +++ b/pkg/client/query/cache/config.go @@ -13,6 +13,12 @@ const ( LeastFrequentlyUsed ) +var EvictionPolicies = map[EvictionPolicy]string{ + FirstInFirstOut: "FirstInFirstOut", + LeastRecentlyUsed: "LeastRecentlyUsed", + LeastFrequentlyUsed: "LeastFrequentlyUsed", +} + // queryCacheConfig is the configuration for query caches. // It is intended to be configured via QueryCacheOptionFn functions. type queryCacheConfig struct { @@ -51,7 +57,7 @@ func (cfg *queryCacheConfig) Validate() error { case FirstInFirstOut: // TODO_IMPROVE: support LeastRecentlyUsed and LeastFrequentlyUsed policies. default: - return ErrQueryCacheConfigValidation.Wrapf("eviction policy %d not imlemented", cfg.evictionPolicy) + return ErrQueryCacheConfigValidation.Wrapf("eviction policy %s not imlemented", EvictionPolicies[cfg.evictionPolicy]) } if cfg.maxVersionAge > 0 && !cfg.historical { @@ -65,11 +71,11 @@ func (cfg *queryCacheConfig) Validate() error { return nil } -// WithHistoricalMode enables historical caching with the given maxVersionAge +// TODO_IN_THIS_COMMIT: update comment... +// WithMaxVersionAge enables historical caching with the given maxVersionAge // configuration; if 0, no historical pruning is performed. -func WithHistoricalMode(maxVersionAge int64) QueryCacheOptionFn { +func WithMaxVersionAge(maxVersionAge int64) QueryCacheOptionFn { return func(cfg *queryCacheConfig) { - cfg.historical = true cfg.maxVersionAge = maxVersionAge } } diff --git a/pkg/client/query/cache/memory.go b/pkg/client/query/cache/memory.go index 0e67b9439..3b34ce5cf 100644 --- a/pkg/client/query/cache/memory.go +++ b/pkg/client/query/cache/memory.go @@ -76,13 +76,33 @@ func NewInMemoryCache[T any](opts ...QueryCacheOptionFn) (client.QueryCache[T], }, nil } +// TODO_IN_THIS_COMMIT: godoc... +func NewHistoricalInMemoryCache[T any](opts ...QueryCacheOptionFn) (client.HistoricalQueryCache[T], error) { + config := DefaultQueryCacheConfig + config.historical = true + + for _, opt := range opts { + opt(&config) + } + + if err := config.Validate(); err != nil { + return nil, err + } + + return &inMemoryCache[T]{ + values: make(map[string]cacheValue[T]), + valueHistories: make(map[string]cacheValueHistory[T]), + config: config, + }, nil +} + // Get retrieves the value from the cache with the given key. If the cache is // configured for historical mode, it will return the value at the latest **known** -// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// version, which is only updated on calls to SetVersion, and therefore is not // guaranteed to be the current version w.r.t the blockchain. func (c *inMemoryCache[T]) Get(key string) (T, error) { if c.config.historical { - return c.GetAsOfVersion(key, c.latestVersion.Load()) + return c.GetVersion(key, c.latestVersion.Load()) } c.valuesMu.RLock() @@ -109,11 +129,16 @@ func (c *inMemoryCache[T]) Get(key string) (T, error) { return cachedValue.value, nil } -// GetAsOfVersion retrieves the value from the cache with the given key, as of the +// TODO_IN_THIS_COMMIT: godoc... +func (c *inMemoryCache[T]) GetLatestVersion(key string) (T, error) { + return c.GetVersion(key, c.latestVersion.Load()) +} + +// GetVersion retrieves the value from the cache with the given key, as of the // given version. If a value is not found for that version, the value at the nearest // previous version is returned. If the cache is not configured for historical mode, // it returns an error. -func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) { +func (c *inMemoryCache[T]) GetVersion(key string, version int64) (T, error) { var zero T if !c.config.historical { @@ -155,7 +180,7 @@ func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) if isTTLEnabled && isCacheValueExpired { // DEV_NOTE: Intentionally not pruning here to improve concurrent speed; // otherwise, the read lock would be insufficient. The value will be pruned - // in the subsequent call to SetAsOfVersion() after c.config.maxVersionAge + // in the subsequent call to SetVersion() after c.config.maxVersionAge // blocks have elapsed. If usage is such that historical values aren't being // subsequently set, numHistoricalBlocks (if configured) will eventually // cause the pruning of historical values with expired TTLs. @@ -167,7 +192,7 @@ func (c *inMemoryCache[T]) GetAsOfVersion(key string, version int64) (T, error) // Set adds or updates the value in the cache for the given key. If the cache is // configured for historical mode, it will store the value at the latest **known** -// version, which is only updated on calls to SetAsOfVersion, and therefore is not +// version, which is only updated on calls to SetVersion, and therefore is not // guaranteed to be the current version w.r.t. the blockchain. func (c *inMemoryCache[T]) Set(key string, value T) error { if c.config.historical { @@ -193,10 +218,10 @@ func (c *inMemoryCache[T]) Set(key string, value T) error { return nil } -// SetAsOfVersion adds or updates the historical value in the cache for the given key, +// SetVersion adds or updates the historical value in the cache for the given key, // and at the version number. If the cache is not configured for historical mode, it // returns an error. -func (c *inMemoryCache[T]) SetAsOfVersion(key string, value T, version int64) error { +func (c *inMemoryCache[T]) SetVersion(key string, value T, version int64) error { if !c.config.historical { return ErrHistoricalModeNotEnabled } diff --git a/pkg/client/query/cache/memory_test.go b/pkg/client/query/cache/memory_test.go index ac5a742ad..896ef9d01 100644 --- a/pkg/client/query/cache/memory_test.go +++ b/pkg/client/query/cache/memory_test.go @@ -2,6 +2,7 @@ package cache import ( "context" + "fmt" "sync" "testing" "time" @@ -9,8 +10,31 @@ import ( "github.com/stretchr/testify/require" ) -// TestInMemoryCache_NonHistorical tests the basic cache functionality without historical mode -func TestInMemoryCache_NonHistorical(t *testing.T) { +func TestInMemoryCacheConfigValidation(t *testing.T) { + t.Run("non-historical and maxVersionAge > 0", func(t *testing.T) { + _, err := NewInMemoryCache[string]( + WithMaxVersionAge(100), + ) + require.ErrorContains(t, err, "maxVersionAge > 0 requires historical mode to be enabled") + }) + + t.Run("historical and maxVersionAge < 0", func(t *testing.T) { + _, err := NewHistoricalInMemoryCache[string]( + WithMaxVersionAge(-1), + ) + require.ErrorContains(t, err, "maxVersionAge MUST be >= 0") + }) + + t.Run("unsupported eviction policy", func(t *testing.T) { + _, err := NewInMemoryCache[string]( + WithEvictionPolicy(LeastFrequentlyUsed), + ) + require.ErrorContains(t, err, fmt.Sprintf("eviction policy %s not imlemented", EvictionPolicies[LeastFrequentlyUsed])) + }) +} + +// TestInMemoryCache tests basic functionality in non-historical mode +func TestInMemoryCache(t *testing.T) { t.Run("basic operations", func(t *testing.T) { cache, err := NewInMemoryCache[string]() require.NoError(t, err) @@ -93,60 +117,60 @@ func TestInMemoryCache_NonHistorical(t *testing.T) { }) } -// TestInMemoryCache_Historical tests the historical mode functionality +// TestHistoricalInMemoryCache tests the historical mode functionality func TestInMemoryCache_Historical(t *testing.T) { t.Run("basic historical operations", func(t *testing.T) { - cache, err := NewInMemoryCache[string]( - WithHistoricalMode(100), + cache, err := NewHistoricalInMemoryCache[string]( + WithMaxVersionAge(100), ) require.NoError(t, err) - // Test SetAsOfVersion and GetAsOfVersion - err = cache.SetAsOfVersion("key", "value1", 10) + // Test SetVersion and GetVersion + err = cache.SetVersion("key", "value1", 10) require.NoError(t, err) - err = cache.SetAsOfVersion("key", "value2", 20) + err = cache.SetVersion("key", "value2", 20) require.NoError(t, err) // Test getting exact versions - val, err := cache.GetAsOfVersion("key", 10) + val, err := cache.GetVersion("key", 10) require.NoError(t, err) require.Equal(t, "value1", val) - val, err = cache.GetAsOfVersion("key", 20) + val, err = cache.GetVersion("key", 20) require.NoError(t, err) require.Equal(t, "value2", val) // Test getting intermediate version (should return nearest lower version) - val, err = cache.GetAsOfVersion("key", 15) + val, err = cache.GetVersion("key", 15) require.NoError(t, err) require.Equal(t, "value1", val) // Test getting version before first entry - _, err = cache.GetAsOfVersion("key", 5) + _, err = cache.GetVersion("key", 5) require.ErrorIs(t, err, ErrCacheMiss) // Test getting version after last entry - val, err = cache.GetAsOfVersion("key", 25) + val, err = cache.GetVersion("key", 25) require.NoError(t, err) require.Equal(t, "value2", val) // Test getting a version for a key that isn't cached - _, err = cache.GetAsOfVersion("key2", 20) + _, err = cache.GetVersion("key2", 20) require.ErrorIs(t, err, ErrCacheMiss) }) t.Run("historical TTL expiration", func(t *testing.T) { - cache, err := NewInMemoryCache[string]( - WithHistoricalMode(100), + cache, err := NewHistoricalInMemoryCache[string]( + WithMaxVersionAge(100), WithTTL(100*time.Millisecond), ) require.NoError(t, err) - err = cache.SetAsOfVersion("key", "value1", 10) + err = cache.SetVersion("key", "value1", 10) require.NoError(t, err) // Value should be available immediately - val, err := cache.GetAsOfVersion("key", 10) + val, err := cache.GetVersion("key", 10) require.NoError(t, err) require.Equal(t, "value1", val) @@ -154,90 +178,47 @@ func TestInMemoryCache_Historical(t *testing.T) { time.Sleep(150 * time.Millisecond) // Value should now be expired - _, err = cache.GetAsOfVersion("key", 10) + _, err = cache.GetVersion("key", 10) require.ErrorIs(t, err, ErrCacheMiss) }) t.Run("pruning old versions", func(t *testing.T) { - cache, err := NewInMemoryCache[string]( - WithHistoricalMode(10), // Prune entries older than 10 blocks + cache, err := NewHistoricalInMemoryCache[string]( + WithMaxVersionAge(10), // Prune entries older than 10 blocks ) require.NoError(t, err) // Add entries at different versions - err = cache.SetAsOfVersion("key", "value1", 10) + err = cache.SetVersion("key", "value1", 10) require.NoError(t, err) - err = cache.SetAsOfVersion("key", "value2", 20) + err = cache.SetVersion("key", "value2", 20) require.NoError(t, err) - err = cache.SetAsOfVersion("key", "value3", 30) + err = cache.SetVersion("key", "value3", 30) require.NoError(t, err) // Add a new entry that should trigger pruning - err = cache.SetAsOfVersion("key", "value4", 40) + err = cache.SetVersion("key", "value4", 40) require.NoError(t, err) // Entries more than 10 blocks old should be pruned - _, err = cache.GetAsOfVersion("key", 10) + _, err = cache.GetVersion("key", 10) require.ErrorIs(t, err, ErrCacheMiss) - _, err = cache.GetAsOfVersion("key", 20) + _, err = cache.GetVersion("key", 20) require.ErrorIs(t, err, ErrCacheMiss) // Recent entries should still be available - val, err := cache.GetAsOfVersion("key", 30) + val, err := cache.GetVersion("key", 30) require.NoError(t, err) require.Equal(t, "value3", val) - val, err = cache.GetAsOfVersion("key", 40) + val, err = cache.GetVersion("key", 40) require.NoError(t, err) require.Equal(t, "value4", val) }) - - t.Run("non-historical operations on historical cache", func(t *testing.T) { - cache, err := NewInMemoryCache[string]( - WithHistoricalMode(100), - ) - require.NoError(t, err) - - // Set some historical values - err = cache.SetAsOfVersion("key", "value1", 10) - require.NoError(t, err) - err = cache.SetAsOfVersion("key", "value2", 20) - require.NoError(t, err) - - // Regular Set should work with latest version - err = cache.Set("key", "value3") - require.ErrorIs(t, err, ErrUnsupportedHistoricalModeOp) - - // Regular Get should return the latest value - val, err := cache.Get("key") - require.NoError(t, err) - require.Equal(t, "value2", val) - - // Delete should remove all historical values - cache.Delete("key") - _, err = cache.GetAsOfVersion("key", 10) - require.ErrorIs(t, err, ErrCacheMiss) - _, err = cache.GetAsOfVersion("key", 20) - require.ErrorIs(t, err, ErrCacheMiss) - _, err = cache.Get("key") - require.ErrorIs(t, err, ErrCacheMiss) - }) } // TestInMemoryCache_ErrorCases tests various error conditions func TestInMemoryCache_ErrorCases(t *testing.T) { - t.Run("historical operations on non-historical cache", func(t *testing.T) { - cache, err := NewInMemoryCache[string]() - require.NoError(t, err) - - // Attempting historical operations should return error - err = cache.SetAsOfVersion("key", "value", 10) - require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) - - _, err = cache.GetAsOfVersion("key", 10) - require.ErrorIs(t, err, ErrHistoricalModeNotEnabled) - }) - t.Run("zero values", func(t *testing.T) { cache, err := NewInMemoryCache[string]() require.NoError(t, err) @@ -307,8 +288,8 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { }) t.Run("concurrent access historical", func(t *testing.T) { - cache, err := NewInMemoryCache[int]( - WithHistoricalMode(100), + cache, err := NewHistoricalInMemoryCache[int]( + WithMaxVersionAge(100), ) require.NoError(t, err) @@ -331,9 +312,9 @@ func TestInMemoryCache_ConcurrentAccess(t *testing.T) { return default: key := "key" - err := cache.SetAsOfVersion(key, j, int64(j)) + err := cache.SetVersion(key, j, int64(j)) require.NoError(t, err) - _, _ = cache.GetAsOfVersion(key, int64(j)) + _, _ = cache.GetVersion(key, int64(j)) } } }() diff --git a/pkg/client/query/options.go b/pkg/client/query/options.go index deafeb606..03547df6c 100644 --- a/pkg/client/query/options.go +++ b/pkg/client/query/options.go @@ -45,7 +45,7 @@ func (pqc *paramsQuerierConfig) Validate() error { func DefaultParamsQuerierConfig() *paramsQuerierConfig { return ¶msQuerierConfig{ cacheOpts: []cache.QueryCacheOptionFn{ - cache.WithHistoricalMode(defaultMaxVersionAge), + cache.WithMaxVersionAge(defaultMaxVersionAge), cache.WithMaxKeys(defaultMaxKeys), cache.WithEvictionPolicy(cache.FirstInFirstOut), }, diff --git a/pkg/client/query/paramsquerier.go b/pkg/client/query/paramsquerier.go index bd5369836..42a7431a3 100644 --- a/pkg/client/query/paramsquerier.go +++ b/pkg/client/query/paramsquerier.go @@ -39,6 +39,9 @@ type paramsQuerierIface[P cosmostypes.Msg] interface { // NewCachedParamsQuerier creates a new, generic, params querier with the given // concrete query client constructor and the configuration which results from // applying the given options. +// - gogogrpc.ClientConn +// - client.HistoricalQueryCache[P] +// - client.BlockQueryClient func NewCachedParamsQuerier[P cosmostypes.Msg, Q paramsQuerierIface[P]]( ctx context.Context, deps depinject.Config, @@ -119,7 +122,7 @@ func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { // Check the cache first. var paramsZero P - cached, err := bq.paramsCache.Get("params") + cached, err := bq.paramsCache.GetLatestVersion("params") switch { case err == nil: logger.Debug().Msgf("params cache hit") @@ -139,6 +142,12 @@ func (bq *cachedParamsQuerier[P, Q]) GetParams(ctx context.Context) (P, error) { return paramsZero, err } + // Set the params at the current height in the cache. + currentHeight := bq.blockClient.LastBlock(ctx).Height() + if err = bq.SetParamsAtHeight(ctx, currentHeight, params); err != nil { + return paramsZero, err + } + return params, nil } @@ -156,7 +165,7 @@ func (bq *cachedParamsQuerier[P, Q]) GetParamsAtHeight(ctx context.Context, heig ) // Try to get from cache at specific height - cached, err := bq.paramsCache.GetAsOfVersion("params", height) + cached, err := bq.paramsCache.GetVersion("params", height) switch { case err == nil: logger.Debug().Msg("params cache hit") @@ -175,6 +184,16 @@ func (bq *cachedParamsQuerier[P, Q]) GetParamsAtHeight(ctx context.Context, heig return bq.GetParams(ctx) } +// TODO_IN_THIS_COMMIT: godoc... +func (bq *cachedParamsQuerier[P, Q]) SetParamsAtHeight( + _ context.Context, + height int64, + params P, +) error { + // TODO_IN_THIS_COMMIT: extract "params" to a constant & reference in other usages. + return bq.paramsCache.SetVersion("params", params, height) +} + // TODO_IN_THIS_COMMIT: godoc... var govAccountTxQueryFmt = "tm.event='Tx' AND message.sender='%s'" From a2ee4141b81d4d7ad81e14062008372192ce473b Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 16 Jan 2025 14:54:24 +0100 Subject: [PATCH 09/10] wip --- pkg/client/interface.go | 1 - .../integration/client/params_querier_test.go | 94 ++++++++++++++----- testutil/integration/app.go | 13 ++- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 294a0751c..8990fb622 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -393,7 +393,6 @@ type ParamsQuerier[P cosmostypes.Msg] interface { // GetParamsAtHeight returns the parameters as they were at the specified // height, where P is the params type of a given module (e.g. sharedtypes.Params). GetParamsAtHeight(ctx context.Context, height int64) (P, error) - // SetParamsAtHeight sets the parameters at the specified height, where // P is the params type of a given module (e.g. sharedtypes.Params). SetParamsAtHeight(ctx context.Context, height int64, params P) error diff --git a/tests/integration/client/params_querier_test.go b/tests/integration/client/params_querier_test.go index e913d6033..2ea7acb89 100644 --- a/tests/integration/client/params_querier_test.go +++ b/tests/integration/client/params_querier_test.go @@ -9,18 +9,26 @@ import ( "cosmossdk.io/depinject" comethttp "github.com/cometbft/cometbft/rpc/client/http" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cosmostypes "github.com/cosmos/cosmos-sdk/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/app/volatile" "github.com/pokt-network/poktroll/pkg/client/block" "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" "github.com/pokt-network/poktroll/pkg/client/query/cache" + "github.com/pokt-network/poktroll/pkg/client/tx" + txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" "github.com/pokt-network/poktroll/pkg/polylog/polyzero" _ "github.com/pokt-network/poktroll/pkg/polylog/polyzero" "github.com/pokt-network/poktroll/testutil/e2e" + "github.com/pokt-network/poktroll/testutil/integration" "github.com/pokt-network/poktroll/testutil/testclient" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) @@ -151,31 +159,23 @@ func TestSanity2(t *testing.T) { } } -// TODO_IN_THIS_COMMIT: godoc... -type MockParamsCache[P any] struct { - mock.Mock -} - -func (m *MockParamsCache[P]) GetLatestVersion(key string) (P, error) { - args := m.Called(key) - return args.Get(0).(P), args.Error(1) -} - -func (m *MockParamsCache[P]) GetVersion(key string, version int64) (P, error) { - args := m.Called(key, version) - return args.Get(0).(P), args.Error(1) -} - -func (m *MockParamsCache[P]) SetVersion(key string, value P, version int64) error { - args := m.Called(key, value, version) - return args.Error(0) -} - func TestSanity(t *testing.T) { ctx := context.Background() - app := e2e.NewE2EApp(t) + app := e2e.NewE2EApp(t, integration.WithAuthorityAddress("pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw")) t.Cleanup(func() { app.Close() }) + keyRing := keyring.NewInMemory(app.GetCodec()) + rec, err := keyRing.NewAccount( + "pnf", + "crumble shrimp south strategy speed kick green topic stool seminar track stand rhythm almost bubble pet knock steel pull flag weekend country major blade", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + pnfAddr, err := rec.GetAddress() + require.NoError(t, err) + clientConn, err := app.GetClientConn() require.NoError(t, err) require.NotNil(t, clientConn) @@ -194,6 +194,14 @@ func TestSanity(t *testing.T) { blockClient, err := block.NewBlockClient(ctx, deps) require.NoError(t, err) + // Fund gateway2 account. + _, err = app.RunMsg(t, &banktypes.MsgSend{ + FromAddress: app.GetFaucetBech32(), + ToAddress: pnfAddr.String(), + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), + }) + require.NoError(t, err) + logBuffer := new(bytes.Buffer) logger := polyzero.NewLogger( polyzero.WithLevel(polyzero.DebugLevel), @@ -210,7 +218,6 @@ func TestSanity(t *testing.T) { depinject.Supply( logger, clientConn, - //new(MockParamsCache[*sharedtypes.Params]), paramsCache, blockClient, ), @@ -233,15 +240,50 @@ func TestSanity(t *testing.T) { require.Equal(t, defaultCUTTM, sharedParams.ComputeUnitsToTokensMultiplier) paramUpdateMsg := &sharedtypes.MsgUpdateParam{ - Authority: authtypes.NewModuleAddress(govtypes.ModuleName).String(), + //Authority: authtypes.NewModuleAddress(govtypes.ModuleName).String(), + Authority: pnfAddr.String(), Name: "compute_units_to_tokens_multiplier", AsType: &sharedtypes.MsgUpdateParam_AsUint64{AsUint64: expectedCUTTM}, } - res, err := app.RunMsg(t, paramUpdateMsg) + // TODO_IN_THIS_COMMIT: investigate why app.RunMsg doesn't seem to update the state in the subsequent block... + //res, err := app.RunMsg(t, paramUpdateMsg) + //require.NoError(t, err) + + //gprcClientConn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) + //require.NoError(t, err) + + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + // DEV_NOTE: DO NOT use the clientCtx as a grpc.ClientConn as it bypasses E2EApp integrations. + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + + txContext, err := tx.NewTxContext(deps) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txContext)) + txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("pnf")) + require.NoError(t, err) + + eitherErr := txClient.SignAndBroadcast(app.GetSdkCtx(), paramUpdateMsg) + err, errCh := eitherErr.SyncOrAsyncError() require.NoError(t, err) - t.Logf("res: %+v", res) + select { + // TODO_IN_THIS_COMMIT: ... + case <-time.After(time.Second): + t.Fatal("timeout waiting for tx to be committed") + case err = <-errCh: + if err != nil { + t.Fatal(err) + } + } + + //t.Logf("res: %+v", res) sharedParams, err = paramsQuerier.GetParams(ctx) require.NoError(t, err) diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 715c0135b..66fe6b4ba 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -238,10 +238,20 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap cfg := &IntegrationAppConfig{ TokenLogicModules: tlm.NewDefaultTokenLogicModules(), } + + // Set the default authority address; it may be overridden by config options. + authority := authtypes.NewModuleAddress(govtypes.ModuleName) + for _, opt := range opts { opt(cfg) } + var err error + if cfg.authorityAddress != "" { + authority, err = sdk.AccAddressFromBech32(cfg.authorityAddress) + require.NoError(t, err) + } + // Prepare & register the codec for all the interfaces sdkCfg := cosmostypes.GetConfig() addrCodec := addresscodec.NewBech32Codec(sdkCfg.GetBech32AccountAddrPrefix()) @@ -310,9 +320,6 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap Height: 1, }) - // Get the authority address - authority := authtypes.NewModuleAddress(govtypes.ModuleName) - // Prepare the account keeper dependencies macPerms := map[string][]string{ banktypes.ModuleName: {authtypes.Minter, authtypes.Burner}, From 69dd853eab7993ae2801c6b9fe9d0674f1177aaa Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 17 Jan 2025 14:16:28 +0100 Subject: [PATCH 10/10] cherry-pick: wip E2EApp testutil --- testutil/e2e/app.go | 164 ++++++++++++++++++++ testutil/e2e/app_test.go | 110 +++++++++++++ testutil/e2e/comet.go | 266 ++++++++++++++++++++++++++++++++ testutil/e2e/grpc_server.go | 1 + testutil/e2e/ws_server.go | 243 +++++++++++++++++++++++++++++ testutil/integration/app.go | 148 ++++++++++-------- testutil/integration/options.go | 18 +++ testutil/testclient/localnet.go | 17 +- 8 files changed, 892 insertions(+), 75 deletions(-) create mode 100644 testutil/e2e/app.go create mode 100644 testutil/e2e/app_test.go create mode 100644 testutil/e2e/comet.go create mode 100644 testutil/e2e/grpc_server.go create mode 100644 testutil/e2e/ws_server.go diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go new file mode 100644 index 000000000..65471f3c4 --- /dev/null +++ b/testutil/e2e/app.go @@ -0,0 +1,164 @@ +package e2e + +import ( + "context" + "errors" + "net" + "net/http" + "sync" + "testing" + + comettypes "github.com/cometbft/cometbft/types" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cosmos/cosmos-sdk/types/module" + "github.com/gorilla/websocket" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + + "github.com/pokt-network/poktroll/testutil/integration" + "github.com/pokt-network/poktroll/testutil/testclient" +) + +// E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing +type E2EApp struct { + *integration.App + grpcServer *grpc.Server + grpcListener net.Listener + wsServer *http.Server + wsListener net.Listener + wsUpgrader websocket.Upgrader + wsConnMutex sync.RWMutex + wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries + resultEventChan chan *coretypes.ResultEvent +} + +// NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers +func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { + t.Helper() + ctx := context.Background() + + // Initialize and start gRPC server + creds := insecure.NewCredentials() + grpcServer := grpc.NewServer(grpc.Creds(creds)) + mux := runtime.NewServeMux() + + rootPattern, err := runtime.NewPattern( + 1, + []int{int(utilities.OpLitPush), int(utilities.OpNop)}, + []string{""}, + "", + ) + require.NoError(t, err) + + // Create the integration app + opts = append(opts, integration.WithGRPCServer(grpcServer)) + app := integration.NewCompleteIntegrationApp(t, opts...) + app.RegisterGRPCServer(grpcServer) + + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + keyRing := keyring.NewInMemory(app.GetCodec()) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + // Register the handler with the mux + client, err := grpc.NewClient("127.0.0.1:42069", grpc.WithInsecure()) + require.NoError(t, err) + + for _, mod := range app.GetModuleManager().Modules { + mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) + } + + // Create listeners for gRPC, WebSocket, and HTTP + grpcListener, err := net.Listen("tcp", "127.0.0.1:42069") + require.NoError(t, err, "failed to create gRPC listener") + + wsListener, err := net.Listen("tcp", "127.0.0.1:6969") + require.NoError(t, err, "failed to create WebSocket listener") + + e2eApp := &E2EApp{ + App: app, + grpcListener: grpcListener, + grpcServer: grpcServer, + wsListener: wsListener, + wsConnections: make(map[*websocket.Conn]map[string]struct{}), + wsUpgrader: websocket.Upgrader{}, + resultEventChan: make(chan *coretypes.ResultEvent), + } + + mux.Handle(http.MethodPost, rootPattern, newPostHandler(ctx, client, e2eApp)) + + go func() { + if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + } + }() + + // Initialize and start WebSocket server + e2eApp.wsServer = newWebSocketServer(e2eApp) + go func() { + if err := e2eApp.wsServer.Serve(wsListener); err != nil && errors.Is(err, http.ErrServerClosed) { + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + } + }() + + // Initialize and start HTTP server + go func() { + if err := http.ListenAndServe("127.0.0.1:42070", mux); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + } + }() + + // Start event handling + go e2eApp.handleResultEvents(t) + + return e2eApp +} + +// Close gracefully shuts down the E2EApp and its servers +func (app *E2EApp) Close() error { + app.grpcServer.GracefulStop() + if err := app.wsServer.Close(); err != nil { + return err + } + + close(app.resultEventChan) + + return nil +} + +// GetClientConn returns a gRPC client connection to the E2EApp's gRPC server. +func (app *E2EApp) GetClientConn() (*grpc.ClientConn, error) { + return grpc.NewClient( + app.grpcListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) +} + +// GetWSEndpoint returns the WebSocket endpoint URL +func (app *E2EApp) GetWSEndpoint() string { + return "ws://" + app.wsListener.Addr().String() + "/websocket" +} + +// TODO_IN_THIS_COMMIT: godoc & move... +func (app *E2EApp) GetCometBlockID() comettypes.BlockID { + lastBlockID := app.GetSdkCtx().BlockHeader().LastBlockId + partSetHeader := lastBlockID.GetPartSetHeader() + + return comettypes.BlockID{ + Hash: lastBlockID.GetHash(), + PartSetHeader: comettypes.PartSetHeader{ + Total: partSetHeader.GetTotal(), + Hash: partSetHeader.GetHash(), + }, + } +} diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go new file mode 100644 index 000000000..2702cc8db --- /dev/null +++ b/testutil/e2e/app_test.go @@ -0,0 +1,110 @@ +package e2e + +import ( + "testing" + + "cosmossdk.io/depinject" + "cosmossdk.io/math" + comethttp "github.com/cometbft/cometbft/rpc/client/http" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/app/volatile" + "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/query" + "github.com/pokt-network/poktroll/pkg/client/tx" + txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" + "github.com/pokt-network/poktroll/testutil/testclient" + gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" + sharedtypes "github.com/pokt-network/poktroll/x/shared/types" +) + +func TestNewE2EApp(t *testing.T) { + app := NewE2EApp(t) + + // Construct dependencies... + keyRing := keyring.NewInMemory(app.GetCodec()) + rec, err := keyRing.NewAccount( + "gateway2", + "suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + + gateway2Addr, err := rec.GetAddress() + require.NoError(t, err) + + blockQueryClient, err := comethttp.New("tcp://127.0.0.1:42070", "/websocket") + require.NoError(t, err) + + grpcConn, err := app.GetClientConn() + require.NoError(t, err) + + deps := depinject.Supply(grpcConn, blockQueryClient) + + sharedQueryClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) + + sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) + require.NoError(t, err) + require.Equal(t, sharedtypes.DefaultParams(), *sharedParams) + + eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") + deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) + require.NoError(t, err) + + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + // DEV_NOTE: DO NOT use the clientCtx as a grpc.ClientConn as it bypasses E2EApp integrations. + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + + txContext, err := tx.NewTxContext(deps) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) + txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) + require.NoError(t, err) + + // Assert that no gateways are staked. + gatewayQueryClient := gatewaytypes.NewQueryClient(grpcConn) + allGatewaysRes, err := gatewayQueryClient.AllGateways(app.GetSdkCtx(), &gatewaytypes.QueryAllGatewaysRequest{}) + require.Equal(t, 0, len(allGatewaysRes.Gateways)) + + // Fund gateway2 account. + _, err = app.RunMsg(t, &banktypes.MsgSend{ + FromAddress: app.GetFaucetBech32(), + ToAddress: gateway2Addr.String(), + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 10000000000)), + }) + require.NoError(t, err) + + // Stake gateway2. + eitherErr := txClient.SignAndBroadcast( + app.GetSdkCtx(), + gatewaytypes.NewMsgStakeGateway( + "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000001)), + ), + ) + + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + require.NoError(t, <-errCh) + + // Assert that only gateway2 is staked. + allGatewaysRes, err = gatewayQueryClient.AllGateways(app.GetSdkCtx(), &gatewaytypes.QueryAllGatewaysRequest{}) + require.Equal(t, 1, len(allGatewaysRes.Gateways)) + require.Equal(t, "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", allGatewaysRes.Gateways[0].Address) +} diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go new file mode 100644 index 000000000..66eaa414f --- /dev/null +++ b/testutil/e2e/comet.go @@ -0,0 +1,266 @@ +package e2e + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/cometbft/cometbft/abci/types" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + comettypes "github.com/cometbft/cometbft/types" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + gogogrpc "github.com/cosmos/gogoproto/grpc" + "github.com/grpc-ecosystem/grpc-gateway/runtime" +) + +// TODO_IN_THIS_COMMIT: godoc... +type CometBFTMethod string + +// TODO_IN_THIS_COMMIT: godoc... +type ServiceMethodUri string + +const ( + abciQueryMethod = CometBFTMethod("abci_query") + broadcastTxSyncMethod = CometBFTMethod("broadcast_tx_sync") + broadcastTxAsyncMethod = CometBFTMethod("broadcast_tx_async") + broadcastTxCommitMethod = CometBFTMethod("broadcast_tx_commit") + blockMethod = CometBFTMethod("block") + + authAccountQueryUri = ServiceMethodUri("/cosmos.auth.v1beta1.Query/Account") +) + +// handleABCIQuery handles the actual ABCI query logic +func newPostHandler( + ctx context.Context, + client gogogrpc.ClientConn, + app *E2EApp, +) runtime.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { + // DEV_NOTE: http.Error() automatically sets the Content-Type header to "text/plain". + w.Header().Set("Content-Type", "application/json") + + // Read and log request body + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Error reading request body", http.StatusBadRequest) + return + } + defer r.Body.Close() + + // Parse JSON-RPC request + var req rpctypes.RPCRequest + if err = json.Unmarshal(body, &req); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + params := make(map[string]json.RawMessage) + if err = json.Unmarshal(req.Params, ¶ms); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + response := new(rpctypes.RPCResponse) + switch CometBFTMethod(req.Method) { + case abciQueryMethod: + response, err = app.handleAbciQuery(ctx, client, req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") + } + case broadcastTxSyncMethod, broadcastTxAsyncMethod, broadcastTxCommitMethod: + response, err = app.handleBroadcastTx(req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") + } + case blockMethod: + response, err = app.handleBlock(ctx, client, req, params) + if err != nil { + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, err.Error(), "") + } + default: + *response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) + } + + if err = json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) handleAbciQuery( + ctx context.Context, + client gogogrpc.ClientConn, + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + var ( + resData []byte + height int64 + ) + + pathRaw, hasPath := params["path"] + if !hasPath { + return nil, fmt.Errorf("missing path param: %s", string(req.Params)) + } + + var path string + if err := json.Unmarshal(pathRaw, &path); err != nil { + return nil, err + } + + switch ServiceMethodUri(path) { + case authAccountQueryUri: + dataRaw, hasData := params["data"] + if !hasData { + return nil, fmt.Errorf("missing data param: %s", string(req.Params)) + } + + data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) + if err != nil { + return nil, err + } + + queryReq := new(authtypes.QueryAccountRequest) + if err = queryReq.Unmarshal(data); err != nil { + return nil, err + } + + var height int64 + heightRaw, hasHeight := params["height"] + if hasHeight { + if err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height); err != nil { + return nil, err + } + } + + queryRes := new(authtypes.QueryAccountResponse) + if err = client.Invoke(ctx, path, queryReq, queryRes); err != nil { + return nil, err + } + + resData, err = queryRes.Marshal() + if err != nil { + return nil, err + } + } + + abciQueryRes := coretypes.ResultABCIQuery{ + Response: types.ResponseQuery{ + Value: resData, + Height: height, + }, + } + + res := rpctypes.NewRPCSuccessResponse(req.ID, abciQueryRes) + return &res, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) handleBroadcastTx( + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + var txBz []byte + txRaw, hasTx := params["tx"] + if !hasTx { + return nil, fmt.Errorf("missing tx param: %s", string(req.Params)) + } + if err := json.Unmarshal(txRaw, &txBz); err != nil { + return nil, err + } + + // TODO_CONSIDERATION: more correct implementation of the different + // broadcast_tx methods (i.e. sync, async, commit) is a matter of + // the sequencing of the following: + // - calling the finalize block ABCI method + // - returning the JSON-RPC response + // - emitting websocket event + + _, finalizeBlockRes, err := app.RunTx(nil, txBz) + if err != nil { + return nil, err + } + + go func() { + // Simulate 1 second block production delay. + time.Sleep(time.Second * 1) + + // TODO_IMPROVE: If we want/need to support multiple txs per + // block in the future, this will have to be refactored. + app.EmitWSEvents(finalizeBlockRes, txBz) + }() + + // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as + // we're finalizing one tx at a time (single tx blocks). + txRes := finalizeBlockRes.GetTxResults()[0] + + bcastTxRes := coretypes.ResultBroadcastTx{ + Code: txRes.GetCode(), + Data: txRes.GetData(), + Log: txRes.GetLog(), + Codespace: txRes.GetCodespace(), + Hash: comettypes.Tx(txBz).Hash(), + } + + res := rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) + return &res, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) handleBlock( + ctx context.Context, + client gogogrpc.ClientConn, + req rpctypes.RPCRequest, + params map[string]json.RawMessage, +) (*rpctypes.RPCResponse, error) { + resultBlock := coretypes.ResultBlock{ + BlockID: app.GetCometBlockID(), + Block: &comettypes.Block{ + Header: comettypes.Header{ + //Version: version.Consensus{}, + ChainID: "poktroll-test", + Height: app.GetSdkCtx().BlockHeight(), + Time: time.Now(), + LastBlockID: app.GetCometBlockID(), + //LastCommitHash: nil, + //DataHash: nil, + //ValidatorsHash: nil, + //NextValidatorsHash: nil, + //ConsensusHash: nil, + //AppHash: nil, + //LastResultsHash: nil, + //EvidenceHash: nil, + //ProposerAddress: nil, + }, + //Data: comettypes.Data{}, + //Evidence: comettypes.EvidenceData{}, + //LastCommit: nil, + }, + } + res := rpctypes.NewRPCSuccessResponse(req.ID, resultBlock) + return &res, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func writeErrorResponseFromErr(w http.ResponseWriter, req rpctypes.RPCRequest, err error) { + var errMsg string + if err != nil { + errMsg = err.Error() + } + writeErrorResponse(w, req, errMsg, "") +} + +// TODO_IN_THIS_COMMIT: godoc... +func writeErrorResponse(w http.ResponseWriter, req rpctypes.RPCRequest, msg, data string) { + errRes := rpctypes.NewRPCErrorResponse(req.ID, 500, msg, data) + if err := json.NewEncoder(w).Encode(errRes); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} diff --git a/testutil/e2e/grpc_server.go b/testutil/e2e/grpc_server.go new file mode 100644 index 000000000..df8caf702 --- /dev/null +++ b/testutil/e2e/grpc_server.go @@ -0,0 +1 @@ +package e2e diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go new file mode 100644 index 000000000..84d10c0f8 --- /dev/null +++ b/testutil/e2e/ws_server.go @@ -0,0 +1,243 @@ +package e2e + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "testing" + "time" + + abci "github.com/cometbft/cometbft/abci/types" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + comettypes "github.com/cometbft/cometbft/types" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +// newWebSocketServer creates and configures a new WebSocket server for the E2EApp +func newWebSocketServer(app *E2EApp) *http.Server { + mux := http.NewServeMux() + mux.HandleFunc("/websocket", app.handleWebSocket) + return &http.Server{Handler: mux} +} + +// handleWebSocket handles incoming WebSocket connections and subscriptions +func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { + conn, err := app.wsUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + app.wsConnMutex.Lock() + app.wsConnections[conn] = make(map[string]struct{}) + app.wsConnMutex.Unlock() + + go app.handleWebSocketConnection(conn) +} + +// handleWebSocketConnection handles messages from a specific WebSocket connection +func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { + logger := app.Logger().With("method", "handleWebSocketConnection") + + defer func() { + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + conn.Close() + }() + + for { + _, message, err := conn.ReadMessage() + if err != nil { + return + } + + var req rpctypes.RPCRequest + if err = json.Unmarshal(message, &req); err != nil { + continue + } + + // Handle subscription requests. + if req.Method == "subscribe" { + var params struct { + Query string `json:"query"` + } + if err = json.Unmarshal(req.Params, ¶ms); err != nil { + continue + } + + app.wsConnMutex.Lock() + app.wsConnections[conn][params.Query] = struct{}{} + app.wsConnMutex.Unlock() + + // Send initial subscription response + resp := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + // DEV_NOTE: Query subscription responses are initially empty; data is sent as subsequent events occur. + Result: json.RawMessage("{}"), + } + if err = conn.WriteJSON(resp); err != nil { + logger.Error(fmt.Sprintf("writing JSON-RPC response: %s", err)) + } + } + } +} + +// handleResultEvents coordinates block finalization with WebSocket event broadcasting +func (app *E2EApp) handleResultEvents(t *testing.T) { + t.Helper() + + for event := range app.resultEventChan { + app.wsConnMutex.RLock() + for conn, queries := range app.wsConnections { + // Check if connection is subscribed to this event type + for query := range queries { + queryParts := strings.Split(query, " AND ") + queryPartPairs := make(map[string]string) + for _, queryPart := range queryParts { + queryPartPair := strings.Split(queryPart, "=") + require.Equal(t, 2, len(queryPartPair)) + + queryPartKey := strings.Trim(queryPartPair[0], `" `) + queryPartValue := strings.Trim(queryPartPair[1], `" `) + queryPartPairs[queryPartKey] = queryPartValue + } + + fmt.Printf(">>> checking query: %s\n", query) + + // TODO_IN_THIS_COMMIT: determine if the request ID is needed!!! + //response := rpctypes.NewRPCSuccessResponse(req.ID, event) + response := rpctypes.NewRPCSuccessResponse(nil, event) + //response := rpctypes.RPCResponse{ + // JSONRPC: "2.0", + // ID: nil, // Events don't have an ID + // // TODO_IN_THIS_COMMIT: make this dynamic! + // //Result: json.RawMessage(mockBlockResultJSON), + // Result: json.RawMessage(eventJSON), + //} + + if err := conn.WriteJSON(response); err != nil { + app.wsConnMutex.RUnlock() + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + app.wsConnMutex.RLock() + continue + } + } + } + app.wsConnMutex.RUnlock() + } +} + +// TODO_IN_THIS_COMMIT: godoc and move... +func parseQuery(t *testing.T, query string) map[string]string { + t.Helper() + + queryParts := strings.Split(query, " AND ") + queryPartPairs := make(map[string]string) + for _, queryPart := range queryParts { + queryPartPair := strings.Split(queryPart, "=") + require.Equal(t, 2, len(queryPartPair)) + + queryPartKey := strings.Trim(queryPartPair[0], `" `) + queryPartValue := strings.Trim(queryPartPair[1], `" `) + queryPartPairs[queryPartKey] = queryPartValue + } + + return queryPartPairs +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *E2EApp) EmitWSEvents(finalizeBlockRes *abci.ResponseFinalizeBlock, txBz []byte) { + events := validateAndStringifyEvents(finalizeBlockRes.GetEvents()) + // DEV_NOTE: see https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L138 + events[comettypes.EventTypeKey] = append(events[comettypes.EventTypeKey], comettypes.EventNewBlock) + + evtDataNewBlock := comettypes.EventDataNewBlock{ + Block: &comettypes.Block{ + Header: comettypes.Header{ + //Version: version.Consensus{}, + ChainID: "poktroll-test", + Height: app.GetSdkCtx().BlockHeight(), + Time: time.Now(), + LastBlockID: app.GetCometBlockID(), + //LastCommitHash: nil, + //DataHash: nil, + //ValidatorsHash: nil, + //NextValidatorsHash: nil, + //ConsensusHash: nil, + //AppHash: nil, + //LastResultsHash: nil, + //EvidenceHash: nil, + //ProposerAddress: nil, + }, + //Data: comettypes.Data{}, + //Evidence: comettypes.EvidenceData{}, + //LastCommit: nil, + }, + BlockID: app.GetCometBlockID(), + ResultFinalizeBlock: *finalizeBlockRes, + } + + // TODO_IN_THIS_COMMIT: comment... + resultEvent := &coretypes.ResultEvent{ + Query: comettypes.EventQueryNewBlock.String(), + Data: evtDataNewBlock, + Events: events, + } + + app.resultEventChan <- resultEvent + + // TODO_IN_THIS_COMMIT: comment... + for idx, txResult := range finalizeBlockRes.GetTxResults() { + events = validateAndStringifyEvents(txResult.GetEvents()) + // DEV_NOTE: see https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L180 + events[comettypes.EventTypeKey] = append(events[comettypes.EventTypeKey], comettypes.EventTx) + events[comettypes.TxHashKey] = append(events[comettypes.TxHashKey], fmt.Sprintf("%X", comettypes.Tx(txBz).Hash())) + events[comettypes.TxHeightKey] = append(events[comettypes.TxHeightKey], fmt.Sprintf("%d", app.GetSdkCtx().BlockHeight())) + + evtDataTx := comettypes.EventDataTx{ + TxResult: abci.TxResult{ + Height: app.GetSdkCtx().BlockHeight(), + Index: uint32(idx), + Tx: txBz, + Result: *txResult, + }, + } + + resultEvent = &coretypes.ResultEvent{ + Query: comettypes.EventQueryTx.String(), + Data: evtDataTx, + Events: events, + } + + app.resultEventChan <- resultEvent + } + + // TODO_IMPROVE: emit individual finalize block & tx result events? +} + +// TODO_IN_THIS_COMMIT: godoc... see: https://github.com/cometbft/cometbft/blob/v0.38.10/types/event_bus.go#L112 +func validateAndStringifyEvents(events []abci.Event) map[string][]string { + result := make(map[string][]string) + for _, event := range events { + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) + result[compositeTag] = append(result[compositeTag], attr.Value) + } + } + + return result +} diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 66fe6b4ba..a3cc986e0 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -103,7 +103,7 @@ type App struct { txCfg client.TxConfig authority sdk.AccAddress moduleManager module.Manager - queryHelper *baseapp.QueryServiceTestHelper + queryHelper *baseapp.GRPCQueryRouter keyRing keyring.Keyring ringClient crypto.RingClient preGeneratedAccts *testkeyring.PreGeneratedAccountIterator @@ -139,7 +139,7 @@ func NewIntegrationApp( modules map[string]appmodule.AppModule, keys map[string]*storetypes.KVStoreKey, msgRouter *baseapp.MsgServiceRouter, - queryHelper *baseapp.QueryServiceTestHelper, + queryHelper *baseapp.GRPCQueryRouter, opts ...IntegrationAppOptionFn, ) *App { t.Helper() @@ -520,8 +520,8 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap ) // Prepare the message & query routers - msgRouter := baseapp.NewMsgServiceRouter() - queryHelper := baseapp.NewQueryServerTestHelper(sdkCtx, registry) + msgRouter := bApp.MsgServiceRouter() + queryHelper := bApp.GRPCQueryRouter() // Prepare the authz keeper and module authzKeeper := authzkeeper.NewKeeper( @@ -570,32 +570,10 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap opts..., ) - // Register the message servers - banktypes.RegisterMsgServer(msgRouter, bankkeeper.NewMsgServerImpl(bankKeeper)) - tokenomicstypes.RegisterMsgServer(msgRouter, tokenomicskeeper.NewMsgServerImpl(tokenomicsKeeper)) - servicetypes.RegisterMsgServer(msgRouter, servicekeeper.NewMsgServerImpl(serviceKeeper)) - sharedtypes.RegisterMsgServer(msgRouter, sharedkeeper.NewMsgServerImpl(sharedKeeper)) - gatewaytypes.RegisterMsgServer(msgRouter, gatewaykeeper.NewMsgServerImpl(gatewayKeeper)) - apptypes.RegisterMsgServer(msgRouter, appkeeper.NewMsgServerImpl(applicationKeeper)) - suppliertypes.RegisterMsgServer(msgRouter, supplierkeeper.NewMsgServerImpl(supplierKeeper)) - prooftypes.RegisterMsgServer(msgRouter, proofkeeper.NewMsgServerImpl(proofKeeper)) - authtypes.RegisterMsgServer(msgRouter, authkeeper.NewMsgServerImpl(accountKeeper)) - sessiontypes.RegisterMsgServer(msgRouter, sessionkeeper.NewMsgServerImpl(sessionKeeper)) - authz.RegisterMsgServer(msgRouter, authzKeeper) - - // Register query servers - banktypes.RegisterQueryServer(queryHelper, bankKeeper) - authz.RegisterQueryServer(queryHelper, authzKeeper) - tokenomicstypes.RegisterQueryServer(queryHelper, tokenomicsKeeper) - servicetypes.RegisterQueryServer(queryHelper, serviceKeeper) - sharedtypes.RegisterQueryServer(queryHelper, sharedKeeper) - gatewaytypes.RegisterQueryServer(queryHelper, gatewayKeeper) - apptypes.RegisterQueryServer(queryHelper, applicationKeeper) - suppliertypes.RegisterQueryServer(queryHelper, supplierKeeper) - prooftypes.RegisterQueryServer(queryHelper, proofKeeper) - // TODO_TECHDEBT: What is the query server for authtypes? - // authtypes.RegisterQueryServer(queryHelper, accountKeeper) - sessiontypes.RegisterQueryServer(queryHelper, sessionKeeper) + configurator := module.NewConfigurator(cdc, msgRouter, queryHelper) + for _, mod := range integrationApp.GetModuleManager().Modules { + mod.(module.HasServices).RegisterServices(configurator) + } // Need to go to the next block to finalize the genesis and setup. // This has to be after the params are set, as the params are stored in the @@ -668,8 +646,7 @@ func (app *App) GetPreGeneratedAccounts() *testkeyring.PreGeneratedAccountIterat // QueryHelper returns the query helper used by the application that can be // used to submit queries to the application. -func (app *App) QueryHelper() *baseapp.QueryServiceTestHelper { - app.queryHelper.Ctx = *app.sdkCtx +func (app *App) QueryHelper() *baseapp.GRPCQueryRouter { return app.queryHelper } @@ -702,23 +679,9 @@ func (app *App) GetFaucetBech32() string { // returned. In order to run a message, the application must have a handler for it. // These handlers are registered on the application message service router. func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgResponse, err error) { - t.Helper() - - // Commit the updated state after the message has been handled. - var finalizeBlockRes *abci.ResponseFinalizeBlock - defer func() { - if _, commitErr := app.Commit(); commitErr != nil { - err = fmt.Errorf("committing state: %w", commitErr) - return - } - - app.nextBlockUpdateCtx() - - // Emit events MUST happen AFTER the context has been updated so that - // events are available on the context for the block after their actions - // were committed (e.g. msgs, begin/end block trigger). - app.emitEvents(t, finalizeBlockRes) - }() + if t != nil { + t.Helper() + } // Package the message into a transaction. txBuilder := app.txCfg.NewTxBuilder() @@ -735,6 +698,40 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo app.logger.Info("Running msg", "msg", msg.String()) } + txMsgResps, _, err = app.RunTx(t, txBz) + if err != nil { + // DEV_NOTE: Intentionally returning and not asserting nil error to improve reusability. + return nil, err + } + + return txMsgResps, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *App) RunTx(t *testing.T, txBz []byte) ( + txMsgResps []tx.MsgResponse, + finalizeBlockRes *abci.ResponseFinalizeBlock, + err error, +) { + if t != nil { + t.Helper() + } + + // Commit the updated state after the message has been handled. + defer func() { + if _, commitErr := app.Commit(); commitErr != nil { + err = fmt.Errorf("committing state: %w", commitErr) + return + } + + app.nextBlockUpdateCtx() + + // Emit events MUST happen AFTER the context has been updated so that + // events are available on the context for the block after their actions + // were committed (e.g. msgs, begin/end block trigger). + app.emitEvents(t, finalizeBlockRes) + }() + // Finalize the block with the transaction. finalizeBlockReq := &cmtabcitypes.RequestFinalizeBlock{ Height: app.LastBlockHeight() + 1, @@ -748,12 +745,14 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo finalizeBlockRes, err = app.FinalizeBlock(finalizeBlockReq) if err != nil { - return nil, fmt.Errorf("finalizing block: %w", err) + return nil, nil, fmt.Errorf("finalizing block: %w", err) } - // NB: We're batching the messages in a single transaction, so we expect - // a single transaction result. - require.Equal(t, 1, len(finalizeBlockRes.TxResults)) + if t != nil { + // NB: We're batching the messages in a single transaction, so we expect + // a single transaction result. + require.Equal(t, 1, len(finalizeBlockRes.TxResults)) + } // Collect the message responses. Accumulate errors related to message handling // failure. If any message fails, an error will be returned. @@ -766,24 +765,29 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo } txMsgDataBz := txResult.GetData() - require.NotNil(t, txMsgDataBz) + if t != nil { + require.NotNil(t, txMsgDataBz) + } txMsgData := new(cosmostypes.TxMsgData) err = app.GetCodec().Unmarshal(txMsgDataBz, txMsgData) - require.NoError(t, err) + if t != nil { + require.NoError(t, err) + } var txMsgRes tx.MsgResponse err = app.GetCodec().UnpackAny(txMsgData.MsgResponses[0], &txMsgRes) - require.NoError(t, err) - require.NotNil(t, txMsgRes) + if t != nil { + require.NoError(t, err) + require.NotNil(t, txMsgRes) + } else { + return nil, finalizeBlockRes, err + } txMsgResps = append(txMsgResps, txMsgRes) } - if txResultErrs != nil { - return nil, err - } - return txMsgResps, nil + return txMsgResps, finalizeBlockRes, nil } // NextBlocks calls NextBlock numBlocks times @@ -798,13 +802,17 @@ func (app *App) NextBlocks(t *testing.T, numBlocks int) { // emitEvents emits the events from the finalized block such that they are available // via the current context's event manager (i.e. app.GetSdkCtx().EventManager.Events()). func (app *App) emitEvents(t *testing.T, res *abci.ResponseFinalizeBlock) { - t.Helper() + if t != nil { + t.Helper() + } // Emit begin/end blocker events. - for _, event := range res.Events { - testutilevents.QuoteEventMode(&event) - abciEvent := cosmostypes.Event(event) - app.sdkCtx.EventManager().EmitEvent(abciEvent) + if res.Events != nil { + for _, event := range res.Events { + testutilevents.QuoteEventMode(&event) + abciEvent := cosmostypes.Event(event) + app.sdkCtx.EventManager().EmitEvent(abciEvent) + } } // Emit txResult events. @@ -826,6 +834,9 @@ func (app *App) NextBlock(t *testing.T) { Time: app.sdkCtx.BlockTime(), // Randomize the proposer address for each block. ProposerAddress: sample.ConsAddress().Bytes(), + DecidedLastCommit: cmtabcitypes.CommitInfo{ + Votes: []cmtabcitypes.VoteInfo{{}}, + }, }) require.NoError(t, err) @@ -970,6 +981,11 @@ func (app *App) setupDefaultActorsState( app.NextBlock(t) } +// TODO_IN_THIS_COMMIT: godoc... +func (app *App) GetModuleManager() module.Manager { + return app.moduleManager +} + // fundAccount mints and sends amountUpokt tokens to the given recipientAddr. // // TODO_IMPROVE: Eliminate usage of and remove this function in favor of diff --git a/testutil/integration/options.go b/testutil/integration/options.go index 1695002cf..14605d379 100644 --- a/testutil/integration/options.go +++ b/testutil/integration/options.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" + gogogrpc "github.com/cosmos/gogoproto/grpc" tlm "github.com/pokt-network/poktroll/x/tokenomics/token_logic_module" ) @@ -17,6 +18,9 @@ type IntegrationAppConfig struct { // InitChainer function. InitChainerModuleFns []InitChainerModuleFn TokenLogicModules []tlm.TokenLogicModule + + grpcServer gogogrpc.Server + authorityAddress string } // IntegrationAppOptionFn is a function that receives and has the opportunity to @@ -70,3 +74,17 @@ func WithTokenLogicModules(tokenLogicModules []tlm.TokenLogicModule) Integration config.TokenLogicModules = tokenLogicModules } } + +// TODO_IN_THIS_COMMIT: godoc... +func WithGRPCServer(grpcServer gogogrpc.Server) IntegrationAppOptionFn { + return func(config *IntegrationAppConfig) { + config.grpcServer = grpcServer + } +} + +// TODO_IN_THIS_COMMIT: godoc... +func WithAuthorityAddress(bech32 string) IntegrationAppOptionFn { + return func(config *IntegrationAppConfig) { + config.authorityAddress = bech32 + } +} diff --git a/testutil/testclient/localnet.go b/testutil/testclient/localnet.go index 354a1dd28..cc3d944ea 100644 --- a/testutil/testclient/localnet.go +++ b/testutil/testclient/localnet.go @@ -91,19 +91,18 @@ func NewLocalnetClientCtx(t gocuke.TestingT, flagSet *pflag.FlagSet) *client.Con // NewLocalnetFlagSet creates a set of predefined flags suitable for a localnet // testing environment. -// -// Parameters: -// - t: The testing.T instance used for the current test. -// -// Returns: -// - A flag set populated with flags tailored for localnet environments. func NewLocalnetFlagSet(t gocuke.TestingT) *pflag.FlagSet { t.Helper() + return NewFlagSet(t, CometLocalTCPURL) +} + +// NewFlagSet creates a set of predefined flags suitable for use with the given cometbft endpoint. +func NewFlagSet(t gocuke.TestingT, cometTCPURL string) *pflag.FlagSet { + t.Helper() + mockFlagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) - // TODO_IMPROVE: It would be nice if the value could be set correctly based - // on whether the test using it is running in tilt or not. - mockFlagSet.String(flags.FlagNode, CometLocalTCPURL, "use localnet poktrolld node") + mockFlagSet.String(flags.FlagNode, cometTCPURL, "use localnet poktrolld node") mockFlagSet.String(flags.FlagHome, "", "use localnet poktrolld node") mockFlagSet.String(flags.FlagKeyringBackend, "test", "use test keyring") mockFlagSet.String(flags.FlagChainID, app.Name, "use poktroll chain-id")