Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb, cmd, ethdb, eth: implement freezer tail deletion #23954

Merged
merged 10 commits into from
Mar 10, 2022
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if num+1 <= frozen {
// Truncate all relative data(header, total difficulty, body, receipt
// and canonical hash) from ancient store.
if err := bc.db.TruncateAncients(num); err != nil {
if err := bc.db.TruncateHead(num); err != nil {
log.Crit("Failed to truncate ancient data", "number", num, "err", err)
}
// Remove the hash <-> number mapping from the active store.
Expand Down Expand Up @@ -991,7 +991,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
if err := bc.db.TruncateHead(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
Expand All @@ -1009,7 +1009,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type NumberHash struct {
Hash common.Hash
}

// ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights,
// both canonical and reorged forks included.
// ReadAllHashesInRange retrieves all the hashes assigned to blocks at certain
// heights, both canonical and reorged forks included.
// This method considers both limits to be _inclusive_.
func ReadAllHashesInRange(db ethdb.Iteratee, first, last uint64) []*NumberHash {
var (
Expand Down Expand Up @@ -776,7 +776,7 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}

// WriteAncientBlock writes entire block data into ancient store and returns the total written size.
// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
Expand Down
16 changes: 13 additions & 3 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}

// Tail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Tail() (uint64, error) {
return 0, errNotSupported
}

// AncientSize returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
Expand All @@ -109,8 +114,13 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}

// TruncateAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateAncients(items uint64) error {
// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) error {
return errNotSupported
}

// TruncateTail returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
}

Expand Down Expand Up @@ -211,7 +221,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// Block #1 is still in the database, we're allowed to init a new feezer
}
// Otherwise, the head header is still the genesis, we're allowed to init a new
// feezer.
// freezer.
}
}
// Freezer is consistent with the key-value database, permit combining the two
Expand Down
56 changes: 46 additions & 10 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
freezerTableSize = 2 * 1000 * 1000 * 1000
)

// freezer is an memory mapped append-only database to store immutable chain data
// freezer is a memory mapped append-only database to store immutable chain data
// into flat files:
//
// - The append only nature ensures that disk writes are minimized.
Expand All @@ -78,6 +78,7 @@ type freezer struct {
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

// This lock synchronizes writers and the truncate operation, as well as
Expand Down Expand Up @@ -226,6 +227,11 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}

// Tail returns the number of first stored item in the freezer.
func (f *freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
}

// AncientSize returns the ancient size of the specified category.
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
Expand Down Expand Up @@ -261,7 +267,7 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncate(prevItem)
err := table.truncateHead(prevItem)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err)
}
Expand All @@ -281,8 +287,8 @@ func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
return writeSize, nil
}

// TruncateAncients discards any recent data above the provided threshold number.
func (f *freezer) TruncateAncients(items uint64) error {
// TruncateHead discards any recent data above the provided threshold number.
func (f *freezer) TruncateHead(items uint64) error {
if f.readonly {
return errReadOnly
}
Expand All @@ -293,14 +299,34 @@ func (f *freezer) TruncateAncients(items uint64) error {
return nil
}
for _, table := range f.tables {
if err := table.truncate(items); err != nil {
if err := table.truncateHead(items); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, items)
return nil
}

// TruncateTail discards any recent data below the provided threshold number.
func (f *freezer) TruncateTail(tail uint64) error {
if f.readonly {
return errReadOnly
}
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.tail) >= tail {
return nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.tail, tail)
return nil
}

// Sync flushes all data tables to disk.
func (f *freezer) Sync() error {
var errs []error
Expand Down Expand Up @@ -344,19 +370,29 @@ func (f *freezer) validate() error {

// repair truncates all data tables to the same length.
func (f *freezer) repair() error {
min := uint64(math.MaxUint64)
var (
head = uint64(math.MaxUint64)
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if min > items {
min = items
if head > items {
head = items
}
if table.tail() > tail {
tail = table.tail()
}
}
for _, table := range f.tables {
if err := table.truncate(min); err != nil {
if err := table.truncateHead(head); err != nil {
return err
}
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, min)
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]

// Write index.
// Write indices.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
Expand Down
Loading