Skip to content

Commit

Permalink
fix(evmengine): handle engine errors (#372)
Browse files Browse the repository at this point in the history
This PR is the commit from Omni to handle errors of engineAPI
(omni-network/omni#2469).

issue: none
  • Loading branch information
0xHansLee authored Nov 28, 2024
1 parent ea5775e commit 45f1946
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 204 deletions.
8 changes: 6 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
{
"path": "detect_secrets.filters.allowlist.is_line_allowlisted"
},
{
"path": "detect_secrets.filters.common.is_baseline_file",
"filename": ".secrets.baseline"
},
{
"path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies",
"min_level": 2
Expand Down Expand Up @@ -302,7 +306,7 @@
"filename": "lib/ethclient/client_test.go",
"hashed_secret": "e5e9fa1ba31ecd1ae84f75caaa474f3a663f05f4",
"is_verified": false,
"line_number": 141
"line_number": 89
}
],
"lib/ethclient/ethbackend/backend.go": [
Expand Down Expand Up @@ -590,5 +594,5 @@
}
]
},
"generated_at": "2024-08-04T14:14:14Z"
"generated_at": "2024-11-21T08:57:28Z"
}
12 changes: 12 additions & 0 deletions client/app/prouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

abci "github.com/cometbft/cometbft/abci/types"
cmttypes "github.com/cometbft/cometbft/proto/tendermint/types"
Expand All @@ -14,6 +15,11 @@ import (
"github.com/piplabs/story/lib/log"
)

// processTimeout is the maximum time to process a proposal.
// Timeout results in rejecting the proposal, which could negatively affect liveness.
// But it avoids blocking forever, which also negatively affects liveness.
const processTimeout = time.Second * 10

// makeProcessProposalRouter creates a new process proposal router that only routes
// expected messages to expected modules.
func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
Expand All @@ -29,6 +35,11 @@ func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter {
// It also updates some external state.
func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig client.TxConfig) sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
// Only allow 10s to process a proposal. Reject proposal otherwise.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), processTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

// Ensure the proposal includes quorum vote extensions (unless first block).
if req.Height > 1 {
var totalPower, votedPower int64
Expand Down Expand Up @@ -82,6 +93,7 @@ func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig clien
}
}

