Skip to content

Commit

Permalink
Make gRPC requests go through tendermint Query (#8549)
Browse files Browse the repository at this point in the history
* Make gRPC requests go through tendermint Query

* Remove commented code

* Dry run in InitChain?

* Save type of first run

* Add metadata in repsonse

* Factorize some code

* Fix lint

* Update comments

* Fix md test

* Fix test expected

* Don't put RunGRPCQuery as clientCtx method

* Update baseapp/grpcserver.go

Co-authored-by: Robert Zaremba <robert@zaremba.ch>

* Address review comments

Co-authored-by: Robert Zaremba <robert@zaremba.ch>
  • Loading branch information
amaury1093 and robert-zaremba committed Feb 15, 2021
1 parent d9d078f commit e306e5b
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 93 deletions.
34 changes: 32 additions & 2 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"fmt"
"reflect"

gogogrpc "github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types"
Expand All @@ -12,13 +13,19 @@ import (
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

var protoCodec = encoding.GetCodec(proto.Name)

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
routes map[string]GRPCQueryHandler
// returnTypes is a map of FQ method name => its return type. It is used
// for cache purposes: the first time a method handler is run, we save its
// return type in this map. Then, on subsequent method handler calls, we
// decode the ABCI response bytes using the cached return type.
returnTypes map[string]reflect.Type
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}
Expand All @@ -34,7 +41,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
returnTypes: map[string]reflect.Type{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -89,8 +97,17 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
if qrt.interfaceRegistry != nil {
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
}

return nil
}, nil)

// If it's the first time we call this handler, then we save
// the return type of the handler in the `returnTypes` map.
// The return type will be used for decoding subsequent requests.
if _, found := qrt.returnTypes[fqName]; !found {
qrt.returnTypes[fqName] = reflect.TypeOf(res)
}

if err != nil {
return abci.ResponseQuery{}, err
}
Expand Down Expand Up @@ -127,3 +144,16 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
reflection.NewReflectionServiceServer(interfaceRegistry),
)
}

// returnTypeOf returns the return type of a gRPC method handler. With the way the
// `returnTypes` cache map is set up, the return type of a method handler is
// guaranteed to be found if it's retrieved **after** the method handler ran at
// least once. If not, then a logic error is return.
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
returnType, found := qrt.returnTypes[method]
if !found {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
}

return returnType, nil
}
83 changes: 47 additions & 36 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,78 @@ package baseapp

import (
"context"
"strconv"
"reflect"

gogogrpc "github.com/gogo/protobuf/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/client"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will route
// the query through the `clientCtx`, which itself queries Tendermint.
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}

return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
}

// Case 2. Querying state.
inMd, _ := metadata.FromIncomingContext(grpcCtx)
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
if err != nil {
return nil, err
}

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
// We need to know the return type of the grpc method for
// unmarshalling abciRes.Value.
//
// When we call each method handler for the first time, we save its
// return type in the `returnTypes` map (see the method handler in
// `grpcrouter.go`). By this time, the method handler has already run
// at least once (in the RunGRPCQuery call), so we're sure the
// returnType maps is populated for this method. We're retrieving it
// for decoding.
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
if err != nil {
return nil, err
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
// returnType is a pointer to a struct. Here, we're creating res which
// is a new pointer to the underlying struct.
res := reflect.New(returnType.Elem()).Interface()

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
err = protoCodec.Unmarshal(abciRes.Value, res)
if err != nil {
return nil, err
}

// Send the metadata header back. The metadata currently includes:
// - block height.
err = grpc.SendHeader(grpcCtx, outMd)
if err != nil {
return nil, err
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
return res, nil
}

// Loop through all services and methods, add the interceptor, and register
Expand Down
99 changes: 54 additions & 45 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,100 +24,109 @@ var _ gogogrpc.ClientConn = Context{}
var protoCodec = encoding.GetCodec(proto.Name)

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
// In both cases, we don't allow empty request req (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}
res, ok := reply.(*tx.BroadcastTxResponse)
resProto, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
if err != nil {
return err
}
*res = *broadcastRes
*resProto = *broadcastRes

return err
}

// Case 2. Querying state.
reqBz, err := protoCodec.Marshal(args)
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
if err != nil {
return err
}

err = protoCodec.Unmarshal(abciRes.Value, reply)
if err != nil {
return err
}

for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = outMd
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
// arguments for the gRPC method, and returns the ABCI response. It is used
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
// gRPC handlers.
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
reqBz, err := protoCodec.Marshal(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
return abci.ResponseQuery{}, nil, err
}
if height < 0 {
return sdkerrors.Wrapf(
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

req := abci.RequestQuery{
abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
}

res, err := ctx.QueryABCI(req)
abciRes, err := ctx.QueryABCI(abciReq)
if err != nil {
return err
}

err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
return abci.ResponseQuery{}, nil, err
}

// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = md
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))

func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
return abciRes, md, nil
}
5 changes: 3 additions & 2 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server/types"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)
app.RegisterGRPCServer(clientCtx, grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
Expand Down
8 changes: 3 additions & 5 deletions server/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
grpc.Header(&header),
)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{"1"}, blockHeight)
s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height.
}

func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
Expand Down Expand Up @@ -124,8 +124,6 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
Events: []string{"message.action=send"},
},
)
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
// should not error anymore.
s.Require().NoError(err)
}

Expand Down Expand Up @@ -185,9 +183,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() {
value string
wantErr string
}{
{"-1", "height < 0"},
{"-1", "\"x-cosmos-block-height\" must be >= 0"},
{"9223372036854775808", "value out of range"}, // > max(int64) by 1
{"-10", "height < 0"},
{"-10", "\"x-cosmos-block-height\" must be >= 0"},
{"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64)
{"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64
}
Expand Down
2 changes: 1 addition & 1 deletion server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
grpcWebSrv *http.Server
)
if config.GRPC.Enable {
grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address)
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit e306e5b

Please sign in to comment.