From 5d346de9a1b06c9b0b18f8c91970c98f322a87f3 Mon Sep 17 00:00:00 2001 From: krish-z <122767080+krish-nr@users.noreply.github.com> Date: Thu, 1 Feb 2024 16:02:03 +0800 Subject: [PATCH 1/2] fix: ignore errors that caused by gap to keep peer connection (#49) --- eth/downloader/skeleton.go | 18 +++++++++++++----- eth/etherror/errors.go | 12 ++++++++++++ eth/protocols/eth/handler.go | 16 +++++++++++++++- 3 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 eth/etherror/errors.go diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 12eb5700f8..5ad8834d05 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/etherror" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -73,6 +74,9 @@ var errTerminated = errors.New("terminated") // with a new header, but it does not link up to the existing sync. var errReorgDenied = errors.New("non-forced head reorg denied") +// maxBlockNumGapTolerance is the max gap tolerance by peer +var maxBlockNumGapTolerance = uint64(30) + func init() { // Tuning parameters is nice, but the scratch space must be assignable in // full to peers. It's a useless cornercase to support a dangling half-group. @@ -786,25 +790,29 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { case len(headers) == 0: // No headers were delivered, reject the response and reschedule peer.log.Debug("No headers delivered") - res.Done <- errors.New("no headers delivered") + res.Done <- etherror.ErrNoHeadersDelivered s.scheduleRevertRequest(req) case headers[0].Number.Uint64() != req.head: // Header batch anchored at non-requested number peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head) - res.Done <- errors.New("invalid header batch anchor") + if req.head-headers[0].Number.Uint64() < maxBlockNumGapTolerance { + res.Done <- etherror.ErrHeaderBatchAnchorLow + } else { + res.Done <- etherror.ErrInvalidHeaderBatchAnchor + } s.scheduleRevertRequest(req) case req.head >= requestHeaders && len(headers) != requestHeaders: // Invalid number of non-genesis headers delivered, reject the response and reschedule peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders) - res.Done <- errors.New("not enough non-genesis headers delivered") + res.Done <- etherror.ErrNotEnoughNonGenesisHeaders s.scheduleRevertRequest(req) case req.head < requestHeaders && uint64(len(headers)) != req.head: // Invalid number of genesis headers delivered, reject the response and reschedule peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) - res.Done <- errors.New("not enough genesis headers delivered") + res.Done <- etherror.ErrNotEnoughGenesisHeaders s.scheduleRevertRequest(req) default: @@ -813,7 +821,7 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { for i := 0; i < len(headers)-1; i++ { if headers[i].ParentHash != headers[i+1].Hash() { peer.log.Debug("Invalid hash progression", "index", i, "wantparenthash", headers[i].ParentHash, "haveparenthash", headers[i+1].Hash()) - res.Done <- errors.New("invalid hash progression") + res.Done <- etherror.ErrInvalidHashProgression s.scheduleRevertRequest(req) return } diff --git a/eth/etherror/errors.go b/eth/etherror/errors.go new file mode 100644 index 0000000000..ce14cf22e2 --- /dev/null +++ b/eth/etherror/errors.go @@ -0,0 +1,12 @@ +package etherror + +import "errors" + +var ( + ErrNoHeadersDelivered = errors.New("no headers delivered") + ErrInvalidHeaderBatchAnchor = errors.New("invalid header batch anchor") + ErrNotEnoughNonGenesisHeaders = errors.New("not enough non-genesis headers delivered") + ErrNotEnoughGenesisHeaders = errors.New("not enough genesis headers delivered") + ErrInvalidHashProgression = errors.New("invalid hash progression") + ErrHeaderBatchAnchorLow = errors.New("header batch anchor is lower than requested") +) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 2f2dd1cf6a..5643e6b767 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -17,6 +17,7 @@ package eth import ( + "errors" "fmt" "math/big" "time" @@ -24,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/etherror" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -153,7 +155,19 @@ func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo { // connection is torn down. func Handle(backend Backend, peer *Peer) error { for { - if err := handleMessage(backend, peer); err != nil { + err := handleMessage(backend, peer) + switch { + // TODO: currently no headers not ignored as it may leads to a dead peer not removing as expected + /* + case errors.Is(err, etherror.ErrNoHeadersDelivered): + // ignore no headers delivered + peer.Log().Warn("Message handling failed with no headers") + */ + case errors.Is(err, etherror.ErrHeaderBatchAnchorLow): + // ignore lower header anchor within tolerance + peer.Log().Warn("Message handling failed with lower batch anchor") + + case err != nil: peer.Log().Debug("Message handling failed in `eth`", "err", err) return err } From 8377b0ec3b8b7260613498f7757450168934f7fe Mon Sep 17 00:00:00 2001 From: krish-z <122767080+krish-nr@users.noreply.github.com> Date: Thu, 1 Feb 2024 16:02:59 +0800 Subject: [PATCH 2/2] fix: fix task stuck and not reassign bug in concurrent-fetch logic (#50) --- eth/downloader/fetchers_concurrent.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 649aa27615..399ce5e806 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -33,6 +33,12 @@ import ( // to each request. Failing to do so is considered a protocol violation. var timeoutGracePeriod = 2 * time.Minute +// peersRetryInterval is the retry interval when all peers cannot get the request data. +var peersRetryInterval = 100 * time.Millisecond + +// maxRetries is the max retry time for unreserved download task +var maxRetries = 5 + // typedQueue is an interface defining the adaptor needed to translate the type // specific downloader/queue schedulers into the type-agnostic general concurrent // fetcher algorithm calls. @@ -125,6 +131,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Prepare the queue and fetch block parts until the block header fetcher's done finished := false + + requestRetried := 0 for { // Short circuit if we lost all our peers if d.peers.Len() == 0 && !beaconMode { @@ -195,6 +203,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // to the queue, that is async, and we can do better here by // immediately pushing the unfulfilled requests. queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method + //reset progressed + if len(pending) == 0 { + progressed = false + } continue } pending[peer.id] = req @@ -212,6 +224,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode { return errPeersUnavailable } + // Retry the unreserved task in next loop + if beaconMode && len(pending) == 0 && queued > 0 && !progressed && !throttled && len(idles) == d.peers.Len() { + log.Warn("All idle peers are not valid for current task, will retry ...") + requestRetried++ + if requestRetried > maxRetries { + log.Info("max retry exceeded, cancel request") + return errCanceled + } + time.Sleep(peersRetryInterval) + continue + } } // Wait for something to happen select {