Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie/triedb/pathdb: improve dirty node flushing trigger #28426

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions trie/triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,15 @@ type tester struct {
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte
}

func newTester(t *testing.T) *tester {
func newTester(t *testing.T, historyLimit uint64) *tester {
var (
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{CleanCacheSize: 256 * 1024, DirtyCacheSize: 256 * 1024})
obj = &tester{
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
DirtyCacheSize: 256 * 1024,
})
obj = &tester{
db: db,
preimages: make(map[common.Hash]common.Address),
accounts: make(map[common.Hash][]byte),
Expand Down Expand Up @@ -376,7 +380,7 @@ func (t *tester) bottomIndex() int {

func TestDatabaseRollback(t *testing.T) {
// Verify state histories
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.verifyHistory(); err != nil {
Expand All @@ -402,7 +406,7 @@ func TestDatabaseRollback(t *testing.T) {

func TestDatabaseRecoverable(t *testing.T) {
var (
tester = newTester(t)
tester = newTester(t, 0)
index = tester.bottomIndex()
)
defer tester.release()
Expand Down Expand Up @@ -440,7 +444,7 @@ func TestDatabaseRecoverable(t *testing.T) {
}

func TestDisable(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

_, stored := rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)
Expand Down Expand Up @@ -476,7 +480,7 @@ func TestDisable(t *testing.T) {
}

func TestCommit(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Commit(tester.lastHash(), false); err != nil {
Expand All @@ -500,7 +504,7 @@ func TestCommit(t *testing.T) {
}

func TestJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand All @@ -524,7 +528,7 @@ func TestJournal(t *testing.T) {
}

func TestCorruptedJournal(t *testing.T) {
tester := newTester(t)
tester := newTester(t, 0)
defer tester.release()

if err := tester.db.Journal(tester.lastHash()); err != nil {
Expand Down Expand Up @@ -553,6 +557,35 @@ func TestCorruptedJournal(t *testing.T) {
}
}

// TestTailTruncateHistory function is designed to test a specific edge case where,
// when history objects are removed from the end, it should trigger a state flush
// if the ID of the new tail object is even higher than the persisted state ID.
//
// For example, let's say the ID of the persistent state is 10, and the current
// history objects range from ID(5) to ID(15). As we accumulate six more objects,
// the history will expand to cover ID(11) to ID(21). ID(11) then becomes the
// oldest history object, and its ID is even higher than the stored state.
//
// In this scenario, it is mandatory to update the persistent state before
// truncating the tail histories. This ensures that the ID of the persistent state
// always falls within the range of [oldest-history-id, latest-history-id].
func TestTailTruncateHistory(t *testing.T) {
tester := newTester(t, 10)
defer tester.release()

tester.db.Close()
tester.db = New(tester.db.diskdb, &Config{StateHistory: 10})

head, err := tester.db.freezer.Ancients()
if err != nil {
t.Fatalf("Failed to obtain freezer head")
}
stored := rawdb.ReadPersistentStateID(tester.db.diskdb)
if head != stored {
t.Fatalf("Failed to truncate excess history object above, stored: %d, head: %d", stored, head)
}
}

// copyAccounts returns a deep-copied account set of the provided one.
func copyAccounts(set map[common.Hash][]byte) map[common.Hash][]byte {
copied := make(map[common.Hash][]byte, len(set))
Expand Down
58 changes: 43 additions & 15 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,37 +172,65 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
dl.lock.Lock()
defer dl.lock.Unlock()

// Construct and store the state history first. If crash happens
// after storing the state history but without flushing the
// corresponding states(journal), the stored state history will
// be truncated in the next restart.
// Construct and store the state history first. If crash happens after storing
// the state history but without flushing the corresponding states(journal),
// the stored state history will be truncated from head in the next restart.
var (
overflow bool
oldest uint64
)
if dl.db.freezer != nil {
err := writeHistory(dl.db.diskdb, dl.db.freezer, bottom, dl.db.config.StateHistory)
err := writeHistory(dl.db.freezer, bottom)
if err != nil {
return nil, err
}
// Determine if the persisted history object has exceeded the configured
// limitation, set the overflow as true if so.
tail, err := dl.db.freezer.Tail()
if err != nil {
return nil, err
}
limit := dl.db.config.StateHistory
if limit != 0 && bottom.stateID()-tail > limit {
overflow = true
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
}
}
// Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true

// Store the root->id lookup afterwards. All stored lookups are
// identified by the **unique** state root. It's impossible that
// in the same chain blocks are not adjacent but have the same
// root.
// Store the root->id lookup afterwards. All stored lookups are identified
// by the **unique** state root. It's impossible that in the same chain
// blocks are not adjacent but have the same root.
if dl.id == 0 {
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

// Construct a new disk layer by merging the nodes from the provided
// diff layer, and flush the content in disk layer if there are too
// many nodes cached. The clean cache is inherited from the original
// disk layer for reusing.
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))
err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force)
if err != nil {

// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting implicit thing here is that nodebuffer.flush() will "surely" push the disk layer beyond oldest-1. This is kind of true, most of the time, since only the 128 diff layers re main after the flush and limit in theory is more like 90K.

Thus two questions/requests:

  • Would be nice to maybe mention this fact in the comments.
  • What happens if limit is configured to be 64? Perhaps we should forbid the limit being below the diff layer count (apart from 0 meaning infinite)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no oldest = bottom.stateID() - limit + 1, so flushing everything will move the disk layer to bottom.stateID().

In that case oldest - 1 will be bottom.stateID() - limit + 1 - 1 == bottom.stateID() - limit. So anything above 0 limit should be ok.

if overflow {
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.freezer, oldest-1)
if err != nil {
return nil, err
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
return ndl, nil
}

Expand Down
26 changes: 8 additions & 18 deletions trie/triedb/pathdb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,38 +512,28 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
return &dec, nil
}

// writeHistory writes the state history with provided state set. After
// storing the corresponding state history, it will also prune the stale
// histories from the disk with the given threshold.
func writeHistory(db ethdb.KeyValueStore, freezer *rawdb.ResettableFreezer, dl *diffLayer, limit uint64) error {
// writeHistory persists the state history with the provided state set.
func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
// Short circuit if state set is not available.
if dl.states == nil {
return errors.New("state change set is not available")
}
var (
err error
n int
start = time.Now()
h = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
start = time.Now()
history = newHistory(dl.rootHash(), dl.parentLayer().rootHash(), dl.block, dl.states)
)
accountData, storageData, accountIndex, storageIndex := h.encode()
accountData, storageData, accountIndex, storageIndex := history.encode()
dataSize := common.StorageSize(len(accountData) + len(storageData))
indexSize := common.StorageSize(len(accountIndex) + len(storageIndex))

// Write history data into five freezer table respectively.
rawdb.WriteStateHistory(freezer, dl.stateID(), h.meta.encode(), accountIndex, storageIndex, accountData, storageData)
rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData)

// Prune stale state histories based on the config.
if limit != 0 && dl.stateID() > limit {
n, err = truncateFromTail(db, freezer, dl.stateID()-limit)
if err != nil {
return err
}
}
historyDataBytesMeter.Mark(int64(dataSize))
historyIndexBytesMeter.Mark(int64(indexSize))
historyBuildTimeMeter.UpdateSince(start)
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "pruned", n, "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Stored state history", "id", dl.stateID(), "block", dl.block, "data", dataSize, "index", indexSize, "elapsed", common.PrettyDuration(time.Since(start)))

return nil
}

Expand Down