//nolint:unparam // Explicitly return nil error
func rejectProposal(ctx context.Context, err error) (*abci.ResponseProcessProposal, error) {
log.Error(ctx, "Rejecting process proposal", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down
34 changes: 18 additions & 16 deletions client/x/evmengine/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package keeper
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"runtime/debug"
"strings"
"time"

Expand All @@ -22,18 +20,20 @@ import (
"github.com/piplabs/story/lib/log"
)

// prepareTimeout is the maximum time to prepare a proposal.
// Timeout results in proposing an empty consensus block.
const prepareTimeout = time.Second * 10

// PrepareProposal returns a proposal for the next block.
// Note returning an error results in a panic cometbft and CONSENSUS_FAILURE log.
// Note returning an error results proposing an empty block.
func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPrepareProposal) (
*abci.ResponsePrepareProposal, error,
) {
defer func() {
if r := recover(); r != nil {
log.Error(ctx, "PrepareProposal panic", nil, "recover", r)
fmt.Println("panic stacktrace: \n" + string(debug.Stack()))
panic(r)
}
}()
// Only allow 10s to prepare a proposal. Propose empty block otherwise.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), prepareTimeout)
defer timeoutCancel()
ctx = ctx.WithContext(timeoutCtx)

if len(req.Txs) > 0 {
return nil, errors.New("unexpected transactions in proposal")
} else if req.MaxTxBytes < cmttypes.MaxBlockSizeBytes*9/10 {
Expand Down Expand Up @@ -77,11 +77,13 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos
if err != nil {
log.Warn(ctx, "Preparing proposal failed: build new evm payload (will retry)", err)
return false, nil
} else if fcr.PayloadStatus.Status != engine.VALID {
return false, errors.New("status not valid")
} else if isSyncing(fcr.PayloadStatus) {
return false, errors.New("evm unexpectedly syncing") // Abort, don't retry
} else if invalid, err := isInvalid(fcr.PayloadStatus); invalid {
return false, errors.Wrap(err, "proposed invalid payload") // Abort, don't retry
} else if fcr.PayloadID == nil {
return false, errors.New("missing payload ID")
}
return false, errors.New("missing payload ID [BUG]")
} /* else isValid(status) */

payloadID = *fcr.PayloadID

Expand Down Expand Up @@ -223,7 +225,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {
withdrawals = append(withdrawals, rewardWithdrawals...)

fcr, err := k.startBuild(ctx, k.validatorAddr, withdrawals, appHash, timestamp)
if err != nil || isUnknown(fcr.PayloadStatus) {
if err != nil {
log.Warn(ctx, "Starting optimistic build failed", err, logAttr)
return nil
} else if isSyncing(fcr.PayloadStatus) {
Expand All @@ -235,7 +237,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error {
} else if fcr.PayloadID == nil {
log.Error(ctx, "Starting optimistic build failed; missing payload ID [BUG]", nil, logAttr)
return nil
}
} /* else isValid(status) */

k.setOptimisticPayload(*fcr.PayloadID, uint64(nextHeight))

Expand Down
31 changes: 1 addition & 30 deletions client/x/evmengine/keeper/abci_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestKeeper_PrepareProposal(t *testing.T) {
wantErr: true,
},
{
name: "forkchoiceUpdateV2 not valid",
name: "forkchoiceUpdateV3 not valid",
mockEngine: mockEngineAPI{
headerByTypeFunc: func(context.Context, ethclient.HeadType) (*types.Header, error) {
fuzzer := ethclient.NewFuzzer(0)
Expand Down Expand Up @@ -407,31 +407,6 @@ func TestKeeper_PostFinalize(t *testing.T) {
},
postStateCheck: payloadFailedToSet,
},
{
name: "fail: unknown status from EL",
mockEngine: mockEngineAPI{
forkchoiceUpdatedV3Func: func(ctx context.Context, update eengine.ForkchoiceStateV1,
payloadAttributes *eengine.PayloadAttributes) (eengine.ForkChoiceResponse, error) {
return eengine.ForkChoiceResponse{
PayloadStatus: eengine.PayloadStatusV1{
Status: "unknown status",
LatestValidHash: nil,
ValidationError: nil,
},
PayloadID: &payloadID,
}, nil
},
},
mockClient: mock.MockClient{},
wantErr: false,
enableOptimistic: true,
setupMocks: func(esk *moduletestutil.MockEvmStakingKeeper) {
esk.EXPECT().MaxWithdrawalPerBlock(gomock.Any()).Return(uint32(0), nil)
esk.EXPECT().PeekEligibleWithdrawals(gomock.Any(), gomock.Any()).Return(nil, nil)
esk.EXPECT().PeekEligibleRewardWithdrawals(gomock.Any(), gomock.Any()).Return(nil, nil)
},
postStateCheck: payloadFailedToSet,
},
{
name: "pass",
mockEngine: mockEngineAPI{
Expand Down Expand Up @@ -695,10 +670,6 @@ func (m *mockEngineAPI) HeaderByType(ctx context.Context, typ ethclient.HeadType
return m.mock.HeaderByType(ctx, typ)
}

func (m *mockEngineAPI) NewPayloadV2(ctx context.Context, params eengine.ExecutableData) (eengine.PayloadStatusV1, error) {
return m.mock.NewPayloadV2(ctx, params)
}

//nolint:nonamedreturns // Required for defer
func (m *mockEngineAPI) NewPayloadV3(ctx context.Context, params eengine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (resp eengine.PayloadStatusV1, err error) {
if m.forceInvalidNewPayloadV3 {
Expand Down
25 changes: 6 additions & 19 deletions client/x/evmengine/keeper/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution

err = retryForever(ctx, func(ctx context.Context) (bool, error) {
status, err := pushPayload(ctx, s.engineCl, payload)
if err != nil || isUnknown(status) {
if err != nil {
// We need to retry forever on networking errors, but can't easily identify them, so retry all errors.
log.Warn(ctx, "Processing finalized payload failed: push new payload to evm (will retry)", err,
"status", status.Status)
log.Warn(ctx, "Processing finalized payload failed: push new payload to evm (will retry)", err)

return false, nil // Retry
} else if invalid, err := isInvalid(status); invalid {
Expand All @@ -110,7 +109,7 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution
return false, err // Don't retry, error out.
} else if isSyncing(status) {
log.Warn(ctx, "Processing finalized payload; evm syncing", nil)
}
} /* else isValid(status) */

return true, nil // We are done, don't retry
})
Expand All @@ -127,10 +126,9 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution

err = retryForever(ctx, func(ctx context.Context) (bool, error) {
fcr, err := s.engineCl.ForkchoiceUpdatedV3(ctx, fcs, nil)
if err != nil || isUnknown(fcr.PayloadStatus) {
if err != nil {
// We need to retry forever on networking errors, but can't easily identify them, so retry all errors.
log.Warn(ctx, "Processing finalized payload failed: evm fork choice update (will retry)", err,
"status", fcr.PayloadStatus.Status)
log.Warn(ctx, "Processing finalized payload failed: evm fork choice update (will retry)", err)

return false, nil // Retry
} else if isSyncing(fcr.PayloadStatus) {
Expand All @@ -143,7 +141,7 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution
"payload_height", payload.Number)

return false, err // Don't retry
}
} /* else isValid(status) */

return true, nil
})
Expand Down Expand Up @@ -190,17 +188,6 @@ func pushPayload(ctx context.Context, engineCl ethclient.EngineClient, payload e

var _ types.MsgServiceServer = msgServer{}

func isUnknown(status engine.PayloadStatusV1) bool {
if status.Status == engine.VALID ||
status.Status == engine.INVALID ||
status.Status == engine.SYNCING ||
status.Status == engine.ACCEPTED {
return false
}

return true
}

func isSyncing(status engine.PayloadStatusV1) bool {
return status.Status == engine.SYNCING || status.Status == engine.ACCEPTED
}
Expand Down
7 changes: 3 additions & 4 deletions client/x/evmengine/keeper/proposal_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ func (s proposalServer) ExecutionPayload(ctx context.Context, msg *types.MsgExec
// Push the payload to the EVM.
err = retryForever(ctx, func(ctx context.Context) (bool, error) {
status, err := pushPayload(ctx, s.engineCl, payload)
if err != nil || isUnknown(status) {
if err != nil {
// We need to retry forever on networking errors, but can't easily identify them, so retry all errors.
log.Warn(ctx, "Verifying proposal failed: push new payload to evm (will retry)", err,
"status", status.Status)
log.Warn(ctx, "Verifying proposal failed: push new payload to evm (will retry)", err)

return false, nil // Retry
} else if invalid, err := isInvalid(status); invalid {
Expand All @@ -44,7 +43,7 @@ func (s proposalServer) ExecutionPayload(ctx context.Context, msg *types.MsgExec
// If this is initial sync, we need to continue and set a target head to sync to, so don't retry.
log.Warn(ctx, "Can't properly verifying proposal: evm syncing", err,
"payload_height", payload.Number)
}
} /* else isValid(status) */

return true, nil // We are done, don't retry.
})
Expand Down
56 changes: 2 additions & 54 deletions lib/ethclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@ import (
"github.com/piplabs/story/lib/ethclient"
)

func TestGetPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.PayloadID
fuzzer.Fuzz(&param1)

var resp engine.ExecutionPayloadEnvelope
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.GetPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestGetPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -53,23 +36,6 @@ func TestGetPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ExecutableData
fuzzer.Fuzz(&param1)

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV2(ctx, param1)
}

testEndpoint(t, call, resp, param1)
}

func TestNewPayloadV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -85,6 +51,7 @@ func TestNewPayloadV3(t *testing.T) {

var resp engine.PayloadStatusV1
fuzzer.Fuzz(&resp)
resp.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.NewPayloadV3(ctx, param1, param2, &param3)
Expand All @@ -93,26 +60,6 @@ func TestNewPayloadV3(t *testing.T) {
testEndpoint(t, call, resp, param1, param2, param3)
}

func TestForkchoiceUpdatedV2(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)

var param1 engine.ForkchoiceStateV1
fuzzer.Fuzz(&param1)

var param2 engine.PayloadAttributes
fuzzer.Fuzz(&param2)

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV2(ctx, param1, &param2)
}

testEndpoint(t, call, resp, param1, param2)
}

func TestForkchoiceUpdatedV3(t *testing.T) {
t.Parallel()
fuzzer := fuzz.New().NilChance(0)
Expand All @@ -125,6 +72,7 @@ func TestForkchoiceUpdatedV3(t *testing.T) {

var resp engine.ForkChoiceResponse
fuzzer.Fuzz(&resp)
resp.PayloadStatus.Status = engine.VALID

call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) {
return engineCl.ForkchoiceUpdatedV3(ctx, param1, &param2)
Expand Down
Loading

0 comments on commit 45f1946

Please sign in to comment.