Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
- [#1](https://github.com/crypto-org-chain/cronos-store/pull/1) feature: add store, memiavl, versiondb.
- [#3](https://github.com/crypto-org-chain/cronos-store/pull/3) feat(memiavl): MultiTree add chainId.
- [#4](https://github.com/crypto-org-chain/cronos-store/pull/4) feat(versiondb/client): add dump versiondb changeset cmd.
- [#7](https://github.com/crypto-org-chain/cronos-store/pull/7) fix: memiavl WriteSnapshotWithContext cancel using wrong ctx.
- [#7](https://github.com/crypto-org-chain/cronos-store/pull/7) fix: memiavl WriteSnapshotWithContext cancel using wrong ctx.
- [#24] (https://github.com/crypto-org-chain/cronos-store/pull/24) feat(memiavl): add memiavl changeset.
120 changes: 118 additions & 2 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func Load(dir string, opts Options, chainId string) (*DB, error) {
triggerStateSyncExport: opts.TriggerStateSyncExport,
snapshotWriterPool: workerPool,
}
db.attachTraverseStateChanges()

if !db.readOnly && db.Version() == 0 && len(opts.InitialStores) > 0 {
// do the initial upgrade with the `opts.InitialStores`
Expand Down Expand Up @@ -326,6 +327,7 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
}

db.pendingLog.Upgrades = append(db.pendingLog.Upgrades, upgrades...)
db.attachTraverseStateChanges()
return nil
}

Expand Down Expand Up @@ -661,12 +663,14 @@ func (db *DB) Copy() *DB {
func (db *DB) copy(cacheSize int) *DB {
mtree := db.MultiTree.Copy(cacheSize)

return &DB{
cloned := &DB{
MultiTree: *mtree,
logger: db.logger,
dir: db.dir,
snapshotWriterPool: db.snapshotWriterPool,
}
cloned.attachTraverseStateChanges()
return cloned
}

// RewriteSnapshot writes the current version of memiavl into a snapshot, and update the `current` symlink.
Expand Down Expand Up @@ -717,7 +721,11 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error {

db.MultiTree = *mtree
// catch-up the pending changes
return db.applyWALEntry(db.pendingLog)
if err := db.applyWALEntry(db.pendingLog); err != nil {
return err
}
db.attachTraverseStateChanges()
return nil
}

// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
Expand Down Expand Up @@ -862,6 +870,114 @@ func (db *DB) WorkingCommitInfo() *CommitInfo {
return db.MultiTree.WorkingCommitInfo()
}

// FirstVersion returns the earliest version that still has WAL entries on disk.
func (db *DB) FirstVersion() (int64, error) {
walLog, initialVersion, _, err := db.walStateForRead()
if err != nil {
return 0, err
}

firstIndex, err := walLog.FirstIndex()
if err != nil {
return 0, err
}
if firstIndex == 0 {
return 0, nil
}
return walVersion(firstIndex, initialVersion), nil
}

// FirstStoreVersions returns the first version each store appears in the WAL.
func (db *DB) FirstStoreVersions(stores []string) (map[string]int64, error) {
result := make(map[string]int64, len(stores))
if len(stores) == 0 {
return result, nil
}

walLog, initialVersion, lastVersion, err := db.walStateForRead()
if err != nil {
return nil, err
}

if err := waitForWALVersion(walLog, initialVersion, lastVersion); err != nil {
return nil, err
}

firstIndex, err := walLog.FirstIndex()
if err != nil {
return nil, err
}
lastIndex, err := walLog.LastIndex()
if err != nil {
return nil, err
}
if firstIndex == 0 || lastIndex == 0 {
return result, nil
}

targets := make(map[string]struct{}, len(stores))
for _, store := range stores {
if _, ok := targets[store]; !ok {
targets[store] = struct{}{}
}
}

for idx := firstIndex; idx <= lastIndex; idx++ {
if len(result) == len(targets) {
break
}

data, err := walLog.Read(idx)
if err != nil {
return nil, err
}
var entry WALEntry
if err := entry.Unmarshal(data); err != nil {
return nil, err
}
version := walVersion(idx, initialVersion)
for _, changeset := range entry.Changesets {
if _, ok := targets[changeset.Name]; !ok {
continue
}
if _, recorded := result[changeset.Name]; recorded {
continue
}
result[changeset.Name] = version
}
}

return result, nil
}

func (db *DB) walStateForRead() (*wal.Log, uint32, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.wal == nil {
return nil, 0, 0, fmt.Errorf("wal is not initialized")
}
return db.wal, db.initialVersion, db.lastCommitInfo.Version, nil
}

func waitForWALVersion(walLog *wal.Log, initialVersion uint32, targetVersion int64) error {
if targetVersion <= 0 {
return nil
}

targetIndex := walIndex(targetVersion, initialVersion)
for {
lastIndex, err := walLog.LastIndex()
if err != nil {
return err
}
if lastIndex >= targetIndex {
return nil
}
time.Sleep(time.Millisecond)
}
}

// UpdateCommitInfo wraps MultiTree.UpdateCommitInfo to add a lock.
func (db *DB) UpdateCommitInfo() {
db.mtx.Lock()
Expand Down
66 changes: 66 additions & 0 deletions memiavl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,72 @@ func TestLoadVersion(t *testing.T) {
}
}

func TestTreeTraverseStateChanges(t *testing.T) {
dir := t.TempDir()
db, err := Load(dir, Options{
CreateIfMissing: true,
InitialStores: []string{"test", "other"},
}, TestAppChainID)
require.NoError(t, err)
defer func() { require.NoError(t, db.Close()) }()

applyAndCommit := func(changeSets []*NamedChangeSet) {
require.NoError(t, db.ApplyChangeSets(changeSets))
_, err := db.Commit()
require.NoError(t, err)
}

applyAndCommit([]*NamedChangeSet{
{Name: "test", Changeset: ChangeSet{Pairs: mockKVPairs("foo", "bar")}},
})
applyAndCommit([]*NamedChangeSet{
{Name: "other", Changeset: ChangeSet{Pairs: mockKVPairs("baz", "qux")}},
})
applyAndCommit([]*NamedChangeSet{
{Name: "test", Changeset: ChangeSet{Pairs: mockKVPairs("foo", "baz")}},
})

firstVersion, err := db.FirstVersion()
require.NoError(t, err)
require.EqualValues(t, 1, firstVersion)
starts, err := db.FirstStoreVersions([]string{"test", "other"})
require.NoError(t, err)
require.EqualValues(t, 1, starts["test"])
require.EqualValues(t, 2, starts["other"])

tree := db.TreeByName("test")
require.NotNil(t, tree)

var versions []int64
var changeSets []ChangeSet
require.NoError(t, tree.TraverseStateChanges(0, 10, func(version int64, cs *ChangeSet) error {
versions = append(versions, version)
copied := ChangeSet{}
for _, pair := range cs.Pairs {
cp := &KVPair{Delete: pair.Delete}
if len(pair.Key) > 0 {
cp.Key = append([]byte(nil), pair.Key...)
}
if len(pair.Value) > 0 {
cp.Value = append([]byte(nil), pair.Value...)
}
copied.Pairs = append(copied.Pairs, cp)
}
changeSets = append(changeSets, copied)
return nil
}))

require.Equal(t, []int64{1, 2, 3}, versions)
require.Len(t, changeSets, 3)
require.Len(t, changeSets[0].Pairs, 1)
require.Equal(t, []byte("foo"), changeSets[0].Pairs[0].Key)
require.Equal(t, []byte("bar"), changeSets[0].Pairs[0].Value)
require.Len(t, changeSets[1].Pairs, 0)
require.Len(t, changeSets[2].Pairs, 1)
require.Equal(t, []byte("foo"), changeSets[2].Pairs[0].Key)
require.Equal(t, []byte("baz"), changeSets[2].Pairs[0].Value)
}

func TestZeroCopy(t *testing.T) {
db, err := Load(t.TempDir(), Options{InitialStores: []string{"test", "test2"}, CreateIfMissing: true, ZeroCopy: true}, TestAppChainID)
require.NoError(t, err)
Expand Down
92 changes: 92 additions & 0 deletions memiavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ type Tree struct {

// when true, the get and iterator methods could return a slice pointing to mmaped blob files.
zeroCopy bool

traverseStateChanges traverseStateChangesFn
}

type traverseStateChangesFn func(startVersion, endVersion int64, fn func(version int64, changeSet *ChangeSet) error) error

type cacheNode struct {
key, value []byte
}
Expand Down Expand Up @@ -239,6 +243,14 @@ func (t *Tree) Iterator(start, end []byte, ascending bool) *Iterator {
return NewIterator(start, end, ascending, t.root, t.zeroCopy)
}

// TraverseStateChanges iterates the change sets between the given versions (inclusive).
func (t *Tree) TraverseStateChanges(startVersion, endVersion int64, fn func(version int64, changeSet *ChangeSet) error) error {
if t.traverseStateChanges == nil {
return fmt.Errorf("TraverseStateChanges not supported")
}
return t.traverseStateChanges(startVersion, endVersion, fn)
}

// ScanPostOrder scans the tree in post-order, and call the callback function on each node.
// If the callback function returns false, the scan will be stopped.
func (t *Tree) ScanPostOrder(callback func(node Node) bool) {
Expand Down Expand Up @@ -295,5 +307,85 @@ func (t *Tree) Close() error {
t.snapshot = nil
}
t.root = nil
t.traverseStateChanges = nil
return err
}

func (t *Tree) setTraverseStateChanges(fn traverseStateChangesFn) {
t.traverseStateChanges = fn
}

func (db *DB) attachTraverseStateChanges() {
for i := range db.trees {
t := db.trees[i]
if t.Tree == nil {
continue
}
storeName := t.Name
t.setTraverseStateChanges(func(startVersion, endVersion int64, fn func(int64, *ChangeSet) error) error {
return db.traverseStateChanges(storeName, startVersion, endVersion, fn)
})
}
}

func (db *DB) traverseStateChanges(store string, startVersion, endVersion int64, fn func(int64, *ChangeSet) error) error {
walLog, initialVersion, lastVersion, err := db.walStateForRead()
if err != nil {
return err
}
if err := waitForWALVersion(walLog, initialVersion, lastVersion); err != nil {
return err
}
if endVersion < startVersion {
return nil
}
firstIndex, err := walLog.FirstIndex()
if err != nil {
return err
}
lastIndex, err := walLog.LastIndex()
if err != nil {
return err
}
if firstIndex == 0 || lastIndex == 0 {
return nil
}
firstAvailable := walVersion(firstIndex, initialVersion)
if startVersion < firstAvailable {
startVersion = firstAvailable
}
lastAvailable := walVersion(lastIndex, initialVersion)
if endVersion <= 0 || endVersion > lastAvailable {
endVersion = lastAvailable
}
if endVersion < startVersion {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impossible condition?

return nil
}
startIndex := walIndex(startVersion, initialVersion)
endIndex := walIndex(endVersion, initialVersion)
for idx := startIndex; idx <= endIndex; idx++ {
data, err := walLog.Read(idx)
if err != nil {
return err
}
var entry WALEntry
if err := entry.Unmarshal(data); err != nil {
return err
}
version := walVersion(idx, initialVersion)
var changeSet *ChangeSet
for j := range entry.Changesets {
if entry.Changesets[j].Name == store {
changeSet = &entry.Changesets[j].Changeset
break
}
}
if changeSet == nil {
changeSet = &ChangeSet{}
}
if err := fn(version, changeSet); err != nil {
return err
}
}
return nil
}
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
cmd.AddCommand(
ListDefaultStoresCmd(opts.DefaultStores),
DumpChangeSetCmd(opts),
DumpMemiavlChangeSetCmd(opts),
PrintChangeSetCmd(),
VerifyChangeSetCmd(opts.DefaultStores),
BuildVersionDBSSTCmd(opts.DefaultStores),
Expand Down
Loading
Loading