Skip to content

Commit

Permalink
refactor(server/v2/cometbft): add codec.Codec and clean-up APIs (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt authored Nov 20, 2024
1 parent d810b77 commit efc05e8
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 147 deletions.
96 changes: 24 additions & 72 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
Expand All @@ -37,6 +33,11 @@ import (
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
consensustypes "cosmossdk.io/x/consensus/types"

"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)

const (
Expand All @@ -45,22 +46,24 @@ const (
QueryPathStore = "store"
)

var _ abci.Application = (*Consensus[transaction.Tx])(nil)
var _ abci.Application = (*consensus[transaction.Tx])(nil)

type Consensus[T transaction.Tx] struct {
// consensus contains the implementation of the ABCI interface for CometBFT.
type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
appCodec codec.Codec
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
indexedEvents map[string]struct{}
chainID string
indexedEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand All @@ -81,60 +84,9 @@ type Consensus[T transaction.Tx] struct {
getProtoRegistry func() (*protoregistry.Files, error)
}

func NewConsensus[T transaction.Tx](
logger log.Logger,
appName string,
app appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
queryHandlersMap map[string]appmodulev2.Handler,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
chainId string,
) *Consensus[T] {
return &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: cfg,
store: store,
logger: logger,
txCodec: txCodec,
streaming: streaming.Manager{},
snapshotManager: nil,
mempool: mp,
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: nil,
processProposalHandler: nil,
verifyVoteExt: nil,
extendVote: nil,
chainID: chainId,
indexedEvents: indexedEvents,
initialHeight: 0,
queryHandlersMap: queryHandlersMap,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
}
}

// SetStreamingManager sets the streaming manager for the consensus module.
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
return fmt.Errorf("failed to register snapshot extensions: %w", err)
}

return nil
}

// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
}

// Info implements types.Application.
func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
version, _, err := c.store.StateLatest()
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
if isGRPC {
return resp, err
Expand All @@ -227,7 +179,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (

switch path[0] {
case QueryPathApp:
resp, err = c.handlerQueryApp(ctx, path, req)
resp, err = c.handleQueryApp(ctx, path, req)

case QueryPathStore:
resp, err = c.handleQueryStore(path, req)
Expand All @@ -246,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, nil
}

func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
Expand Down Expand Up @@ -288,7 +240,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq

txResult, _, err := c.app.Simulate(ctx, tx)
if err != nil {
return nil, true, fmt.Errorf("%v with gas used: '%d'", err, txResult.GasUsed)
return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err)
}

msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp))
Expand Down Expand Up @@ -337,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
}

// InitChain implements types.Application.
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)

// store chainID to be used later on in execution
Expand Down Expand Up @@ -421,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

