Skip to content

Commit

Permalink
eth/downloader: remove stale beacon headers as backfilling progresses (
Browse files Browse the repository at this point in the history
…ethereum#24670)

* eth/downloader: remove stale beacon headers as backfilling progresses

* eth/downloader: remove leftover from a previous design

* eth/downloader: do partial beacon cleanups if chain is large

* eth/downloader: linter != heart
  • Loading branch information
karalabe authored and cp-wjhan committed Nov 29, 2022
1 parent 527f83a commit 266db69
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 16 deletions.
15 changes: 12 additions & 3 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type beaconBackfiller struct {
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
filled *types.Header // Last header filled by the last terminated sync loop
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
Expand All @@ -48,16 +49,18 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
}
}

// suspend cancels any background downloader threads.
func (b *beaconBackfiller) suspend() {
// suspend cancels any background downloader threads and returns the last header
// that has been successfully backfilled.
func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
filled := b.filled
started := b.started
b.lock.Unlock()

if !filling {
return
return filled // Return the filled header on the previous sync completion
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
Expand All @@ -69,6 +72,10 @@ func (b *beaconBackfiller) suspend() {
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()

// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
return b.downloader.blockchain.CurrentFastBlock().Header()
}

// resume starts the downloader threads for backfilling state and chain data.
Expand All @@ -81,6 +88,7 @@ func (b *beaconBackfiller) resume() {
return
}
b.filling = true
b.filled = nil
b.started = make(chan struct{})
mode := b.syncMode
b.lock.Unlock()
Expand All @@ -92,6 +100,7 @@ func (b *beaconBackfiller) resume() {
defer func() {
b.lock.Lock()
b.filling = false
b.filled = b.downloader.blockchain.CurrentFastBlock().Header()
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
Expand Down
142 changes: 132 additions & 10 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package downloader
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sort"
"time"
Expand Down Expand Up @@ -148,11 +149,15 @@ type backfiller interface {
// based on the skeleton chain as it might be invalid. The backfiller should
// gracefully handle multiple consecutive suspends without a resume, even
// on initial sartup.
suspend()
//
// The method should return the last block header that has been successfully
// backfilled, or nil if the backfiller was not resumed.
suspend() *types.Header

// resume requests the backfiller to start running fill or snap sync based on
// the skeleton chain as it has successfully been linked. Appending new heads
// to the end of the chain will not result in suspend/resume cycles.
// leaking too much sync logic out to the filler.
resume()
}

Expand Down Expand Up @@ -358,8 +363,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
if linked {
s.filler.resume()
}
defer s.filler.suspend()

defer func() {
if filled := s.filler.suspend(); filled != nil {
// If something was filled, try to delete stale sync helpers. If
// unsuccessful, warn the user, but not much else we can do (it's
// a programming error, just let users report an issue and don't
// choke in the meantime).
if err := s.cleanStales(filled); err != nil {
log.Error("Failed to clean stale beacon headers", "err", err)
}
}
}()
// Create a set of unique channels for this sync cycle. We need these to be
// ephemeral so a data race doesn't accidentally deliver something stale on
// a persistent channel across syncs (yup, this happened)
Expand Down Expand Up @@ -582,8 +596,16 @@ func (s *skeleton) processNewHead(head *types.Header, force bool) bool {

lastchain := s.progress.Subchains[0]
if lastchain.Tail >= number {
// If the chain is down to a single beacon header, and it is re-announced
// once more, ignore it instead of tearing down sync for a noop.
if lastchain.Head == lastchain.Tail {
if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
return false
}
}
// Not a noop / double head announce, abort with a reorg
if force {
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number)
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
}
return true
}
Expand Down Expand Up @@ -943,12 +965,44 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// If the beacon chain was linked to the local chain, completely swap out
// all internal progress and abort header synchronization.
if linked {
// Note, linking into the local chain should also mean that there are
// no leftover subchains, but just in case there's some junk due to
// strange conditions or bugs, clean up all internal state.
if len(s.progress.Subchains) > 1 {
log.Error("Cleaning up leftovers after beacon link")
s.progress.Subchains = s.progress.Subchains[:1]
// Linking into the local chain should also mean that there are no
// leftover subchains, but in the case of importing the blocks via
// the engine API, we will not push the subchains forward. This will
// lead to a gap between an old sync cycle and a future one.
if subchains := len(s.progress.Subchains); subchains > 1 {
switch {
// If there are only 2 subchains - the current one and an older
// one - and the old one consists of a single block, then it's
// the expected new sync cycle after some propagated blocks. Log
// it for debugging purposes, explicitly clean and don't escalate.
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
s.progress.Subchains = s.progress.Subchains[:1]

// If we have more than one header or more than one leftover chain,
// the syncer's internal state is corrupted. Do try to fix it, but
// be very vocal about the fault.
default:
var context []interface{}

for i := range s.progress.Subchains[1:] {
context = append(context, fmt.Sprintf("stale_head_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Head)
context = append(context, fmt.Sprintf("stale_tail_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Tail)
context = append(context, fmt.Sprintf("stale_next_%d", i+1))
context = append(context, s.progress.Subchains[i+1].Next)
}
log.Error("Cleaning spurious beacon sync leftovers", context...)
s.progress.Subchains = s.progress.Subchains[:1]

// Note, here we didn't actually delete the headers at all,
// just the metadata. We could implement a cleanup mechanism,
// but further modifying corrupted state is kind of asking
// for it. Unless there's a good enough reason to risk it,
// better to live with the small database junk.
}
}
break
}
Expand Down Expand Up @@ -1023,6 +1077,74 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
return linked, merged
}

// cleanStales removes previously synced beacon headers that have become stale
// due to the downloader backfilling past the tracked tail.
func (s *skeleton) cleanStales(filled *types.Header) error {
number := filled.Number.Uint64()
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())

// If the filled header is below the linked subchain, something's
// corrupted internally. Report and error and refuse to do anything.
if number < s.progress.Subchains[0].Tail {
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
}
// Subchain seems trimmable, push the tail forward up to the last
// filled header and delete everything before it - if available. In
// case we filled past the head, recreate the subchain with a new
// head to keep it consistent with the data on disk.
var (
start = s.progress.Subchains[0].Tail // start deleting from the first known header
end = number // delete until the requested threshold
)
s.progress.Subchains[0].Tail = number
s.progress.Subchains[0].Next = filled.ParentHash

if s.progress.Subchains[0].Head < number {
// If more headers were filled than available, push the entire
// subchain forward to keep tracking the node's block imports
end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
}
// Execute the trimming and the potential rewiring of the progress
batch := s.db.NewBatch()

if end != number {
// The entire original skeleton chain was deleted and a new one
// defined. Make sure the new single-header chain gets pushed to
// disk to keep internal state consistent.
rawdb.WriteSkeletonHeader(batch, filled)
}
s.saveSyncStatus(batch)
for n := start; n < end; n++ {
// If the batch grew too big, flush it and continue with a new batch.
// The catch is that the sync metadata needs to reflect the actually
// flushed state, so temporarily change the subchain progress and
// revert after the flush.
if batch.ValueSize() >= ethdb.IdealBatchSize {
tmpTail := s.progress.Subchains[0].Tail
tmpNext := s.progress.Subchains[0].Next

s.progress.Subchains[0].Tail = n
s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash
s.saveSyncStatus(batch)

if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
batch.Reset()

s.progress.Subchains[0].Tail = tmpTail
s.progress.Subchains[0].Next = tmpNext
s.saveSyncStatus(batch)
}
rawdb.DeleteSkeletonHeader(batch, n)
}
if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
return nil
}

// Bounds retrieves the current head and tail tracked by the skeleton syncer.
// This method is used by the backfiller, whose life cycle is controlled by the
// skeleton syncer.
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ func newHookedBackfiller() backfiller {
// based on the skeleton chain as it might be invalid. The backfiller should
// gracefully handle multiple consecutive suspends without a resume, even
// on initial sartup.
func (hf *hookedBackfiller) suspend() {
func (hf *hookedBackfiller) suspend() *types.Header {
if hf.suspendHook != nil {
hf.suspendHook()
}
return nil // we don't really care about header cleanups for now
}

// resume requests the backfiller to start running fill or snap sync based on
Expand Down Expand Up @@ -426,7 +427,6 @@ func TestSkeletonSyncExtend(t *testing.T) {
newstate: []*subchain{
{Head: 49, Tail: 49},
},
err: errReorgDenied,
},
// Initialize a sync and try to extend it with a sibling block.
{
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestSkeletonSyncExtend(t *testing.T) {

<-wait
if err := skeleton.Sync(tt.extend, false); err != tt.err {
t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err)
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
}
skeleton.Terminate()

Expand Down

0 comments on commit 266db69

Please sign in to comment.