Skip to content

Commit

Permalink
eth: use the last announced finalized block as the sync ancient limit (
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe authored and mmsqe committed Dec 7, 2023
1 parent 1645298 commit 667a450
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 55 deletions.
16 changes: 14 additions & 2 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,27 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
log.Warn("Forkchoice requested unknown head", "hash", update.HeadBlockHash)
return engine.STATUS_SYNCING, nil
}
// If the finalized hash is known, we can direct the downloader to move
// potentially more data to the freezer from the get go.
finalized := api.remoteBlocks.get(update.FinalizedBlockHash)

// Header advertised via a past newPayload request. Start syncing to it.
// Before we do however, make sure any legacy sync in switched off so we
// don't accidentally have 2 cycles running.
if merger := api.eth.Merger(); !merger.TDDReached() {
merger.ReachTTD()
api.eth.Downloader().Cancel()
}
log.Info("Forkchoice requested sync to new head", "number", header.Number, "hash", header.Hash())
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header); err != nil {
context := []interface{}{"number", header.Number, "hash", header.Hash()}
if update.FinalizedBlockHash != (common.Hash{}) {
if finalized == nil {
context = append(context, []interface{}{"finalized", "unknown"}...)
} else {
context = append(context, []interface{}{"finalized", finalized.Number}...)
}
}
log.Info("Forkchoice requested sync to new head", context...)
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header, finalized); err != nil {
return engine.STATUS_SYNCING, err
}
return engine.STATUS_SYNCING, nil
Expand Down
9 changes: 6 additions & 3 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ import (
const maxTrackedPayloads = 10

// maxTrackedHeaders is the maximum number of executed payloads the execution
// engine tracks before evicting old ones. Ideally we should only ever track the
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10
// engine tracks before evicting old ones. These are tracked outside the chain
// during initial sync to allow ForkchoiceUpdate to reference past blocks via
// hashes only. For the sync target it would be enough to track only the latest
// header, but snap sync also needs the latest finalized height for the ancient
// limit.
const maxTrackedHeaders = 96

// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (tester *FullSyncTester) Start() error {
}
// Trigger beacon sync with the provided block header as
// trusted chain head.
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header())
err := tester.api.eth.Downloader().BeaconSync(downloader.FullSync, tester.block.Header(), nil)
if err != nil {
log.Info("Failed to beacon sync", "err", err)
}
Expand Down
16 changes: 8 additions & 8 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
return d.beaconSync(mode, head, true)
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header, final *types.Header) error {
return d.beaconSync(mode, head, final, true)
}

// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
Expand All @@ -162,7 +162,7 @@ func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
// This is useful if a beacon client is feeding us large chunks of payloads to run,
// but is not setting the head after each.
func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
return d.beaconSync(mode, head, false)
return d.beaconSync(mode, head, nil, false)
}

