Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(octane/evmengine): handle engine errors #2469

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions halo/app/prouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

atypes "github.com/omni-network/omni/halo/attest/types"
"github.com/omni-network/omni/lib/errors"
Expand All @@ -16,6 +17,11 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

// processTimeout is the maximum time to process a proposal.
// Timeout results in rejecting the proposal, which could negatively affect liveness.
// But it avoids blocking forever, which also negatively affects liveness.
const processTimeout = time.Minute

// makeProcessProposalRouter creates a new process proposal router that only routes
// expected messages to expected modules.
func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
Expand All @@ -32,6 +38,11 @@ func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
// It also updates some external state.
func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig client.TxConfig) sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
// Only allow 10s to process a proposal. Reject proposal otherwise.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), processTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

// Ensure the proposal includes quorum vote extensions (unless first block).
if req.Height > 1 {
var totalPower, votedPower int64
Expand Down Expand Up @@ -86,6 +97,7 @@ func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig clien
}
}

//nolint:unparam // Explicitly return nil error
func rejectProposal(ctx context.Context, err error) (*abci.ResponseProcessProposal, error) {
log.Error(ctx, "Rejecting process proposal", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down
56 changes: 2 additions & 54 deletions lib/ethclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestGetPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.PayloadID
fuzzer.Fuzz(&param1)

var resp engine.ExecutionPayloadEnvelope
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.GetPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestGetPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -53,23 +36,6 @@ func TestGetPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ExecutableData
fuzzer.Fuzz(&param1)

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -85,6 +51,7 @@ func TestNewPayloadV3(t *testing.T) {

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)
resp.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV3(ctx, param1, param2, &param3)
Expand All @@ -93,26 +60,6 @@ func TestNewPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1, param2, param3)
}

func TestForkchoiceUpdatedV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ForkchoiceStateV1
fuzzer.Fuzz(&param1)

var param2 engine.PayloadAttributes
fuzzer.Fuzz(&param2)

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV2(ctx, param1, &param2)
}

testEndpoint(t, call, resp, param1, param2)
}

func TestForkchoiceUpdatedV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -125,6 +72,7 @@ func TestForkchoiceUpdatedV3(t *testing.T) {

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)
resp.PayloadStatus.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV3(ctx, param1, &param2)
Expand Down
106 changes: 44 additions & 62 deletions lib/ethclient/engineclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/log"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,25 +31,14 @@ const (
type EngineClient interface {
Client

// NewPayloadV2 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error)
// NewPayloadV3 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash,
beaconRoot *common.Hash) (engine.PayloadStatusV1, error)

// ForkchoiceUpdatedV2 has several responsibilities:
// - It sets the chain the head.
// - And/or it sets the chain's finalized block hash.
// - And/or it starts assembling (async) a block with the payload attributes.
ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error)

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes.
ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error)

// GetPayloadV2 returns a cached payload by id.
GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)
// GetPayloadV3 returns a cached payload by id.
GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)
}
Expand Down Expand Up @@ -77,50 +67,39 @@ func NewAuthClient(ctx context.Context, urlAddr string, jwtSecret []byte) (Engin
}, nil
}

func (c engineClient) NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error) {
const endpoint = "new_payload_v2"
defer latency(c.chain, endpoint)()

var resp engine.PayloadStatusV1
err := c.cl.Client().CallContext(ctx, &resp, newPayloadV2, params)
if err != nil {
incError(c.chain, endpoint)
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v2")
}

return resp, nil
}

func (c engineClient) NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash,
beaconRoot *common.Hash,
) (engine.PayloadStatusV1, error) {
const endpoint = "new_payload_v3"
defer latency(c.chain, endpoint)()

// isStatusOk returns true if the response status is valid.
isStatusOk := func(status engine.PayloadStatusV1) bool {
return map[string]bool{
engine.VALID: true,
engine.INVALID: true,
engine.SYNCING: true,
engine.ACCEPTED: true,
}[status.Status]
}

var resp engine.PayloadStatusV1
err := c.cl.Client().CallContext(ctx, &resp, newPayloadV3, params, versionedHashes, beaconRoot)
if err != nil {
if isStatusOk(resp) {
// Swallow errors when geth returns errors along with proper responses (but at least log it).
if err != nil {
log.Warn(ctx, "Ignoring new_payload_v3 error with proper response", err, "status", resp.Status)
}

return resp, nil
} else if err != nil {
incError(c.chain, endpoint)
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v3")
}

return resp, nil
}

func (c engineClient) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes,
) (engine.ForkChoiceResponse, error) {
const endpoint = "forkchoice_updated_v2"
defer latency(c.chain, endpoint)()
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload")
} /* else err==nil && status!=ok */

var resp engine.ForkChoiceResponse
err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV2, update, payloadAttributes)
if err != nil {
incError(c.chain, endpoint)
return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v2")
}
incError(c.chain, endpoint)

return resp, nil
return engine.PayloadStatusV1{}, errors.New("nil error and unknown status", "status", resp.Status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a [BUG]?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly yeah

}

func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1,
Expand All @@ -129,30 +108,33 @@ func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.For
const endpoint = "forkchoice_updated_v3"
defer latency(c.chain, endpoint)()

// isStatusOk returns true if the response status is valid.
isStatusOk := func(resp engine.ForkChoiceResponse) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about this code style: why do you prefer wrapping a map with a closure over simply using a map given it's used once in the function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functions are easy to read and document. Also slightly more concise

