Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine API: NewPayload fails with a "context canceled" error in Current/GetHeader (#9786) #10180

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion erigon-lib/common/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,27 @@

package common

import "errors"
import (
"errors"

"golang.org/x/net/context"
)

var ErrStopped = errors.New("stopped")
var ErrUnwind = errors.New("unwound")

// FastContextErr is faster than ctx.Err() because usually it doesn't lock an internal mutex.
// It locks it only if the context is done and at the first call.
// See implementation of cancelCtx in context/context.go.
func FastContextErr(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

func Stopped(ch <-chan struct{}) error {
if ch == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/net v0.24.0
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
db.trackTxEnd()
return nil, semErr
return nil, fmt.Errorf("mdbx.MdbxKV.BeginRo: roTxsLimiter error %w", semErr)
}

defer func() {
Expand Down
7 changes: 4 additions & 3 deletions erigon-lib/kv/remotedb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"runtime"
"unsafe"

"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (db *DB) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
}

if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
return nil, fmt.Errorf("remotedb.DB.BeginRo: roTxsLimiter error %w", semErr)
}

defer func() {
Expand Down
13 changes: 12 additions & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (

"github.com/golang-jwt/jwt/v4"
jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
)

const (
Expand Down Expand Up @@ -237,6 +239,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// until EOF, writes the response to w, and orders the server to process a
// single request.
ctx := r.Context()

// The context might be cancelled if the client's connection was closed while waiting for ServeHTTP.
if libcommon.FastContextErr(ctx) != nil {
// TODO: introduce an log message for all possible cases
// s.logger.Warn("rpc.Server.ServeHTTP: client connection was lost. Check if the server is able to keep up with the request rate.", "url", r.URL.String())
w.WriteHeader(http.StatusServiceUnavailable)
return
}

ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
ctx = context.WithValue(ctx, "scheme", r.Proto)
ctx = context.WithValue(ctx, "local", r.Host)
Expand Down
2 changes: 2 additions & 0 deletions turbo/execution/eth1/block_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (e *EthereumExecutionModule) evictOldBuilders() {
// Missing: NewPayload, AssembleBlock
func (e *EthereumExecutionModule) AssembleBlock(ctx context.Context, req *execution.AssembleBlockRequest) (*execution.AssembleBlockResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.AssembleBlock: ExecutionStatus_Busy")
return &execution.AssembleBlockResponse{
Id: 0,
Busy: true,
Expand Down Expand Up @@ -108,6 +109,7 @@ func blockValue(br *types.BlockWithReceipts, baseFee *uint256.Int) *uint256.Int

func (e *EthereumExecutionModule) GetAssembledBlock(ctx context.Context, req *execution.GetAssembledBlockRequest) (*execution.GetAssembledBlockResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.GetAssembledBlock: ExecutionStatus_Busy")
return &execution.GetAssembledBlockResponse{
Busy: true,
}, nil
Expand Down
48 changes: 13 additions & 35 deletions turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (c ChainReaderWriterEth1) FrozenBlocks(ctx context.Context) uint64 {
return ret.FrozenBlocks
}

const retryTimeout = 10 * time.Millisecond

func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks []*types.Block) error {
request := &execution.InsertBlocksRequest{
Blocks: eth1_utils.ConvertBlocksToRPC(blocks),
Expand All @@ -281,22 +279,26 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks [
if err != nil {
return err
}
retryInterval := time.NewTicker(retryTimeout)
defer retryInterval.Stop()

// limit the number of retries
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for response.Result == execution.ExecutionStatus_Busy {
const retryDelay = 100 * time.Millisecond
select {
case <-retryInterval.C:
response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
case <-time.After(retryDelay):
case <-ctx.Done():
return ctx.Err()
}

response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
}
if response.Result != execution.ExecutionStatus_Success {
return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String())
return fmt.Errorf("InsertBlocksAndWait: executionModule.InsertBlocks ExecutionStatus = %s", response.Result.String())
}
return nil
}
Expand All @@ -321,31 +323,7 @@ func (c ChainReaderWriterEth1) InsertBlocks(ctx context.Context, blocks []*types

func (c ChainReaderWriterEth1) InsertBlockAndWait(ctx context.Context, block *types.Block) error {
blocks := []*types.Block{block}
request := &execution.InsertBlocksRequest{
Blocks: eth1_utils.ConvertBlocksToRPC(blocks),
}

response, err := c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
retryInterval := time.NewTicker(retryTimeout)
defer retryInterval.Stop()
for response.Result == execution.ExecutionStatus_Busy {
select {
case <-retryInterval.C:
response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
case <-ctx.Done():
return context.Canceled
}
}
if response.Result != execution.ExecutionStatus_Success {
return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String())
}
return c.InsertBlocksAndWait(ctx, []*types.Block{block})
return c.InsertBlocksAndWait(ctx, blocks)
}

func (c ChainReaderWriterEth1) ValidateChain(ctx context.Context, hash libcommon.Hash, number uint64) (execution.ExecutionStatus, *string, libcommon.Hash, error) {
Expand Down
9 changes: 6 additions & 3 deletions turbo/execution/eth1/ethereum_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"errors"
"math/big"

"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/math"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (e *EthereumExecutionModule) canonicalHash(ctx context.Context, tx kv.Tx, b

func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execution.ValidationRequest) (*execution.ValidationReceipt, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.ValidateChain: ExecutionStatus_Busy")
return &execution.ValidationReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
ValidationStatus: execution.ExecutionStatus_Busy,
Expand Down Expand Up @@ -258,6 +260,7 @@ func (e *EthereumExecutionModule) Start(ctx context.Context) {

func (e *EthereumExecutionModule) Ready(context.Context, *emptypb.Empty) (*execution.ReadyResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.Ready: ExecutionStatus_Busy")
return &execution.ReadyResponse{Ready: false}, nil
}
defer e.semaphore.Release(1)
Expand Down
1 change: 1 addition & 0 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func writeForkChoiceHashes(tx kv.RwTx, blockHash, safeHash, finalizedHash common

func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, originalBlockHash, safeHash, finalizedHash common.Hash, outcomeCh chan forkchoiceOutcome) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.updateForkChoice: ExecutionStatus_Busy")
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
Status: execution.ExecutionStatus_Busy,
Expand Down
Loading
Loading