diff --git a/db/db_versioned.go b/db/db_versioned.go index 0d356b9b0d..dc07e89e88 100644 --- a/db/db_versioned.go +++ b/db/db_versioned.go @@ -11,12 +11,15 @@ import ( "context" "fmt" "math" + "syscall" "github.com/pkg/errors" bolt "go.etcd.io/bbolt" + "go.uber.org/zap" "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" + "github.com/iotexproject/iotex-core/v2/pkg/log" "github.com/iotexproject/iotex-core/v2/pkg/util/byteutil" ) @@ -43,6 +46,9 @@ type ( // Version returns the key's most recent version Version(string, []byte) (uint64, error) + + // CommitToDB writes a batch to the underlying DB + CommitToDB(uint64, map[string]bool, batch.KVStoreBatch) error } // BoltDBVersioned is KvVersioned implementation based on bolt DB @@ -196,6 +202,241 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) { return last, err } +// CommitToDB write a batch to DB, where the batch can contain keys for +// both versioned and non-versioned namespace +func (b *BoltDBVersioned) CommitToDB(version uint64, vns map[string]bool, kvsb batch.KVStoreBatch) error { + vnsize, ve, nve, err := dedup(vns, kvsb) + if err != nil { + return errors.Wrapf(err, "BoltDBVersioned failed to write batch") + } + return b.commitToDB(version, vnsize, ve, nve) +} + +func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error { + var ( + err error + nonDBErr bool + ) + for c := uint8(0); c < b.db.config.NumRetries; c++ { + buckets := make(map[string]*bolt.Bucket) + if err = b.db.db.Update(func(tx *bolt.Tx) error { + // create/check metadata of all namespaces + for ns, size := range vnsize { + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + var vn *versionedNamespace + if val := bucket.Get(_minKey); val == nil { + // namespace not created yet + vn = &versionedNamespace{ + keyLen: uint32(size), + } + ve = append(ve, batch.NewWriteInfo( + batch.Put, ns, _minKey, vn.serialize(), + fmt.Sprintf("failed to create metadata for namespace %s", ns), + )) + } else { + if vn, err = deserializeVersionedNamespace(val); err != nil { + nonDBErr = true + return errors.Wrapf(err, "failed to get metadata of bucket %s", ns) + } + if vn.keyLen != uint32(size) { + nonDBErr = true + return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size) + } + } + } + // keep order of the writes same as the original batch + for i := len(ve) - 1; i >= 0; i-- { + var ( + write = ve[i] + ns = write.Namespace() + key = write.Key() + val = write.Value() + ) + // get bucket + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + // check key's last version + var ( + last uint64 + notexist, isDelete bool + actualKey = keyForWrite(key, version) + ) + c := bucket.Cursor() + k, _ := c.Seek(actualKey) + if k == nil || bytes.Compare(k, actualKey) == 1 { + k, _ = c.Prev() + if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 { + // cursor is at the beginning/end of the bucket or smaller than minimum key + notexist = true + } + } + if !notexist { + isDelete, last = parseKey(k) + } + switch write.WriteType() { + case batch.Put: + if bytes.Equal(key, _minKey) { + // create namespace + if err = bucket.Put(key, val); err != nil { + return errors.Wrap(err, write.Error()) + } + } else { + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) + } + if isDelete && version <= last { + // not allowed to perform write on an earlier version + nonDBErr = true + return ErrInvalid + } + if err = bucket.Put(keyForWrite(key, version), val); err != nil { + return errors.Wrap(err, write.Error()) + } + } + case batch.Delete: + if notexist { + continue + } + // wrong-size key should be caught in dedup(), but check anyway + if vnsize[ns] != len(key) { + panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key))) + } + if version < last { + // not allowed to perform delete on an earlier version + nonDBErr = true + return ErrInvalid + } + if err = bucket.Put(keyForDelete(key, version), nil); err != nil { + return errors.Wrap(err, write.Error()) + } + if err = bucket.Delete(keyForWrite(key, version)); err != nil { + return errors.Wrap(err, write.Error()) + } + } + } + // write non-versioned keys + for i := len(nve) - 1; i >= 0; i-- { + var ( + write = nve[i] + ns = write.Namespace() + ) + switch write.WriteType() { + case batch.Put: + // get bucket + bucket, ok := buckets[ns] + if !ok { + bucket, err = tx.CreateBucketIfNotExists([]byte(ns)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", ns) + } + buckets[ns] = bucket + } + if err = bucket.Put(write.Key(), write.Value()); err != nil { + return errors.Wrap(err, write.Error()) + } + case batch.Delete: + bucket := tx.Bucket([]byte(ns)) + if bucket == nil { + continue + } + if err = bucket.Delete(write.Key()); err != nil { + return errors.Wrap(err, write.Error()) + } + } + } + return nil + }); err == nil || nonDBErr { + break + } + } + if nonDBErr { + return err + } + if err != nil { + if errors.Is(err, syscall.ENOSPC) { + log.L().Fatal("BoltDBVersioned failed to write batch", zap.Error(err)) + } + return errors.Wrap(ErrIO, err.Error()) + } + return nil +} + +// dedup does 3 things: +// 1. deduplicate entries in the batch, only keep the last write for each key +// 2. splits entries into 2 slices according to the input namespace map +// 3. return a map of input namespace's keyLength +func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*batch.WriteInfo, []*batch.WriteInfo, error) { + kvsb.Lock() + defer kvsb.Unlock() + + type doubleKey struct { + ns string + key string + } + + var ( + entryKeySet = make(map[doubleKey]bool) + nsKeyLen = make(map[string]int) + nsInMap = make([]*batch.WriteInfo, 0) + other = make([]*batch.WriteInfo, 0) + pickAll = len(vns) == 0 + ) + for i := kvsb.Size() - 1; i >= 0; i-- { + write, e := kvsb.Entry(i) + if e != nil { + return nil, nil, nil, e + } + // only handle Put and Delete + var ( + writeType = write.WriteType() + ns = write.Namespace() + key = write.Key() + ) + if writeType != batch.Put && writeType != batch.Delete { + continue + } + k := doubleKey{ns: ns, key: string(key)} + if entryKeySet[k] { + continue + } + if writeType == batch.Put { + // for a later DELETE, we want to capture the earlier PUT + // otherwise, the DELETE might return not-exist + entryKeySet[k] = true + } + if pickAll || vns[k.ns] { + nsInMap = append(nsInMap, write) + } else { + other = append(other, write) + } + // check key size + if pickAll || vns[k.ns] { + if n, ok := nsKeyLen[k.ns]; !ok { + nsKeyLen[k.ns] = len(write.Key()) + } else { + if n != len(write.Key()) { + return nil, nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key())) + } + } + } + } + return nsKeyLen, nsInMap, other, nil +} + func isNotExist(err error) bool { return err == ErrNotExist || err == ErrBucketNotExist } diff --git a/db/db_versioned_test.go b/db/db_versioned_test.go index d13e9afec0..3cedc9edb5 100644 --- a/db/db_versioned_test.go +++ b/db/db_versioned_test.go @@ -8,14 +8,21 @@ package db import ( "context" + "math" "testing" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/testutil" ) +var ( + _k5 = []byte("key_5") + _k10 = []byte("key_10") +) + type versionTest struct { ns string k, v []byte @@ -50,10 +57,6 @@ func TestVersionedDB(t *testing.T) { r.NoError(err) r.EqualValues(len(_k2), vn.keyLen) // check more Put/Get - var ( - _k5 = []byte("key_5") - _k10 = []byte("key_10") - ) err = db.Put(1, _bucket1, _k10, _v1) r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) r.NoError(db.Put(1, _bucket1, _k2, _v1)) @@ -239,6 +242,173 @@ func TestVersionedDB(t *testing.T) { } func TestMultipleWriteDelete(t *testing.T) { + r := require.New(t) + for i := 0; i < 2; i++ { + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewBoltDBVersioned(cfg) + ctx := context.Background() + r.NoError(db.Start(ctx)) + + if i == 0 { + // multiple writes and deletes + r.NoError(db.Put(1, _bucket1, _k2, _v1)) + r.NoError(db.Put(3, _bucket1, _k2, _v3)) + v, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(3, v) + r.NoError(db.Delete(7, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(10, _bucket1, _k2, _v2)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(10, v) + r.NoError(db.Delete(15, _bucket1, _k2)) + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) + r.NoError(db.Put(18, _bucket1, _k2, _v3)) + r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write + _, err = db.Version(_bucket1, _k2) + r.Equal(ErrDeleted, errors.Cause(err)) + r.NoError(db.Put(21, _bucket1, _k2, _v4)) + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(21, v) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) + r.NoError(db.Delete(25, _bucket1, _k2)) + r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete + v, err = db.Version(_bucket1, _k2) + r.NoError(err) + r.EqualValues(25, v) + } else { + // multiple writes and deletes using commitToDB + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, nil, 7, ErrDeleted}, + {_bucket1, _k2, _v2, 10, nil}, + {_bucket1, _k2, nil, 15, ErrDeleted}, + {_bucket1, _k2, _v3, 18, ErrDeleted}, // delete-after-write + {_bucket1, _k2, _v4, 21, nil}, + {_bucket1, _k2, _k2, 25, nil}, // write-after-delete + } { + if e.height == 7 || e.height == 15 { + b.Delete(e.ns, e.k, "test") + } else if e.height == 18 { + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + } else if e.height == 25 { + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + b.Delete(e.ns, e.k, "test") + b.Put(e.ns, e.k, e.v, "test") + } else { + b.Put(e.ns, e.k, e.v, "test") + } + r.NoError(db.CommitToDB(e.height, nil, b)) + b.Clear() + v, err := db.Version(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + if err == nil { + r.EqualValues(e.height, v) + } + } + } + for _, e := range []versionTest{ + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 6, nil}, + {_bucket1, _k2, nil, 7, ErrDeleted}, + {_bucket1, _k2, nil, 9, ErrDeleted}, + {_bucket1, _k2, _v2, 10, nil}, + {_bucket1, _k2, _v2, 14, nil}, + {_bucket1, _k2, nil, 15, ErrDeleted}, + {_bucket1, _k2, nil, 17, ErrDeleted}, + {_bucket1, _k2, nil, 18, ErrDeleted}, + {_bucket1, _k2, nil, 20, ErrDeleted}, + {_bucket1, _k2, _v4, 21, nil}, + {_bucket1, _k2, _v4, 22, nil}, + {_bucket1, _k2, _v4, 24, nil}, + {_bucket1, _k2, _k2, 25, nil}, + {_bucket1, _k2, _k2, 26, nil}, + {_bucket1, _k2, _k2, 25000, nil}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + r.NoError(db.Stop(ctx)) + } +} + +func TestDedup(t *testing.T) { + r := require.New(t) + + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _v2, 0, nil}, + {_bucket2, _v2, _v3, 9, nil}, + {_bucket2, _v3, _v4, 3, nil}, + {_bucket2, _v4, _v1, 1, nil}, + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v2, 9, nil}, + {_bucket1, _k3, _v3, 3, nil}, + {_bucket1, _k4, _v4, 1, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + keySize, ve, ce, err := dedup(nil, b) + r.NoError(err) + r.Equal(2, len(keySize)) + r.Equal(5, keySize[_bucket1]) + r.Equal(7, keySize[_bucket2]) + r.Equal(8, len(ve)) + r.Zero(len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ve[i].Key()) + } + // put a key with diff length into _bucket2 + b.Put(_bucket2, _k1, _v1, "test") + // treat _bucket1 as versioned namespace still OK + keySize, ve, ce, err = dedup(map[string]bool{ + _bucket1: true, + }, b) + r.NoError(err) + r.Equal(1, len(keySize)) + r.Equal(5, keySize[_bucket1]) + r.Equal(4, len(ve)) + r.Equal(5, len(ce)) + for i, v := range [][]byte{_k4, _k3, _k2, _k1} { + r.Equal(v, ve[i].Key()) + } + for i, v := range [][]byte{_k1, _v4, _v3, _v2, _v1} { + r.Equal(v, ce[i].Key()) + } + // treat _bucket2 (or both buckets) as versioned namespace hits error due to diff key size + for _, v := range []map[string]bool{ + {_bucket2: true}, nil, + } { + _, _, _, err = dedup(v, b) + r.Equal("invalid key length, expecting 5, got 7: invalid input", err.Error()) + } +} + +func TestCommitToDB(t *testing.T) { r := require.New(t) testPath, err := testutil.PathOfTempFile("test-version") r.NoError(err) @@ -255,68 +425,109 @@ func TestMultipleWriteDelete(t *testing.T) { db.Stop(ctx) }() - // multiple writes and deletes - r.NoError(db.Put(1, _bucket1, _k2, _v1)) - r.NoError(db.Put(3, _bucket1, _k2, _v3)) - v, err := db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(3, v) - r.NoError(db.Delete(7, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(10, _bucket1, _k2, _v2)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(10, v) - r.NoError(db.Delete(15, _bucket1, _k2)) - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(18, _bucket1, _k2, _v3)) // write again - value, err := db.Get(18, _bucket1, _k2) - r.NoError(err) - r.Equal(_v3, value) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(18, v) - r.NoError(db.Delete(18, _bucket1, _k2)) // delete-after-write - _, err = db.Version(_bucket1, _k2) - r.Equal(ErrDeleted, errors.Cause(err)) - r.NoError(db.Put(21, _bucket1, _k2, _v4)) - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(21, v) - r.NoError(db.Delete(25, _bucket1, _k2)) - r.NoError(db.Put(25, _bucket1, _k2, _k2)) // write-after-delete - v, err = db.Version(_bucket1, _k2) - r.NoError(err) - r.EqualValues(25, v) + b := batch.NewBatch() for _, e := range []versionTest{ + {_bucket2, _v1, _k1, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k3, 3, nil}, + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v2, 9, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + + r.NoError(db.CommitToDB(1, nil, b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v1, _k1, 2, nil}, + {_bucket2, _v2, _k2, 2, nil}, + {_bucket2, _v3, _k3, 2, nil}, + {_bucket1, _k1, nil, 0, ErrNotExist}, {_bucket1, _k2, nil, 0, ErrNotExist}, - {_bucket1, _k2, _v1, 1, nil}, - {_bucket1, _k2, _v1, 2, nil}, - {_bucket1, _k2, _v3, 3, nil}, - {_bucket1, _k2, _v3, 6, nil}, - {_bucket1, _k2, nil, 7, ErrDeleted}, - {_bucket1, _k2, nil, 9, ErrDeleted}, - {_bucket1, _k2, _v2, 10, nil}, - {_bucket1, _k2, _v2, 14, nil}, - {_bucket1, _k2, nil, 15, ErrDeleted}, - {_bucket1, _k2, nil, 17, ErrDeleted}, - {_bucket1, _k2, nil, 18, ErrDeleted}, - {_bucket1, _k2, nil, 20, ErrDeleted}, - {_bucket1, _k2, _v4, 21, nil}, - {_bucket1, _k2, _v4, 22, nil}, - {_bucket1, _k2, _v4, 24, nil}, - {_bucket1, _k2, _k2, 25, nil}, - {_bucket1, _k2, _k2, 26, nil}, - {_bucket1, _k2, _k2, 25000, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k1, _v1, 3, nil}, + {_bucket1, _k2, _v2, 3, nil}, + } { + value, err := db.Get(e.height, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + + // batch with wrong key length would fail + b.Put(_bucket1, _v1, _k1, "test") + r.Equal(ErrInvalid, errors.Cause(db.CommitToDB(3, nil, b))) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v3, 9, nil}, + {_bucket1, _k3, _v1, 3, nil}, + {_bucket1, _k4, _v2, 1, nil}, + {_bucket2, _v1, _k3, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k1, 3, nil}, + {_bucket2, _v4, _k4, 1, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + b.Delete(_bucket1, _k3, "test") + b.Delete(_bucket2, _v3, "test") + + r.NoError(db.CommitToDB(5, nil, b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, } { value, err := db.Get(e.height, e.ns, e.k) r.Equal(e.err, errors.Cause(err)) r.Equal(e.v, value) } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k3, nil, 1, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v4, nil, 1, ErrNotExist}, + } { + for _, h := range []uint64{1, 2, 3, 4} { + value, err := db.Get(h, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 5, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k3, nil, 5, ErrDeleted}, + {_bucket1, _k4, _v2, 5, nil}, + {_bucket2, _v1, _k3, 5, nil}, + {_bucket2, _v2, _k2, 5, nil}, + {_bucket2, _v3, nil, 5, ErrDeleted}, + {_bucket2, _v4, _k4, 5, nil}, + } { + for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { + value, err := db.Get(h, e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } } diff --git a/db/kvstore_versioned.go b/db/kvstore_versioned.go index e1e85b01b3..a041830b8e 100644 --- a/db/kvstore_versioned.go +++ b/db/kvstore_versioned.go @@ -7,6 +7,11 @@ package db import ( + "context" + + "github.com/pkg/errors" + + "github.com/iotexproject/iotex-core/v2/db/batch" "github.com/iotexproject/iotex-core/v2/pkg/lifecycle" ) @@ -47,4 +52,107 @@ type ( // SetVersion sets the version, and returns a KVStore to call Put()/Get() SetVersion(uint64) KVStore } + + // KvWithVersion wraps the versioned DB implementation with a certain version + KvWithVersion struct { + db VersionedDB + kvBase KVStore + versioned map[string]bool // map of versioned buckets + version uint64 // the current version + } ) + +// Option sets an option +type Option func(*KvWithVersion) + +func VersionedNamespaceOption(ns ...string) Option { + return func(k *KvWithVersion) { + k.versioned = make(map[string]bool) + for _, ns := range ns { + k.versioned[ns] = true + } + } +} + +// NewKVStoreWithVersion implements a KVStore that can handle both versioned +// and non-versioned namespace +func NewKVStoreWithVersion(cfg Config, opts ...Option) *KvWithVersion { + db := NewBoltDBVersioned(cfg) + kv := KvWithVersion{ + db: db, + kvBase: db.Base(), + } + for _, opt := range opts { + opt(&kv) + } + return &kv +} + +// Start starts the DB +func (b *KvWithVersion) Start(ctx context.Context) error { + return b.kvBase.Start(ctx) +} + +// Stop stops the DB +func (b *KvWithVersion) Stop(ctx context.Context) error { + return b.kvBase.Stop(ctx) +} + +// Put writes a record +func (b *KvWithVersion) Put(ns string, key, value []byte) error { + if b.versioned[ns] { + return b.db.Put(b.version, ns, key, value) + } + return b.kvBase.Put(ns, key, value) +} + +// Get retrieves a key's value +func (b *KvWithVersion) Get(ns string, key []byte) ([]byte, error) { + if b.versioned[ns] { + return b.db.Get(b.version, ns, key) + } + return b.kvBase.Get(ns, key) +} + +// Delete deletes a key +func (b *KvWithVersion) Delete(ns string, key []byte) error { + if b.versioned[ns] { + return b.db.Delete(b.version, ns, key) + } + return b.kvBase.Delete(ns, key) +} + +// Filter returns pair in a bucket that meet the condition +func (b *KvWithVersion) Filter(ns string, cond Condition, minKey, maxKey []byte) ([][]byte, [][]byte, error) { + if b.versioned[ns] { + panic("Filter not supported for versioned DB") + } + return b.kvBase.Filter(ns, cond, minKey, maxKey) +} + +// WriteBatch commits a batch +func (b *KvWithVersion) WriteBatch(kvsb batch.KVStoreBatch) error { + return b.db.CommitToDB(b.version, b.versioned, kvsb) +} + +// Version returns the key's most recent version +func (b *KvWithVersion) Version(ns string, key []byte) (uint64, error) { + if b.versioned[ns] { + return b.db.Version(ns, key) + } + return 0, errors.Errorf("namespace %s is non-versioned", ns) +} + +// SetVersion sets the version, and returns a KVStore to call Put()/Get() +func (b *KvWithVersion) SetVersion(v uint64) KVStore { + kv := KvWithVersion{ + db: b.db, + kvBase: b.kvBase, + versioned: make(map[string]bool), + version: v, + } + for k := range b.versioned { + kv.versioned[k] = true + } + return &kv +} diff --git a/db/kvstore_versioned_test.go b/db/kvstore_versioned_test.go new file mode 100644 index 0000000000..19639c6f31 --- /dev/null +++ b/db/kvstore_versioned_test.go @@ -0,0 +1,494 @@ +// Copyright (c) 2021 IoTeX Foundation +// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no +// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent +// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache +// License 2.0 that can be found in the LICENSE file. + +package db + +import ( + "context" + "math" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/v2/db/batch" + "github.com/iotexproject/iotex-core/v2/testutil" +) + +var ( + _ns = "ns" + _nerr = errors.New("namespace ns is non-versioned") +) + +func TestKVStoreWithVersion(t *testing.T) { + r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-kversion") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewKVStoreWithVersion(cfg, VersionedNamespaceOption(_bucket1, _bucket2)) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() + + // write first key + r.NoError(db.Put(_bucket1, _k2, _v2)) + v, err := db.Get(_bucket1, _k2) + r.NoError(err) + r.Equal(_v2, v) + n, err := db.Version(_bucket1, _k2) + r.NoError(err) + r.Zero(n) + // check more Put/Get + err = db.SetVersion(1).Put(_bucket1, _k10, _v1) + r.Equal("invalid key length, expecting 5, got 6: invalid input", err.Error()) + r.NoError(db.SetVersion(1).Put(_bucket1, _k2, _v1)) + r.NoError(db.SetVersion(1).Put(_bucket2, _k2, _v2)) + r.NoError(db.SetVersion(3).Put(_bucket1, _k2, _v3)) + r.NoError(db.SetVersion(3).Put(_bucket2, _k2, _v1)) + r.NoError(db.SetVersion(6).Put(_bucket1, _k2, _v2)) + r.NoError(db.SetVersion(6).Put(_bucket2, _k2, _v3)) + r.NoError(db.SetVersion(2).Put(_bucket1, _k4, _v2)) + r.NoError(db.SetVersion(4).Put(_bucket1, _k4, _v1)) + r.NoError(db.SetVersion(7).Put(_bucket1, _k4, _v3)) + // non-versioned namespace + r.NoError(db.Put(_ns, _k1, _v1)) + r.NoError(db.Put(_ns, _k2, _v2)) + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v2, 6, nil}, + {_bucket1, _k2, _v2, 7, nil}, // after last write version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v3, 7, nil}, + {_bucket1, _k4, _v3, 8, nil}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 0, ErrNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, nil}, + {_bucket2, _k2, _v2, 2, nil}, + {_bucket2, _k2, _v1, 3, nil}, + {_bucket2, _k2, _v1, 5, nil}, + {_bucket2, _k2, _v3, 6, nil}, + {_bucket2, _k2, _v3, 7, nil}, // after last write version + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 1, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, _v1, 0, nil}, + {_ns, _k2, _v2, 0, nil}, + {_ns, _k3, nil, 0, ErrNotExist}, + {_ns, _k4, nil, 0, ErrNotExist}, + {_ns, _k5, nil, 0, ErrNotExist}, + {_ns, _k10, nil, 0, ErrNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // overwrite the same height again + r.NoError(db.SetVersion(6).Put(_bucket1, _k2, _v4)) + r.NoError(db.SetVersion(6).Put(_bucket2, _k2, _v4)) + r.NoError(db.SetVersion(7).Put(_bucket1, _k4, _v4)) + // write to earlier version again is invalid + r.Equal(ErrInvalid, db.SetVersion(3).Put(_bucket1, _k2, _v4)) + r.Equal(ErrInvalid, db.SetVersion(4).Put(_bucket1, _k4, _v4)) + // write with same value + r.NoError(db.SetVersion(9).Put(_bucket1, _k2, _v4)) + r.NoError(db.SetVersion(10).Put(_bucket1, _k4, _v4)) + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 9, nil}, + {_bucket1, _k2, _v4, 10, nil}, // after last write version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 9, nil}, + {_bucket1, _k4, _v4, 10, nil}, + {_bucket1, _k4, _v4, 11, nil}, // larger than last key in bucket + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 0, ErrNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, nil}, + {_bucket2, _k2, _v2, 2, nil}, + {_bucket2, _k2, _v1, 3, nil}, + {_bucket2, _k2, _v1, 5, nil}, + {_bucket2, _k2, _v4, 6, nil}, + {_bucket2, _k2, _v4, 7, nil}, // after last write version + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 1, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, _v1, 0, nil}, + {_ns, _k2, _v2, 0, nil}, + {_ns, _k3, nil, 0, ErrNotExist}, + {_ns, _k4, nil, 0, ErrNotExist}, + {_ns, _k5, nil, 0, ErrNotExist}, + {_ns, _k10, nil, 0, ErrNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // check version + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 9, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 10, nil}, + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 6, nil}, + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 0, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, nil, 0, _nerr}, + {_ns, _k2, nil, 0, _nerr}, + {_ns, _k3, nil, 0, _nerr}, + {_ns, _k4, nil, 0, _nerr}, + {_ns, _k5, nil, 0, _nerr}, + {_ns, _k10, nil, 0, _nerr}, + } { + value, err := db.Version(e.ns, e.k) + if e.ns == _ns { + r.Equal(e.err.Error(), err.Error()) + } else { + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.height, value) + } + } + // test delete + kv := db.SetVersion(10) + r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket2, _k1))) + for _, k := range [][]byte{_k2, _k4} { + r.NoError(kv.Delete(_bucket1, k)) + } + for _, k := range [][]byte{_k1, _k3, _k5} { + r.Equal(ErrNotExist, errors.Cause(kv.Delete(_bucket1, k))) + } + r.Equal(ErrInvalid, errors.Cause(kv.Delete(_bucket1, _k10))) + // key still can be read before delete version + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 9, nil}, // before delete version + {_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 9, nil}, // before delete version + {_bucket1, _k4, nil, 10, ErrDeleted}, // after delete version + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 0, ErrNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, nil}, + {_bucket2, _k2, _v2, 2, nil}, + {_bucket2, _k2, _v1, 3, nil}, + {_bucket2, _k2, _v1, 5, nil}, + {_bucket2, _k2, _v4, 6, nil}, + {_bucket2, _k2, _v4, 7, nil}, // after last write version + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 1, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, _v1, 0, nil}, + {_ns, _k2, _v2, 0, nil}, + {_ns, _k3, nil, 0, ErrNotExist}, + {_ns, _k4, nil, 0, ErrNotExist}, + {_ns, _k5, nil, 0, ErrNotExist}, + {_ns, _k10, nil, 0, ErrNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // write before delete version is invalid + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(9).Put(_bucket1, _k2, _k2))) + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(9).Put(_bucket1, _k4, _k4))) + for _, e := range []versionTest{ + {_bucket1, _k2, _v4, 9, nil}, // before delete version + {_bucket1, _k2, nil, 10, ErrDeleted}, // after delete version + {_bucket1, _k4, _v4, 9, nil}, // before delete version + {_bucket1, _k4, nil, 10, ErrDeleted}, // after delete version + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // write after delete version + r.NoError(db.SetVersion(10).Put(_bucket1, _k2, _k2)) + r.NoError(db.SetVersion(10).Put(_bucket1, _k4, _k4)) + for _, e := range []versionTest{ + {_bucket2, _k1, nil, 0, ErrNotExist}, // bucket not exist + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _v2, 0, nil}, + {_bucket1, _k2, _v1, 1, nil}, + {_bucket1, _k2, _v1, 2, nil}, + {_bucket1, _k2, _v3, 3, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k2, _v4, 6, nil}, + {_bucket1, _k2, _v4, 8, nil}, + {_bucket1, _k2, _v4, 9, nil}, // before delete version + {_bucket1, _k2, _k2, 10, nil}, // after delete version + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, // before first write version + {_bucket1, _k4, _v2, 2, nil}, + {_bucket1, _k4, _v2, 3, nil}, + {_bucket1, _k4, _v1, 4, nil}, + {_bucket1, _k4, _v1, 6, nil}, + {_bucket1, _k4, _v4, 7, nil}, + {_bucket1, _k4, _v4, 9, nil}, // before delete version + {_bucket1, _k4, _k4, 10, nil}, // after delete version + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 0, ErrNotExist}, // before first write version + {_bucket2, _k2, _v2, 1, nil}, + {_bucket2, _k2, _v2, 2, nil}, + {_bucket2, _k2, _v1, 3, nil}, + {_bucket2, _k2, _v1, 5, nil}, + {_bucket2, _k2, _v4, 6, nil}, + {_bucket2, _k2, _v4, 7, nil}, // after last write version + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 1, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, _v1, 0, nil}, + {_ns, _k2, _v2, 0, nil}, + {_ns, _k3, nil, 0, ErrNotExist}, + {_ns, _k4, nil, 0, ErrNotExist}, + {_ns, _k5, nil, 0, ErrNotExist}, + {_ns, _k10, nil, 0, ErrNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + // check version + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, _k2, 10, nil}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, _k4, 10, nil}, + {_bucket1, _k5, nil, 0, ErrNotExist}, + {_bucket1, _k10, nil, 0, ErrInvalid}, + // bucket2 + {_bucket2, _k1, nil, 0, ErrNotExist}, + {_bucket2, _k2, nil, 6, nil}, + {_bucket2, _k3, nil, 0, ErrNotExist}, + {_bucket2, _k4, nil, 0, ErrNotExist}, + {_bucket2, _k5, nil, 0, ErrNotExist}, + {_bucket2, _k10, nil, 0, ErrInvalid}, + // non-versioned namespace + {_ns, _k1, nil, 0, _nerr}, + {_ns, _k2, nil, 0, _nerr}, + {_ns, _k3, nil, 0, _nerr}, + {_ns, _k4, nil, 0, _nerr}, + {_ns, _k5, nil, 0, _nerr}, + {_ns, _k10, nil, 0, _nerr}, + } { + value, err := db.Version(e.ns, e.k) + if e.ns == _ns { + r.Equal(e.err.Error(), err.Error()) + } else { + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.height, value) + } + } +} + +func TestWriteBatch(t *testing.T) { + r := require.New(t) + testPath, err := testutil.PathOfTempFile("test-version") + r.NoError(err) + defer func() { + testutil.CleanupPath(testPath) + }() + + cfg := DefaultConfig + cfg.DbPath = testPath + db := NewKVStoreWithVersion(cfg, VersionedNamespaceOption(_bucket1, _bucket2)) + ctx := context.Background() + r.NoError(db.Start(ctx)) + defer func() { + db.Stop(ctx) + }() + + b := batch.NewBatch() + for _, e := range []versionTest{ + {_bucket2, _v1, _k1, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k3, 3, nil}, + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v2, 9, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + + r.NoError(db.SetVersion(1).WriteBatch(b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v1, _k1, 2, nil}, + {_bucket2, _v2, _k2, 2, nil}, + {_bucket2, _v3, _k3, 2, nil}, + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k1, _v1, 3, nil}, + {_bucket1, _k2, _v2, 3, nil}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + + // batch with wrong key length would fail + b.Put(_bucket1, _v1, _k1, "test") + r.Equal(ErrInvalid, errors.Cause(db.SetVersion(3).WriteBatch(b))) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 0, nil}, + {_bucket1, _k2, _v3, 9, nil}, + {_bucket1, _k3, _v1, 3, nil}, + {_bucket1, _k4, _v2, 1, nil}, + {_bucket2, _v1, _k3, 0, nil}, + {_bucket2, _v2, _k2, 9, nil}, + {_bucket2, _v3, _k1, 3, nil}, + {_bucket2, _v4, _k4, 1, nil}, + // non-versioned namespace + {_ns, _k1, _v1, 1, nil}, + {_ns, _k2, _v2, 1, nil}, + {_ns, _v3, _k3, 1, nil}, + {_ns, _v4, _k4, 1, nil}, + } { + b.Put(e.ns, e.k, e.v, "test") + } + b.Delete(_bucket1, _k3, "test") + b.Delete(_bucket2, _v3, "test") + b.Delete(_ns, _v3, "test") + + r.NoError(db.SetVersion(5).WriteBatch(b)) + b.Clear() + for _, e := range []versionTest{ + {_bucket1, _k1, nil, 0, ErrNotExist}, + {_bucket1, _k2, nil, 0, ErrNotExist}, + {_bucket1, _k3, nil, 0, ErrNotExist}, + {_bucket1, _k4, nil, 0, ErrNotExist}, + {_bucket2, _v1, nil, 0, ErrNotExist}, + {_bucket2, _v2, nil, 0, ErrNotExist}, + {_bucket2, _v3, nil, 0, ErrNotExist}, + {_bucket2, _v4, nil, 0, ErrNotExist}, + } { + value, err := db.SetVersion(e.height).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 1, nil}, + {_bucket1, _k2, _v2, 1, nil}, + {_bucket1, _k3, nil, 1, ErrNotExist}, + {_bucket1, _k4, nil, 1, ErrNotExist}, + {_bucket2, _v1, _k1, 1, nil}, + {_bucket2, _v2, _k2, 1, nil}, + {_bucket2, _v3, _k3, 1, nil}, + {_bucket2, _v4, nil, 1, ErrNotExist}, + } { + for _, h := range []uint64{1, 2, 3, 4} { + value, err := db.SetVersion(h).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } + for _, e := range []versionTest{ + {_bucket1, _k1, _v1, 5, nil}, + {_bucket1, _k2, _v3, 5, nil}, + {_bucket1, _k3, nil, 5, ErrDeleted}, + {_bucket1, _k4, _v2, 5, nil}, + {_bucket2, _v1, _k3, 5, nil}, + {_bucket2, _v2, _k2, 5, nil}, + {_bucket2, _v3, nil, 5, ErrDeleted}, + {_bucket2, _v4, _k4, 5, nil}, + } { + for _, h := range []uint64{5, 16, 64, 3000, math.MaxUint64} { + value, err := db.SetVersion(h).Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } + } + // non-versioned namespace + for _, e := range []versionTest{ + {_ns, _k1, _v1, 1, nil}, + {_ns, _k2, _v2, 1, nil}, + {_ns, _v3, nil, 1, ErrNotExist}, + {_ns, _v4, _k4, 1, nil}, + } { + value, err := db.Get(e.ns, e.k) + r.Equal(e.err, errors.Cause(err)) + r.Equal(e.v, value) + } +}