Skip to content

Commit

Permalink
fix(octane/evmengine): handle engine errors (#2469)
Browse files Browse the repository at this point in the history
Improve EngineAPI error handling:
- Geth returns `status:INVALID` AND errors, so swallow errors if known
status is returned.
- Remaining errors (with unknown status) should only be temporary
networking (or some unexpected geth error).
- Timeout PrepareProposal after 10s, proposing an empty consensus block
rather (other validators probably already moved on in any case.)
 - Timeout ProcessProposal after 1min, prevent blocking forever

issue: #2461
  • Loading branch information
corverroos authored Nov 13, 2024
1 parent 39262ac commit c5dc5fc
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 160 deletions.
12 changes: 12 additions & 0 deletions halo/app/prouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

atypes "github.com/omni-network/omni/halo/attest/types"
"github.com/omni-network/omni/lib/errors"
Expand All @@ -16,6 +17,11 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

// processTimeout is the maximum time to process a proposal.
// Timeout results in rejecting the proposal, which could negatively affect liveness.
// But it avoids blocking forever, which also negatively affects liveness.
const processTimeout = time.Minute

// makeProcessProposalRouter creates a new process proposal router that only routes
// expected messages to expected modules.
func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
Expand All @@ -32,6 +38,11 @@ func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
// It also updates some external state.
func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig client.TxConfig) sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
// Only allow 10s to process a proposal. Reject proposal otherwise.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), processTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

// Ensure the proposal includes quorum vote extensions (unless first block).
if req.Height > 1 {
var totalPower, votedPower int64
Expand Down Expand Up @@ -86,6 +97,7 @@ func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig clien
}
}

//nolint:unparam // Explicitly return nil error
func rejectProposal(ctx context.Context, err error) (*abci.ResponseProcessProposal, error) {
log.Error(ctx, "Rejecting process proposal", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down
56 changes: 2 additions & 54 deletions lib/ethclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestGetPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.PayloadID
fuzzer.Fuzz(&param1)

var resp engine.ExecutionPayloadEnvelope
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.GetPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestGetPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -53,23 +36,6 @@ func TestGetPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ExecutableData
fuzzer.Fuzz(&param1)

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -85,6 +51,7 @@ func TestNewPayloadV3(t *testing.T) {

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)
resp.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV3(ctx, param1, param2, &param3)
Expand All @@ -93,26 +60,6 @@ func TestNewPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1, param2, param3)
}

func TestForkchoiceUpdatedV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ForkchoiceStateV1
fuzzer.Fuzz(&param1)

var param2 engine.PayloadAttributes
fuzzer.Fuzz(&param2)

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV2(ctx, param1, &param2)
}

testEndpoint(t, call, resp, param1, param2)
}

func TestForkchoiceUpdatedV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -125,6 +72,7 @@ func TestForkchoiceUpdatedV3(t *testing.T) {

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)
resp.PayloadStatus.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV3(ctx, param1, &param2)
Expand Down
106 changes: 44 additions & 62 deletions lib/ethclient/engineclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/log"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,25 +31,14 @@ const (
type EngineClient interface {
Client

// NewPayloadV2 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error)
// NewPayloadV3 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash,
beaconRoot *common.Hash) (engine.PayloadStatusV1, error)

// ForkchoiceUpdatedV2 has several responsibilities:
// - It sets the chain the head.
// - And/or it sets the chain's finalized block hash.
// - And/or it starts assembling (async) a block with the payload attributes.
ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error)

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes.
ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error)

// GetPayloadV2 returns a cached payload by id.
GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)
// GetPayloadV3 returns a cached payload by id.
GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error)
}
Expand Down Expand Up @@ -77,50 +67,39 @@ func NewAuthClient(ctx context.Context, urlAddr string, jwtSecret []byte) (Engin
}, nil
}

func (c engineClient) NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error) {
const endpoint = "new_payload_v2"
defer latency(c.chain, endpoint)()

var resp engine.PayloadStatusV1
err := c.cl.Client().CallContext(ctx, &resp, newPayloadV2, params)
if err != nil {
incError(c.chain, endpoint)
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v2")
}

return resp, nil
}

