Skip to content

Commit

Permalink
fix: concurrency issues between simulation, grpc queries and ABCI com…
Browse files Browse the repository at this point in the history
…mit flow (#213)

* fix: simulation and grpc query concurrency with ABCI

* table tests for selectHeight

* add table-driven GRPC query tests and fix non-existent heights

* clean up

* clean up grpc query tests

* Apply suggestions from code review

* Update testutil/network/network.go

* fix typo in abci.go

* Update baseapp/abci.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* store last block height in a var

* avoid returning named parameters

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
  • Loading branch information
p0mvn and alexanderbez authored May 2, 2022
1 parent d66c967 commit 18b0d2d
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 73 deletions.
11 changes: 8 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,15 +603,20 @@ func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, e
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(height)
if err != nil {
lastBlockHeight := app.LastBlockHeight()
if height > lastBlockHeight {
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", height, err, app.LastBlockHeight(),
"failed to load state at height %d; (latest height: %d)", height, lastBlockHeight,
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(height)
if err != nil {
return sdk.Context{}, fmt.Errorf("failed to load cache multi store for height %d: %w", height, err)
}

// branch the commit-multistore for safety
ctx := sdk.NewContext(
cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger,
Expand Down
5 changes: 5 additions & 0 deletions client/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package client

var (
SelectHeight = selectHeight
)
58 changes: 38 additions & 20 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"

gogogrpc "github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -51,9 +52,23 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

if ctx.GRPCClient != nil {
// Certain queries must not be be concurrent with ABCI to function correctly.
// As a result, we direct them to the ABCI flow where they get syncronized.
_, isSimulationRequest := req.(*tx.SimulateRequest)
isTendermintQuery := strings.Contains(method, "tendermint")

isGRPCAllowed := !isTendermintQuery && !isSimulationRequest

requestedHeight, err := selectHeight(ctx, grpcCtx)
if err != nil {
return err
}

if ctx.GRPCClient != nil && isGRPCAllowed {
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(requestedHeight, 10))
context := metadata.NewOutgoingContext(grpcCtx, md)
// Case 2-1. Invoke grpc.
return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...)
return ctx.GRPCClient.Invoke(context, method, req, reply, opts...)
}

// Case 2-2. Querying state via abci query.
Expand All @@ -62,26 +77,10 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
}
if height < 0 {
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
Height: requestedHeight,
}

res, err := ctx.QueryABCI(abciReq)
Expand All @@ -99,7 +98,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
md := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
Expand All @@ -120,3 +119,22 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

// selectHeight returns the height chosen from client context and grpc context.
// If exists, height extracted from grpcCtx takes precedence.
func selectHeight(clientContext Context, grpcCtx gocontext.Context) (int64, error) {
var height int64
if clientContext.Height > 0 {
height = clientContext.Height
}

md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
var err error
height, err = strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return 0, err
}
}
return height, nil
}
173 changes: 142 additions & 31 deletions client/grpc_query_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// +build norace

package client_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/testutil/network"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -24,13 +24,30 @@ type IntegrationTestSuite struct {
network *network.Network
}

type testcase struct {
clientContextHeight int64
grpcHeight int64
expectedHeight int64
}

const (
// if clientContextHeight or grpcHeight is set to this flag,
// the test assumes that the respective height is not provided.
heightNotSetFlag = int64(-1)
// given the current block time, this should never be reached by the time
// a test is run.
invalidBeyondLatestHeight = 1_000_000_000
// if this flag is set to expectedHeight, an error is assumed.
errorHeightFlag = int64(-2)
)

func (s *IntegrationTestSuite) SetupSuite() {
s.T().Log("setting up integration test suite")

s.network = network.New(s.T(), network.DefaultConfig())
s.Require().NotNil(s.network)

_, err := s.network.WaitForHeight(2)
_, err := s.network.WaitForHeight(3)
s.Require().NoError(err)
}

Expand All @@ -39,44 +56,138 @@ func (s *IntegrationTestSuite) TearDownSuite() {
s.network.Cleanup()
}

func (s *IntegrationTestSuite) TestGRPCQuery() {
func (s *IntegrationTestSuite) TestGRPCQuery_TestService() {
val0 := s.network.Validators[0]

// gRPC query to test service should work
testClient := testdata.NewQueryClient(val0.ClientCtx)
testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"})
s.Require().NoError(err)
s.Require().Equal("hello", testRes.Message)
}

// gRPC query to bank service should work
denom := fmt.Sprintf("%stoken", val0.Moniker)
bankClient := banktypes.NewQueryClient(val0.ClientCtx)
var header metadata.MD
bankRes, err := bankClient.Balance(
context.Background(),
&banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom},
grpc.Header(&header), // Also fetch grpc header
)
s.Require().NoError(err)
s.Require().Equal(
sdk.NewCoin(denom, s.network.Config.AccountTokens),
*bankRes.GetBalance(),
)
blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().NotEmpty(blockHeight[0]) // Should contain the block height