if isStatusOk(resp) {

vs

if okStatuses[resp.PayloadStatus.Status] {

return map[string]bool{
engine.VALID: true,
engine.INVALID: true,
engine.SYNCING: true,
engine.ACCEPTED: false, // Unexpected in ForkchoiceUpdated
}[resp.PayloadStatus.Status]
}

var resp engine.ForkChoiceResponse
err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV3, update, payloadAttributes)
if err != nil {
if isStatusOk(resp) {
// Swallow errors when geth returns errors along with proper responses (but at least log it).
if err != nil {
log.Warn(ctx, "Ignoring forkchoice_updated_v3 error with proper response", err, "status", resp.PayloadStatus.Status)
}

return resp, nil
} else if err != nil {
incError(c.chain, endpoint)
return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v3")
}

return resp, nil
}

func (c engineClient) GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) (
*engine.ExecutionPayloadEnvelope, error,
) {
const endpoint = "get_payload_v2"
defer latency(c.chain, endpoint)()
} /* else err==nil && status!=ok */

var resp engine.ExecutionPayloadEnvelope
err := c.cl.Client().CallContext(ctx, &resp, getPayloadV2, payloadID)
if err != nil {
incError(c.chain, endpoint)
return nil, errors.Wrap(err, "rpc get payload v2")
}
incError(c.chain, endpoint)

return &resp, nil
return engine.ForkChoiceResponse{}, errors.New("nil error and unknown status", "status", resp.PayloadStatus.Status)
}

func (c engineClient) GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) (
Expand Down
32 changes: 17 additions & 15 deletions octane/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package keeper
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"time"

Expand All @@ -25,18 +23,20 @@ import (
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
)

// prepareTimeout is the maximum time to prepare a proposal.
// Timeout results in proposing an empty consensus block.
const prepareTimeout = time.Second * 10

// PrepareProposal returns a proposal for the next block.
// Note returning an error results in a panic cometbft and CONSENSUS_FAILURE log.
// Note returning an error results proposing an empty block.
func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPrepareProposal) (
*abci.ResponsePrepareProposal, error,
) {
defer func() {
if r := recover(); r != nil {
log.Error(ctx, "PrepareProposal panic", nil, "recover", r)
fmt.Println("panic stacktrace: \n" + string(debug.Stack())) //nolint:forbidigo // Print stacktrace
panic(r)
}
}()
// Only allow 10s to prepare a proposal. Propose empty block otherwise.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), prepareTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

if len(req.Txs) > 0 {
return nil, errors.New("unexpected transactions in proposal")
} else if req.MaxTxBytes < cmttypes.MaxBlockSizeBytes*9/10 {
Expand Down Expand Up @@ -68,11 +68,13 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
if err != nil {
log.Warn(ctx, "Preparing proposal failed: build new evm payload (will retry)", err)
return false, nil // Retry
} else if fcr.PayloadStatus.Status != engine.VALID {
return false, errors.New("status not valid") // Abort, don't retry
} else if isSyncing(fcr.PayloadStatus) {
return false, errors.New("evm unexpectedly syncing") // Abort, don't retry
} else if invalid, err := isInvalid(fcr.PayloadStatus); invalid {
return false, errors.Wrap(err, "proposed invalid payload") // Abort, don't retry
} else if fcr.PayloadID == nil {
return false, errors.New("missing payload ID [BUG]") // Abort, don't retry
}
} /* else isValid(status) */

payloadID = *fcr.PayloadID

Expand Down Expand Up @@ -193,7 +195,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {

// No need to wrap this in retryForever since this is a best-effort optimisation, if it fails, just skip it.
fcr, err := k.startBuild(ctx, appHash, timestamp)
if err != nil || isUnknown(fcr.PayloadStatus) {
if err != nil {
log.Warn(ctx, "Starting optimistic build failed", err, logAttr)
return nil
} else if isSyncing(fcr.PayloadStatus) {
Expand All @@ -205,7 +207,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {
} else if fcr.PayloadID == nil {
log.Error(ctx, "Starting optimistic build failed; missing payload ID [BUG]", nil, logAttr)
return nil
}
} /* else isValid(status) */

k.setOptimisticPayload(*fcr.PayloadID, uint64(nextHeight))

Expand Down
8 changes: 2 additions & 6 deletions octane/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: true,
},
{
name: "forkchoiceUpdateV2 not valid",
name: "forkchoiceUpdateV3 not valid",
mockEngine: mockEngineAPI{
headerByTypeFunc: func(context.Context, ethclient.HeadType) (*types.Header, error) {
fuzzer := ethclient.NewFuzzer(0)
Expand Down Expand Up @@ -502,10 +502,6 @@ func (m *mockEngineAPI) HeaderByType(ctx context.Context, typ ethclient.HeadType
return m.mock.HeaderByType(ctx, typ)
}

func (m *mockEngineAPI) NewPayloadV2(ctx context.Context, params eengine.ExecutableData) (eengine.PayloadStatusV1, error) {
return m.mock.NewPayloadV2(ctx, params)
}

//nolint:nonamedreturns // Required for defer
func (m *mockEngineAPI) NewPayloadV3(ctx context.Context, params eengine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (resp eengine.PayloadStatusV1, err error) {
if status, ok := m.maybeSync(); ok {
Expand Down Expand Up @@ -540,7 +536,7 @@ func (m *mockEngineAPI) GetPayloadV3(ctx context.Context, payloadID eengine.Payl
return m.mock.GetPayloadV3(ctx, payloadID)
}

// pushPayload - invokes the ForkchoiceUpdatedV2 method on the mock engine and returns the payload ID.
// pushPayload - invokes the ForkchoiceUpdatedV3 method on the mock engine and returns the payload ID.
func (m *mockEngineAPI) pushPayload(t *testing.T, ctx context.Context, feeRecipient common.Address, blockHash common.Hash, ts time.Time, appHash common.Hash) *eengine.PayloadID {
t.Helper()
state := eengine.ForkchoiceStateV1{
Expand Down
Loading
Loading