diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 05c8bc4c7c3d..01ac3e5225d2 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -458,7 +458,7 @@ func importHistory(ctx *cli.Context) error { network = networks[0] } - if err := utils.ImportHistory(chain, db, dir, network); err != nil { + if err := utils.ImportHistory(chain, dir, network); err != nil { return err } fmt.Printf("Import done in %v\n", time.Since(start)) diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index d91d6ca5a807..d34e15ebc083 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -246,8 +246,9 @@ func readList(filename string) ([]string, error) { } // ImportHistory imports Era1 files containing historical block information, -// starting from genesis. -func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, network string) error { +// starting from genesis. The assumption is held that the provided chain +// segment in Era1 file should all be canonical and verified. +func ImportHistory(chain *core.BlockChain, dir string, network string) error { if chain.CurrentSnapBlock().Number.BitLen() != 0 { return errors.New("history import only supported when starting from genesis") } @@ -308,11 +309,6 @@ func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, networ if err != nil { return fmt.Errorf("error reading receipts %d: %w", it.Number(), err) } - if status, err := chain.HeaderChain().InsertHeaderChain([]*types.Header{block.Header()}, start); err != nil { - return fmt.Errorf("error inserting header %d: %w", it.Number(), err) - } else if status != core.CanonStatTy { - return fmt.Errorf("error inserting header %d, not canon: %v", it.Number(), status) - } if _, err := chain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{receipts}, 2^64-1); err != nil { return fmt.Errorf("error inserting body %d: %w", it.Number(), err) } diff --git a/cmd/utils/history_test.go b/cmd/utils/history_test.go index 8654c454f953..d3c6bda1c506 100644 --- a/cmd/utils/history_test.go +++ b/cmd/utils/history_test.go @@ -171,7 +171,7 @@ func TestHistoryImportAndExport(t *testing.T) { if err != nil { t.Fatalf("unable to initialize chain: %v", err) } - if err := ImportHistory(imported, db2, dir, "mainnet"); err != nil { + if err := ImportHistory(imported, dir, "mainnet"); err != nil { t.Fatalf("failed to import chain: %v", err) } if have, want := imported.CurrentHeader(), chain.CurrentHeader(); have.Hash() != want.Hash() { diff --git a/core/blockchain.go b/core/blockchain.go index d80236c90229..d56996dadbee 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -24,6 +24,7 @@ import ( "math/big" "runtime" "slices" + "sort" "strings" "sync" "sync/atomic" @@ -157,7 +158,8 @@ type CacheConfig struct { // This defines the cutoff block for history expiry. // Blocks before this number may be unavailable in the chain database. - HistoryPruningCutoff uint64 + HistoryPruningCutoffNumber uint64 + HistoryPruningCutoffHash common.Hash } // triedbConfig derives the configures for trie database. @@ -262,7 +264,6 @@ type BlockChain struct { txLookupLock sync.RWMutex txLookupCache *lru.Cache[common.Hash, txLookup] - wg sync.WaitGroup quit chan struct{} // shutdown signal, closed in Stop. stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing @@ -333,10 +334,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.processor = NewStateProcessor(chainConfig, bc.hc) genesisHeader := bc.GetHeaderByNumber(0) - bc.genesisBlock = types.NewBlockWithHeader(genesisHeader) - if bc.genesisBlock == nil { + if genesisHeader == nil { return nil, ErrNoGenesis } + bc.genesisBlock = types.NewBlockWithHeader(genesisHeader) bc.currentBlock.Store(nil) bc.currentSnapBlock.Store(nil) @@ -1110,7 +1111,6 @@ func (bc *BlockChain) stopWithoutSaving() { // the mutex should become available quickly. It cannot be taken again after Close has // returned. bc.chainmu.Close() - bc.wg.Wait() } // Stop stops the blockchain service. If any imports are currently in progress @@ -1197,79 +1197,64 @@ const ( SideStatTy ) -// InsertReceiptChain attempts to complete an already existing header chain with -// transaction and receipt data. +// InsertReceiptChain inserts a batch of blocks along with their receipts into +// the database. Unlike InsertChain, this function does not verify the state root +// in the blocks. It is used exclusively for snap sync. All the inserted blocks +// will be regarded as canonical, chain reorg is not supported. +// +// The optional ancientLimit can also be specified and chain segment before that +// will be directly stored in the ancient, getting rid of the chain migration. func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { - // We don't require the chainMu here since we want to maximize the - // concurrency of header insertion and receipt insertion. - bc.wg.Add(1) - defer bc.wg.Done() - - var ( - ancientBlocks, liveBlocks types.Blocks - ancientReceipts, liveReceipts []types.Receipts - ) - // Do a sanity check that the provided chain is actually ordered and linked - for i, block := range blockChain { - if i != 0 { - prev := blockChain[i-1] - if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() { - log.Error("Non contiguous receipt insert", - "number", block.Number(), "hash", block.Hash(), "parent", block.ParentHash(), - "prevnumber", prev.Number(), "prevhash", prev.Hash()) - return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", - i-1, prev.NumberU64(), prev.Hash().Bytes()[:4], - i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) - } - } - if block.NumberU64() <= ancientLimit { - ancientBlocks, ancientReceipts = append(ancientBlocks, block), append(ancientReceipts, receiptChain[i]) - } else { - liveBlocks, liveReceipts = append(liveBlocks, block), append(liveReceipts, receiptChain[i]) - } - - // Here we also validate that blob transactions in the block do not contain a sidecar. - // While the sidecar does not affect the block hash / tx hash, sending blobs within a block is not allowed. + // Verify the supplied headers before insertion without lock + var headers []*types.Header + for _, block := range blockChain { + headers = append(headers, block.Header()) + + // Here we also validate that blob transactions in the block do not + // contain a sidecar. While the sidecar does not affect the block hash + // or tx hash, sending blobs within a block is not allowed. for txIndex, tx := range block.Transactions() { if tx.Type() == types.BlobTxType && tx.BlobTxSidecar() != nil { return 0, fmt.Errorf("block #%d contains unexpected blob sidecar in tx at index %d", block.NumberU64(), txIndex) } } } + if n, err := bc.hc.ValidateHeaderChain(headers); err != nil { + return n, err + } + // Hold the mutation lock + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() var ( stats = struct{ processed, ignored int32 }{} start = time.Now() size = int64(0) ) - - // updateHead updates the head snap sync block if the inserted blocks are better - // and returns an indicator whether the inserted blocks are canonical. - updateHead := func(head *types.Block) bool { - if !bc.chainmu.TryLock() { - return false - } - defer bc.chainmu.Unlock() - - // Rewind may have occurred, skip in that case. - if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 { - rawdb.WriteHeadFastBlockHash(bc.db, head.Hash()) - bc.currentSnapBlock.Store(head.Header()) - headFastBlockGauge.Update(int64(head.NumberU64())) - return true + // updateHead updates the head header and head snap block flags. + updateHead := func(header *types.Header) error { + batch := bc.db.NewBatch() + hash := header.Hash() + rawdb.WriteHeadHeaderHash(batch, hash) + rawdb.WriteHeadFastBlockHash(batch, hash) + if err := batch.Write(); err != nil { + return err } - return false + bc.hc.currentHeader.Store(header) + bc.currentSnapBlock.Store(header) + headHeaderGauge.Update(header.Number.Int64()) + headFastBlockGauge.Update(header.Number.Int64()) + return nil } // writeAncient writes blockchain and corresponding receipt chain into ancient store. // // this function only accepts canonical chain data. All side chain will be reverted // eventually. writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { - first := blockChain[0] - last := blockChain[len(blockChain)-1] - - // Ensure genesis is in ancients. - if first.NumberU64() == 1 { + // Ensure genesis is in the ancient store + if blockChain[0].NumberU64() == 1 { if frozen, _ := bc.db.Ancients(); frozen == 0 { writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}) if err != nil { @@ -1280,12 +1265,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ log.Info("Wrote genesis to ancients") } } - // Before writing the blocks to the ancients, we need to ensure that - // they correspond to the what the headerchain 'expects'. - // We only check the last block/header, since it's a contiguous chain. - if !bc.HasHeader(last.Hash(), last.NumberU64()) { - return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4]) - } // Write all chain data to ancients. writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain) if err != nil { @@ -1298,44 +1277,28 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if err := bc.db.Sync(); err != nil { return 0, err } - // Update the current snap block because all block data is now present in DB. - previousSnapBlock := bc.CurrentSnapBlock().Number.Uint64() - if !updateHead(blockChain[len(blockChain)-1]) { - // We end up here if the header chain has reorg'ed, and the blocks/receipts - // don't match the canonical chain. - if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil { - log.Error("Can't truncate ancient store after failed insert", "err", err) - } - return 0, errSideChainReceipts - } - - // Delete block data from the main database. - var ( - batch = bc.db.NewBatch() - canonHashes = make(map[common.Hash]struct{}, len(blockChain)) - ) + // Write hash to number mappings + batch := bc.db.NewBatch() for _, block := range blockChain { - canonHashes[block.Hash()] = struct{}{} - if block.NumberU64() == 0 { - continue - } - rawdb.DeleteCanonicalHash(batch, block.NumberU64()) - rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) - } - // Delete side chain hash-to-number mappings. - for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { - if _, canon := canonHashes[nh.Hash]; !canon { - rawdb.DeleteHeader(batch, nh.Hash, nh.Number) - } + rawdb.WriteHeaderNumber(batch, block.Hash(), block.NumberU64()) } if err := batch.Write(); err != nil { return 0, err } + // Update the current snap block because all block data is now present in DB. + if err := updateHead(blockChain[len(blockChain)-1].Header()); err != nil { + return 0, err + } stats.processed += int32(len(blockChain)) return 0, nil } - // writeLive writes blockchain and corresponding receipt chain into active store. + // writeLive writes the blockchain and corresponding receipt chain to the active store. + // + // Notably, in different snap sync cycles, the supplied chain may partially reorganize + // existing local chain segments (reorg around the chain tip). The reorganized part + // will be included in the provided chain segment, and stale canonical markers will be + // silently rewritten. Therefore, no explicit reorg logic is needed. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { var ( skipPresenceCheck = false @@ -1346,10 +1309,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if bc.insertStopped() { return 0, errInsertionInterrupted } - // Short circuit if the owner header is unknown - if !bc.HasHeader(block.Hash(), block.NumberU64()) { - return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4]) - } if !skipPresenceCheck { // Ignore if the entire data is already known if bc.HasBlock(block.Hash(), block.NumberU64()) { @@ -1363,7 +1322,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } } // Write all the data out into the database - rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) + rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) + rawdb.WriteBlock(batch, block) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) // Write everything belongs to the blocks into the database. So that @@ -1387,21 +1347,27 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, err } } - updateHead(blockChain[len(blockChain)-1]) + if err := updateHead(blockChain[len(blockChain)-1].Header()); err != nil { + return 0, err + } return 0, nil } - // Write downloaded chain data and corresponding receipt chain data - if len(ancientBlocks) > 0 { - if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { + // Split the supplied blocks into two groups, according to the + // given ancient limit. + index := sort.Search(len(blockChain), func(i int) bool { + return blockChain[i].NumberU64() >= ancientLimit + }) + if index > 0 { + if n, err := writeAncient(blockChain[:index], receiptChain[:index]); err != nil { if err == errInsertionInterrupted { return 0, nil } return n, err } } - if len(liveBlocks) > 0 { - if n, err := writeLive(liveBlocks, liveReceipts); err != nil { + if index != len(blockChain) { + if n, err := writeLive(blockChain[index:], receiptChain[index:]); err != nil { if err == errInsertionInterrupted { return 0, nil } @@ -1420,7 +1386,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ context = append(context, []interface{}{"ignored", stats.ignored}...) } log.Debug("Imported new block receipts", context...) - return 0, nil } @@ -2505,15 +2470,83 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) { if i, err := bc.hc.ValidateHeaderChain(chain); err != nil { return i, err } - if !bc.chainmu.TryLock() { return 0, errChainStopped } defer bc.chainmu.Unlock() + _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } +// InsertHeadersBeforeCutoff inserts the given headers into the ancient store +// as they are claimed older than the configured chain cutoff point. All the +// inserted headers are regarded as canonical and chain reorg is not supported. +func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, error) { + if len(headers) == 0 { + return 0, nil + } + // TODO(rjl493456442): Headers before the configured cutoff have already + // been verified by the hash of cutoff header. Theoretically, header validation + // could be skipped here. + if n, err := bc.hc.ValidateHeaderChain(headers); err != nil { + return n, err + } + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } + defer bc.chainmu.Unlock() + + // Initialize the ancient store with genesis block if it's empty. + var ( + frozen, _ = bc.db.Ancients() + first = headers[0].Number.Uint64() + ) + if first == 1 && frozen == 0 { + _, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}) + if err != nil { + log.Error("Error writing genesis to ancients", "err", err) + return 0, err + } + log.Info("Wrote genesis to ancient store") + } else if frozen != first { + return 0, fmt.Errorf("headers are gapped with the ancient store, first: %d, ancient: %d", first, frozen) + } + + // Write headers to the ancient store, with block bodies and receipts set to nil + // to ensure consistency across tables in the freezer. + _, err := rawdb.WriteAncientHeaderChain(bc.db, headers) + if err != nil { + return 0, err + } + if err := bc.db.Sync(); err != nil { + return 0, err + } + // Write hash to number mappings + batch := bc.db.NewBatch() + for _, header := range headers { + rawdb.WriteHeaderNumber(batch, header.Hash(), header.Number.Uint64()) + } + // Write head header and head snap block flags + last := headers[len(headers)-1] + rawdb.WriteHeadHeaderHash(batch, last.Hash()) + rawdb.WriteHeadFastBlockHash(batch, last.Hash()) + if err := batch.Write(); err != nil { + return 0, err + } + // Truncate the useless chain segment (zero bodies and receipts) in the + // ancient store. + if _, err := bc.db.TruncateTail(last.Number.Uint64() + 1); err != nil { + return 0, err + } + // Last step update all in-memory markers + bc.hc.currentHeader.Store(last) + bc.currentSnapBlock.Store(last) + headHeaderGauge.Update(last.Number.Int64()) + headFastBlockGauge.Update(last.Number.Int64()) + return 0, nil +} + // SetBlockValidatorAndProcessorForTesting sets the current validator and processor. // This method can be used to force an invalid blockchain to be verified for tests. // This method is unsafe and should only be used before block import starts. @@ -2533,9 +2566,3 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { func (bc *BlockChain) GetTrieFlushInterval() time.Duration { return time.Duration(bc.flushInterval.Load()) } - -// HistoryPruningCutoff returns the configured history pruning point. -// Blocks before this might not be available in the database. -func (bc *BlockChain) HistoryPruningCutoff() uint64 { - return bc.cacheConfig.HistoryPruningCutoff -} diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 025b912cebac..2caabe07106e 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -402,6 +402,12 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) { return bc.txIndexer.txIndexProgress() } +// HistoryPruningCutoff returns the configured history pruning point. +// Blocks before this might not be available in the database. +func (bc *BlockChain) HistoryPruningCutoff() (uint64, common.Hash) { + return bc.cacheConfig.HistoryPruningCutoffNumber, bc.cacheConfig.HistoryPruningCutoffHash +} + // TrieDB retrieves the low level trie database used for data storage. func (bc *BlockChain) TrieDB() *triedb.Database { return bc.triedb diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 04dfa87b8b88..8f5a64e20668 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -733,13 +733,6 @@ func testFastVsFullChains(t *testing.T, scheme string) { fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) defer fast.Stop() - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := fast.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } @@ -753,9 +746,6 @@ func testFastVsFullChains(t *testing.T, scheme string) { ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) defer ancient.Stop() - if n, err := ancient.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } @@ -880,13 +870,6 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) { fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) defer fast.Stop() - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := fast.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } @@ -900,9 +883,6 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) { ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) defer ancient.Stop() - if n, err := ancient.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } @@ -916,6 +896,11 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) { // Import the chain as a light node and ensure all pointers are updated lightDb := makeDb() defer lightDb.Close() + + headers := make([]*types.Header, len(blocks)) + for i, block := range blocks { + headers[i] = block.Header() + } light, _ := NewBlockChain(lightDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) if n, err := light.InsertHeaderChain(headers); err != nil { t.Fatalf("failed to insert header %d: %v", n, err) @@ -1710,13 +1695,6 @@ func testBlockchainRecovery(t *testing.T, scheme string) { defer ancientDb.Close() ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := ancient.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil { t.Fatalf("failed to insert receipt %d: %v", n, err) } @@ -1741,82 +1719,6 @@ func testBlockchainRecovery(t *testing.T, scheme string) { } } -// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain. -func TestInsertReceiptChainRollback(t *testing.T) { - testInsertReceiptChainRollback(t, rawdb.HashScheme) - testInsertReceiptChainRollback(t, rawdb.PathScheme) -} - -func testInsertReceiptChainRollback(t *testing.T, scheme string) { - // Generate forked chain. The returned BlockChain object is used to process the side chain blocks. - tmpChain, sideblocks, canonblocks, gspec, err := getLongAndShortChains(scheme) - if err != nil { - t.Fatal(err) - } - defer tmpChain.Stop() - // Get the side chain receipts. - if _, err := tmpChain.InsertChain(sideblocks); err != nil { - t.Fatal("processing side chain failed:", err) - } - t.Log("sidechain head:", tmpChain.CurrentBlock().Number, tmpChain.CurrentBlock().Hash()) - sidechainReceipts := make([]types.Receipts, len(sideblocks)) - for i, block := range sideblocks { - sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) - } - // Get the canon chain receipts. - if _, err := tmpChain.InsertChain(canonblocks); err != nil { - t.Fatal("processing canon chain failed:", err) - } - t.Log("canon head:", tmpChain.CurrentBlock().Number, tmpChain.CurrentBlock().Hash()) - canonReceipts := make([]types.Receipts, len(canonblocks)) - for i, block := range canonblocks { - canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash()) - } - - // Set up a BlockChain that uses the ancient store. - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } - defer ancientDb.Close() - - ancientChain, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil) - defer ancientChain.Stop() - - // Import the canonical header chain. - canonHeaders := make([]*types.Header, len(canonblocks)) - for i, block := range canonblocks { - canonHeaders[i] = block.Header() - } - if _, err = ancientChain.InsertHeaderChain(canonHeaders); err != nil { - t.Fatal("can't import canon headers:", err) - } - - // Try to insert blocks/receipts of the side chain. - _, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks))) - if err == nil { - t.Fatal("expected error from InsertReceiptChain.") - } - if ancientChain.CurrentSnapBlock().Number.Uint64() != 0 { - t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentSnapBlock().Number) - } - if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 { - t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen) - } - - // Insert blocks/receipts of the canonical chain. - _, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks))) - if err != nil { - t.Fatalf("can't import canon chain receipts: %v", err) - } - if ancientChain.CurrentSnapBlock().Number.Uint64() != canonblocks[len(canonblocks)-1].NumberU64() { - t.Fatalf("failed to insert ancient recept chain after rollback") - } - if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 { - t.Fatalf("wrong ancients count %d", frozen) - } -} - // Tests that importing a very large side fork, which is larger than the canon chain, // but where the difficulty per block is kept low: this means that it will not // overtake the 'canon' chain until after it's passed canon by about 200 blocks. @@ -2088,14 +1990,6 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) { } } else if typ == "receipts" { inserter = func(blocks []*types.Block, receipts []types.Receipts) error { - headers := make([]*types.Header, 0, len(blocks)) - for _, block := range blocks { - headers = append(headers, block.Header()) - } - _, err := chain.InsertHeaderChain(headers) - if err != nil { - return err - } _, err = chain.InsertReceiptChain(blocks, receipts, 0) return err } @@ -2262,14 +2156,6 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i } } else if typ == "receipts" { inserter = func(blocks []*types.Block, receipts []types.Receipts) error { - headers := make([]*types.Header, 0, len(blocks)) - for _, block := range blocks { - headers = append(headers, block.Header()) - } - i, err := chain.InsertHeaderChain(headers) - if err != nil { - return fmt.Errorf("index %d: %w", i, err) - } _, err = chain.InsertReceiptChain(blocks, receipts, 0) return err } @@ -4265,3 +4151,220 @@ func TestEIP7702(t *testing.T) { t.Fatalf("addr2 storage wrong: expected %d, got %d", fortyTwo, actual) } } + +// Tests the scenario that the synchronization target in snap sync has been changed +// with a chain reorg at the tip. In this case the reorg'd segment should be unmarked +// with canonical flags. +func TestChainReorgSnapSync(t *testing.T) { + testChainReorgSnapSync(t, 0) + testChainReorgSnapSync(t, 32) + testChainReorgSnapSync(t, gomath.MaxUint64) +} + +func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) { + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + + // Configure and generate a sample block chain + var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{address: {Balance: funds}}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer = types.LatestSigner(gspec.Config) + engine = beacon.New(ethash.NewFaker()) + ) + genDb, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 32, func(i int, block *BlockGen) { + block.SetCoinbase(common.Address{0x00}) + + // If the block number is multiple of 3, send a few bonus transactions to the miner + if i%3 == 2 { + for j := 0; j < i%4+1; j++ { + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + } + }) + chainA, receiptsA := GenerateChain(gspec.Config, blocks[len(blocks)-1], engine, genDb, 16, func(i int, gen *BlockGen) { + gen.SetCoinbase(common.Address{0: byte(0xa), 19: byte(i)}) + }) + chainB, receiptsB := GenerateChain(gspec.Config, blocks[len(blocks)-1], engine, genDb, 20, func(i int, gen *BlockGen) { + gen.SetCoinbase(common.Address{0: byte(0xb), 19: byte(i)}) + }) + + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) + defer db.Close() + + chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil) + defer chain.Stop() + + if n, err := chain.InsertReceiptChain(blocks, receipts, ancientLimit); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + if n, err := chain.InsertReceiptChain(chainA, receiptsA, ancientLimit); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + // If the common ancestor is below the ancient limit, rewind the chain head. + // It's aligned with the behavior in the snap sync + ancestor := blocks[len(blocks)-1].NumberU64() + if ancestor < ancientLimit { + rawdb.WriteLastPivotNumber(db, ancestor) + chain.SetHead(ancestor) + } + if n, err := chain.InsertReceiptChain(chainB, receiptsB, ancientLimit); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + head := chain.CurrentSnapBlock() + if head.Hash() != chainB[len(chainB)-1].Hash() { + t.Errorf("head snap block #%d: header mismatch: want: %v, got: %v", head.Number, chainB[len(chainB)-1].Hash(), head.Hash()) + } + + // Iterate over all chain data components, and cross reference + for i := 0; i < len(blocks); i++ { + num, hash := blocks[i].NumberU64(), blocks[i].Hash() + header := chain.GetHeaderByNumber(num) + if header.Hash() != hash { + t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash()) + } + } + for i := 0; i < len(chainA); i++ { + num, hash := chainA[i].NumberU64(), chainA[i].Hash() + header := chain.GetHeaderByNumber(num) + if header == nil { + continue + } + if header.Hash() == hash { + t.Errorf("block #%d: unexpected canonical header: %v", num, hash) + } + } + for i := 0; i < len(chainB); i++ { + num, hash := chainB[i].NumberU64(), chainB[i].Hash() + header := chain.GetHeaderByNumber(num) + if header.Hash() != hash { + t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash()) + } + } +} + +// Tests the scenario that all the inserted chain segment are with the configured +// chain cutoff point. In this case the chain segment before the cutoff should +// be persisted without the receipts and bodies; chain after should be persisted +// normally. +func TestInsertChainWithCutoff(t *testing.T) { + testInsertChainWithCutoff(t, 32, 32) // cutoff = 32, ancientLimit = 32 + testInsertChainWithCutoff(t, 32, 64) // cutoff = 32, ancientLimit = 64 (entire chain in ancient) + testInsertChainWithCutoff(t, 32, 65) // cutoff = 32, ancientLimit = 65 (64 blocks in ancient, 1 block in live) +} + +func testInsertChainWithCutoff(t *testing.T, cutoff uint64, ancientLimit uint64) { + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + + // Configure and generate a sample block chain + var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{address: {Balance: funds}}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer = types.LatestSigner(gspec.Config) + engine = beacon.New(ethash.NewFaker()) + ) + _, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(2*cutoff), func(i int, block *BlockGen) { + block.SetCoinbase(common.Address{0x00}) + + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key) + if err != nil { + panic(err) + } + block.AddTx(tx) + }) + db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false) + defer db.Close() + + cutoffBlock := blocks[cutoff-1] + config := DefaultCacheConfigWithScheme(rawdb.PathScheme) + config.HistoryPruningCutoffNumber = cutoffBlock.NumberU64() + config.HistoryPruningCutoffHash = cutoffBlock.Hash() + + chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil) + defer chain.Stop() + + var ( + headersBefore []*types.Header + blocksAfter []*types.Block + receiptsAfter []types.Receipts + ) + for i, b := range blocks { + if b.NumberU64() < cutoffBlock.NumberU64() { + headersBefore = append(headersBefore, b.Header()) + } else { + blocksAfter = append(blocksAfter, b) + receiptsAfter = append(receiptsAfter, receipts[i]) + } + } + if n, err := chain.InsertHeadersBeforeCutoff(headersBefore); err != nil { + t.Fatalf("failed to insert headers before cutoff %d: %v", n, err) + } + if n, err := chain.InsertReceiptChain(blocksAfter, receiptsAfter, ancientLimit); err != nil { + t.Fatalf("failed to insert receipt %d: %v", n, err) + } + headSnap := chain.CurrentSnapBlock() + if headSnap.Hash() != blocks[len(blocks)-1].Hash() { + t.Errorf("head snap block #%d: header mismatch: want: %v, got: %v", headSnap.Number, blocks[len(blocks)-1].Hash(), headSnap.Hash()) + } + headHeader := chain.CurrentHeader() + if headHeader.Hash() != blocks[len(blocks)-1].Hash() { + t.Errorf("head header #%d: header mismatch: want: %v, got: %v", headHeader.Number, blocks[len(blocks)-1].Hash(), headHeader.Hash()) + } + headBlock := chain.CurrentBlock() + if headBlock.Hash() != gspec.ToBlock().Hash() { + t.Errorf("head block #%d: header mismatch: want: %v, got: %v", headBlock.Number, gspec.ToBlock().Hash(), headBlock.Hash()) + } + + // Iterate over all chain data components, and cross reference + for i := 0; i < len(blocks); i++ { + num, hash := blocks[i].NumberU64(), blocks[i].Hash() + + // Canonical headers should be visible regardless of cutoff + header := chain.GetHeaderByNumber(num) + if header.Hash() != hash { + t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash()) + } + tail, err := db.Tail() + if err != nil { + t.Fatalf("Failed to get chain tail, %v", err) + } + if tail != cutoffBlock.NumberU64() { + t.Fatalf("Unexpected chain tail, want: %d, got: %d", cutoffBlock.NumberU64(), tail) + } + // Block bodies and receipts before the cutoff should be non-existent + if num < cutoffBlock.NumberU64() { + body := chain.GetBody(hash) + if body != nil { + t.Fatalf("Unexpected block body: %d, cutoff: %d", num, cutoffBlock.NumberU64()) + } + receipts := chain.GetReceiptsByHash(hash) + if receipts != nil { + t.Fatalf("Unexpected block receipts: %d, cutoff: %d", num, cutoffBlock.NumberU64()) + } + } else { + body := chain.GetBody(hash) + if body == nil || len(body.Transactions) != 1 { + t.Fatalf("Missed block body: %d, cutoff: %d", num, cutoffBlock.NumberU64()) + } + receipts := chain.GetReceiptsByHash(hash) + if receipts == nil || len(receipts) != 1 { + t.Fatalf("Missed block receipts: %d, cutoff: %d", num, cutoffBlock.NumberU64()) + } + } + } +} diff --git a/core/error.go b/core/error.go index ce3bbb788886..de95e6463620 100644 --- a/core/error.go +++ b/core/error.go @@ -28,8 +28,6 @@ var ( // ErrNoGenesis is returned when there is no Genesis Block. ErrNoGenesis = errors.New("genesis not found in chain") - - errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data") ) // List of evm-call-message pre-checking errors. All state transition messages will diff --git a/core/headerchain.go b/core/headerchain.go index cb707a152f53..f7acc49bef4f 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -237,7 +237,8 @@ func (hc *HeaderChain) WriteHeaders(headers []*types.Header) (int, error) { } // writeHeadersAndSetHead writes a batch of block headers and applies the last -// header as the chain head if the fork choicer says it's ok to update the chain. +// header as the chain head. +// // Note: This method is not concurrent-safe with inserting blocks simultaneously // into the chain, as side effects caused by reorganisations cannot be emulated // without the real blocks. Hence, writing headers directly should only be done @@ -272,12 +273,14 @@ func (hc *HeaderChain) writeHeadersAndSetHead(headers []*types.Header) (*headerW return result, nil } +// ValidateHeaderChain verifies that the supplied header chain is contiguous +// and conforms to consensus rules. func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header) (int, error) { // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 { - hash := chain[i].Hash() - parentHash := chain[i-1].Hash() + hash, parentHash := chain[i].Hash(), chain[i-1].Hash() + // Chain broke ancestry, log a message (programming error) and skip insertion log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", hash, "parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", parentHash) @@ -302,7 +305,6 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header) (int, error) { return i, err } } - return 0, nil } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 020d35619e78..2f62d86e4b77 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -737,6 +737,30 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type return nil } +// WriteAncientHeaderChain writes the supplied headers along with nil block +// bodies and receipts into the ancient store. It's supposed to be used for +// storing chain segment before the chain cutoff. +func WriteAncientHeaderChain(db ethdb.AncientWriter, headers []*types.Header) (int64, error) { + return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + for _, header := range headers { + num := header.Number.Uint64() + if err := op.AppendRaw(ChainFreezerHashTable, num, header.Hash().Bytes()); err != nil { + return fmt.Errorf("can't add block %d hash: %v", num, err) + } + if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil { + return fmt.Errorf("can't append block header %d: %v", num, err) + } + if err := op.AppendRaw(ChainFreezerBodiesTable, num, nil); err != nil { + return fmt.Errorf("can't append block body %d: %v", num, err) + } + if err := op.AppendRaw(ChainFreezerReceiptTable, num, nil); err != nil { + return fmt.Errorf("can't append block %d receipts: %v", num, err) + } + } + return nil + }) +} + // DeleteBlock removes all block data associated with a hash. func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { DeleteReceipts(db, hash, number) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index efd16d5fa7d1..247e27758290 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -464,6 +464,48 @@ func TestAncientStorage(t *testing.T) { } } +func TestWriteAncientHeaderChain(t *testing.T) { + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), t.TempDir(), "", false) + if err != nil { + t.Fatalf("failed to create database with ancient backend") + } + defer db.Close() + + // Create a test block + var headers []*types.Header + headers = append(headers, &types.Header{ + Number: big.NewInt(0), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + }) + headers = append(headers, &types.Header{ + Number: big.NewInt(1), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + }) + // Write and verify the header in the database + WriteAncientHeaderChain(db, headers) + + for _, header := range headers { + if blob := ReadHeaderRLP(db, header.Hash(), header.Number.Uint64()); len(blob) == 0 { + t.Fatalf("no header returned") + } + if h := ReadCanonicalHash(db, header.Number.Uint64()); h != header.Hash() { + t.Fatalf("no canonical hash returned") + } + if blob := ReadBodyRLP(db, header.Hash(), header.Number.Uint64()); len(blob) != 0 { + t.Fatalf("unexpected body returned") + } + if blob := ReadReceiptsRLP(db, header.Hash(), header.Number.Uint64()); len(blob) != 0 { + t.Fatalf("unexpected body returned") + } + } +} + func TestCanonicalHashIteration(t *testing.T) { var cases = []struct { from, to uint64 diff --git a/core/txindexer.go b/core/txindexer.go index d0fce302f3eb..31f069995bc3 100644 --- a/core/txindexer.go +++ b/core/txindexer.go @@ -58,9 +58,10 @@ type txIndexer struct { // newTxIndexer initializes the transaction indexer. func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer { + cutoff, _ := chain.HistoryPruningCutoff() indexer := &txIndexer{ limit: limit, - cutoff: chain.HistoryPruningCutoff(), + cutoff: cutoff, db: chain.db, progress: make(chan chan TxIndexProgress), term: make(chan chan struct{}), diff --git a/eth/backend.go b/eth/backend.go index 909d153a2b59..d17d0106d323 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -154,13 +154,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } // Validate history pruning configuration. - var historyPruningCutoff uint64 + var ( + cutoffNumber uint64 + cutoffHash common.Hash + ) if config.HistoryMode == ethconfig.PostMergeHistory { prunecfg, ok := ethconfig.HistoryPrunePoints[genesisHash] if !ok { return nil, fmt.Errorf("no history pruning point is defined for genesis %x", genesisHash) } - historyPruningCutoff = prunecfg.BlockNumber + cutoffNumber = prunecfg.BlockNumber + cutoffHash = prunecfg.BlockHash + log.Info("Chain cutoff configured", "number", cutoffNumber, "hash", cutoffHash) } // Set networkID to chainID by default. @@ -204,16 +209,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnablePreimageRecording: config.EnablePreimageRecording, } cacheConfig = &core.CacheConfig{ - TrieCleanLimit: config.TrieCleanCache, - TrieCleanNoPrefetch: config.NoPrefetch, - TrieDirtyLimit: config.TrieDirtyCache, - TrieDirtyDisabled: config.NoPruning, - TrieTimeLimit: config.TrieTimeout, - SnapshotLimit: config.SnapshotCache, - Preimages: config.Preimages, - StateHistory: config.StateHistory, - StateScheme: scheme, - HistoryPruningCutoff: historyPruningCutoff, + TrieCleanLimit: config.TrieCleanCache, + TrieCleanNoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + TrieDirtyDisabled: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, + Preimages: config.Preimages, + StateHistory: config.StateHistory, + StateScheme: scheme, + HistoryPruningCutoffNumber: cutoffNumber, + HistoryPruningCutoffHash: cutoffHash, } ) if config.VMTrace != "" { @@ -241,7 +247,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints} chainView := eth.newChainView(eth.blockchain.CurrentBlock()) - historyCutoff := eth.blockchain.HistoryPruningCutoff() + historyCutoff, _ := eth.blockchain.HistoryPruningCutoff() var finalBlock uint64 if fb := eth.blockchain.CurrentFinalBlock(); fb != nil { finalBlock = fb.Number.Uint64() @@ -438,7 +444,7 @@ func (s *Ethereum) updateFilterMapsHeads() { if head == nil || newHead.Hash() != head.Hash() { head = newHead chainView := s.newChainView(head) - historyCutoff := s.blockchain.HistoryPruningCutoff() + historyCutoff, _ := s.blockchain.HistoryPruningCutoff() var finalBlock uint64 if fb := s.blockchain.CurrentFinalBlock(); fb != nil { finalBlock = fb.Number.Uint64() diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index c142ea7435e3..33ad0f897142 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -273,8 +273,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) { // fetchHeaders feeds skeleton headers to the downloader queue for scheduling // until sync errors or is finished. func (d *Downloader) fetchHeaders(from uint64) error { - var head *types.Header - _, tail, _, err := d.skeleton.Bounds() + head, tail, _, err := d.skeleton.Bounds() if err != nil { return err } @@ -294,6 +293,27 @@ func (d *Downloader) fetchHeaders(from uint64) error { fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck) defer fsHeaderContCheckTimer.Stop() + // Verify the header at configured chain cutoff, ensuring it's matched with + // the configured hash. Skip the check if the configured cutoff is even higher + // than the sync target, which is definitely not a common case. + if d.chainCutoffNumber != 0 && d.chainCutoffNumber >= from && d.chainCutoffNumber <= head.Number.Uint64() { + h := d.skeleton.Header(d.chainCutoffNumber) + if h == nil { + if d.chainCutoffNumber < tail.Number.Uint64() { + dist := tail.Number.Uint64() - d.chainCutoffNumber + if len(localHeaders) >= int(dist) { + h = localHeaders[dist-1] + } + } + } + if h == nil { + return fmt.Errorf("header at chain cutoff is not available, cutoff: %d", d.chainCutoffNumber) + } + if h.Hash() != d.chainCutoffHash { + return fmt.Errorf("header at chain cutoff mismatched, want: %v, got: %v", d.chainCutoffHash, h.Hash()) + } + } + for { // Some beacon headers might have appeared since the last cycle, make // sure we're always syncing to all available ones diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 3f3f9b7f0ca8..4d13ae304c2c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -20,6 +20,7 @@ package downloader import ( "errors" "fmt" + "sort" "sync" "sync/atomic" "time" @@ -120,6 +121,12 @@ type Downloader struct { committed atomic.Bool ancientLimit uint64 // The maximum block number which can be regarded as ancient data. + // The cutoff block number and hash before which chain segments (bodies + // and receipts) are skipped during synchronization. 0 means the entire + // chain segment is aimed for synchronization. + chainCutoffNumber uint64 + chainCutoffHash common.Hash + // Channels headerProcCh chan *headerTask // Channel to feed the header processor new tasks @@ -163,9 +170,6 @@ type BlockChain interface { // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header) (int, error) - // SetHead rewinds the local chain to a new head. SetHead(uint64) error @@ -187,10 +191,17 @@ type BlockChain interface { // SnapSyncCommitHead directly commits the head block to a certain entity. SnapSyncCommitHead(common.Hash) error + // InsertHeadersBeforeCutoff inserts a batch of headers before the configured + // chain cutoff into the ancient store. + InsertHeadersBeforeCutoff([]*types.Header) (int, error) + // InsertChain inserts a batch of blocks into the local chain. InsertChain(types.Blocks) (int, error) - // InsertReceiptChain inserts a batch of receipts into the local chain. + // InsertReceiptChain inserts a batch of blocks along with their receipts + // into the local chain. Blocks older than the specified `ancientLimit` + // are stored directly in the ancient store, while newer blocks are stored + // in the live key-value store. InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error) // Snapshots returns the blockchain snapshot tree to paused it during sync. @@ -199,22 +210,29 @@ type BlockChain interface { // TrieDB retrieves the low level trie database used for interacting // with trie nodes. TrieDB() *triedb.Database + + // HistoryPruningCutoff returns the configured history pruning point. + // Block bodies along with the receipts will be skipped for synchronization. + HistoryPruningCutoff() (uint64, common.Hash) } // New creates a new downloader to fetch hashes and blocks from remote peers. func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { + cutoffNumber, cutoffHash := chain.HistoryPruningCutoff() dl := &Downloader{ - stateDB: stateDb, - mux: mux, - queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), - peers: newPeerSet(), - blockchain: chain, - dropPeer: dropPeer, - headerProcCh: make(chan *headerTask, 1), - quitCh: make(chan struct{}), - SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), - stateSyncStart: make(chan *stateSync), - syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), + stateDB: stateDb, + mux: mux, + queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), + peers: newPeerSet(), + blockchain: chain, + chainCutoffNumber: cutoffNumber, + chainCutoffHash: cutoffHash, + dropPeer: dropPeer, + headerProcCh: make(chan *headerTask, 1), + quitCh: make(chan struct{}), + SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), + stateSyncStart: make(chan *stateSync), + syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), } // Create the post-merge skeleton syncer and start the process dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) @@ -503,6 +521,12 @@ func (d *Downloader) syncToHead() (err error) { } else { d.ancientLimit = 0 } + // Extend the ancient chain segment range if the ancient limit is even + // below the pre-configured chain cutoff. + if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.ancientLimit { + d.ancientLimit = d.chainCutoffNumber + log.Info("Extend the ancient range with configured cutoff", "cutoff", d.chainCutoffNumber) + } frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here. // If a part of blockchain data has already been written into active store, @@ -521,14 +545,23 @@ func (d *Downloader) syncToHead() (err error) { log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin) } } + // Skip ancient chain segments if Geth is running with a configured chain cutoff. + // These segments are not guaranteed to be available in the network. + chainOffset := origin + 1 + if mode == ethconfig.SnapSync && d.chainCutoffNumber != 0 { + if chainOffset < d.chainCutoffNumber { + chainOffset = d.chainCutoffNumber + log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber) + } + } // Initiate the sync using a concurrent header and content retrieval algorithm - d.queue.Prepare(origin+1, mode) + d.queue.Prepare(chainOffset, mode) // In beacon mode, headers are served by the skeleton syncer fetchers := []func() error{ - func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync + func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved + func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync func() error { return d.processHeaders(origin + 1) }, } if mode == ethconfig.SnapSync { @@ -666,7 +699,7 @@ func (d *Downloader) processHeaders(origin uint64) error { return nil } // Otherwise split the chunk of headers into batches and process them - headers, hashes := task.headers, task.hashes + headers, hashes, scheduled := task.headers, task.hashes, false for len(headers) > 0 { // Terminate if something failed in between processing chunks @@ -683,17 +716,21 @@ func (d *Downloader) processHeaders(origin uint64) error { chunkHeaders := headers[:limit] chunkHashes := hashes[:limit] - // In case of header only syncing, validate the chunk immediately - if mode == ethconfig.SnapSync { - // Although the received headers might be all valid, a legacy - // PoW/PoA sync must not accept post-merge headers. Make sure - // that any transition is rejected at this point. - if len(chunkHeaders) > 0 { - if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil { - log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) - return fmt.Errorf("%w: %v", errInvalidChain, err) - } + // Split the headers around the chain cutoff + var cutoff int + if mode == ethconfig.SnapSync && d.chainCutoffNumber != 0 { + cutoff = sort.Search(len(chunkHeaders), func(i int) bool { + return chunkHeaders[i].Number.Uint64() >= d.chainCutoffNumber + }) + } + // Insert the header chain into the ancient store (with block bodies and + // receipts set to nil) if they fall before the cutoff. + if mode == ethconfig.SnapSync && cutoff != 0 { + if n, err := d.blockchain.InsertHeadersBeforeCutoff(chunkHeaders[:cutoff]); err != nil { + log.Warn("Failed to insert ancient header chain", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err) + return fmt.Errorf("%w: %v", errInvalidChain, err) } + log.Debug("Inserted headers before cutoff", "number", chunkHeaders[cutoff-1].Number, "hash", chunkHashes[cutoff-1]) } // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { @@ -704,12 +741,21 @@ func (d *Downloader) processHeaders(origin uint64) error { case <-timer.C: } } - // Otherwise insert the headers for content retrieval - inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin) - if len(inserts) != len(chunkHeaders) { - return fmt.Errorf("%w: stale headers", errBadPeer) + // Otherwise, schedule the headers for content retrieval (block bodies and + // potentially receipts in snap sync). + // + // Skip the bodies/receipts retrieval scheduling before the cutoff in snap + // sync if chain pruning is configured. + if mode == ethconfig.SnapSync && cutoff != 0 { + chunkHeaders = chunkHeaders[cutoff:] + chunkHashes = chunkHashes[cutoff:] + } + if len(chunkHeaders) > 0 { + scheduled = true + if d.queue.Schedule(chunkHeaders, chunkHashes, origin+uint64(cutoff)) != len(chunkHeaders) { + return fmt.Errorf("%w: stale headers", errBadPeer) + } } - headers = headers[limit:] hashes = hashes[limit:] origin += uint64(limit) @@ -721,11 +767,13 @@ func (d *Downloader) processHeaders(origin uint64) error { } d.syncStatsLock.Unlock() - // Signal the content downloaders of the availability of new tasks - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { - select { - case ch <- true: - default: + // Signal the downloader of the availability of new tasks + if scheduled { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { + select { + case ch <- true: + default: + } } } } @@ -1085,10 +1133,20 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { header = d.blockchain.CurrentHeader() block = d.blockchain.CurrentSnapBlock() ) - syncedBlocks := block.Number.Uint64() - d.syncStartBlock - if syncedBlocks == 0 { + // Prevent reporting if nothing has been synchronized yet + if block.Number.Uint64() <= d.syncStartBlock { + return + } + // Prevent reporting noise if the actual chain synchronization (headers + // and bodies) hasn't started yet. Inserting the ancient header chain is + // fast enough and would introduce significant bias if included in the count. + if d.chainCutoffNumber != 0 && block.Number.Uint64() <= d.chainCutoffNumber { return } + fetchedBlocks := block.Number.Uint64() - d.syncStartBlock + if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.syncStartBlock { + fetchedBlocks = block.Number.Uint64() - d.chainCutoffNumber + } // Retrieve the current chain head and calculate the ETA latest, _, _, err := d.skeleton.Bounds() if err != nil { @@ -1103,7 +1161,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { } var ( left = latest.Number.Uint64() - block.Number.Uint64() - eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left) + eta = time.Since(d.syncStartTime) / time.Duration(fetchedBlocks) * time.Duration(left) progress = fmt.Sprintf("%.2f%%", float64(block.Number.Uint64())*100/float64(latest.Number.Uint64())) headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString()) diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 23c033a8ad12..bfe80ddbf1cb 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -25,7 +25,6 @@ import ( var ( headerInMeter = metrics.NewRegisteredMeter("eth/downloader/headers/in", nil) headerReqTimer = metrics.NewRegisteredTimer("eth/downloader/headers/req", nil) - headerDropMeter = metrics.NewRegisteredMeter("eth/downloader/headers/drop", nil) headerTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/headers/timeout", nil) bodyInMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/in", nil) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index a1c114f05785..000ad97ca9c3 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -73,7 +73,7 @@ type fetchResult struct { Withdrawals types.Withdrawals } -func newFetchResult(header *types.Header, fastSync bool) *fetchResult { +func newFetchResult(header *types.Header, snapSync bool) *fetchResult { item := &fetchResult{ Header: header, } @@ -82,7 +82,7 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult { } else if header.WithdrawalsHash != nil { item.Withdrawals = make(types.Withdrawals, 0) } - if fastSync && !header.EmptyReceipts() { + if snapSync && !header.EmptyReceipts() { item.pending.Store(item.pending.Load() | (1 << receiptType)) } return item @@ -124,19 +124,8 @@ func (f *fetchResult) Done(kind uint) bool { // queue represents hashes that are either need fetching or are being fetched type queue struct { - mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching - - // Headers are "special", they download in batches, supported by a skeleton chain - headerHead common.Hash // Hash of the last queued header to verify order - headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for - headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable - headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations - headerResults []*types.Header // Result cache accumulating the completed headers - headerHashes []common.Hash // Result cache accumulating the completed header hashes - headerProced int // Number of headers already processed from the results - headerOffset uint64 // Number of the first header in the result cache - headerContCh chan bool // Channel to notify when header download finishes + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching + headerHead common.Hash // Hash of the last queued header to verify order // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers @@ -163,7 +152,6 @@ type queue struct { func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ - headerContCh: make(chan bool, 1), blockTaskQueue: prque.New[int64, *types.Header](nil), blockWakeCh: make(chan bool, 1), receiptTaskQueue: prque.New[int64, *types.Header](nil), @@ -182,9 +170,7 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.closed = false q.mode = ethconfig.FullSync - q.headerHead = common.Hash{} - q.headerPendPool = make(map[string]*fetchRequest) q.blockTaskPool = make(map[common.Hash]*types.Header) q.blockTaskQueue.Reset() @@ -207,14 +193,6 @@ func (q *queue) Close() { q.lock.Unlock() } -// PendingHeaders retrieves the number of header requests pending for retrieval. -func (q *queue) PendingHeaders() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.headerTaskQueue.Size() -} - // PendingBodies retrieves the number of block body requests pending for retrieval. func (q *queue) PendingBodies() int { q.lock.Lock() @@ -260,54 +238,14 @@ func (q *queue) Idle() bool { return (queued + pending) == 0 } -// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill -// up an already retrieved header skeleton. -func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { - q.lock.Lock() - defer q.lock.Unlock() - - // No skeleton retrieval can be in progress, fail hard if so (huge implementation bug) - if q.headerResults != nil { - panic("skeleton assembly already in progress") - } - // Schedule all the header retrieval tasks for the skeleton assembly - q.headerTaskPool = make(map[uint64]*types.Header) - q.headerTaskQueue = prque.New[int64, uint64](nil) - q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains - q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) - q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch) - q.headerProced = 0 - q.headerOffset = from - q.headerContCh = make(chan bool, 1) - - for i, header := range skeleton { - index := from + uint64(i*MaxHeaderFetch) - - q.headerTaskPool[index] = header - q.headerTaskQueue.Push(index, -int64(index)) - } -} - -// RetrieveHeaders retrieves the header chain assemble based on the scheduled -// skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) { - q.lock.Lock() - defer q.lock.Unlock() - - headers, hashes, proced := q.headerResults, q.headerHashes, q.headerProced - q.headerResults, q.headerHashes, q.headerProced = nil, nil, 0 - - return headers, hashes, proced -} - // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) int { q.lock.Lock() defer q.lock.Unlock() // Insert all the headers prioritised by the contained block number - inserts := make([]*types.Header, 0, len(headers)) + var inserts int for i, header := range headers { // Make sure chain order is honoured and preserved throughout hash := hashes[i] @@ -337,7 +275,7 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } } - inserts = append(inserts, header) + inserts++ q.headerHead = hash from++ } @@ -390,7 +328,7 @@ func (q *queue) Results(block bool) []*fetchResult { q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize } - // Using the newly calibrated resultsize, figure out the new throttle limit + // Using the newly calibrated result size, figure out the new throttle limit // on the result cache throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) @@ -428,46 +366,6 @@ func (q *queue) stats() []interface{} { } } -// ReserveHeaders reserves a set of headers for the given peer, skipping any -// previously failed batches. -func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the peer's already downloading something (sanity check to - // not corrupt state) - if _, ok := q.headerPendPool[p.id]; ok { - return nil - } - // Retrieve a batch of hashes, skipping previously failed ones - send, skip := uint64(0), []uint64{} - for send == 0 && !q.headerTaskQueue.Empty() { - from, _ := q.headerTaskQueue.Pop() - if q.headerPeerMiss[p.id] != nil { - if _, ok := q.headerPeerMiss[p.id][from]; ok { - skip = append(skip, from) - continue - } - } - send = from - } - // Merge all the skipped batches back - for _, from := range skip { - q.headerTaskQueue.Push(from, -int64(from)) - } - // Assemble and return the block download request - if send == 0 { - return nil - } - request := &fetchRequest{ - Peer: p, - From: send, - Time: time.Now(), - } - q.headerPendPool[p.id] = request - return request -} - // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. @@ -594,10 +492,6 @@ func (q *queue) Revoke(peerID string) { q.lock.Lock() defer q.lock.Unlock() - if request, ok := q.headerPendPool[peerID]; ok { - q.headerTaskQueue.Push(request.From, -int64(request.From)) - delete(q.headerPendPool, peerID) - } if request, ok := q.blockPendPool[peerID]; ok { for _, header := range request.Headers { q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) @@ -612,16 +506,6 @@ func (q *queue) Revoke(peerID string) { } } -// ExpireHeaders cancels a request that timed out and moves the pending fetch -// task back into the queue for rescheduling. -func (q *queue) ExpireHeaders(peer string) int { - q.lock.Lock() - defer q.lock.Unlock() - - headerTimeoutMeter.Mark(1) - return q.expire(peer, q.headerPendPool, q.headerTaskQueue) -} - // ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireBodies(peer string) int { @@ -670,116 +554,6 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue return len(req.Headers) } -// DeliverHeaders injects a header retrieval response into the header results -// cache. This method either accepts all headers it received, or none of them -// if they do not map correctly to the skeleton. -// -// If the headers are accepted, the method makes an attempt to deliver the set -// of ready headers to the processor to keep the pipeline full. However, it will -// not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []common.Hash, headerProcCh chan *headerTask) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - var logger log.Logger - if len(id) < 16 { - // Tests use short IDs, don't choke on them - logger = log.New("peer", id) - } else { - logger = log.New("peer", id[:16]) - } - // Short circuit if the data was never requested - request := q.headerPendPool[id] - if request == nil { - headerDropMeter.Mark(int64(len(headers))) - return 0, errNoFetchesPending - } - delete(q.headerPendPool, id) - - headerReqTimer.UpdateSince(request.Time) - headerInMeter.Mark(int64(len(headers))) - - // Ensure headers can be mapped onto the skeleton chain - target := q.headerTaskPool[request.From].Hash() - - accepted := len(headers) == MaxHeaderFetch - if accepted { - if headers[0].Number.Uint64() != request.From { - logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", hashes[0], "expected", request.From) - accepted = false - } else if hashes[len(headers)-1] != target { - logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", hashes[len(headers)-1], "expected", target) - accepted = false - } - } - if accepted { - parentHash := hashes[0] - for i, header := range headers[1:] { - hash := hashes[i+1] - if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { - logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want) - accepted = false - break - } - if parentHash != header.ParentHash { - logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash) - accepted = false - break - } - // Set-up parent hash for next round - parentHash = hash - } - } - // If the batch of headers wasn't accepted, mark as unavailable - if !accepted { - logger.Trace("Skeleton filling not accepted", "from", request.From) - headerDropMeter.Mark(int64(len(headers))) - - miss := q.headerPeerMiss[id] - if miss == nil { - q.headerPeerMiss[id] = make(map[uint64]struct{}) - miss = q.headerPeerMiss[id] - } - miss[request.From] = struct{}{} - - q.headerTaskQueue.Push(request.From, -int64(request.From)) - return 0, errors.New("delivery not accepted") - } - // Clean up a successful fetch and try to deliver any sub-results - copy(q.headerResults[request.From-q.headerOffset:], headers) - copy(q.headerHashes[request.From-q.headerOffset:], hashes) - - delete(q.headerTaskPool, request.From) - - ready := 0 - for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil { - ready += MaxHeaderFetch - } - if ready > 0 { - // Headers are ready for delivery, gather them and push forward (non blocking) - processHeaders := make([]*types.Header, ready) - copy(processHeaders, q.headerResults[q.headerProced:q.headerProced+ready]) - - processHashes := make([]common.Hash, ready) - copy(processHashes, q.headerHashes[q.headerProced:q.headerProced+ready]) - - select { - case headerProcCh <- &headerTask{ - headers: processHeaders, - hashes: processHashes, - }: - logger.Trace("Pre-scheduled new headers", "count", len(processHeaders), "from", processHeaders[0].Number) - q.headerProced += len(processHeaders) - default: - } - } - // Check for termination and return - if len(q.headerTaskPool) == 0 { - q.headerContCh <- false - } - return len(headers), nil -} - // DeliverBodies injects a block body retrieval response into the results queue. // The method returns the number of blocks bodies accepted from the delivery and // also wakes any threads waiting for data delivery. diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index e4323c04ebc5..36c382fefcc6 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { // throttled - if true, the store is at capacity, this particular header is not prio now // item - the result to store data into // err - any error that occurred -func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) { +func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) { r.lock.Lock() defer r.lock.Unlock() @@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro return stale, throttled, item, err } if item == nil { - item = newFetchResult(header, fastSync) + item = newFetchResult(header, snapSync) r.items[index] = item } return stale, throttled, item, err