func (c engineClient) NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash,
beaconRoot *common.Hash,
) (engine.PayloadStatusV1, error) {
const endpoint = "new_payload_v3"
defer latency(c.chain, endpoint)()

// isStatusOk returns true if the response status is valid.
isStatusOk := func(status engine.PayloadStatusV1) bool {
return map[string]bool{
engine.VALID: true,
engine.INVALID: true,
engine.SYNCING: true,
engine.ACCEPTED: true,
}[status.Status]
}

var resp engine.PayloadStatusV1
err := c.cl.Client().CallContext(ctx, &resp, newPayloadV3, params, versionedHashes, beaconRoot)
if err != nil {
if isStatusOk(resp) {
// Swallow errors when geth returns errors along with proper responses (but at least log it).
if err != nil {
log.Warn(ctx, "Ignoring new_payload_v3 error with proper response", err, "status", resp.Status)
}

return resp, nil
} else if err != nil {
incError(c.chain, endpoint)
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v3")
}

return resp, nil
}

func (c engineClient) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1,
payloadAttributes *engine.PayloadAttributes,
) (engine.ForkChoiceResponse, error) {
const endpoint = "forkchoice_updated_v2"
defer latency(c.chain, endpoint)()
return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload")
} /* else err==nil && status!=ok */

var resp engine.ForkChoiceResponse
err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV2, update, payloadAttributes)
if err != nil {
incError(c.chain, endpoint)
return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v2")
}
incError(c.chain, endpoint)

return resp, nil
return engine.PayloadStatusV1{}, errors.New("nil error and unknown status", "status", resp.Status)
}

func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1,
Expand All @@ -129,30 +108,33 @@ func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.For
const endpoint = "forkchoice_updated_v3"
defer latency(c.chain, endpoint)()

// isStatusOk returns true if the response status is valid.
isStatusOk := func(resp engine.ForkChoiceResponse) bool {
return map[string]bool{
engine.VALID: true,
engine.INVALID: true,
engine.SYNCING: true,
engine.ACCEPTED: false, // Unexpected in ForkchoiceUpdated
}[resp.PayloadStatus.Status]
}

var resp engine.ForkChoiceResponse
err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV3, update, payloadAttributes)
if err != nil {
if isStatusOk(resp) {
// Swallow errors when geth returns errors along with proper responses (but at least log it).
if err != nil {
log.Warn(ctx, "Ignoring forkchoice_updated_v3 error with proper response", err, "status", resp.PayloadStatus.Status)
}

return resp, nil
} else if err != nil {
incError(c.chain, endpoint)
return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v3")
}

return resp, nil
}

func (c engineClient) GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) (
*engine.ExecutionPayloadEnvelope, error,
) {
const endpoint = "get_payload_v2"
defer latency(c.chain, endpoint)()
} /* else err==nil && status!=ok */

var resp engine.ExecutionPayloadEnvelope
err := c.cl.Client().CallContext(ctx, &resp, getPayloadV2, payloadID)
if err != nil {
incError(c.chain, endpoint)
return nil, errors.Wrap(err, "rpc get payload v2")
}
incError(c.chain, endpoint)

return &resp, nil
return engine.ForkChoiceResponse{}, errors.New("nil error and unknown status", "status", resp.PayloadStatus.Status)
}

