Skip to content

Commit

Permalink
core: reduce peak memory usage during reorg (#30600)
Browse files Browse the repository at this point in the history
~~Opening this as a draft to have a discussion.~~ Pressed the wrong
button
I had [a previous PR
](#24616 long time ago
which reduced the peak memory used during reorgs by not accumulating all
transactions and logs.
This PR reduces the peak memory further by not storing the blocks in
memory.
However this means we need to pull the blocks back up from storage
multiple times during the reorg.
I collected the following numbers on peak memory usage: 

// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440
allocs/op 1549443072 bytes of heap used
// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op
1564 allocs/op 1163870208 bytes of heap used
// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op
1564 allocs/op 1171890176 bytes of heap used

Each block contains a transaction with ~50k bytes and we're doing a 10k
block reorg, so the chain should be ~500MB in size

---------

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
  • Loading branch information
MariusVanDerWijden and karalabe authored Oct 16, 2024
1 parent 368e16f commit 18a5918
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 97 deletions.
203 changes: 106 additions & 97 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"math/big"
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1435,7 +1436,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
current := bc.CurrentBlock()
if block.ParentHash() != current.Hash() {
if err := bc.reorg(current, block); err != nil {
if err := bc.reorg(current, block.Header()); err != nil {
return err
}
}
Expand Down Expand Up @@ -1541,7 +1542,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types

// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
if err := bc.reorg(currentBlock, block.Header()); err != nil {
return NonStatTy, err
}
}
Expand Down Expand Up @@ -2154,8 +2155,8 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
return block.Hash(), nil
}

// collectLogs collects the logs that were generated or removed during
// the processing of a block. These logs are later announced as deleted or reborn.
// collectLogs collects the logs that were generated or removed during the
// processing of a block. These logs are later announced as deleted or reborn.
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
var blobGasPrice *big.Int
excessBlobGas := b.ExcessBlobGas()
Expand All @@ -2181,70 +2182,55 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
//
// Note the new head block won't be processed here, callers need to handle it
// externally.
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block

deletedTxs []common.Hash
addedTxs []common.Hash
newChain []*types.Header
oldChain []*types.Header
commonBlock *types.Header
)
oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
if oldBlock == nil {
return errors.New("current head block missing")
}
newBlock := newHead

// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
oldChain = append(oldChain, oldHead)
}
} else {
// New chain is longer, stash all blocks away for subsequent insertion
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
newChain = append(newChain, newHead)
}
}
if oldBlock == nil {
if oldHead == nil {
return errInvalidOldChain
}
if newBlock == nil {
if newHead == nil {
return errInvalidNewChain
}
// Both sides of the reorg are at the same number, reduce both until the common
// ancestor is found
for {
// If the common ancestor was found, bail out
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
if oldHead.Hash() == newHead.Hash() {
commonBlock = oldHead
break
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
newChain = append(newChain, newBlock)
oldChain = append(oldChain, oldHead)
newChain = append(newChain, newHead)

// Step back with both chains
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
if oldBlock == nil {
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
if oldHead == nil {
return errInvalidOldChain
}
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if newBlock == nil {
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
if newHead == nil {
return errInvalidNewChain
}
}

// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info
Expand All @@ -2253,63 +2239,120 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
msg = "Large chain reorg detected"
logFn = log.Warn
}
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
blockReorgMeter.Mark(1)
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
// len(newChain) == 0 && len(oldChain) > 0
// rewind the canonical chain to a lower point.
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain))
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
}
// Acquire the tx-lookup lock before mutation. This step is essential
// as the txlookups should be changed atomically, and all subsequent
// reads should be blocked until the mutation is complete.
bc.txLookupLock.Lock()

// Insert the new chain segment in incremental order, from the old
// to the new. The new chain head (newChain[0]) is not inserted here,
// as it will be handled separately outside of this function
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i])
// Reorg can be executed, start reducing the chain's old blocks and appending
// the new blocks
var (
deletedTxs []common.Hash
rebirthTxs []common.Hash

// Collect the new added transactions.
for _, tx := range newChain[i].Transactions() {
addedTxs = append(addedTxs, tx.Hash())
deletedLogs []*types.Log
rebirthLogs []*types.Log
)
// Deleted log emission on the API uses forward order, which is borked, but
// we'll leave it in for legacy reasons.
//
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
{
for i := len(oldChain) - 1; i >= 0; i-- {
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
if logs := bc.collectLogs(block, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
}
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
}
// Undo old blocks in reverse order
for i := 0; i < len(oldChain); i++ {
// Collect all the deleted transactions
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
// Collect deleted logs and emit them for new integrations
if logs := bc.collectLogs(block, true); len(logs) > 0 {
// Emit revertals latest first, older then
slices.Reverse(logs)

// TODO(karalabe): Hook into the reverse emission part
}
}
// Apply new blocks in forward order
for i := len(newChain) - 1; i >= 1; i-- {
// Collect all the included transactions
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
if block == nil {
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
rebirthTxs = append(rebirthTxs, tx.Hash())
}
// Collect inserted logs and emit them
if logs := bc.collectLogs(block, false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
// Update the head block
bc.writeHeadBlock(block)
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
var (
indexesBatch = bc.db.NewBatch()
diffs = types.HashDifference(deletedTxs, addedTxs)
)
for _, tx := range diffs {
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
batch := bc.db.NewBatch()
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
rawdb.DeleteTxLookupEntry(batch, tx)
}
// Delete all hash markers that are not part of the new canonical chain.
// Because the reorg function does not handle new chain head, all hash
// markers greater than or equal to new chain head should be deleted.
number := commonBlock.NumberU64()
number := commonBlock.Number
if len(newChain) > 1 {
number = newChain[1].NumberU64()
number = newChain[1].Number
}
for i := number + 1; ; i++ {
for i := number.Uint64() + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
rawdb.DeleteCanonicalHash(batch, i)
}
if err := indexesBatch.Write(); err != nil {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
// Reset the tx lookup cache to clear stale txlookup cache.
Expand All @@ -2318,40 +2361,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// Release the tx-lookup lock after mutation.
bc.txLookupLock.Unlock()

// Send out events for logs from the old canon chain, and 'reborn'
// logs from the new canon chain. The number of logs can be very
// high, so the events are sent in batches of size around 512.

// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
}
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}

// New logs:
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
return nil
}

Expand Down Expand Up @@ -2389,7 +2398,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
// Run the reorg if necessary and set the given block as new head.
start := time.Now()
if head.ParentHash() != bc.CurrentBlock().Hash() {
if err := bc.reorg(bc.CurrentBlock(), head); err != nil {
if err := bc.reorg(bc.CurrentBlock(), head.Header()); err != nil {
return common.Hash{}, err
}
}
Expand Down
33 changes: 33 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4231,3 +4231,36 @@ func TestPragueRequests(t *testing.T) {
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}
}

func BenchmarkReorg(b *testing.B) {
chainLength := b.N

dir := b.TempDir()
db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
defer db.Close()
gspec := &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}},
}
blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()

// Insert an easy and a difficult chain afterwards
easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))

if _, err := blockchain.InsertChain(easyBlocks); err != nil {
b.Fatalf("failed to insert easy chain: %v", err)
}
b.ResetTimer()
if _, err := blockchain.InsertChain(diffBlocks); err != nil {
b.Fatalf("failed to insert difficult chain: %v", err)
}
}

// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used
// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used
// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used

0 comments on commit 18a5918

Please sign in to comment.