Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ledger] Replace LRU cache with a FIFO queue (circular buffer) #2893

1 change: 0 additions & 1 deletion ledger/complete/checkpoint_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func BenchmarkLoadCheckpointAndWALs(b *testing.B) {
return err
},
func(rootHash ledger.RootHash) error {
forest.RemoveTrie(rootHash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With removal of removal, maybe we can get out of it completely, remove WALDelete (and WALUpdate since now we have only one operation, and all related code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need it for reading old WALs when we spork, we could get rid of it after next spork.

return nil
},
)
Expand Down
26 changes: 4 additions & 22 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const defaultTrieUpdateChanSize = 500
// Ledger is fork-aware which means any update can be applied at any previous state which forms a tree of tries (forest).
// The forest is in memory but all changes (e.g. register updates) are captured inside write-ahead-logs for crash recovery reasons.
// In order to limit the memory usage and maintain the performance storage only keeps a limited number of
// tries and purge the old ones (LRU-based); in other words, Ledger is not designed to be used
// tries and purge the old ones (FIFO-based); in other words, Ledger is not designed to be used
// for archival usage but make it possible for other software components to reconstruct very old tries using write-ahead logs.
type Ledger struct {
forest *mtrie.Forest
Expand All @@ -53,12 +53,7 @@ func NewLedger(

logger := log.With().Str("ledger", "complete").Logger()

forest, err := mtrie.NewForest(capacity, metrics, func(evictedTrie *trie.MTrie) {
err := wal.RecordDelete(evictedTrie.RootHash())
if err != nil {
logger.Error().Err(err).Msg("failed to save delete record in wal")
}
})
forest, err := mtrie.NewForest(capacity, metrics, nil)
if err != nil {
return nil, fmt.Errorf("cannot create forest: %w", err)
}
Expand Down Expand Up @@ -354,7 +349,7 @@ func (l *Ledger) ExportCheckpointAt(
Str("hash", rh.String()).
Msgf("Most recently touched root hash.")
return ledger.State(hash.DummyHash),
fmt.Errorf("cannot get try at the given state commitment: %w", err)
fmt.Errorf("cannot get trie at the given state commitment: %w", err)
}

// clean up tries to release memory
Expand Down Expand Up @@ -501,20 +496,7 @@ func (l *Ledger) keepOnlyOneTrie(state ledger.State) error {
// don't write things to WALs
l.wal.PauseRecord()
defer l.wal.UnpauseRecord()

allTries, err := l.forest.GetTries()
if err != nil {
return err
}

targetRootHash := ledger.RootHash(state)
for _, trie := range allTries {
trieRootHash := trie.RootHash()
if trieRootHash != targetRootHash {
l.forest.RemoveTrie(trieRootHash)
}
}
return nil
return l.forest.PurgeCacheExcept(ledger.RootHash(state))
}

func runReport(r ledger.Reporter, p []ledger.Payload, commit ledger.State, l zerolog.Logger) error {
Expand Down
5 changes: 0 additions & 5 deletions ledger/complete/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ func Test_WAL(t *testing.T) {
assert.NoError(t, err)
state, _, err = led.Set(update)
require.NoError(t, err)
fmt.Printf("Updated with %x\n", state)

data := make(map[string]ledger.Value, len(keys))
for j, key := range keys {
Expand Down Expand Up @@ -581,10 +580,6 @@ func Test_WAL(t *testing.T) {
}
}

// test deletion
s := led2.ForestSize()
assert.Equal(t, s, size)

Comment on lines -584 to -587
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do removals any more so this part is not relevant anymore

<-led2.Done()
<-compactor2.Done()
})
Expand Down
96 changes: 27 additions & 69 deletions ledger/complete/mtrie/forest.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package mtrie

import (
"errors"
"fmt"

lru "github.com/hashicorp/golang-lru"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/hash"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
Expand All @@ -27,7 +24,7 @@ type Forest struct {
// tries stores all MTries in the forest. It is NOT a CACHE in the conventional sense:
// there is no mechanism to load a trie from disk in case of a cache miss. Missing a
// needed trie in the forest might cause a fatal application logic error.
tries *lru.Cache
tries *TrieCache
forestCapacity int
onTreeEvicted func(tree *trie.MTrie)
metrics module.LedgerMetrics
Expand All @@ -36,38 +33,19 @@ type Forest struct {
// NewForest returns a new instance of memory forest.
//
// CAUTION on forestCapacity: the specified capacity MUST be SUFFICIENT to store all needed MTries in the forest.
// If more tries are added than the capacity, the Least Recently Used trie is removed (evicted) from the Forest.
// THIS IS A ROUGH HEURISTIC as it might evict tries that are still needed.
// If more tries are added than the capacity, the Least Recently Added trie is removed (evicted) from the Forest (FIFO queue).
// Make sure you chose a sufficiently large forestCapacity, such that, when reaching the capacity, the
// Least Recently Used trie will never be needed again.
// Least Recently Added trie will never be needed again.
func NewForest(forestCapacity int, metrics module.LedgerMetrics, onTreeEvicted func(tree *trie.MTrie)) (*Forest, error) {
// init LRU cache as a SHORTCUT for a usage-related storage eviction policy
var cache *lru.Cache
var err error
if onTreeEvicted != nil {
cache, err = lru.NewWithEvict(forestCapacity, func(key interface{}, value interface{}) {
trie, ok := value.(*trie.MTrie)
if !ok {
panic(fmt.Sprintf("cache contains item of type %T", value))
}
onTreeEvicted(trie)
})
} else {
cache, err = lru.New(forestCapacity)
}
if err != nil {
return nil, fmt.Errorf("cannot create forest cache: %w", err)
}

forest := &Forest{tries: cache,
forest := &Forest{tries: NewTrieCache(uint(forestCapacity), onTreeEvicted),
forestCapacity: forestCapacity,
onTreeEvicted: onTreeEvicted,
metrics: metrics,
}

// add trie with no allocated registers
emptyTrie := trie.NewEmptyMTrie()
err = forest.AddTrie(emptyTrie)
err := forest.AddTrie(emptyTrie)
if err != nil {
return nil, fmt.Errorf("adding empty trie to forest failed: %w", err)
}
Expand Down Expand Up @@ -333,33 +311,15 @@ func (f *Forest) HasTrie(rootHash ledger.RootHash) bool {
// warning, use this function for read-only operation
func (f *Forest) GetTrie(rootHash ledger.RootHash) (*trie.MTrie, error) {
// if in memory
if ent, found := f.tries.Get(rootHash); found {
trie, ok := ent.(*trie.MTrie)
if !ok {
return nil, fmt.Errorf("forest contains an element of a wrong type")
}
if trie, found := f.tries.Get(rootHash); found {
return trie, nil
}
return nil, fmt.Errorf("trie with the given rootHash %s not found", rootHash)
}

// GetTries returns list of currently cached tree root hashes
func (f *Forest) GetTries() ([]*trie.MTrie, error) {
// ToDo needs concurrency safety
keys := f.tries.Keys()
tries := make([]*trie.MTrie, len(keys))
for i, key := range keys {
t, ok := f.tries.Get(key)
if !ok {
return nil, errors.New("concurrent Forest modification")
}
trie, ok := t.(*trie.MTrie)
if !ok {
return nil, errors.New("forest contains an element of a wrong type")
}
tries[i] = trie
}
return tries, nil
return f.tries.Tries(), nil
}

// AddTries adds a trie to the forest
Expand All @@ -381,44 +341,42 @@ func (f *Forest) AddTrie(newTrie *trie.MTrie) error {

// TODO: check Thread safety
rootHash := newTrie.RootHash()
if storedTrie, found := f.tries.Get(rootHash); found {
trie, ok := storedTrie.(*trie.MTrie)
if !ok {
return fmt.Errorf("forest contains an element of a wrong type")
}
if trie.Equals(newTrie) {
return nil
}
return fmt.Errorf("forest already contains a tree with same root hash but other properties")
if _, found := f.tries.Get(rootHash); found {
// do no op
return nil
}
f.tries.Add(rootHash, newTrie)
f.metrics.ForestNumberOfTrees(uint64(f.tries.Len()))
f.tries.Push(newTrie)
f.metrics.ForestNumberOfTrees(uint64(f.tries.Count()))

return nil
}

// RemoveTrie removes a trie to the forest
func (f *Forest) RemoveTrie(rootHash ledger.RootHash) {
// TODO remove from the file as well
f.tries.Remove(rootHash)
f.metrics.ForestNumberOfTrees(uint64(f.tries.Len()))
}

// GetEmptyRootHash returns the rootHash of empty Trie
func (f *Forest) GetEmptyRootHash() ledger.RootHash {
return trie.EmptyTrieRootHash()
}

// MostRecentTouchedRootHash returns the rootHash of the most recently touched trie
func (f *Forest) MostRecentTouchedRootHash() (ledger.RootHash, error) {
keys := f.tries.Keys()
if len(keys) > 0 {
return keys[len(keys)-1].(ledger.RootHash), nil
trie := f.tries.LastAddedTrie()
if trie != nil {
return trie.RootHash(), nil
}
return ledger.RootHash(hash.DummyHash), fmt.Errorf("no trie is stored in the forest")
}

// PurgeCacheExcept removes all tries in the memory except the one with the given root hash
func (f *Forest) PurgeCacheExcept(rootHash ledger.RootHash) error {
trie, found := f.tries.Get(rootHash)
if !found {
return fmt.Errorf("trie with the given root hash not found")
}
f.tries.Purge()
f.tries.Push(trie)
return nil
}

// Size returns the number of active tries in this store
func (f *Forest) Size() int {
return f.tries.Len()
return f.tries.Count()
}
52 changes: 48 additions & 4 deletions ledger/complete/mtrie/forest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func TestTrieOperations(t *testing.T) {
require.NoError(t, err)
require.Equal(t, retnt.RootHash(), updatedTrie.RootHash())
require.Equal(t, 2, forest.Size())

// Remove trie
forest.RemoveTrie(updatedTrie.RootHash())
require.Equal(t, 1, forest.Size())
}

// TestTrieUpdate updates the empty trie with some values and verifies that the
Expand Down Expand Up @@ -1072,3 +1068,51 @@ func TestNow(t *testing.T) {
require.Equal(t, updatedRoot, updatedRoot2)
require.Equal(t, 2, size)
}

func TestPurgeCacheExcept(t *testing.T) {
ramtinms marked this conversation as resolved.
Show resolved Hide resolved
forest, err := NewForest(5, &metrics.NoopCollector{}, nil)
require.NoError(t, err)

nt := trie.NewEmptyMTrie()
p1 := pathByUint8s([]uint8{uint8(53), uint8(74)})
v1 := payloadBySlices([]byte{'A'}, []byte{'A'})

updatedTrie1, _, err := trie.NewTrieWithUpdatedRegisters(nt, []ledger.Path{p1}, []ledger.Payload{*v1}, true)
require.NoError(t, err)

err = forest.AddTrie(updatedTrie1)
require.NoError(t, err)

p2 := pathByUint8s([]uint8{uint8(12), uint8(34)})
v2 := payloadBySlices([]byte{'B'}, []byte{'B'})

updatedTrie2, _, err := trie.NewTrieWithUpdatedRegisters(nt, []ledger.Path{p2}, []ledger.Payload{*v2}, true)
require.NoError(t, err)

err = forest.AddTrie(updatedTrie2)
require.NoError(t, err)

require.Equal(t, 3, forest.tries.Count())
forest.PurgeCacheExcept(updatedTrie2.RootHash())

require.Equal(t, 1, forest.tries.Count())
ret, err := forest.GetTrie(updatedTrie2.RootHash())
require.NoError(t, err)
require.Equal(t, ret, updatedTrie2)

_, err = forest.GetTrie(updatedTrie1.RootHash())
require.Error(t, err)

// test purge when only a single target trie exist there
forest.PurgeCacheExcept(updatedTrie1.RootHash())
ret, err = forest.GetTrie(updatedTrie2.RootHash())
require.NoError(t, err)
require.Equal(t, ret, updatedTrie2)

_, err = forest.GetTrie(updatedTrie1.RootHash())
require.Error(t, err)

// purge with non existing trie
forest.PurgeCacheExcept(updatedTrie2.RootHash())
require.Error(t, err)
}
Loading