From 5831ca02b81faf00240bc8b89df667acd4fd3532 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 2 May 2022 16:34:19 -0400 Subject: [PATCH] fix: concurrency issues between simulation, grpc queries and ABCI commit flow (#213) (#216) --- baseapp/abci.go | 11 ++- client/export_test.go | 5 ++ client/grpc_query.go | 58 ++++++++----- client/grpc_query_test.go | 173 +++++++++++++++++++++++++++++++------- testutil/network/util.go | 19 +++++ x/auth/tx/service_test.go | 41 ++++----- 6 files changed, 234 insertions(+), 73 deletions(-) create mode 100644 client/export_test.go diff --git a/baseapp/abci.go b/baseapp/abci.go index fb51a0dc6275..1809bd70b5be 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -603,15 +603,20 @@ func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, e ) } - cacheMS, err := app.cms.CacheMultiStoreWithVersion(height) - if err != nil { + lastBlockHeight := app.LastBlockHeight() + if height > lastBlockHeight { return sdk.Context{}, sdkerrors.Wrapf( sdkerrors.ErrInvalidRequest, - "failed to load state at height %d; %s (latest height: %d)", height, err, app.LastBlockHeight(), + "failed to load state at height %d; (latest height: %d)", height, lastBlockHeight, ) } + cacheMS, err := app.cms.CacheMultiStoreWithVersion(height) + if err != nil { + return sdk.Context{}, fmt.Errorf("failed to load cache multi store for height %d: %w", height, err) + } + // branch the commit-multistore for safety ctx := sdk.NewContext( cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger, diff --git a/client/export_test.go b/client/export_test.go new file mode 100644 index 000000000000..722f53f33fd9 --- /dev/null +++ b/client/export_test.go @@ -0,0 +1,5 @@ +package client + +var ( + SelectHeight = selectHeight +) diff --git a/client/grpc_query.go b/client/grpc_query.go index ef9e1377ba7a..540403a0c445 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strconv" + "strings" gogogrpc "github.com/gogo/protobuf/grpc" abci "github.com/tendermint/tendermint/abci/types" @@ -51,9 +52,23 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i return err } - if ctx.GRPCClient != nil { + // Certain queries must not be be concurrent with ABCI to function correctly. + // As a result, we direct them to the ABCI flow where they get syncronized. + _, isSimulationRequest := req.(*tx.SimulateRequest) + isTendermintQuery := strings.Contains(method, "tendermint") + + isGRPCAllowed := !isTendermintQuery && !isSimulationRequest + + requestedHeight, err := selectHeight(ctx, grpcCtx) + if err != nil { + return err + } + + if ctx.GRPCClient != nil && isGRPCAllowed { + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(requestedHeight, 10)) + context := metadata.NewOutgoingContext(grpcCtx, md) // Case 2-1. Invoke grpc. - return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...) + return ctx.GRPCClient.Invoke(context, method, req, reply, opts...) } // Case 2-2. Querying state via abci query. @@ -62,26 +77,10 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i return err } - // parse height header - md, _ := metadata.FromOutgoingContext(grpcCtx) - if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { - height, err := strconv.ParseInt(heights[0], 10, 64) - if err != nil { - return err - } - if height < 0 { - return sdkerrors.Wrapf( - sdkerrors.ErrInvalidRequest, - "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) - } - - ctx = ctx.WithHeight(height) - } - abciReq := abci.RequestQuery{ Path: method, Data: reqBz, - Height: ctx.Height, + Height: requestedHeight, } res, err := ctx.QueryABCI(abciReq) @@ -99,7 +98,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i // We then parse all the call options, if the call option is a // HeaderCallOption, then we manually set the value of that header to the // metadata. - md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) + md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) for _, callOpt := range opts { header, ok := callOpt.(grpc.HeaderCallOption) if !ok { @@ -120,3 +119,22 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { return nil, fmt.Errorf("streaming rpc not supported") } + +// selectHeight returns the height chosen from client context and grpc context. +// If exists, height extracted from grpcCtx takes precedence. +func selectHeight(clientContext Context, grpcCtx gocontext.Context) (int64, error) { + var height int64 + if clientContext.Height > 0 { + height = clientContext.Height + } + + md, _ := metadata.FromOutgoingContext(grpcCtx) + if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { + var err error + height, err = strconv.ParseInt(heights[0], 10, 64) + if err != nil { + return 0, err + } + } + return height, nil +} diff --git a/client/grpc_query_test.go b/client/grpc_query_test.go index c4d2f024a07b..06a1eb2038be 100644 --- a/client/grpc_query_test.go +++ b/client/grpc_query_test.go @@ -1,5 +1,3 @@ -// +build norace - package client_test import ( @@ -7,10 +5,12 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/testutil/network" "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" @@ -24,13 +24,30 @@ type IntegrationTestSuite struct { network *network.Network } +type testcase struct { + clientContextHeight int64 + grpcHeight int64 + expectedHeight int64 +} + +const ( + // if clientContextHeight or grpcHeight is set to this flag, + // the test assumes that the respective height is not provided. + heightNotSetFlag = int64(-1) + // given the current block time, this should never be reached by the time + // a test is run. + invalidBeyondLatestHeight = 1_000_000_000 + // if this flag is set to expectedHeight, an error is assumed. + errorHeightFlag = int64(-2) +) + func (s *IntegrationTestSuite) SetupSuite() { s.T().Log("setting up integration test suite") s.network = network.New(s.T(), network.DefaultConfig()) s.Require().NotNil(s.network) - _, err := s.network.WaitForHeight(2) + _, err := s.network.WaitForHeight(3) s.Require().NoError(err) } @@ -39,7 +56,7 @@ func (s *IntegrationTestSuite) TearDownSuite() { s.network.Cleanup() } -func (s *IntegrationTestSuite) TestGRPCQuery() { +func (s *IntegrationTestSuite) TestGRPCQuery_TestService() { val0 := s.network.Validators[0] // gRPC query to test service should work @@ -47,36 +64,130 @@ func (s *IntegrationTestSuite) TestGRPCQuery() { testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) s.Require().NoError(err) s.Require().Equal("hello", testRes.Message) +} - // gRPC query to bank service should work - denom := fmt.Sprintf("%stoken", val0.Moniker) - bankClient := banktypes.NewQueryClient(val0.ClientCtx) - var header metadata.MD - bankRes, err := bankClient.Balance( - context.Background(), - &banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom}, - grpc.Header(&header), // Also fetch grpc header - ) - s.Require().NoError(err) - s.Require().Equal( - sdk.NewCoin(denom, s.network.Config.AccountTokens), - *bankRes.GetBalance(), - ) - blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader) - s.Require().NotEmpty(blockHeight[0]) // Should contain the block height - - // Request metadata should work - val0.ClientCtx = val0.ClientCtx.WithHeight(1) // We set clientCtx to height 1 - bankClient = banktypes.NewQueryClient(val0.ClientCtx) - bankRes, err = bankClient.Balance( - context.Background(), - &banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom}, - grpc.Header(&header), - ) - blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader) - s.Require().Equal([]string{"1"}, blockHeight) +func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() { + val0 := s.network.Validators[0] + + const method = "/cosmos.bank.v1beta1.Query/Balance" + + testcases := map[string]testcase{ + "clientContextHeight 1; grpcHeight not set - clientContextHeight selected": { + clientContextHeight: 1, // chosen + grpcHeight: heightNotSetFlag, + expectedHeight: 1, + }, + "clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": { + clientContextHeight: heightNotSetFlag, + grpcHeight: 2, // chosen + expectedHeight: 2, + }, + "both not set - 0 returned": { + clientContextHeight: heightNotSetFlag, + grpcHeight: heightNotSetFlag, + expectedHeight: 3, // latest height + }, + "clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": { + clientContextHeight: 1, + grpcHeight: 0, // chosen + expectedHeight: 3, // latest height + }, + "clientContextHeight 3; grpcHeight is 3 - 3 is returned": { + clientContextHeight: 3, + grpcHeight: 3, + expectedHeight: 3, + }, + "clientContextHeight is 1_000_000_000; grpcHeight is 1_000_000_000 - requested beyond latest height - error": { + clientContextHeight: invalidBeyondLatestHeight, + grpcHeight: invalidBeyondLatestHeight, + expectedHeight: errorHeightFlag, + }, + } + + for name, tc := range testcases { + s.T().Run(name, func(t *testing.T) { + // Setup + clientCtx := val0.ClientCtx + clientCtx.Height = 0 + + if tc.clientContextHeight != heightNotSetFlag { + clientCtx = clientCtx.WithHeight(tc.clientContextHeight) + } + + grpcContext := context.Background() + if tc.grpcHeight != heightNotSetFlag { + header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight)) + grpcContext = metadata.NewOutgoingContext(grpcContext, header) + } + + // Test + var header metadata.MD + denom := fmt.Sprintf("%stoken", val0.Moniker) + request := &banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom} + response := &banktypes.QueryBalanceResponse{} + err := clientCtx.Invoke(grpcContext, method, request, response, grpc.Header(&header)) + + // Assert results + if tc.expectedHeight == errorHeightFlag { + s.Require().Error(err) + return + } + + s.Require().NoError(err) + s.Require().Equal( + sdk.NewCoin(denom, s.network.Config.AccountTokens), + *response.GetBalance(), + ) + blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader) + s.Require().Equal([]string{fmt.Sprintf("%d", tc.expectedHeight)}, blockHeight) + }) + } } func TestIntegrationTestSuite(t *testing.T) { suite.Run(t, new(IntegrationTestSuite)) } + +func TestSelectHeight(t *testing.T) { + testcases := map[string]testcase{ + "clientContextHeight 1; grpcHeight not set - clientContextHeight selected": { + clientContextHeight: 1, + grpcHeight: heightNotSetFlag, + expectedHeight: 1, + }, + "clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": { + clientContextHeight: heightNotSetFlag, + grpcHeight: 2, + expectedHeight: 2, + }, + "both not set - 0 returned": { + clientContextHeight: heightNotSetFlag, + grpcHeight: heightNotSetFlag, + expectedHeight: 0, + }, + "clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": { + clientContextHeight: 3, + grpcHeight: 0, + expectedHeight: 0, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + clientCtx := client.Context{} + if tc.clientContextHeight != heightNotSetFlag { + clientCtx = clientCtx.WithHeight(tc.clientContextHeight) + } + + grpcContxt := context.Background() + if tc.grpcHeight != heightNotSetFlag { + header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight)) + grpcContxt = metadata.NewOutgoingContext(grpcContxt, header) + } + + height, err := client.SelectHeight(clientCtx, grpcContxt) + require.NoError(t, err) + require.Equal(t, tc.expectedHeight, height) + }) + } +} diff --git a/testutil/network/util.go b/testutil/network/util.go index 97d957e88686..e8e7beb06bef 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -13,6 +13,9 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/proto" "github.com/cosmos/cosmos-sdk/server/api" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" @@ -105,6 +108,22 @@ func startInProcess(cfg Config, val *Validator) error { val.grpc = grpcSrv + // If grpc is enabled, configure grpc client. + grpcClient, err := grpc.Dial( + val.AppConfig.GRPC.Address, + grpc.WithInsecure(), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(encoding.GetCodec(proto.Name)), + grpc.MaxCallRecvMsgSize(val.AppConfig.GRPC.MaxRecvMsgSize), + grpc.MaxCallSendMsgSize(val.AppConfig.GRPC.MaxSendMsgSize), + ), + ) + if err != nil { + return err + } + + val.ClientCtx = val.ClientCtx.WithGRPCClient(grpcClient) + if val.AppConfig.GRPCWeb.Enable { val.grpcWeb, err = servergrpc.StartGRPCWeb(grpcSrv, *val.AppConfig) if err != nil { diff --git a/x/auth/tx/service_test.go b/x/auth/tx/service_test.go index 9a4bc1d40d4b..a24809ef1b36 100644 --- a/x/auth/tx/service_test.go +++ b/x/auth/tx/service_test.go @@ -98,31 +98,34 @@ func (s IntegrationTestSuite) TestSimulateTx_GRPC() { s.Require().NoError(err) testCases := []struct { - name string - req *tx.SimulateRequest - expErr bool - expErrMsg string + name string + req *tx.SimulateRequest + expErr bool + expErrMsg string + repeatCount int }{ - {"nil request", nil, true, "request cannot be nil"}, - {"empty request", &tx.SimulateRequest{}, true, "empty txBytes is not allowed"}, - {"valid request with proto tx (deprecated)", &tx.SimulateRequest{Tx: protoTx}, false, ""}, - {"valid request with tx_bytes", &tx.SimulateRequest{TxBytes: txBytes}, false, ""}, + {"nil request", nil, true, "request cannot be nil", 1}, + {"empty request", &tx.SimulateRequest{}, true, "empty txBytes is not allowed", 1}, + {"valid request with proto tx (deprecated)", &tx.SimulateRequest{Tx: protoTx}, false, "", 1}, + {"valid request with tx_bytes", &tx.SimulateRequest{TxBytes: txBytes}, false, "", 1000}, } for _, tc := range testCases { tc := tc s.Run(tc.name, func() { - // Broadcast the tx via gRPC via the validator's clientCtx (which goes - // through Tendermint). - res, err := s.queryClient.Simulate(context.Background(), tc.req) - if tc.expErr { - s.Require().Error(err) - s.Require().Contains(err.Error(), tc.expErrMsg) - } else { - s.Require().NoError(err) - // Check the result and gas used are correct. - s.Require().Equal(len(res.GetResult().GetEvents()), 6) // 1 coin recv 1 coin spent, 1 transfer, 3 messages. - s.Require().True(res.GetGasInfo().GetGasUsed() > 0) // Gas used sometimes change, just check it's not empty. + for i := 0; i < tc.repeatCount; i++ { + // Broadcast the tx via gRPC via the validator's clientCtx (which goes + // through Tendermint). + res, err := s.queryClient.Simulate(context.Background(), tc.req) + if tc.expErr { + s.Require().Error(err) + s.Require().Contains(err.Error(), tc.expErrMsg) + } else { + s.Require().NoError(err) + // Check the result and gas used are correct. + s.Require().Equal(len(res.GetResult().GetEvents()), 6) // 1 coin recv 1 coin spent, 1 transfer, 3 messages. + s.Require().True(res.GetGasInfo().GetGasUsed() > 0) // Gas used sometimes change, just check it's not empty. + } } }) }