Skip to content

Commit

Permalink
refactor(server/v2): kill viper from server components (#21663)
Browse files Browse the repository at this point in the history
(cherry picked from commit 88cfebe)

# Conflicts:
#	server/v2/api/grpc/server.go
#	server/v2/api/grpcgateway/server.go
#	server/v2/commands.go
#	server/v2/config.go
#	server/v2/config_test.go
#	server/v2/server.go
#	server/v2/server_mock_test.go
#	server/v2/server_test.go
#	server/v2/store/server.go
  • Loading branch information
julienrbrt authored and mergify[bot] committed Sep 12, 2024
1 parent e7724c6 commit 88f5f98
Show file tree
Hide file tree
Showing 12 changed files with 1,193 additions and 22 deletions.
203 changes: 203 additions & 0 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package grpc

import (
"context"
"errors"
"fmt"
"io"
"maps"
"net"
"slices"
"strconv"

"github.com/cosmos/gogoproto/proto"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"

Check failure on line 22 in server/v2/api/grpc/server.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/server/v2; to add it:
"cosmossdk.io/server/v2/api/grpc/gogoreflection"

Check failure on line 23 in server/v2/api/grpc/server.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/server/v2/api/grpc/gogoreflection; to add it:
)

const (
ServerName = "grpc"

BlockHeightHeader = "x-cosmos-block-height"
)

type Server[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption

grpcSrv *grpc.Server
}

// New creates a new grpc server.
func New[T transaction.Tx](cfgOptions ...CfgOption) *Server[T] {
return &Server[T]{
cfgOptions: cfgOptions,
}
}

// Init returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
methodsMap := appI.GetGPRCMethodsToMessageMap()

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
grpc.MaxSendMsgSize(serverCfg.MaxSendMsgSize),
grpc.MaxRecvMsgSize(serverCfg.MaxRecvMsgSize),
grpc.UnknownServiceHandler(
makeUnknownServiceHandler(methodsMap, appI.GetAppManager()),
),
)

// Reflection allows external clients to see what services and methods the gRPC server exposes.
gogoreflection.Register(grpcSrv, slices.Collect(maps.Keys(methodsMap)), logger.With("sub-module", "grpc-reflection"))

s.grpcSrv = grpcSrv
s.config = serverCfg
s.logger = logger.With(log.ModuleKey, s.Name())

return nil
}

func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
flags := pflag.NewFlagSet(s.Name(), pflag.ExitOnError)
flags.String(FlagAddress, "localhost:9090", "Listen address")
return flags
}

func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface {
Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error)
},
) grpc.StreamHandler {
return func(srv any, stream grpc.ServerStream) error {
method, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Error(codes.InvalidArgument, "unable to get method")
}
makeMsg, exists := messageMap[method]
if !exists {
return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method)
}
for {
req := makeMsg()
err := stream.RecvMsg(req)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}

// extract height header
ctx := stream.Context()
height, err := getHeightFromCtx(ctx)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid get height from context: %v", err)
}
resp, err := querier.Query(ctx, height, req)
if err != nil {
return err
}
err = stream.SendMsg(resp)
if err != nil {
return err
}
}
}
}

func getHeightFromCtx(ctx context.Context) (uint64, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return 0, nil
}
values := md.Get(BlockHeightHeader)
if len(values) == 0 {
return 0, nil
}
if len(values) != 1 {
return 0, fmt.Errorf("gRPC height metadata must be of length 1, got: %d", len(values))
}

heightStr := values[0]
height, err := strconv.ParseUint(heightStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse height string from gRPC metadata %s: %w", heightStr, err)
}

return height, nil
}

func (s *Server[T]) Name() string {
return ServerName
}

func (s *Server[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
opt(cfg)
}

return cfg
}

return s.config
}

func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}

listener, err := net.Listen("tcp", s.config.Address)
if err != nil {
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
s.logger.Info("starting gRPC server...", "address", s.config.Address)
errCh <- s.grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
err = <-errCh
if err != nil {
s.logger.Error("failed to start gRPC server", "err", err)
}

return err
}

func (s *Server[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}

s.logger.Info("stopping gRPC server...", "address", s.config.Address)
s.grpcSrv.GracefulStop()

return nil
}
144 changes: 144 additions & 0 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package grpcgateway

import (
"context"
"fmt"
"net/http"
"strings"

gateway "github.com/cosmos/gogogateway"
"github.com/cosmos/gogoproto/jsonpb"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
)

var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)

const (
ServerName = "grpc-gateway"

// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)

type GRPCGatewayServer[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption

GRPCSrv *grpc.Server
GRPCGatewayRouter *runtime.ServeMux
}

// New creates a new gRPC-gateway server.
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
EmitDefaults: true,
Indent: "",
OrigName: true,
AnyResolver: ir,
}

return &GRPCGatewayServer[T]{
GRPCSrv: grpcSrv,
GRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),

// This is necessary to get error details properly
// marshaled in unary requests.
runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),

// Custom header matcher for mapping request headers to
// GRPC metadata
runtime.WithIncomingHeaderMatcher(CustomGRPCHeaderMatcher),
),
cfgOptions: cfgOptions,
}
}

func (g *GRPCGatewayServer[T]) Name() string {
return ServerName
}

func (s *GRPCGatewayServer[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
opt(cfg)
}

return cfg
}

return s.config
}

func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}

// Register the gRPC-Gateway server.
// appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv)

s.logger = logger
s.config = serverCfg

return nil
}

func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}

// TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98)

return nil
}

func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}

return nil
}

// Register implements registers a grpc-gateway server
func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// configure grpc-gatway server
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
s.GRPCGatewayRouter.ServeHTTP(w, req)
}))

return nil
}

// CustomGRPCHeaderMatcher for mapping request headers to
// GRPC metadata.
// HTTP headers that start with 'Grpc-Metadata-' are automatically mapped to
// gRPC metadata after removing prefix 'Grpc-Metadata-'. We can use this
// CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-`
func CustomGRPCHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case GRPCBlockHeightHeader:
return GRPCBlockHeightHeader, true

default:
return runtime.DefaultHeaderMatcher(key)
}
}
14 changes: 0 additions & 14 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package cometbft

import (
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/spf13/viper"

serverv2 "cosmossdk.io/server/v2"
)

// Config is the configuration for the CometBFT application
Expand Down Expand Up @@ -53,14 +50,3 @@ func OverwriteDefaultAppTomlConfig(newCfg *AppTomlConfig) CfgOption {
cfg.AppTomlConfig = newCfg
}
}

func getConfigTomlFromViper(v *viper.Viper) *cmtcfg.Config {
rootDir := v.GetString(serverv2.FlagHome)

conf := cmtcfg.DefaultConfig()
if err := v.Unmarshal(conf); err != nil {
return cmtcfg.DefaultConfig().SetRoot(rootDir)
}

return conf.SetRoot(rootDir)
}
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117
google.golang.org/grpc v1.66.0
Expand Down Expand Up @@ -147,6 +146,7 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/viper v1.19.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
Expand Down
Loading

0 comments on commit 88f5f98

Please sign in to comment.