// beaconSync is the post-merge version of the chain synchronization, where the
Expand All @@ -171,7 +171,7 @@ func (d *Downloader) BeaconExtend(mode SyncMode, head *types.Header) error {
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) error {
func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, final *types.Header, force bool) error {
// When the downloader starts a sync cycle, it needs to be aware of the sync
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
// mode into the backfiller directly.
Expand All @@ -181,7 +181,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e
d.skeleton.filler.(*beaconBackfiller).setMode(mode)

// Signal the skeleton sync to switch to a new head, however it wants
if err := d.skeleton.Sync(head, force); err != nil {
if err := d.skeleton.Sync(head, final, force); err != nil {
return err
}
return nil
Expand All @@ -207,7 +207,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
number := chainHead.Number.Uint64()

// Retrieve the skeleton bounds and ensure they are linked to the local chain
beaconHead, beaconTail, err := d.skeleton.Bounds()
beaconHead, beaconTail, _, err := d.skeleton.Bounds()
if err != nil {
// This is a programming error. The chain backfiller was called with an
// invalid beacon sync state. Ideally we would panic here, but erroring
Expand Down Expand Up @@ -272,7 +272,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
// until sync errors or is finished.
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
var head *types.Header
_, tail, err := d.skeleton.Bounds()
_, tail, _, err := d.skeleton.Bounds()
if err != nil {
return err
}
Expand All @@ -292,7 +292,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
head, _, err = d.skeleton.Bounds()
head, _, _, err = d.skeleton.Bounds()
if err != nil {
return err
}
Expand Down
59 changes: 37 additions & 22 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}(time.Now())

// Look up the sync boundaries: the common ancestor and the target block
var latest, pivot *types.Header
var latest, pivot, final *types.Header
if !beaconMode {
// In legacy mode, use the master peer to retrieve the headers from
latest, pivot, err = d.fetchHead(p)
Expand All @@ -489,7 +489,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}
} else {
// In beacon mode, use the skeleton chain to retrieve the headers from
latest, _, err = d.skeleton.Bounds()
latest, _, final, err = d.skeleton.Bounds()
if err != nil {
return err
}
Expand All @@ -499,7 +499,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
// Retrieve the pivot header from the skeleton chain segment but
// fallback to local chain if it's not found in skeleton space.
if pivot = d.skeleton.Header(number); pivot == nil {
_, oldest, _ := d.skeleton.Bounds() // error is already checked
_, oldest, _, _ := d.skeleton.Bounds() // error is already checked
if number < oldest.Number.Uint64() {
count := int(oldest.Number.Uint64() - number) // it's capped by fsMinFullBlocks
headers := d.readHeaderRange(oldest, count)
Expand Down Expand Up @@ -567,26 +567,41 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
d.committed = 0
}
if mode == SnapSync {
// Set the ancient data limitation.
// If we are running snap sync, all block data older than ancientLimit will be
// written to the ancient store. More recent data will be written to the active
// database and will wait for the freezer to migrate.
// Set the ancient data limitation. If we are running snap sync, all block
// data older than ancientLimit will be written to the ancient store. More
// recent data will be written to the active database and will wait for the
// freezer to migrate.
//
// If there is a checkpoint available, then calculate the ancientLimit through
// that. Otherwise calculate the ancient limit through the advertised height
// of the remote peer.
// If the network is post-merge, use either the last announced finalized
// block as the ancient limit, or if we haven't yet received one, the head-
// a max fork ancestry limit. One quirky case if we've already passed the
// finalized block, in which case the skeleton.Bounds will return nil and
// we'll revert to head - 90K. That's fine, we're finishing sync anyway.
//
// The reason for picking checkpoint first is that a malicious peer can give us
// a fake (very high) height, forcing the ancient limit to also be very high.
// The peer would start to feed us valid blocks until head, resulting in all of
// the blocks might be written into the ancient store. A following mini-reorg
// could cause issues.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
// For non-merged networks, if there is a checkpoint available, then calculate
// the ancientLimit through that. Otherwise calculate the ancient limit through
// the advertised height of the remote peer. This most is mostly a fallback for
// legacy networks, but should eventually be droppped. TODO(karalabe).
if beaconMode {
// Beacon sync, use the latest finalized block as the ancient limit
// or a reasonable height if no finalized block is yet announced.
if final != nil {
d.ancientLimit = final.Number.Uint64()
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
} else {
d.ancientLimit = 0
// Legacy sync, use any hardcoded checkpoints or the best announcement
// we have from the remote peer. TODO(karalabe): Drop this pathway.
if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
d.ancientLimit = d.checkpoint
} else if height > fullMaxForkAncestry+1 {
d.ancientLimit = height - fullMaxForkAncestry - 1
} else {
d.ancientLimit = 0
}
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.

Expand Down Expand Up @@ -1566,7 +1581,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {

// In post-merge, notify the engine API of encountered bad chains
if d.badBlock != nil {
head, _, err := d.skeleton.Bounds()
head, _, _, err := d.skeleton.Bounds()
if err != nil {
log.Error("Failed to retrieve beacon bounds for bad block reporting", "err", err)
} else {
Expand Down Expand Up @@ -1860,7 +1875,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
return
}
// Retrieve the current chain head and calculate the ETA
latest, _, err := d.skeleton.Bounds()
latest, _, _, err := d.skeleton.Bounds()
if err != nil {
// We're going to cheat for non-merged networks, but that's fine
latest = d.pivotHeader
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
if c.local > 0 {
tester.chain.InsertChain(chain.blocks[1 : c.local+1])
}
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header()); err != nil {
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
t.Fatalf("Failed to beacon sync chain %v %v", c.name, err)
}
select {
Expand Down
45 changes: 32 additions & 13 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,15 @@ type subchain struct {
// suspended skeleton sync without prior knowledge of all prior suspension points.
type skeletonProgress struct {
Subchains []*subchain // Disjoint subchains downloaded until now
Finalized *uint64 // Last known finalized block number
}

// headUpdate is a notification that the beacon sync should switch to a new target.
// The update might request whether to forcefully change the target, or only try to
// extend it and fail if it's not possible.
type headUpdate struct {
header *types.Header // Header to update the sync target to
final *types.Header // Finalized header to use as thresholds
force bool // Whether to force the update or only extend if possible
errc chan error // Channel to signal acceptance of the new head
}
Expand Down Expand Up @@ -321,12 +323,12 @@ func (s *skeleton) Terminate() error {
//
// This method does not block, rather it just waits until the syncer receives the
// fed header. What the syncer does with it is the syncer's problem.
func (s *skeleton) Sync(head *types.Header, force bool) error {
func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) error {
log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash(), "force", force)
errc := make(chan error)

select {
case s.headEvents <- &headUpdate{header: head, force: force, errc: errc}:
case s.headEvents <- &headUpdate{header: head, final: final, force: force, errc: errc}:
return <-errc
case <-s.terminated:
return errTerminated
Expand Down Expand Up @@ -437,7 +439,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// we don't seamlessly integrate reorgs to keep things simple. If the
// network starts doing many mini reorgs, it might be worthwhile handling
// a limited depth without an error.
if reorged := s.processNewHead(event.header, event.force); reorged {
if reorged := s.processNewHead(event.header, event.final, event.force); reorged {
// If a reorg is needed, and we're forcing the new head, signal
// the syncer to tear down and start over. Otherwise, drop the
// non-force reorg.
Expand Down Expand Up @@ -590,7 +592,17 @@ func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
// accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
// the syncer will tear itself down and restart with a fresh head. It is simpler
// to reconstruct the sync state than to mutate it and hope for the best.
func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
func (s *skeleton) processNewHead(head *types.Header, final *types.Header, force bool) bool {
// If a new finalized block was announced, update the sync process independent
// of what happens with the sync head below
if final != nil {
if number := final.Number.Uint64(); s.progress.Finalized == nil || *s.progress.Finalized != number {
s.progress.Finalized = new(uint64)
*s.progress.Finalized = final.Number.Uint64()

s.saveSyncStatus(s.db)
}
}
// If the header cannot be inserted without interruption, return an error for
// the outer loop to tear down the skeleton sync and restart it
number := head.Number.Uint64()
Expand Down Expand Up @@ -1150,38 +1162,45 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
return nil
}

// Bounds retrieves the current head and tail tracked by the skeleton syncer.
// This method is used by the backfiller, whose life cycle is controlled by the
// skeleton syncer.
// Bounds retrieves the current head and tail tracked by the skeleton syncer
// and optionally the last known finalized header if any was announced and if
// it is still in the sync range. This method is used by the backfiller, whose
// life cycle is controlled by the skeleton syncer.
//
// Note, the method will not use the internal state of the skeleton, but will
// rather blindly pull stuff from the database. This is fine, because the back-
// filler will only run when the skeleton chain is fully downloaded and stable.
// There might be new heads appended, but those are atomic from the perspective
// of this method. Any head reorg will first tear down the backfiller and only
// then make the modification.
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, final *types.Header, err error) {
// Read the current sync progress from disk and figure out the current head.
// Although there's a lot of error handling here, these are mostly as sanity
// checks to avoid crashing if a programming error happens. These should not
// happen in live code.
status := rawdb.ReadSkeletonSyncStatus(s.db)
if len(status) == 0 {
return nil, nil, errors.New("beacon sync not yet started")
return nil, nil, nil, errors.New("beacon sync not yet started")
}
progress := new(skeletonProgress)
if err := json.Unmarshal(status, progress); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
if head == nil {
return nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
return nil, nil, nil, fmt.Errorf("head skeleton header %d is missing", progress.Subchains[0].Head)
}
tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)
if tail == nil {
return nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
return nil, nil, nil, fmt.Errorf("tail skeleton header %d is missing", progress.Subchains[0].Tail)
}
if progress.Finalized != nil && tail.Number.Uint64() <= *progress.Finalized && *progress.Finalized <= head.Number.Uint64() {
final = rawdb.ReadSkeletonHeader(s.db, *progress.Finalized)
if final == nil {
return nil, nil, nil, fmt.Errorf("finalized skeleton header %d is missing", *progress.Finalized)
}
}
return head, tail, nil
return head, tail, final, nil
}

// Header retrieves a specific header tracked by the skeleton syncer. This method
Expand Down
10 changes: 5 additions & 5 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func TestSkeletonSyncInit(t *testing.T) {

skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, true)
skeleton.Sync(tt.head, nil, true)

<-wait
skeleton.Terminate()
Expand Down Expand Up @@ -484,10 +484,10 @@ func TestSkeletonSyncExtend(t *testing.T) {

skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, true)
skeleton.Sync(tt.head, nil, true)

<-wait
if err := skeleton.Sync(tt.extend, false); err != tt.err {
if err := skeleton.Sync(tt.extend, nil, false); err != tt.err {
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
}
skeleton.Terminate()
Expand Down Expand Up @@ -859,7 +859,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, filler)
skeleton.Sync(tt.head, true)
skeleton.Sync(tt.head, nil, true)

var progress skeletonProgress
// Wait a bit (bleah) for the initial sync loop to go to idle. This might
Expand Down Expand Up @@ -910,7 +910,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
// Apply the post-init events if there's any
if tt.newHead != nil {
skeleton.Sync(tt.newHead, true)
skeleton.Sync(tt.newHead, nil, true)
}
if tt.newPeer != nil {
if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil {
Expand Down

0 comments on commit 667a450

Please sign in to comment.