From 8a048dc5fba8658101916d2dab00356406298339 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 29 Mar 2022 18:56:46 +0200 Subject: [PATCH] core: make reorg use less memory --- core/blockchain.go | 53 ++++++++++++++++++++++++++------------- core/types/transaction.go | 18 +++++++++++++ 2 files changed, 53 insertions(+), 18 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5ac12303cf73..d83f9678ae21 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1960,13 +1960,18 @@ func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log { // Note the new head block won't be processed here, callers need to handle it // externally. func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { + type block struct { + hash common.Hash + number uint64 + } + var ( - newChain types.Blocks + newChain []block oldChain types.Blocks commonBlock *types.Block - deletedTxs types.Transactions - addedTxs types.Transactions + deletedTxs []common.Hash + addedTxs []common.Hash deletedLogs [][]*types.Log rebirthLogs [][]*types.Log @@ -1976,7 +1981,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // Old chain is longer, gather all transactions and logs as deleted ones for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) { oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) + for _, tx := range oldBlock.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } // Collect deleted logs for notification logs := bc.collectLogs(oldBlock.Hash(), true) @@ -1987,7 +1994,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } else { // New chain is longer, stash all blocks away for subsequent insertion for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) { - newChain = append(newChain, newBlock) + newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) } } if oldBlock == nil { @@ -2006,14 +2013,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } // Remove an old block as well as stash away a new block oldChain = append(oldChain, oldBlock) - deletedTxs = append(deletedTxs, oldBlock.Transactions()...) + for _, tx := range oldBlock.Transactions() { + deletedTxs = append(deletedTxs, tx.Hash()) + } // Collect deleted logs for notification logs := bc.collectLogs(oldBlock.Hash(), true) if len(logs) > 0 { deletedLogs = append(deletedLogs, logs) } - newChain = append(newChain, newBlock) + newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()}) // Step back with both chains oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) @@ -2034,14 +2043,14 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { logFn = log.Warn } logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(), - "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash()) + "drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].hash) blockReorgAddMeter.Mark(int64(len(newChain))) blockReorgDropMeter.Mark(int64(len(oldChain))) blockReorgMeter.Mark(1) } else if len(newChain) > 0 { // Special case happens in the post merge stage that current head is // the ancestor of new head while these two blocks are not consecutive - log.Info("Extend chain", "add", len(newChain), "number", newChain[0].NumberU64(), "hash", newChain[0].Hash()) + log.Info("Extend chain", "add", len(newChain), "number", newChain[0].number, "hash", newChain[0].hash) blockReorgAddMeter.Mark(int64(len(newChain))) } else { // len(newChain) == 0 && len(oldChain) > 0 @@ -2052,21 +2061,20 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 1; i-- { // Insert the block in the canonical way, re-writing history - bc.writeHeadBlock(newChain[i]) + block := bc.GetBlock(newChain[i].hash, newChain[i].number) - // Collect reborn logs due to chain reorg - logs := bc.collectLogs(newChain[i].Hash(), false) - if len(logs) > 0 { - rebirthLogs = append(rebirthLogs, logs) - } + bc.writeHeadBlock(block) // Collect the new added transactions. - addedTxs = append(addedTxs, newChain[i].Transactions()...) + for _, tx := range block.Transactions() { + addedTxs = append(addedTxs, tx.Hash()) + } } + // Delete useless indexes right now which includes the non-canonical // transaction indexes, canonical chain indexes which above the head. indexesBatch := bc.db.NewBatch() - for _, tx := range types.TxDifference(deletedTxs, addedTxs) { - rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) + for _, tx := range types.TxDifferenceHash(deletedTxs, addedTxs) { + rawdb.DeleteTxLookupEntry(indexesBatch, tx) } // Delete any canonical number assignments above the new head number := bc.CurrentBlock().NumberU64() @@ -2080,6 +2088,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { if err := indexesBatch.Write(); err != nil { log.Crit("Failed to delete useless indexes", "err", err) } + + // Collect the logs + for i := len(newChain) - 1; i >= 1; i-- { + // Collect reborn logs due to chain reorg + logs := bc.collectLogs(newChain[i].hash, false) + if len(logs) > 0 { + rebirthLogs = append(rebirthLogs, logs) + } + } // If any logs need to be fired, do it now. In theory we could avoid creating // this goroutine if there are no events to fire, but realistcally that only // ever happens if we're reorging empty blocks, which will only happen on idle diff --git a/core/types/transaction.go b/core/types/transaction.go index 29820a0d785f..2fb3da729798 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -432,6 +432,24 @@ func TxDifference(a, b Transactions) Transactions { return keep } +// TxDifferenceHash returns a new set which is the difference between a and b. +func TxDifferenceHash(a, b []common.Hash) []common.Hash { + keep := make([]common.Hash, 0, len(a)) + + remove := make(map[common.Hash]struct{}) + for _, tx := range b { + remove[tx] = struct{}{} + } + + for _, tx := range a { + if _, ok := remove[tx]; !ok { + keep = append(keep, tx) + } + } + + return keep +} + // TxByNonce implements the sort interface to allow sorting a list of transactions // by their nonces. This is usually only useful for sorting transactions from a // single account, otherwise a nonce comparison doesn't make much sense.