Skip to content

Commit

Permalink
eth, trie: sync with upstream v1.10.26 to solve snap sync issues (eth…
Browse files Browse the repository at this point in the history
…ereum#1226)

* eth: fix a rare datarace on CHT challenge reply / shutdown

* trie: check childrens' existence concurrently for snap heal

* eth/protocols/snap: fix problems due to idle-but-busy peers

* eth/filters: change filter block to be by-ref (ethereum#26054)

This PR changes the block field in the filter to be a pointer, to disambiguate between empty hash and no hash

* rpc: handle wrong HTTP batch response length (ethereum#26064)

* eth/protocols/snap: throttle trie heal requests when peers DoS us (ethereum#25666)

* eth/protocols/snap: throttle trie heal requests when peers DoS us

* eth/protocols/snap: lower heal throttle log to debug

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth/protocols/snap: fix comment

Co-authored-by: Martin Holst Swende <martin@swende.se>

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Jordan Krage <jmank88@gmail.com>
  • Loading branch information
4 people authored Dec 13, 2022
1 parent 8894b65 commit c5b7d74
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 68 deletions.
10 changes: 5 additions & 5 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash

block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher

Expand Down Expand Up @@ -106,7 +106,7 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(backend, addresses, topics)
filter.block = block
filter.block = &block
return filter
}

Expand All @@ -125,8 +125,8 @@ func newFilter(backend Backend, addresses []common.Address, topics [][]common.Ha
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.backend.HeaderByHash(ctx, f.block)
if f.block != nil {
header, err := f.backend.HeaderByHash(ctx, *f.block)
if err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -446,10 +451,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit whitelist block hashes, request them
for number, hash := range h.whitelist {
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err
}
go func(number uint64, hash common.Hash) {
go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -478,7 +488,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Whitelist challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID())
}
}(number, hash)
}(number, hash, req)
}
// Handle incoming messages until the connection is torn down
return handler(peer)
Expand Down
197 changes: 155 additions & 42 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,6 +81,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512

// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005

// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1

// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount

// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33

// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)

var (
Expand Down Expand Up @@ -432,6 +457,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running

trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated

trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
Expand Down Expand Up @@ -477,9 +507,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),

trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
stateWriter: db.NewBatch(),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),

extProgress: new(SyncProgress),
}
Expand Down Expand Up @@ -1324,6 +1355,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap)
Expand Down Expand Up @@ -2094,6 +2129,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes {
node := res.nodes[i]

Expand All @@ -2102,6 +2141,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[hash] = res.paths[i]
continue
}
fills++

// Push the trie node into the state syncer
s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node))
Expand All @@ -2125,6 +2166,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))

// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))

// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate

pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()

log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
}

// processBytecodeHealResponse integrates an already validated bytecode response
Expand Down Expand Up @@ -2252,14 +2337,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.accountReqs[id]
if !ok {
Expand Down Expand Up @@ -2364,14 +2453,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeReqs[id]
if !ok {
Expand Down Expand Up @@ -2473,14 +2566,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.storageReqs[id]
if !ok {
Expand Down Expand Up @@ -2600,14 +2697,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.trienodeHealReqs[id]
if !ok {
Expand Down Expand Up @@ -2643,10 +2744,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error

// Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash := make([]byte, 32)

nodes := make([][]byte, len(req.hashes))
var (
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils
hasher.Reset()
Expand All @@ -2658,16 +2761,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
}
if j < len(req.hashes) {
nodes[j] = trienodes[i]
fills++
j++
continue
}
// We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode")
}
// Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{
task: req.task,
hashes: req.hashes,
Expand Down Expand Up @@ -2695,14 +2804,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeHealReqs[id]
if !ok {
Expand Down
Loading

0 comments on commit c5b7d74

Please sign in to comment.