Skip to content

Commit

Permalink
feat(server/v2/cometbft): optimistic execution (#22560)
Browse files Browse the repository at this point in the history
Co-authored-by: Randy Grok <@faulttolerance.net>
  • Loading branch information
randygrok authored Nov 28, 2024
1 parent ca48cef commit 9caec06
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 46 deletions.
149 changes: 112 additions & 37 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
Expand All @@ -28,6 +28,7 @@ import (
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/oe"
"cosmossdk.io/server/v2/cometbft/types"
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
"cosmossdk.io/server/v2/streaming"
Expand Down Expand Up @@ -77,6 +78,11 @@ type consensus[T transaction.Tx] struct {
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution[T]

addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

Expand Down Expand Up @@ -385,6 +391,14 @@ func (c *consensus[T]) PrepareProposal(
return nil, errors.New("no prepare proposal function was set")
}

// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand Down Expand Up @@ -421,6 +435,16 @@ func (c *consensus[T]) ProcessProposal(
return nil, errors.New("no process proposal function was set")
}

// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -436,6 +460,17 @@ func (c *consensus[T]) ProcessProposal(
}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
Expand All @@ -447,46 +482,40 @@ func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, err
}
var (
resp *server.BlockResponse
newState store.WriterMap
decodedTxs []T
err error
)

if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)

// Wait for the OE to finish, regardless of whether it was aborted or not
res, optimistErr := c.optimisticExec.WaitResult()

if !aborted {
if res != nil {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, err
}
if optimistErr != nil {
return nil, optimistErr
}
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
c.optimisticExec.Reset()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
if resp == nil { // if we didn't run OE, run the normal finalize block
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
}

// after we get the changeset we can produce the commit hash,
Expand Down Expand Up @@ -531,6 +560,52 @@ func (c *consensus[T]) FinalizeBlock(
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
}

func (c *consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*server.BlockResponse, store.WriterMap, []T, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, nil, nil, err
}

if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, nil, nil, err
}

// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}

cid, err := c.store.LastCommitID()
if err != nil {
return nil, nil, nil, err
}

blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})

resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq)

return resp, stateChanges, decodedTxs, err
}

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
Expand Down
99 changes: 91 additions & 8 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package cometbft

import (
"context"
"cosmossdk.io/core/server"
"crypto/sha256"
"encoding/json"
"errors"
abci "github.com/cometbft/cometbft/abci/types"
"io"
"strings"
"sync"
"testing"
"time"

"cosmossdk.io/server/v2/cometbft/oe"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
v1 "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -56,10 +60,10 @@ func getQueryRouterBuilder[T any, PT interface {
*T
proto.Message
},
U any, UT interface {
*U
proto.Message
}](
U any, UT interface {
*U
proto.Message
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand All @@ -86,10 +90,10 @@ func getMsgRouterBuilder[T any, PT interface {
*T
transaction.Msg
},
U any, UT interface {
*U
transaction.Msg
}](
U any, UT interface {
*U
transaction.Msg
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand Down Expand Up @@ -514,6 +518,12 @@ func TestConsensus_ProcessProposal(t *testing.T) {
require.Error(t, err)

// NoOp handler
// dummy optimistic execution
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)

c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
_, err = c.ProcessProposal(context.Background(), &abciproto.ProcessProposalRequest{
Height: 1,
Expand Down Expand Up @@ -724,3 +734,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}

func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})

// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler

// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
calledTimes++
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)

_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)

_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)

theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}

// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)

resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]

// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)

// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}
Loading

0 comments on commit 9caec06

Please sign in to comment.