Skip to content

Commit

Permalink
core: adapter some cmd for block store
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjunLi committed Mar 6, 2024
1 parent 71b1b41 commit 4eb0a22
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 11 deletions.
26 changes: 26 additions & 0 deletions cmd/geth/dbcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,10 @@ func dbStats(ctx *cli.Context) error {
fmt.Println("show stats of state store")
showLeveldbStats(db.StateStore())
}
if db.BlockStore() != db {
fmt.Println("show stats of block store")
showLeveldbStats(db.BlockStore())
}

return nil
}
Expand All @@ -590,6 +594,11 @@ func dbCompact(ctx *cli.Context) error {
showLeveldbStats(statediskdb)
}

if db.BlockStore() != db {
fmt.Println("show stats of block store")
showLeveldbStats(db.BlockStore())
}

log.Info("Triggering compaction")
if err := db.Compact(nil, nil); err != nil {
log.Error("Compact err", "error", err)
Expand All @@ -602,13 +611,23 @@ func dbCompact(ctx *cli.Context) error {
return err
}
}
if db.BlockStore() != db {
if err := db.BlockStore().Compact(nil, nil); err != nil {
log.Error("Compact err", "error", err)
return err
}
}

log.Info("Stats after compaction")
showLeveldbStats(db)
if statediskdb != nil {
fmt.Println("show stats of state store")
showLeveldbStats(statediskdb)
}
if db.BlockStore() != db {
fmt.Println("show stats of block store")
showLeveldbStats(db.BlockStore())
}
return nil
}

Expand Down Expand Up @@ -640,6 +659,13 @@ func dbGet(ctx *cli.Context) error {
return nil
}
}
if db.BlockStore() != db {
if blockdata, dberr := db.BlockStore().Get(key); dberr != nil {
fmt.Printf("key %#x: %#x\n", key, blockdata)
return nil
}
}

