Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: reduce peak memory usage during reorg #30600

Merged
merged 7 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,8 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
// externally.
func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
newChain []*types.Header
oldChain []*types.Header
commonBlock *types.Block

deletedTxs []common.Hash
Expand All @@ -2202,15 +2202,15 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
if oldBlock.NumberU64() > newBlock.NumberU64() {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
oldChain = append(oldChain, oldBlock.Header())
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
}
} else {
// New chain is longer, stash all blocks away for subsequent insertion
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
newChain = append(newChain, newBlock.Header())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this aprt will nt be correct. The headBlock is not part of the chain yet I think at this point, it's written later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, doc says head block is not processsed here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this codepath, it looks to me like it is written

func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
	if err := bc.writeBlockWithState(block, receipts, state); err != nil {
		return NonStatTy, err
	}
	currentBlock := bc.CurrentBlock()

	// Reorganise the chain if the parent is not the head block
	if block.ParentHash() != currentBlock.Hash() {
		if err := bc.reorg(currentBlock, block); err != nil {
			return NonStatTy, err
		}
	}

	// Set new head.
	bc.writeHeadBlock(block)

}
}
if oldBlock == nil {
Expand All @@ -2228,11 +2228,11 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
break
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock)
oldChain = append(oldChain, oldBlock.Header())
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
newChain = append(newChain, newBlock)
newChain = append(newChain, newBlock.Header())

// Step back with both chains
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
Expand Down Expand Up @@ -2261,31 +2261,49 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
// len(newChain) == 0 && len(oldChain) > 0
// rewind the canonical chain to a lower point.
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "oldblocks", len(oldChain), "newnum", newBlock.Number(), "newhash", newBlock.Hash(), "newblocks", len(newChain))
}
// Acquire the tx-lookup lock before mutation. This step is essential
// as the txlookups should be changed atomically, and all subsequent
// reads should be blocked until the mutation is complete.
bc.txLookupLock.Lock()

//var stats runtime.MemStats
//runtime.ReadMemStats(&stats)
//panic(stats.HeapInuse)
karalabe marked this conversation as resolved.
Show resolved Hide resolved

// Insert the new chain segment in incremental order, from the old
// to the new. The new chain head (newChain[0]) is not inserted here,
// as it will be handled separately outside of this function
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i])
newBlock = bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This begins at the latest in the new chain.

Q1: Is the latest block in the new chain already retrievable via bc.GetBlock? I think the answer is Yes.

Q2: Isn't it the same as the incoming parameter newHead? If so, we can use that. Most calls to reorg will only be 1 block, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karalabe I thought you were commenting on this comment, but it's not quite the same, is it?

bc.writeHeadBlock(newBlock)

// Collect the new added transactions.
for _, tx := range newChain[i].Transactions() {
for _, tx := range newBlock.Transactions() {
addedTxs = append(addedTxs, tx.Hash())
}
// Collect the logs and send them in batches of 512.
if logs := bc.collectLogs(newBlock, false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this block. The previous code sent the log removals first, and then the log additions. You changes it so now it sends events for new logs first and them removes old logs.

}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}

// Acquire the tx-lookup lock before mutation. This step is essential
// as the txlookups should be changed atomically, and all subsequent
// reads should be blocked until the mutation is complete.
bc.txLookupLock.Lock()

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
var (
Expand All @@ -2300,7 +2318,7 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// markers greater than or equal to new chain head should be deleted.
number := commonBlock.NumberU64()
if len(newChain) > 1 {
number = newChain[1].NumberU64()
number = newChain[1].Number.Uint64()
}
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
Expand All @@ -2318,15 +2336,16 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
// Release the tx-lookup lock after mutation.
bc.txLookupLock.Unlock()

// Send out events for logs from the old canon chain, and 'reborn'
// logs from the new canon chain. The number of logs can be very
// high, so the events are sent in batches of size around 512.
// Send out events for logs from the old canon chain.
// The number of logs can be very high,
// so the events are sent in batches of size around 512.

// Deleted logs + blocks:
var deletedLogs []*types.Log
for i := len(oldChain) - 1; i >= 0; i-- {
oldBlock = bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
// Collect deleted logs for notification
if logs := bc.collectLogs(oldChain[i], true); len(logs) > 0 {
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
Expand All @@ -2337,21 +2356,6 @@ func (bc *BlockChain) reorg(oldHead *types.Header, newHead *types.Block) error {
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}

// New logs:
var rebirthLogs []*types.Log
for i := len(newChain) - 1; i >= 1; i-- {
if logs := bc.collectLogs(newChain[i], false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}
return nil
}

Expand Down
33 changes: 33 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4231,3 +4231,36 @@ func TestPragueRequests(t *testing.T) {
t.Fatalf("block %d: failed to insert into chain: %v", n, err)
}
}

func BenchmarkReorg(b *testing.B) {
chainLength := b.N

dir := b.TempDir()
db, err := rawdb.NewLevelDBDatabase(dir, 128, 128, "", false)
if err != nil {
b.Fatalf("cannot create temporary database: %v", err)
}
defer db.Close()
gspec := &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{benchRootAddr: {Balance: math.BigPow(2, 254)}},
}
blockchain, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer blockchain.Stop()

// Insert an easy and a difficult chain afterwards
easyBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
diffBlocks, _ := GenerateChain(params.TestChainConfig, blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()), ethash.NewFaker(), db, chainLength, genValueTx(50000))
Comment on lines +4251 to +4253
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what "easy" and "difficult" means here. These chains are of the same length and form.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original meaning was probably that the 'difficult' chain was heavier, and should take precedence over the 'easy'?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, its copied from another test, will rename


if _, err := blockchain.InsertChain(easyBlocks); err != nil {
b.Fatalf("failed to insert easy chain: %v", err)
}
b.ResetTimer()
if _, err := blockchain.InsertChain(diffBlocks); err != nil {
b.Fatalf("failed to insert difficult chain: %v", err)
}
}

// Master: BenchmarkReorg-8 10000 899591 ns/op 820154 B/op 1440 allocs/op 1549443072 bytes of heap used
// WithoutOldChain: BenchmarkReorg-8 10000 1147281 ns/op 943163 B/op 1564 allocs/op 1163870208 bytes of heap used
// WithoutNewChain: BenchmarkReorg-8 10000 1018922 ns/op 943580 B/op 1564 allocs/op 1171890176 bytes of heap used