Skip to content

Commit

Permalink
eth: pre-process downloader responses on the peer reader thread
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Dec 1, 2021
1 parent 721c572 commit c893488
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 85 deletions.
72 changes: 45 additions & 27 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// headerTask is a set of downloaded headers to queue along with their precomputed
// hashes to avoid constant rehashing.
type headerTask struct {
headers []*types.Header
hashes []common.Hash
}

type Downloader struct {
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
Expand Down Expand Up @@ -116,7 +123,7 @@ type Downloader struct {
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.

// Channels
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks
headerProcCh chan *headerTask // Channel to feed the header processor new tasks

// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
Expand Down Expand Up @@ -210,7 +217,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan []*types.Header, 1),
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
Expand Down Expand Up @@ -626,7 +633,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
if mode == SnapSync {
fetch = 2 // head + pivot headers
}
headers, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true)
headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -645,7 +652,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0])
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
Expand Down Expand Up @@ -784,7 +791,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)

p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
headers, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false)
headers, hashes, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false)
if err != nil {
return 0, err
}
Expand All @@ -811,7 +818,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
continue
}
// Otherwise check if we already know the header or not
h := headers[i].Hash()
h := hashes[i]
n := headers[i].Number.Uint64()

var known bool
Expand Down Expand Up @@ -854,7 +861,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2

headers, err := d.fetchHeadersByNumber(p, check, 1, 0, false)
headers, hashes, err := d.fetchHeadersByNumber(p, check, 1, 0, false)
if err != nil {
return 0, err
}
Expand All @@ -864,7 +871,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
// Modify the search interval based on the response
h := headers[0].Hash()
h := hashes[0]
n := headers[0].Number.Uint64()

var known bool
Expand Down Expand Up @@ -923,6 +930,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// - Full header retrieval if we're near the chain head
var (
headers []*types.Header
hashes []common.Hash
err error
)
switch {
Expand All @@ -932,15 +940,15 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
d.pivotLock.RUnlock()

p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
headers, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep

case skeleton:
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
headers, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
headers, hashes, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)

default:
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
headers, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false)
headers, hashes, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false)
}
switch err {
case nil:
Expand Down Expand Up @@ -1038,12 +1046,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// If we received a skeleton batch, resolve internals concurrently
var progressed bool
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
filled, hashset, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.log.Debug("Skeleton chain invalid", "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
headers = filled[proced:]
hashes = hashset[proced:]

progressed = proced > 0
from += uint64(proced)
} else {
Expand Down Expand Up @@ -1079,6 +1089,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
delay = n
}
headers = headers[:n-delay]
hashes = hashes[:n-delay]
}
}
}
Expand All @@ -1098,7 +1109,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
if len(headers) > 0 {
p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case d.headerProcCh <- &headerTask{
headers: headers,
hashes: hashes,
}:
case <-d.cancelCh:
return errCanceled
}
Expand All @@ -1121,19 +1135,19 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
//
// The method returns the entire filled skeleton and also the number of headers
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, []common.Hash, int, error) {
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)

err := d.concurrentFetch((*headerQueue)(d))
if err != nil {
log.Debug("Skeleton fill failed", "err", err)
}
filled, proced := d.queue.RetrieveHeaders()
filled, hashes, proced := d.queue.RetrieveHeaders()
if err == nil {
log.Debug("Skeleton fill succeeded", "filled", len(filled), "processed", proced)
}
return filled, proced, err
return filled, hashes, proced, err
}

// fetchBodies iteratively downloads the scheduled block bodies, taking any
Expand Down Expand Up @@ -1199,9 +1213,9 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollbackErr = errCanceled
return errCanceled

case headers := <-d.headerProcCh:
case task := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
if task == nil || len(task.headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
Expand Down Expand Up @@ -1245,6 +1259,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
return nil
}
// Otherwise split the chunk of headers into batches and process them
headers, hashes := task.headers, task.hashes

gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
Expand All @@ -1259,7 +1275,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
chunkHeaders := headers[:limit]
chunkHashes := hashes[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync {
Expand All @@ -1273,22 +1290,22 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
d.pivotLock.RUnlock()

frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
if chunkHeaders[len(chunkHeaders)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders, frequency); err != nil {
rollbackErr = err

// If some headers were inserted, track them as uncertain
if (mode == SnapSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
rollback = chunkHeaders[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the alloted limits
if mode == SnapSync {
head := chunk[len(chunk)-1].Number.Uint64()
head := chunkHeaders[len(chunkHeaders)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
Expand All @@ -1308,13 +1325,14 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
}
// Update the highest block number we know if a higher one is found.
Expand Down
26 changes: 26 additions & 0 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,18 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
}
}
}
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand Down Expand Up @@ -216,13 +221,18 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
}
}
}
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand All @@ -243,12 +253,22 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
bodies[i] = new(eth.BlockBody)
rlp.DecodeBytes(blob, bodies[i])
}
var (
txsHashes = make([]common.Hash, len(bodies))
uncleHashes = make([]common.Hash, len(bodies))
)
hasher := trie.NewStackTrie(nil)
for i, body := range bodies {
txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
}
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockBodiesPacket)(&bodies),
Meta: [][]common.Hash{txsHashes, uncleHashes},
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand All @@ -268,12 +288,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *
for i, blob := range blobs {
rlp.DecodeBytes(blob, &receipts[i])
}
hasher := trie.NewStackTrie(nil)
hashes = make([]common.Hash, len(receipts))
for i, receipt := range receipts {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
}
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.ReceiptsPacket)(&receipts),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand Down
Loading

0 comments on commit c893488

Please sign in to comment.