Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adaptive for loading journal file or journal kv during loadJournal #2406

Merged
merged 4 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type CacheConfig struct {
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.
JournalFilePath string
JournalFile bool

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -190,6 +191,7 @@ func (c *CacheConfig) triedbConfig() *triedb.Config {
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
JournalFilePath: c.JournalFilePath,
JournalFile: c.JournalFile,
}
}
return config
Expand Down
15 changes: 7 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}

// Assemble the Ethereum object
chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config)
chainDb, err := stack.OpenAndMergeDatabase(ChainData, ChainDBNamespace, false, config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,14 +255,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
journalFilePath string
path string
)
if config.JournalFileEnabled {
if stack.CheckIfMultiDataBase() {
path = ChainData + "/state"
} else {
path = ChainData
}
journalFilePath = stack.ResolvePath(path) + "/" + JournalFileName
if stack.CheckIfMultiDataBase() {
path = ChainData + "/state"
} else {
path = ChainData
}
journalFilePath = stack.ResolvePath(path) + "/" + JournalFileName
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
Expand All @@ -281,6 +279,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateScheme: config.StateScheme,
PathSyncFlush: config.PathSyncFlush,
JournalFilePath: journalFilePath,
JournalFile: config.JournalFileEnabled,
}
)
bcOps := make([]core.BlockChainOption, 0)
Expand Down
53 changes: 39 additions & 14 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ const (
DefaultBatchRedundancyRate = 1.1
)

type JournalType int

const (
JournalKVType JournalType = iota
JournalFileType
)

// layer is the interface implemented by all state layers which includes some
// public methods and some additional methods for internal usage.
type layer interface {
Expand Down Expand Up @@ -92,7 +99,7 @@ type layer interface {
// journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer, journalFile bool) error
journal(w io.Writer, journalType JournalType) error
}

// Config contains the settings for database.
Expand All @@ -104,6 +111,7 @@ type Config struct {
ReadOnly bool // Flag whether the database is opened in read only mode.
NoTries bool
JournalFilePath string
JournalFile bool
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -527,23 +535,40 @@ func (db *Database) GetAllRooHash() [][]string {
return data
}

func (db *Database) IsEnableJournalFile() bool {
return len(db.config.JournalFilePath) != 0
// DetermineJournalTypeForWriter is used when persisting the journal. It determines JournalType based on the config passed in by the Config.
func (db *Database) DetermineJournalTypeForWriter() JournalType {
if db.config.JournalFile {
return JournalFileType
} else {
return JournalKVType
}
}

// DetermineJournalTypeForReader is used when loading the journal. It loads based on whether JournalKV or JournalFile currently exists.
func (db *Database) DetermineJournalTypeForReader() JournalType {
if journal := rawdb.ReadTrieJournal(db.diskdb); len(journal) != 0 {
return JournalKVType
}

if fileInfo, stateErr := os.Stat(db.config.JournalFilePath); stateErr == nil && !fileInfo.IsDir() {
return JournalFileType
}

return JournalKVType
}

func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error {
// To prevent any remnants of old journals after converting from JournalKV to JournalFile or vice versa, all deletions must be completed.
rawdb.DeleteTrieJournal(writer)

// delete from journal file, may not exist
filePath := db.config.JournalFilePath
if len(filePath) == 0 {
rawdb.DeleteTrieJournal(writer)
} else {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return err
}
errRemove := os.Remove(filePath)
if errRemove != nil {
log.Crit("Failed to remove tries journal", "journal path", filePath, "err", err)
}
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil
}
errRemove := os.Remove(filePath)
if errRemove != nil {
log.Crit("Failed to remove tries journal", "journal path", filePath, "err", errRemove)
}
return nil
}
2 changes: 1 addition & 1 deletion triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,6 @@ func BenchmarkJournal(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
layer.journal(new(bytes.Buffer), false)
layer.journal(new(bytes.Buffer), JournalKVType)
}
}
34 changes: 18 additions & 16 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (kr *JournalKVReader) Read(p []byte) (n int, err error) {
func (kr *JournalKVReader) Close() {
}

func newJournalWriter(file string, db ethdb.Database) JournalWriter {
if len(file) == 0 {
func newJournalWriter(file string, db ethdb.Database, journalType JournalType) JournalWriter {
log.Info("New journal writer", "path", file, "journalType", journalType)
if journalType == JournalKVType {
return &JournalKVWriter{
diskdb: db,
}
Expand All @@ -167,8 +168,9 @@ func newJournalWriter(file string, db ethdb.Database) JournalWriter {
}
}

func newJournalReader(file string, db ethdb.Database) (JournalReader, error) {
if len(file) == 0 {
func newJournalReader(file string, db ethdb.Database, journalType JournalType) (JournalReader, error) {
log.Info("New journal reader", "path", file, "journalType", journalType)
if journalType == JournalKVType {
journal := rawdb.ReadTrieJournal(db)
if len(journal) == 0 {
return nil, errMissJournal
Expand All @@ -193,7 +195,7 @@ func newJournalReader(file string, db ethdb.Database) (JournalReader, error) {
// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
start := time.Now()
reader, err := newJournalReader(db.config.JournalFilePath, db.diskdb)
reader, err := newJournalReader(db.config.JournalFilePath, db.diskdb, db.DetermineJournalTypeForReader())

if err != nil {
return nil, err
Expand Down Expand Up @@ -267,7 +269,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
}
Expand Down Expand Up @@ -308,7 +310,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}

if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
Expand All @@ -334,7 +336,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
if err := r.Decode(&journalEncodedBuff); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
Expand Down Expand Up @@ -407,7 +409,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
storages[entry.Account] = set
}

if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
Expand All @@ -426,7 +428,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand Down Expand Up @@ -460,7 +462,7 @@ func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
}

// Store the journal buf into w and calculate checksum
if journalFile {
if journalType == JournalFileType {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
Expand All @@ -480,12 +482,12 @@ func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {

// journal implements the layer interface, writing the memory layer contents
// into a buffer to be stored in the database as the layer journal.
func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
func (dl *diffLayer) journal(w io.Writer, journalType JournalType) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

// journal the parent first
if err := dl.parent.journal(w, journalFile); err != nil {
if err := dl.parent.journal(w, journalType); err != nil {
return err
}
// Create a buffer to store encoded data
Expand Down Expand Up @@ -535,7 +537,7 @@ func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
}

// Store the journal buf into w and calculate checksum
if journalFile {
if journalType == JournalFileType {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
Expand Down Expand Up @@ -583,7 +585,7 @@ func (db *Database) Journal(root common.Hash) error {
}
// Firstly write out the metadata of journal
db.DeleteTrieJournal(db.diskdb)
journal := newJournalWriter(db.config.JournalFilePath, db.diskdb)
journal := newJournalWriter(db.config.JournalFilePath, db.diskdb, db.DetermineJournalTypeForWriter())
defer journal.Close()

if err := rlp.Encode(journal, journalVersion); err != nil {
Expand All @@ -600,7 +602,7 @@ func (db *Database) Journal(root common.Hash) error {
return err
}
// Finally write out the journal of each layer in reverse order.
if err := l.journal(journal, db.IsEnableJournalFile()); err != nil {
if err := l.journal(journal, db.DetermineJournalTypeForWriter()); err != nil {
return err
}
// Store the journal into the database and return
Expand Down
Loading