log.Info("Get operation failed", "key", fmt.Sprintf("%#x", key), "error", err)
return err
}
Expand Down
15 changes: 15 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,10 @@ func MakeChainDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFree
statediskdb := MakeStateDataBase(ctx, stack, readonly, false)
chainDb.SetStateStore(statediskdb)
}
if stack.HasSeparateBlockDir() && err == nil {
blockdb := MakeBlockDatabase(ctx, stack, readonly, false)
chainDb.SetBlockStore(blockdb)
}
}
if err != nil {
Fatalf("Could not open database: %v", err)
Expand All @@ -2351,6 +2355,17 @@ func MakeStateDataBase(ctx *cli.Context, stack *node.Node, readonly, disableFree
return statediskdb
}

// MakeBlockDatabase open a separate block database using the flags passed to the client and will hard crash if it fails.
func MakeBlockDatabase(ctx *cli.Context, stack *node.Node, readonly, disableFreeze bool) ethdb.Database {
cache := ctx.Int(CacheFlag.Name) * ctx.Int(CacheDatabaseFlag.Name) / 100
handles := MakeDatabaseHandles(ctx.Int(FDLimitFlag.Name)) / 10
blockdb, err := stack.OpenDatabaseWithFreezer("chaindata", cache, handles, "", "", readonly, disableFreeze, false, false, false, true)
if err != nil {
Fatalf("Failed to open separate block database: %v", err)
}
return blockdb
}

// tryMakeReadOnlyDatabase try to open the chain database in read-only mode,
// or fallback to write mode if the database is not initialized.
//
Expand Down
20 changes: 10 additions & 10 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error {
// pile them onto the existing chain. Otherwise, do the necessary
// reorgs.
var (
first = headers[0]
last = headers[len(headers)-1]
batch = hc.chainDb.BlockStore().NewBatch()
first = headers[0]
last = headers[len(headers)-1]
blockBatch = hc.chainDb.BlockStore().NewBatch()
)
if first.ParentHash != hc.currentHeaderHash {
// Delete any canonical number assignments above the new head
Expand All @@ -183,7 +183,7 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error {
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(batch, i)
rawdb.DeleteCanonicalHash(blockBatch, i)
}
// Overwrite any stale canonical number assignments, going
// backwards from the first header in this import until the
Expand All @@ -194,7 +194,7 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error {
headHash = header.Hash()
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(batch, headHash, headNumber)
rawdb.WriteCanonicalHash(blockBatch, headHash, headNumber)
if headNumber == 0 {
break // It shouldn't be reached
}
Expand All @@ -209,16 +209,16 @@ func (hc *HeaderChain) Reorg(headers []*types.Header) error {
for i := 0; i < len(headers)-1; i++ {
hash := headers[i+1].ParentHash // Save some extra hashing
num := headers[i].Number.Uint64()
rawdb.WriteCanonicalHash(batch, hash, num)
rawdb.WriteHeadHeaderHash(batch, hash)
rawdb.WriteCanonicalHash(blockBatch, hash, num)
rawdb.WriteHeadHeaderHash(blockBatch, hash)
}
// Write the last header
hash := headers[len(headers)-1].Hash()
num := headers[len(headers)-1].Number.Uint64()
rawdb.WriteCanonicalHash(batch, hash, num)
rawdb.WriteHeadHeaderHash(batch, hash)
rawdb.WriteCanonicalHash(blockBatch, hash, num)
rawdb.WriteHeadHeaderHash(blockBatch, hash)

if err := batch.Write(); err != nil {
if err := blockBatch.Write(); err != nil {
return err
}
// Last step update all in-memory head header markers
Expand Down
57 changes: 56 additions & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,15 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
defer it.Release()

var trieIter ethdb.Iterator
var blockIter ethdb.Iterator
if db.StateStore() != nil {
trieIter = db.StateStore().NewIterator(keyPrefix, nil)
defer trieIter.Release()
}
if db.BlockStore() != db {
blockIter = db.BlockStore().NewIterator(keyPrefix, nil)
defer blockIter.Release()
}
var (
count int64
start = time.Now()
Expand Down Expand Up @@ -810,6 +815,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
}
if !accounted {
unaccounted.Add(size)
log.Info("unaccounted 000", "key", string(key), "size", size)
}
}
count++
Expand Down Expand Up @@ -841,14 +847,16 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
default:
var accounted bool
for _, meta := range [][]byte{
fastTrieProgressKey, persistentStateIDKey, trieJournalKey} {
fastTrieProgressKey, persistentStateIDKey, trieJournalKey, snapSyncStatusFlagKey} {
if bytes.Equal(key, meta) {
metadata.Add(size)
accounted = true
break
}
}
if !accounted {
unaccounted.Add(size)
log.Info("unaccounted 111", "key", string(key), "size", size)
}
}
count++
Expand All @@ -858,6 +866,53 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
}
}
}
// inspect separate block db
if blockIter != nil {
count = 0
logged = time.Now()
log.Info("Inspecting separate state database", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))

for blockIter.Next() {
var (
key = blockIter.Key()
value = blockIter.Value()
size = common.StorageSize(len(key) + len(value))
)

switch {
case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength):
headers.Add(size)
case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength):
bodies.Add(size)
case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength):
receipts.Add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix):
tds.Add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix):
numHashPairings.Add(size)
case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength):
hashNumPairings.Add(size)
default:
var accounted bool
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} {
if bytes.Equal(key, meta) {
metadata.Add(size)
accounted = true
break
}
}
if !accounted {
unaccounted.Add(size)
log.Info("unaccounted 222", "key", string(key), "size", size)
}
}
count++
if count%1000 == 0 && time.Since(logged) > 8*time.Second {
log.Info("Inspecting separate block database", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
}
}
// Display the database statistic of key-value store.
stats := [][]string{
{"Key-Value store", "Headers", headers.Size(), headers.Count()},
Expand Down
10 changes: 10 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,16 @@ func (n *Node) HasSeparateTrieDir() bool {
return fileInfo.IsDir()
}

// HasSeparateBlockDir check the block subdirectory of db, if subdirectory exists, return true
func (n *Node) HasSeparateBlockDir() bool {
separateDir := filepath.Join(n.ResolvePath("chaindata"), "block")
fileInfo, err := os.Stat(separateDir)
if os.IsNotExist(err) {
return false
}
return fileInfo.IsDir()
}

func (n *Node) OpenDiffDatabase(name string, handles int, diff, namespace string, readonly bool) (*leveldb.Database, error) {
n.lock.Lock()
defer n.lock.Unlock()
Expand Down

0 comments on commit 4eb0a22

Please sign in to comment.