func (c engineClient) GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) (
Expand Down
32 changes: 17 additions & 15 deletions octane/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package keeper
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"time"

Expand All @@ -25,18 +23,20 @@ import (
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
)

// prepareTimeout is the maximum time to prepare a proposal.
// Timeout results in proposing an empty consensus block.
const prepareTimeout = time.Second * 10

// PrepareProposal returns a proposal for the next block.
// Note returning an error results in a panic cometbft and CONSENSUS_FAILURE log.
// Note returning an error results proposing an empty block.
func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPrepareProposal) (
*abci.ResponsePrepareProposal, error,
) {
defer func() {
if r := recover(); r != nil {
log.Error(ctx, "PrepareProposal panic", nil, "recover", r)
fmt.Println("panic stacktrace: \n" + string(debug.Stack())) //nolint:forbidigo // Print stacktrace
panic(r)
}
}()
// Only allow 10s to prepare a proposal. Propose empty block otherwise.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), prepareTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

if len(req.Txs) > 0 {
return nil, errors.New("unexpected transactions in proposal")
} else if req.MaxTxBytes < cmttypes.MaxBlockSizeBytes*9/10 {
Expand Down Expand Up @@ -68,11 +68,13 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
if err != nil {
log.Warn(ctx, "Preparing proposal failed: build new evm payload (will retry)", err)
return false, nil // Retry
} else if fcr.PayloadStatus.Status != engine.VALID {
return false, errors.New("status not valid") // Abort, don't retry
} else if isSyncing(fcr.PayloadStatus) {
return false, errors.New("evm unexpectedly syncing") // Abort, don't retry
} else if invalid, err := isInvalid(fcr.PayloadStatus); invalid {
return false, errors.Wrap(err, "proposed invalid payload") // Abort, don't retry
} else if fcr.PayloadID == nil {
return false, errors.New("missing payload ID [BUG]") // Abort, don't retry
}
} /* else isValid(status) */

payloadID = *fcr.PayloadID

Expand Down Expand Up @@ -193,7 +195,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {

// No need to wrap this in retryForever since this is a best-effort optimisation, if it fails, just skip it.
fcr, err := k.startBuild(ctx, appHash, timestamp)
if err != nil || isUnknown(fcr.PayloadStatus) {
if err != nil {
log.Warn(ctx, "Starting optimistic build failed", err, logAttr)
return nil
} else if isSyncing(fcr.PayloadStatus) {
Expand All @@ -205,7 +207,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {
} else if fcr.PayloadID == nil {
log.Error(ctx, "Starting optimistic build failed; missing payload ID [BUG]", nil, logAttr)
return nil
}
} /* else isValid(status) */

k.setOptimisticPayload(*fcr.PayloadID, uint64(nextHeight))

Expand Down
8 changes: 2 additions & 6 deletions octane/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: true,
},
{
name: "forkchoiceUpdateV2 not valid",
name: "forkchoiceUpdateV3 not valid",
mockEngine: mockEngineAPI{
headerByTypeFunc: func(context.Context, ethclient.HeadType) (*types.Header, error) {
fuzzer := ethclient.NewFuzzer(0)
Expand Down Expand Up @@ -502,10 +502,6 @@ func (m *mockEngineAPI) HeaderByType(ctx context.Context, typ ethclient.HeadType
return m.mock.HeaderByType(ctx, typ)
}

func (m *mockEngineAPI) NewPayloadV2(ctx context.Context, params eengine.ExecutableData) (eengine.PayloadStatusV1, error) {
return m.mock.NewPayloadV2(ctx, params)
}

//nolint:nonamedreturns // Required for defer
func (m *mockEngineAPI) NewPayloadV3(ctx context.Context, params eengine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (resp eengine.PayloadStatusV1, err error) {
if status, ok := m.maybeSync(); ok {
Expand Down Expand Up @@ -540,7 +536,7 @@ func (m *mockEngineAPI) GetPayloadV3(ctx context.Context, payloadID eengine.Payl
return m.mock.GetPayloadV3(ctx, payloadID)
}

// pushPayload - invokes the ForkchoiceUpdatedV2 method on the mock engine and returns the payload ID.
// pushPayload - invokes the ForkchoiceUpdatedV3 method on the mock engine and returns the payload ID.
func (m *mockEngineAPI) pushPayload(t *testing.T, ctx context.Context, feeRecipient common.Address, blockHash common.Hash, ts time.Time, appHash common.Hash) *eengine.PayloadID {
t.Helper()
state := eengine.ForkchoiceStateV1{
Expand Down
Loading

0 comments on commit c5dc5fc

Please sign in to comment.