// Request metadata should work
val0.ClientCtx = val0.ClientCtx.WithHeight(1) // We set clientCtx to height 1
bankClient = banktypes.NewQueryClient(val0.ClientCtx)
bankRes, err = bankClient.Balance(
context.Background(),
&banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom},
grpc.Header(&header),
)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{"1"}, blockHeight)
func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() {
val0 := s.network.Validators[0]

const method = "/cosmos.bank.v1beta1.Query/Balance"

testcases := map[string]testcase{
"clientContextHeight 1; grpcHeight not set - clientContextHeight selected": {
clientContextHeight: 1, // chosen
grpcHeight: heightNotSetFlag,
expectedHeight: 1,
},
"clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": {
clientContextHeight: heightNotSetFlag,
grpcHeight: 2, // chosen
expectedHeight: 2,
},
"both not set - 0 returned": {
clientContextHeight: heightNotSetFlag,
grpcHeight: heightNotSetFlag,
expectedHeight: 3, // latest height
},
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": {
clientContextHeight: 1,
grpcHeight: 0, // chosen
expectedHeight: 3, // latest height
},
"clientContextHeight 3; grpcHeight is 3 - 3 is returned": {
clientContextHeight: 3,
grpcHeight: 3,
expectedHeight: 3,
},
"clientContextHeight is 1_000_000_000; grpcHeight is 1_000_000_000 - requested beyond latest height - error": {
clientContextHeight: invalidBeyondLatestHeight,
grpcHeight: invalidBeyondLatestHeight,
expectedHeight: errorHeightFlag,
},
}

for name, tc := range testcases {
s.T().Run(name, func(t *testing.T) {
// Setup
clientCtx := val0.ClientCtx
clientCtx.Height = 0

if tc.clientContextHeight != heightNotSetFlag {
clientCtx = clientCtx.WithHeight(tc.clientContextHeight)
}

grpcContext := context.Background()
if tc.grpcHeight != heightNotSetFlag {
header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight))
grpcContext = metadata.NewOutgoingContext(grpcContext, header)
}

// Test
var header metadata.MD
denom := fmt.Sprintf("%stoken", val0.Moniker)
request := &banktypes.QueryBalanceRequest{Address: val0.Address.String(), Denom: denom}
response := &banktypes.QueryBalanceResponse{}
err := clientCtx.Invoke(grpcContext, method, request, response, grpc.Header(&header))

// Assert results
if tc.expectedHeight == errorHeightFlag {
s.Require().Error(err)
return
}

s.Require().NoError(err)
s.Require().Equal(
sdk.NewCoin(denom, s.network.Config.AccountTokens),
*response.GetBalance(),
)
blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{fmt.Sprintf("%d", tc.expectedHeight)}, blockHeight)
})
}
}

func TestIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(IntegrationTestSuite))
}

func TestSelectHeight(t *testing.T) {
testcases := map[string]testcase{
"clientContextHeight 1; grpcHeight not set - clientContextHeight selected": {
clientContextHeight: 1,
grpcHeight: heightNotSetFlag,
expectedHeight: 1,
},
"clientContextHeight not set; grpcHeight is 2 - grpcHeight is chosen": {
clientContextHeight: heightNotSetFlag,
grpcHeight: 2,
expectedHeight: 2,
},
"both not set - 0 returned": {
clientContextHeight: heightNotSetFlag,
grpcHeight: heightNotSetFlag,
expectedHeight: 0,
},
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": {
clientContextHeight: 3,
grpcHeight: 0,
expectedHeight: 0,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
clientCtx := client.Context{}
if tc.clientContextHeight != heightNotSetFlag {
clientCtx = clientCtx.WithHeight(tc.clientContextHeight)
}

grpcContxt := context.Background()
if tc.grpcHeight != heightNotSetFlag {
header := metadata.Pairs(grpctypes.GRPCBlockHeightHeader, fmt.Sprintf("%d", tc.grpcHeight))
grpcContxt = metadata.NewOutgoingContext(grpcContxt, header)
}

height, err := client.SelectHeight(clientCtx, grpcContxt)
require.NoError(t, err)
require.Equal(t, tc.expectedHeight, height)
})
}
}
19 changes: 19 additions & 0 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/tendermint/tendermint/rpc/client/local"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"

"github.com/cosmos/cosmos-sdk/server/api"
servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
Expand Down Expand Up @@ -105,6 +108,22 @@ func startInProcess(cfg Config, val *Validator) error {

val.grpc = grpcSrv

// If grpc is enabled, configure grpc client.
grpcClient, err := grpc.Dial(
val.AppConfig.GRPC.Address,
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(encoding.GetCodec(proto.Name)),
grpc.MaxCallRecvMsgSize(val.AppConfig.GRPC.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(val.AppConfig.GRPC.MaxSendMsgSize),
),
)
if err != nil {
return err
}

val.ClientCtx = val.ClientCtx.WithGRPCClient(grpcClient)

if val.AppConfig.GRPCWeb.Enable {
val.grpcWeb, err = servergrpc.StartGRPCWeb(grpcSrv, *val.AppConfig)
if err != nil {
Expand Down
Loading

0 comments on commit 18b0d2d

Please sign in to comment.