// PrepareProposal implements types.Application.
// It is called by cometbft to prepare a proposal block.
func (c *Consensus[T]) PrepareProposal(
func (c *consensus[T]) PrepareProposal(
ctx context.Context,
req *abciproto.PrepareProposalRequest,
) (resp *abciproto.PrepareProposalResponse, err error) {
Expand Down Expand Up @@ -457,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal(

// ProcessProposal implements types.Application.
// It is called by cometbft to process/verify a proposal block.
func (c *Consensus[T]) ProcessProposal(
func (c *consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
Expand Down Expand Up @@ -491,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal(

// FinalizeBlock implements types.Application.
// It is called by cometbft to finalize a block.
func (c *Consensus[T]) FinalizeBlock(
func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
Expand Down Expand Up @@ -581,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock(

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
lastCommittedHeight := c.lastCommittedHeight.Load()

c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight)
Expand All @@ -599,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (
// Vote extensions

// VerifyVoteExtension implements types.Application.
func (c *Consensus[T]) VerifyVoteExtension(
func (c *consensus[T]) VerifyVoteExtension(
ctx context.Context,
req *abciproto.VerifyVoteExtensionRequest,
) (*abciproto.VerifyVoteExtensionResponse, error) {
Expand Down Expand Up @@ -641,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension(
}

// ExtendVote implements types.Application.
func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
// If vote extensions are not enabled, as a safety precaution, we return an
// error.
cp, err := c.GetConsensusParams(ctx)
Expand Down
17 changes: 13 additions & 4 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -637,7 +638,7 @@ func TestConsensus_Query(t *testing.T) {
require.Equal(t, res.Value, []byte(nil))
}

func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] {
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] {
t.Helper()

msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) {
Expand Down Expand Up @@ -699,9 +700,17 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
nil,
)

return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am,
mempool, map[string]struct{}{}, nil, mockStore,
Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
mempool: mempool,
store: mockStore,
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
txCodec: mock.TxCodec{},
chainID: "test",
getProtoRegistry: sync.OnceValues(proto.MergedRegistry),
}
}

// Check target version same with store's latest version
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *CometBFTServer[T]) BootstrapStateCmd() *cobra.Command {
return err
}
if height == 0 {
height, err = s.Consensus.store.GetLatestVersion()
height, err = s.store.GetLatestVersion()
if err != nil {
return err
}
Expand Down
41 changes: 26 additions & 15 deletions server/v2/cometbft/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package cometbft
import (
"context"

v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1"
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -12,8 +13,8 @@ import (
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
cmtv1beta1 "cosmossdk.io/api/cosmos/base/tendermint/v1beta1"
"cosmossdk.io/core/server"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
Expand All @@ -23,17 +24,25 @@ import (
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
)

// GRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
type appSimulator[T transaction.Tx] interface {
Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error)
}

// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
// Those services are defined for backward compatibility.
// Eventually, they will be removed in favor of the new gRPC services.
func (c *Consensus[T]) GRPCServiceRegistrar(
func gRPCServiceRegistrar[T transaction.Tx](
clientCtx client.Context,
cfg server.ConfigMap,
cometBFTAppConfig *AppTomlConfig,
txCodec transaction.Codec[T],
consensus abci.Application,
app appSimulator[T],
) func(srv *grpc.Server) error {
return func(srv *grpc.Server) error {
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, c.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, c})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, c})
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec))
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app})
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus})

return nil
}
Expand Down Expand Up @@ -86,7 +95,8 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{

type txServer[T transaction.Tx] struct {
clientCtx client.Context
consensus *Consensus[T]
txCodec transaction.Codec[T]
app appSimulator[T]
}

// BroadcastTx implements tx.ServiceServer.
Expand Down Expand Up @@ -132,12 +142,12 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
return nil, status.Errorf(codes.InvalidArgument, "empty txBytes is not allowed")
}

tx, err := t.consensus.txCodec.Decode(txBytes)
tx, err := t.txCodec.Decode(txBytes)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to decode tx")
return nil, status.Errorf(codes.InvalidArgument, "failed to decode tx: %v", err)
}

txResult, _, err := t.consensus.app.Simulate(ctx, tx)
txResult, _, err := t.app.Simulate(ctx, tx)
if err != nil {
return nil, status.Errorf(codes.Unknown, "%v with gas used: '%d'", err, txResult.GasUsed)
}
Expand Down Expand Up @@ -186,8 +196,9 @@ func (t txServer[T]) TxEncodeAmino(context.Context, *txtypes.TxEncodeAminoReques
var _ txtypes.ServiceServer = txServer[transaction.Tx]{}

type nodeServer[T transaction.Tx] struct {
cfg server.ConfigMap
consensus *Consensus[T]
cfg server.ConfigMap
cometBFTAppConfig *AppTomlConfig
consensus abci.Application
}

func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) (*nodeservice.ConfigResponse, error) {
Expand All @@ -201,12 +212,12 @@ func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest)
MinimumGasPrice: minGasPricesStr,
PruningKeepRecent: "ambiguous in v2",
PruningInterval: "ambiguous in v2",
HaltHeight: s.consensus.cfg.AppTomlConfig.HaltHeight,
HaltHeight: s.cometBFTAppConfig.HaltHeight,
}, nil
}

func (s nodeServer[T]) Status(ctx context.Context, _ *nodeservice.StatusRequest) (*nodeservice.StatusResponse, error) {
nodeInfo, err := s.consensus.Info(ctx, &v1.InfoRequest{})
nodeInfo, err := s.consensus.Info(ctx, &abciproto.InfoRequest{})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit efc05e8

Please sign in to comment.