Skip to content

Commit 15ed4e3

Browse files
MariusVanDerWijdenkaralabe
authored andcommitted
core: reduce peak memory usage during reorg (#30600)
~~Opening this as a draft to have a discussion.~~ Pressed the wrong button I had [a previous PR ](ethereum/go-ethereum#24616 long time ago which reduced the peak memory used during reorgs by not accumulating all transactions and logs. This PR reduces the peak memory further by not storing the blocks in memory. However this means we need to pull the blocks back up from storage multiple times during the reorg. I collected the following numbers on peak memory usage: // Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used // WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used // WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used Each block contains a transaction with ~50k bytes and we're doing a 10k block reorg, so the chain should be ~500MB in size --------- Co-authored-by: Péter Szilágyi <peterke@gmail.com>
1 parent 501984e commit 15ed4e3

File tree

3 files changed

+143
-102
lines changed

3 files changed

+143
-102
lines changed

core/blockchain.go

Lines changed: 105 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,7 +1475,7 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
14751475
func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
14761476
current := bc.CurrentBlock()
14771477
if block.ParentHash() != current.Hash() {
1478-
if err := bc.reorg(current, block); err != nil {
1478+
if err := bc.reorg(current, block.Header()); err != nil {
14791479
return err
14801480
}
14811481
}
@@ -1596,7 +1596,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
15961596
if reorg {
15971597
// Reorganise the chain if the parent is not the head block
15981598
if block.ParentHash() != currentBlock.Hash() {
1599-
if err := bc.reorg(currentBlock, block); err != nil {
1599+
if err := bc.reorg(currentBlock, block.Header()); err != nil {
16001600
return NonStatTy, err
16011601
}
16021602
}
@@ -2300,8 +2300,8 @@ func (bc *BlockChain) recoverAncestors(block *types.Block, makeWitness bool) (co
23002300
return block.Hash(), nil
23012301
}
23022302

2303-
// collectLogs collects the logs that were generated or removed during
2304-
// the processing of a block. These logs are later announced as deleted or reborn.
2303+
// collectLogs collects the logs that were generated or removed during the
2304+
// processing of a block. These logs are later announced as deleted or reborn.
23052305
func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
23062306
var blobGasPrice *big.Int
23072307
excessBlobGas := b.ExcessBlobGas()
@@ -2327,70 +2327,55 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
23272327
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
23282328
// blocks and inserts them to be part of the new canonical chain and accumulates
23292329
// potential missing transactions and post an event about them.
2330+
//
23302331
// Note the new head block won't be processed here, callers need to handle it
23312332
// externally.
2332-
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
2333+
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Header) error {
23332334
var (
2334-
newChain types.Blocks
2335-
oldChain types.Blocks
2336-
commonBlock *types.Block
2337-
2338-
deletedTxs []common.Hash
2339-
addedTxs []common.Hash
2335+
newChain []*types.Header
2336+
oldChain []*types.Header
2337+
commonBlock *types.Header
23402338
)
2341-
oldBlock := bc.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
2342-
if oldBlock == nil {
2343-
return errors.New("current head block missing")
2344-
}
2345-
newBlock := newHead
2346-
23472339
// Reduce the longer chain to the same number as the shorter one
2348-
if oldBlock.NumberU64() > newBlock.NumberU64() {
2340+
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
23492341
// Old chain is longer, gather all transactions and logs as deleted ones
2350-
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
2351-
oldChain = append(oldChain, oldBlock)
2352-
for _, tx := range oldBlock.Transactions() {
2353-
deletedTxs = append(deletedTxs, tx.Hash())
2354-
}
2342+
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
2343+
oldChain = append(oldChain, oldHead)
23552344
}
23562345
} else {
23572346
// New chain is longer, stash all blocks away for subsequent insertion
2358-
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
2359-
newChain = append(newChain, newBlock)
2347+
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
2348+
newChain = append(newChain, newHead)
23602349
}
23612350
}
2362-
if oldBlock == nil {
2351+
if oldHead == nil {
23632352
return errInvalidOldChain
23642353
}
2365-
if newBlock == nil {
2354+
if newHead == nil {
23662355
return errInvalidNewChain
23672356
}
23682357
// Both sides of the reorg are at the same number, reduce both until the common
23692358
// ancestor is found
23702359
for {
23712360
// If the common ancestor was found, bail out
2372-
if oldBlock.Hash() == newBlock.Hash() {
2373-
commonBlock = oldBlock
2361+
if oldHead.Hash() == newHead.Hash() {
2362+
commonBlock = oldHead
23742363
break
23752364
}
23762365
// Remove an old block as well as stash away a new block
2377-
oldChain = append(oldChain, oldBlock)
2378-
for _, tx := range oldBlock.Transactions() {
2379-
deletedTxs = append(deletedTxs, tx.Hash())
2380-
}
2381-
newChain = append(newChain, newBlock)
2366+
oldChain = append(oldChain, oldHead)
2367+
newChain = append(newChain, newHead)
23822368

23832369
// Step back with both chains
2384-
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
2385-
if oldBlock == nil {
2370+
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
2371+
if oldHead == nil {
23862372
return errInvalidOldChain
23872373
}
2388-
newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
2389-
if newBlock == nil {
2374+
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
2375+
if newHead == nil {
23902376
return errInvalidNewChain
23912377
}
23922378
}
2393-
23942379
// Ensure the user sees large reorgs
23952380
if len(oldChain) > 0 && len(newChain) > 0 {
23962381
logFn := log.Info
@@ -2399,63 +2384,120 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
23992384
msg = "Large chain reorg detected"
24002385
logFn = log.Warn
24012386
}
2402-
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
2387+
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash(),
24032388
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
24042389
blockReorgAddMeter.Mark(int64(len(newChain)))
24052390
blockReorgDropMeter.Mark(int64(len(oldChain)))
24062391
blockReorgMeter.Mark(1)
24072392
} else if len(newChain) > 0 {
24082393
// Special case happens in the post merge stage that current head is
24092394
// the ancestor of new head while these two blocks are not consecutive
2410-
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
2395+
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
24112396
blockReorgAddMeter.Mark(int64(len(newChain)))
24122397
} else {
24132398
// len(newChain) == 0 && len(oldChain) > 0
24142399
// rewind the canonical chain to a lower point.
2415-
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain))
2400+
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
24162401
}
24172402
// Acquire the tx-lookup lock before mutation. This step is essential
24182403
// as the txlookups should be changed atomically, and all subsequent
24192404
// reads should be blocked until the mutation is complete.
24202405
bc.txLookupLock.Lock()
24212406

2422-
// Insert the new chain segment in incremental order, from the old
2423-
// to the new. The new chain head (newChain[0]) is not inserted here,
2424-
// as it will be handled separately outside of this function
2425-
for i := len(newChain) - 1; i >= 1; i-- {
2426-
// Insert the block in the canonical way, re-writing history
2427-
bc.writeHeadBlock(newChain[i])
2407+
// Reorg can be executed, start reducing the chain's old blocks and appending
2408+
// the new blocks
2409+
var (
2410+
deletedTxs []common.Hash
2411+
rebirthTxs []common.Hash
24282412

2429-
// Collect the new added transactions.
2430-
for _, tx := range newChain[i].Transactions() {
2431-
addedTxs = append(addedTxs, tx.Hash())
2413+
deletedLogs []*types.Log
2414+
rebirthLogs []*types.Log
2415+
)
2416+
// Deleted log emission on the API uses forward order, which is borked, but
2417+
// we'll leave it in for legacy reasons.
2418+
//
2419+
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
2420+
{
2421+
for i := len(oldChain) - 1; i >= 0; i-- {
2422+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2423+
if block == nil {
2424+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2425+
}
2426+
if logs := bc.collectLogs(block, true); len(logs) > 0 {
2427+
deletedLogs = append(deletedLogs, logs...)
2428+
}
2429+
if len(deletedLogs) > 512 {
2430+
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2431+
deletedLogs = nil
2432+
}
2433+
}
2434+
if len(deletedLogs) > 0 {
2435+
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
24322436
}
24332437
}
2438+
// Undo old blocks in reverse order
2439+
for i := 0; i < len(oldChain); i++ {
2440+
// Collect all the deleted transactions
2441+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2442+
if block == nil {
2443+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2444+
}
2445+
for _, tx := range block.Transactions() {
2446+
deletedTxs = append(deletedTxs, tx.Hash())
2447+
}
2448+
// Collect deleted logs and emit them for new integrations
2449+
if logs := bc.collectLogs(block, true); len(logs) > 0 {
2450+
// Emit revertals latest first, older then
2451+
slices.Reverse(logs)
24342452

2453+
// TODO(karalabe): Hook into the reverse emission part
2454+
}
2455+
}
2456+
// Apply new blocks in forward order
2457+
for i := len(newChain) - 1; i >= 1; i-- {
2458+
// Collect all the included transactions
2459+
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
2460+
if block == nil {
2461+
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
2462+
}
2463+
for _, tx := range block.Transactions() {
2464+
rebirthTxs = append(rebirthTxs, tx.Hash())
2465+
}
2466+
// Collect inserted logs and emit them
2467+
if logs := bc.collectLogs(block, false); len(logs) > 0 {
2468+
rebirthLogs = append(rebirthLogs, logs...)
2469+
}
2470+
if len(rebirthLogs) > 512 {
2471+
bc.logsFeed.Send(rebirthLogs)
2472+
rebirthLogs = nil
2473+
}
2474+
// Update the head block
2475+
bc.writeHeadBlock(block)
2476+
}
2477+
if len(rebirthLogs) > 0 {
2478+
bc.logsFeed.Send(rebirthLogs)
2479+
}
24352480
// Delete useless indexes right now which includes the non-canonical
24362481
// transaction indexes, canonical chain indexes which above the head.
2437-
var (
2438-
indexesBatch = bc.db.NewBatch()
2439-
diffs = types.HashDifference(deletedTxs, addedTxs)
2440-
)
2441-
for _, tx := range diffs {
2442-
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
2482+
batch := bc.db.NewBatch()
2483+
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
2484+
rawdb.DeleteTxLookupEntry(batch, tx)
24432485
}
24442486
// Delete all hash markers that are not part of the new canonical chain.
24452487
// Because the reorg function does not handle new chain head, all hash
24462488
// markers greater than or equal to new chain head should be deleted.
2447-
number := commonBlock.NumberU64()
2489+
number := commonBlock.Number
24482490
if len(newChain) > 1 {
2449-
number = newChain[1].NumberU64()
2491+
number = newChain[1].Number
24502492
}
2451-
for i := number + 1; ; i++ {
2493+
for i := number.Uint64() + 1; ; i++ {
24522494
hash := rawdb.ReadCanonicalHash(bc.db, i)
24532495
if hash == (common.Hash{}) {
24542496
break
24552497
}
2456-
rawdb.DeleteCanonicalHash(indexesBatch, i)
2498+
rawdb.DeleteCanonicalHash(batch, i)
24572499
}
2458-
if err := indexesBatch.Write(); err != nil {
2500+
if err := batch.Write(); err != nil {
24592501
log.Crit("Failed to delete useless indexes", "err", err)
24602502
}
24612503
// Reset the tx lookup cache to clear stale txlookup cache.
@@ -2464,40 +2506,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
24642506
// Release the tx-lookup lock after mutation.
24652507
bc.txLookupLock.Unlock()
24662508

2467-
// Send out events for logs from the old canon chain, and 'reborn'
2468-
// logs from the new canon chain. The number of logs can be very
2469-
// high, so the events are sent in batches of size around 512.
2470-
2471-
// Deleted logs + blocks:
2472-
var deletedLogs []*types.Log
2473-
for i := len(oldChain) - 1; i >= 0; i-- {
2474-
// Collect deleted logs for notification
2475-
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
2476-
deletedLogs = append(deletedLogs, logs...)
2477-
}
2478-
if len(deletedLogs) > 512 {
2479-
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2480-
deletedLogs = nil
2481-
}
2482-
}
2483-
if len(deletedLogs) > 0 {
2484-
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2485-
}
2486-
2487-
// New logs:
2488-
var rebirthLogs []*types.Log
2489-
for i := len(newChain) - 1; i >= 1; i-- {
2490-
if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 {
2491-
rebirthLogs = append(rebirthLogs, logs...)
2492-
}
2493-
if len(rebirthLogs) > 512 {
2494-
bc.logsFeed.Send(rebirthLogs)
2495-
rebirthLogs = nil
2496-
}
2497-
}
2498-
if len(rebirthLogs) > 0 {
2499-
bc.logsFeed.Send(rebirthLogs)
2500-
}
25012509
return nil
25022510
}
25032511

@@ -2535,7 +2543,7 @@ func (bc *BlockChain) SetCanonical(head *types.Block) (common.Hash, error) {
25352543
// Run the reorg if necessary and set the given block as new head.
25362544
start := time.Now()
25372545
if head.ParentHash() != bc.CurrentBlock().Hash() {
2538-
if err := bc.reorg(bc.CurrentBlock(), head); err != nil {
2546+
if err := bc.reorg(bc.CurrentBlock(), head.Header()); err != nil {
25392547
return common.Hash{}, err
25402548
}
25412549
}

core/blockchain_test.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2674,7 +2674,7 @@ func testSideImportPrunedBlocks(t *testing.T, scheme string) {
26742674
}
26752675
defer db.Close()
26762676

2677-
chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil)
2677+
chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), genesis, nil, engine, vm.Config{}, nil, nil)
26782678
if err != nil {
26792679
t.Fatalf("failed to create tester chain: %v", err)
26802680
}
@@ -4226,7 +4226,7 @@ func TestPragueRequests(t *testing.T) {
42264226
}
42274227

42284228
// Insert block to check validation.
4229-
chain, err := NewBlockChain(rawdb.NewMemoryDatabase(), nil, gspec, nil, engine, vm.Config{}, nil)
4229+
chain, err := NewBlockChain(rawdb.NewMemoryDatabase(), nil, gspec, nil, engine, vm.Config{}, nil, nil)
42304230
if err != nil {
42314231
t.Fatalf("failed to create tester chain: %v", err)
42324232
}
@@ -4235,3 +4235,36 @@ func TestPragueRequests(t *testing.T) {
42354235
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
42364236
}
42374237
}
4238+
4239+
func BenchmarkReorg(b *testing.B) {
4240+
chainLength := b.N
4241+
4242+
dir := b.TempDir()
4243+
db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false)
4244+
if err != nil {
4245+
b.Fatalf("cannot create temporary database: %v", err)
4246+
}
4247+
defer db.Close()
4248+
gspec := &Genesis{
4249+
Config: params.TestChainConfig,
4250+
Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}},
4251+
}
4252+
blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
4253+
defer blockchain.Stop()
4254+
4255+
// Insert an easy and a difficult chain afterwards
4256+
easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
4257+
diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
4258+
4259+
if _, err := blockchain.InsertChain(easyBlocks); err != nil {
4260+
b.Fatalf("failed to insert easy chain: %v", err)
4261+
}
4262+
b.ResetTimer()
4263+
if _, err := blockchain.InsertChain(diffBlocks); err != nil {
4264+
b.Fatalf("failed to insert difficult chain: %v", err)
4265+
}
4266+
}
4267+
4268+
// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used
4269+
// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used
4270+
// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used

miner/worker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
145145
default:
146146
t.Fatalf("unexpected consensus engine type: %T", engine)
147147
}
148-
chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vm.Config{}, nil)
148+
chain, err := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec, nil, engine, vm.Config{}, nil, nil)
149149
if err != nil {
150150
t.Fatalf("core.NewBlockChain failed: %v", err)
151151
}
@@ -195,7 +195,7 @@ func TestGenerateAndImportBlock(t *testing.T) {
195195
defer w.close()
196196

197197
// This test chain imports the mined blocks.
198-
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, b.genesis, nil, engine, vm.Config{}, nil)
198+
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, b.genesis, nil, engine, vm.Config{}, nil, nil)
199199
defer chain.Stop()
200200

201201
// Ignore empty commit here for less noise.
@@ -240,7 +240,7 @@ func TestGenerateAndImportBlockDBFT(t *testing.T) {
240240
defer w.close()
241241

242242
// This test chain imports the mined blocks.
243-
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, b.genesis, nil, engine, vm.Config{}, nil)
243+
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, b.genesis, nil, engine, vm.Config{}, nil, nil)
244244
defer chain.Stop()
245245

246246
// Ignore empty commit here for less noise.

0 commit comments

Comments
 (0)