diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go new file mode 100644 index 000000000..697561355 --- /dev/null +++ b/store/multiversion/mvkv.go @@ -0,0 +1,268 @@ +package multiversion + +import ( + "io" + "sort" + "sync" + "time" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" + dbm "github.com/tendermint/tm-db" +) + +// Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index +type VersionIndexedStore struct { + mtx sync.Mutex + // used for tracking reads and writes for eventual validation + persistence into multi-version store + readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store) + writeset map[string][]byte // contains the key -> value mapping for all keys written to the store + // TODO: need to add iterateset here as well + + // TODO: do we need this? - I think so? / maybe we just treat `nil` value in the writeset as a delete + deleted *sync.Map + // dirty keys that haven't been sorted yet for iteration + dirtySet map[string]struct{} + // used for iterators - populated at the time of iterator instantiation + // TODO: when we want to perform iteration, we need to move all the dirty keys (writeset and readset) into the sortedTree and then combine with the iterators for the underlying stores + sortedStore *dbm.MemDB // always ascending sorted + // parent stores (both multiversion and underlying parent store) + multiVersionStore MultiVersionStore + parent types.KVStore + // transaction metadata for versioned operations + transactionIndex int + incarnation int + // have abort channel here for aborting transactions + abortChannel chan scheduler.Abort +} + +var _ types.KVStore = (*VersionIndexedStore)(nil) + +func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore { + return &VersionIndexedStore{ + readset: make(map[string][]byte), + writeset: make(map[string][]byte), + deleted: &sync.Map{}, + dirtySet: make(map[string]struct{}), + sortedStore: dbm.NewMemDB(), + parent: parent, + multiVersionStore: multiVersionStore, + transactionIndex: transactionIndex, + incarnation: incarnation, + abortChannel: abortChannel, + } +} + +// GetReadset returns the readset +func (store *VersionIndexedStore) GetReadset() map[string][]byte { + return store.readset +} + +// GetWriteset returns the writeset +func (store *VersionIndexedStore) GetWriteset() map[string][]byte { + return store.writeset +} + +// Get implements types.KVStore. +func (store *VersionIndexedStore) Get(key []byte) []byte { + // first try to get from writeset cache, if cache miss, then try to get from multiversion store, if that misses, then get from parent store + // if the key is in the cache, return it + + // don't have RW mutex because we have to update readset + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "get") + + types.AssertValidKey(key) + strKey := string(key) + // first check the MVKV writeset, and return that value if present + cacheValue, ok := store.writeset[strKey] + if ok { + // return the value from the cache, no need to update any readset stuff + return cacheValue + } + // read the readset to see if the value exists - and return if applicable + if readsetVal, ok := store.readset[strKey]; ok { + return readsetVal + } + + // if we didn't find it, then we want to check the multivalue store + add to readset if applicable + mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key) + if mvsValue != nil { + if mvsValue.IsEstimate() { + store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index()) + return nil + } else { + // This handles both detecting readset conflicts and updating readset if applicable + return store.parseValueAndUpdateReadset(strKey, mvsValue) + } + } + // if we didn't find it in the multiversion store, then we want to check the parent store + add to readset + parentValue := store.parent.Get(key) + store.updateReadSet(key, parentValue) + return parentValue +} + +// This functions handles reads with deleted items and values and verifies that the data is consistent to what we currently have in the readset (IF we have a readset value for that key) +func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsValue MultiVersionValueItem) []byte { + value := mvsValue.Value() + if mvsValue.IsDeleted() { + value = nil + } + store.updateReadSet([]byte(strKey), value) + return value +} + +// This function iterates over the readset, validating that the values in the readset are consistent with the values in the multiversion store and underlying parent store, and returns a boolean indicating validity +func (store *VersionIndexedStore) ValidateReadset() bool { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "validate_readset") + + // sort the readset keys - this is so we have consistent behavior when theres varying conflicts within the readset (eg. read conflict vs estimate) + readsetKeys := make([]string, 0, len(store.readset)) + for key := range store.readset { + readsetKeys = append(readsetKeys, key) + } + sort.Strings(readsetKeys) + + // iterate over readset keys and values + for _, strKey := range readsetKeys { + key := []byte(strKey) + value := store.readset[strKey] + mvsValue := store.multiVersionStore.GetLatestBeforeIndex(store.transactionIndex, key) + if mvsValue != nil { + if mvsValue.IsEstimate() { + // if we see an estimate, that means that we need to abort and rerun + store.abortChannel <- scheduler.NewEstimateAbort(mvsValue.Index()) + return false + } else { + if mvsValue.IsDeleted() { + // check for `nil` + if value != nil { + return false + } + } else { + // check for equality + if string(value) != string(mvsValue.Value()) { + return false + } + } + } + continue // value is valid, continue to next key + } + + parentValue := store.parent.Get(key) + if string(parentValue) != string(value) { + // this shouldnt happen because if we have a conflict it should always happen within multiversion store + panic("we shouldn't ever have a readset conflict in parent store") + } + // value was correct, we can continue to the next value + } + return true +} + +// Delete implements types.KVStore. +func (store *VersionIndexedStore) Delete(key []byte) { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete") + + types.AssertValidKey(key) + store.setValue(key, nil, true, true) +} + +// Has implements types.KVStore. +func (store *VersionIndexedStore) Has(key []byte) bool { + // necessary locking happens within store.Get + return store.Get(key) != nil +} + +// Set implements types.KVStore. +func (store *VersionIndexedStore) Set(key []byte, value []byte) { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set") + + types.AssertValidKey(key) + store.setValue(key, value, false, true) +} + +// Iterator implements types.KVStore. +func (v *VersionIndexedStore) Iterator(start []byte, end []byte) dbm.Iterator { + panic("unimplemented") +} + +// ReverseIterator implements types.KVStore. +func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iterator { + panic("unimplemented") +} + +// GetStoreType implements types.KVStore. +func (v *VersionIndexedStore) GetStoreType() types.StoreType { + return v.parent.GetStoreType() +} + +// CacheWrap implements types.KVStore. +func (*VersionIndexedStore) CacheWrap(storeKey types.StoreKey) types.CacheWrap { + panic("CacheWrap not supported for version indexed store") +} + +// CacheWrapWithListeners implements types.KVStore. +func (*VersionIndexedStore) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + panic("CacheWrapWithListeners not supported for version indexed store") +} + +// CacheWrapWithTrace implements types.KVStore. +func (*VersionIndexedStore) CacheWrapWithTrace(storeKey types.StoreKey, w io.Writer, tc types.TraceContext) types.CacheWrap { + panic("CacheWrapWithTrace not supported for version indexed store") +} + +// GetWorkingHash implements types.KVStore. +func (v *VersionIndexedStore) GetWorkingHash() ([]byte, error) { + panic("should never attempt to get working hash from version indexed store") +} + +// Only entrypoint to mutate writeset +func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirty bool) { + types.AssertValidKey(key) + + keyStr := string(key) + store.writeset[keyStr] = value + if deleted { + store.deleted.Store(keyStr, struct{}{}) + } else { + store.deleted.Delete(keyStr) + } + if dirty { + store.dirtySet[keyStr] = struct{}{} + } +} + +func (store *VersionIndexedStore) WriteToMultiVersionStore() { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") + store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset) +} + +func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() { + store.mtx.Lock() + defer store.mtx.Unlock() + defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs") + store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset) +} + +func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) { + // add to readset + keyStr := string(key) + store.readset[keyStr] = value + // add to dirty set + store.dirtySet[keyStr] = struct{}{} +} + +func (store *VersionIndexedStore) isDeleted(key string) bool { + _, ok := store.deleted.Load(key) + return ok +} diff --git a/store/multiversion/mvkv_test.go b/store/multiversion/mvkv_test.go new file mode 100644 index 000000000..df1692d1f --- /dev/null +++ b/store/multiversion/mvkv_test.go @@ -0,0 +1,250 @@ +package multiversion_test + +import ( + "testing" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" + "github.com/cosmos/cosmos-sdk/store/types" + scheduler "github.com/cosmos/cosmos-sdk/types/occ" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func TestVersionIndexedStoreGetters(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // mock a value in the parent store + parentKVStore.Set([]byte("key1"), []byte("value1")) + + // read key that doesn't exist + val := vis.Get([]byte("key2")) + require.Nil(t, val) + require.False(t, vis.Has([]byte("key2"))) + + // read key that falls down to parent store + val2 := vis.Get([]byte("key1")) + require.Equal(t, []byte("value1"), val2) + require.True(t, vis.Has([]byte("key1"))) + // verify value now in readset + require.Equal(t, []byte("value1"), vis.GetReadset()["key1"]) + + // read the same key that should now be served from the readset (can be verified by setting a different value for the key in the parent store) + parentKVStore.Set([]byte("key1"), []byte("value2")) // realistically shouldn't happen, modifying to verify readset access + val3 := vis.Get([]byte("key1")) + require.True(t, vis.Has([]byte("key1"))) + require.Equal(t, []byte("value1"), val3) + + // test deleted value written to MVS but not parent store + mvs.Delete(0, 2, []byte("delKey")) + parentKVStore.Set([]byte("delKey"), []byte("value4")) + valDel := vis.Get([]byte("delKey")) + require.Nil(t, valDel) + require.False(t, vis.Has([]byte("delKey"))) + + // set different key in MVS - for various indices + mvs.Set(0, 2, []byte("key3"), []byte("value3")) + mvs.Set(2, 1, []byte("key3"), []byte("value4")) + mvs.SetEstimate(5, 0, []byte("key3")) + + // read the key that falls down to MVS + val4 := vis.Get([]byte("key3")) + // should equal value3 because value4 is later than the key in question + require.Equal(t, []byte("value3"), val4) + require.True(t, vis.Has([]byte("key3"))) + + // try a read that falls through to MVS with a later tx index + vis2 := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 3, 2, make(chan scheduler.Abort)) + val5 := vis2.Get([]byte("key3")) + // should equal value3 because value4 is later than the key in question + require.Equal(t, []byte("value4"), val5) + require.True(t, vis2.Has([]byte("key3"))) + + // test estimate values writing to abortChannel + abortChannel := make(chan scheduler.Abort) + vis3 := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 6, 2, abortChannel) + go func() { + vis3.Get([]byte("key3")) + }() + abort := <-abortChannel // read the abort from the channel + require.Equal(t, 5, abort.DependentTxIdx) + require.Equal(t, scheduler.ErrReadEstimate, abort.Err) + + vis.Set([]byte("key4"), []byte("value4")) + // verify proper response for GET + val6 := vis.Get([]byte("key4")) + require.True(t, vis.Has([]byte("key4"))) + require.Equal(t, []byte("value4"), val6) + // verify that its in the writeset + require.Equal(t, []byte("value4"), vis.GetWriteset()["key4"]) + // verify that its not in the readset + require.Nil(t, vis.GetReadset()["key4"]) +} + +func TestVersionIndexedStoreSetters(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // test simple set + vis.Set([]byte("key1"), []byte("value1")) + require.Equal(t, []byte("value1"), vis.GetWriteset()["key1"]) + + mvs.Set(0, 1, []byte("key2"), []byte("value2")) + vis.Delete([]byte("key2")) + require.Nil(t, vis.Get([]byte("key2"))) + // because the delete should be at the writeset level, we should not have populated the readset + require.Zero(t, len(vis.GetReadset())) + + // try setting the value again, and then read + vis.Set([]byte("key2"), []byte("value3")) + require.Equal(t, []byte("value3"), vis.Get([]byte("key2"))) + require.Zero(t, len(vis.GetReadset())) +} + +func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + // asserts panics where appropriate + require.Panics(t, func() { vis.CacheWrap(types.NewKVStoreKey("mock")) }) + require.Panics(t, func() { vis.CacheWrapWithListeners(types.NewKVStoreKey("mock"), nil) }) + require.Panics(t, func() { vis.CacheWrapWithTrace(types.NewKVStoreKey("mock"), nil, nil) }) + require.Panics(t, func() { vis.GetWorkingHash() }) + + // assert properly returns store type + require.Equal(t, types.StoreTypeDB, vis.GetStoreType()) +} + +func TestVersionIndexedStoreWrite(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + mvs.Set(0, 1, []byte("key3"), []byte("value3")) + + require.False(t, mvs.Has(3, []byte("key1"))) + require.False(t, mvs.Has(3, []byte("key2"))) + require.True(t, mvs.Has(3, []byte("key3"))) + + // write some keys + vis.Set([]byte("key1"), []byte("value1")) + vis.Set([]byte("key2"), []byte("value2")) + vis.Delete([]byte("key3")) + + vis.WriteToMultiVersionStore() + + require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value()) + require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value()) + require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted()) +} + +func TestVersionIndexedStoreWriteEstimates(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort)) + + mvs.Set(0, 1, []byte("key3"), []byte("value3")) + + require.False(t, mvs.Has(3, []byte("key1"))) + require.False(t, mvs.Has(3, []byte("key2"))) + require.True(t, mvs.Has(3, []byte("key3"))) + + // write some keys + vis.Set([]byte("key1"), []byte("value1")) + vis.Set([]byte("key2"), []byte("value2")) + vis.Delete([]byte("key3")) + + vis.WriteEstimatesToMultiVersionStore() + + require.True(t, mvs.GetLatest([]byte("key1")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key2")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key3")).IsEstimate()) +} + +func TestVersionIndexedStoreValidation(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000) + mvs := multiversion.NewMultiVersionStore() + // initialize a new VersionIndexedStore + abortC := make(chan scheduler.Abort) + vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC) + // set some initial values + parentKVStore.Set([]byte("key4"), []byte("value4")) + parentKVStore.Set([]byte("key5"), []byte("value5")) + parentKVStore.Set([]byte("deletedKey"), []byte("foo")) + mvs.Set(0, 1, []byte("key1"), []byte("value1")) + mvs.Set(0, 1, []byte("key2"), []byte("value2")) + mvs.Delete(0, 1, []byte("deletedKey")) + + // load those into readset + vis.Get([]byte("key1")) + vis.Get([]byte("key2")) + vis.Get([]byte("key4")) + vis.Get([]byte("key5")) + vis.Get([]byte("keyDNE")) + vis.Get([]byte("deletedKey")) + + // everything checks out, so we should be able to validate successfully + require.True(t, vis.ValidateReadset()) + // modify underlying transaction key that is unrelated + mvs.Set(1, 1, []byte("key3"), []byte("value3")) + // should still have valid readset + require.True(t, vis.ValidateReadset()) + + // modify underlying transaction key that is related + mvs.Set(1, 1, []byte("key1"), []byte("value1_b")) + // should now have invalid readset + require.False(t, vis.ValidateReadset()) + // reset so readset is valid again + mvs.Set(1, 1, []byte("key1"), []byte("value1")) + require.True(t, vis.ValidateReadset()) + + // mvs has a value that was initially read from parent + mvs.Set(1, 2, []byte("key4"), []byte("value4_b")) + require.False(t, vis.ValidateReadset()) + // reset key + mvs.Set(1, 2, []byte("key4"), []byte("value4")) + require.True(t, vis.ValidateReadset()) + + // mvs has a value that was initially read from parent - BUT in a later tx index + mvs.Set(4, 2, []byte("key4"), []byte("value4_c")) + // readset should remain valid + require.True(t, vis.ValidateReadset()) + + // mvs has an estimate + mvs.SetEstimate(1, 2, []byte("key2")) + // readset should be invalid now - but via abort channel write + go func() { + vis.ValidateReadset() + }() + abort := <-abortC // read the abort from the channel + require.Equal(t, 1, abort.DependentTxIdx) + + // test key deleted later + mvs.Delete(1, 1, []byte("key2")) + require.False(t, vis.ValidateReadset()) + // reset key2 + mvs.Set(1, 1, []byte("key2"), []byte("value2")) + + // lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic + parentKVStore.Set([]byte("keyDNE"), []byte("foobar")) + require.Equal(t, []byte("foobar"), parentKVStore.Get([]byte("keyDNE"))) + require.Panics(t, func() { + vis.ValidateReadset() + }) +} diff --git a/types/occ/scheduler.go b/types/occ/scheduler.go new file mode 100644 index 000000000..3905be395 --- /dev/null +++ b/types/occ/scheduler.go @@ -0,0 +1,20 @@ +package scheduler + +import "errors" + +var ( + ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting") +) + +// define the return struct for abort due to conflict +type Abort struct { + DependentTxIdx int + Err error +} + +func NewEstimateAbort(dependentTxIdx int) Abort { + return Abort{ + DependentTxIdx: dependentTxIdx, + Err: ErrReadEstimate, + } +}