Skip to content

Commit 06a5c96

Browse files
committed
feat: add memiavl changeset
1 parent a2ef3db commit 06a5c96

File tree

5 files changed

+448
-8
lines changed

5 files changed

+448
-8
lines changed

memiavl/db.go

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func Load(dir string, opts Options, chainId string) (*DB, error) {
250250
triggerStateSyncExport: opts.TriggerStateSyncExport,
251251
snapshotWriterPool: workerPool,
252252
}
253+
db.attachTraverseStateChanges()
253254

254255
if !db.readOnly && db.Version() == 0 && len(opts.InitialStores) > 0 {
255256
// do the initial upgrade with the `opts.InitialStores`
@@ -326,6 +327,7 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
326327
}
327328

328329
db.pendingLog.Upgrades = append(db.pendingLog.Upgrades, upgrades...)
330+
db.attachTraverseStateChanges()
329331
return nil
330332
}
331333

@@ -661,12 +663,14 @@ func (db *DB) Copy() *DB {
661663
func (db *DB) copy(cacheSize int) *DB {
662664
mtree := db.MultiTree.Copy(cacheSize)
663665

664-
return &DB{
666+
cloned := &DB{
665667
MultiTree: *mtree,
666668
logger: db.logger,
667669
dir: db.dir,
668670
snapshotWriterPool: db.snapshotWriterPool,
669671
}
672+
cloned.attachTraverseStateChanges()
673+
return cloned
670674
}
671675

672676
// RewriteSnapshot writes the current version of memiavl into a snapshot, and update the `current` symlink.
@@ -717,7 +721,11 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error {
717721

718722
db.MultiTree = *mtree
719723
// catch-up the pending changes
720-
return db.applyWALEntry(db.pendingLog)
724+
if err := db.applyWALEntry(db.pendingLog); err != nil {
725+
return err
726+
}
727+
db.attachTraverseStateChanges()
728+
return nil
721729
}
722730

723731
// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
@@ -862,6 +870,114 @@ func (db *DB) WorkingCommitInfo() *CommitInfo {
862870
return db.MultiTree.WorkingCommitInfo()
863871
}
864872

873+
// FirstVersion returns the earliest version that still has WAL entries on disk.
874+
func (db *DB) FirstVersion() (int64, error) {
875+
walLog, initialVersion, _, err := db.walStateForRead()
876+
if err != nil {
877+
return 0, err
878+
}
879+
880+
firstIndex, err := walLog.FirstIndex()
881+
if err != nil {
882+
return 0, err
883+
}
884+
if firstIndex == 0 {
885+
return 0, nil
886+
}
887+
return walVersion(firstIndex, initialVersion), nil
888+
}
889+
890+
// FirstStoreVersions returns the first version each store appears in the WAL.
891+
func (db *DB) FirstStoreVersions(stores []string) (map[string]int64, error) {
892+
result := make(map[string]int64, len(stores))
893+
if len(stores) == 0 {
894+
return result, nil
895+
}
896+
897+
walLog, initialVersion, lastVersion, err := db.walStateForRead()
898+
if err != nil {
899+
return nil, err
900+
}
901+
902+
if err := waitForWALVersion(walLog, initialVersion, lastVersion); err != nil {
903+
return nil, err
904+
}
905+
906+
firstIndex, err := walLog.FirstIndex()
907+
if err != nil {
908+
return nil, err
909+
}
910+
lastIndex, err := walLog.LastIndex()
911+
if err != nil {
912+
return nil, err
913+
}
914+
if firstIndex == 0 || lastIndex == 0 {
915+
return result, nil
916+
}
917+
918+
targets := make(map[string]struct{}, len(stores))
919+
for _, store := range stores {
920+
if _, ok := targets[store]; !ok {
921+
targets[store] = struct{}{}
922+
}
923+
}
924+
925+
for idx := firstIndex; idx <= lastIndex; idx++ {
926+
if len(result) == len(targets) {
927+
break
928+
}
929+
930+
data, err := walLog.Read(idx)
931+
if err != nil {
932+
return nil, err
933+
}
934+
var entry WALEntry
935+
if err := entry.Unmarshal(data); err != nil {
936+
return nil, err
937+
}
938+
version := walVersion(idx, initialVersion)
939+
for _, changeset := range entry.Changesets {
940+
if _, ok := targets[changeset.Name]; !ok {
941+
continue
942+
}
943+
if _, recorded := result[changeset.Name]; recorded {
944+
continue
945+
}
946+
result[changeset.Name] = version
947+
}
948+
}
949+
950+
return result, nil
951+
}
952+
953+
func (db *DB) walStateForRead() (*wal.Log, uint32, int64, error) {
954+
db.mtx.Lock()
955+
defer db.mtx.Unlock()
956+
957+
if db.wal == nil {
958+
return nil, 0, 0, fmt.Errorf("wal is not initialized")
959+
}
960+
return db.wal, db.initialVersion, db.lastCommitInfo.Version, nil
961+
}
962+
963+
func waitForWALVersion(walLog *wal.Log, initialVersion uint32, targetVersion int64) error {
964+
if targetVersion <= 0 {
965+
return nil
966+
}
967+
968+
targetIndex := walIndex(targetVersion, initialVersion)
969+
for {
970+
lastIndex, err := walLog.LastIndex()
971+
if err != nil {
972+
return err
973+
}
974+
if lastIndex >= targetIndex {
975+
return nil
976+
}
977+
time.Sleep(time.Millisecond)
978+
}
979+
}
980+
865981
// UpdateCommitInfo wraps MultiTree.UpdateCommitInfo to add a lock.
866982
func (db *DB) UpdateCommitInfo() {
867983
db.mtx.Lock()

memiavl/db_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,72 @@ func TestLoadVersion(t *testing.T) {
294294
}
295295
}
296296

297+
func TestTreeTraverseStateChanges(t *testing.T) {
298+
dir := t.TempDir()
299+
db, err := Load(dir, Options{
300+
CreateIfMissing: true,
301+
InitialStores: []string{"test", "other"},
302+
}, TestAppChainID)
303+
require.NoError(t, err)
304+
defer func() { require.NoError(t, db.Close()) }()
305+
306+
applyAndCommit := func(changeSets []*NamedChangeSet) {
307+
require.NoError(t, db.ApplyChangeSets(changeSets))
308+
_, err := db.Commit()
309+
require.NoError(t, err)
310+
}
311+
312+
applyAndCommit([]*NamedChangeSet{
313+
{Name: "test", Changeset: ChangeSet{Pairs: mockKVPairs("foo", "bar")}},
314+
})
315+
applyAndCommit([]*NamedChangeSet{
316+
{Name: "other", Changeset: ChangeSet{Pairs: mockKVPairs("baz", "qux")}},
317+
})
318+
applyAndCommit([]*NamedChangeSet{
319+
{Name: "test", Changeset: ChangeSet{Pairs: mockKVPairs("foo", "baz")}},
320+
})
321+
322+
firstVersion, err := db.FirstVersion()
323+
require.NoError(t, err)
324+
require.EqualValues(t, 1, firstVersion)
325+
starts, err := db.FirstStoreVersions([]string{"test", "other"})
326+
require.NoError(t, err)
327+
require.EqualValues(t, 1, starts["test"])
328+
require.EqualValues(t, 2, starts["other"])
329+
330+
tree := db.TreeByName("test")
331+
require.NotNil(t, tree)
332+
333+
var versions []int64
334+
var changeSets []ChangeSet
335+
require.NoError(t, tree.TraverseStateChanges(0, 10, func(version int64, cs *ChangeSet) error {
336+
versions = append(versions, version)
337+
copied := ChangeSet{}
338+
for _, pair := range cs.Pairs {
339+
cp := &KVPair{Delete: pair.Delete}
340+
if len(pair.Key) > 0 {
341+
cp.Key = append([]byte(nil), pair.Key...)
342+
}
343+
if len(pair.Value) > 0 {
344+
cp.Value = append([]byte(nil), pair.Value...)
345+
}
346+
copied.Pairs = append(copied.Pairs, cp)
347+
}
348+
changeSets = append(changeSets, copied)
349+
return nil
350+
}))
351+
352+
require.Equal(t, []int64{1, 2, 3}, versions)
353+
require.Len(t, changeSets, 3)
354+
require.Len(t, changeSets[0].Pairs, 1)
355+
require.Equal(t, []byte("foo"), changeSets[0].Pairs[0].Key)
356+
require.Equal(t, []byte("bar"), changeSets[0].Pairs[0].Value)
357+
require.Len(t, changeSets[1].Pairs, 0)
358+
require.Len(t, changeSets[2].Pairs, 1)
359+
require.Equal(t, []byte("foo"), changeSets[2].Pairs[0].Key)
360+
require.Equal(t, []byte("baz"), changeSets[2].Pairs[0].Value)
361+
}
362+
297363
func TestZeroCopy(t *testing.T) {
298364
db, err := Load(t.TempDir(), Options{InitialStores: []string{"test", "test2"}, CreateIfMissing: true, ZeroCopy: true}, TestAppChainID)
299365
require.NoError(t, err)

memiavl/tree.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ type Tree struct {
3030

3131
// when true, the get and iterator methods could return a slice pointing to mmaped blob files.
3232
zeroCopy bool
33+
34+
traverseStateChanges traverseStateChangesFn
3335
}
3436

37+
type traverseStateChangesFn func(startVersion, endVersion int64, fn func(version int64, changeSet *ChangeSet) error) error
38+
3539
type cacheNode struct {
3640
key, value []byte
3741
}
@@ -239,6 +243,14 @@ func (t *Tree) Iterator(start, end []byte, ascending bool) *Iterator {
239243
return NewIterator(start, end, ascending, t.root, t.zeroCopy)
240244
}
241245

246+
// TraverseStateChanges iterates the change sets between the given versions (inclusive).
247+
func (t *Tree) TraverseStateChanges(startVersion, endVersion int64, fn func(version int64, changeSet *ChangeSet) error) error {
248+
if t.traverseStateChanges == nil {
249+
return fmt.Errorf("TraverseStateChanges not supported")
250+
}
251+
return t.traverseStateChanges(startVersion, endVersion, fn)
252+
}
253+
242254
// ScanPostOrder scans the tree in post-order, and call the callback function on each node.
243255
// If the callback function returns false, the scan will be stopped.
244256
func (t *Tree) ScanPostOrder(callback func(node Node) bool) {
@@ -295,5 +307,85 @@ func (t *Tree) Close() error {
295307
t.snapshot = nil
296308
}
297309
t.root = nil
310+
t.traverseStateChanges = nil
298311
return err
299312
}
313+
314+
func (t *Tree) setTraverseStateChanges(fn traverseStateChangesFn) {
315+
t.traverseStateChanges = fn
316+
}
317+
318+
func (db *DB) attachTraverseStateChanges() {
319+
for i := range db.trees {
320+
t := db.trees[i]
321+
if t.Tree == nil {
322+
continue
323+
}
324+
storeName := t.Name
325+
t.Tree.setTraverseStateChanges(func(startVersion, endVersion int64, fn func(int64, *ChangeSet) error) error {
326+
return db.traverseStateChanges(storeName, startVersion, endVersion, fn)
327+
})
328+
}
329+
}
330+
331+
func (db *DB) traverseStateChanges(store string, startVersion, endVersion int64, fn func(int64, *ChangeSet) error) error {
332+
walLog, initialVersion, lastVersion, err := db.walStateForRead()
333+
if err != nil {
334+
return err
335+
}
336+
if err := waitForWALVersion(walLog, initialVersion, lastVersion); err != nil {
337+
return err
338+
}
339+
if endVersion < startVersion {
340+
return nil
341+
}
342+
firstIndex, err := walLog.FirstIndex()
343+
if err != nil {
344+
return err
345+
}
346+
lastIndex, err := walLog.LastIndex()
347+
if err != nil {
348+
return err
349+
}
350+
if firstIndex == 0 || lastIndex == 0 {
351+
return nil
352+
}
353+
firstAvailable := walVersion(firstIndex, initialVersion)
354+
if startVersion < firstAvailable {
355+
startVersion = firstAvailable
356+
}
357+
lastAvailable := walVersion(lastIndex, initialVersion)
358+
if endVersion <= 0 || endVersion > lastAvailable {
359+
endVersion = lastAvailable
360+
}
361+
if endVersion < startVersion {
362+
return nil
363+
}
364+
startIndex := walIndex(startVersion, initialVersion)
365+
endIndex := walIndex(endVersion, initialVersion)
366+
for idx := startIndex; idx <= endIndex; idx++ {
367+
data, err := walLog.Read(idx)
368+
if err != nil {
369+
return err
370+
}
371+
var entry WALEntry
372+
if err := entry.Unmarshal(data); err != nil {
373+
return err
374+
}
375+
version := walVersion(idx, initialVersion)
376+
var changeSet *ChangeSet
377+
for j := range entry.Changesets {
378+
if entry.Changesets[j].Name == store {
379+
changeSet = &entry.Changesets[j].Changeset
380+
break
381+
}
382+
}
383+
if changeSet == nil {
384+
changeSet = &ChangeSet{}
385+
}
386+
if err := fn(version, changeSet); err != nil {
387+
return err
388+
}
389+
}
390+
return nil
391+
}

versiondb/client/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
2121
cmd.AddCommand(
2222
ListDefaultStoresCmd(opts.DefaultStores),
2323
DumpChangeSetCmd(opts),
24+
DumpMemiavlChangeSetCmd(opts),
2425
PrintChangeSetCmd(),
2526
VerifyChangeSetCmd(opts.DefaultStores),
2627
BuildVersionDBSSTCmd(opts.DefaultStores),

0 commit comments

Comments
 (0)