diff --git a/store/multiversion/data_structures.go b/store/multiversion/data_structures.go index c4ca7b995..cba10d0f4 100644 --- a/store/multiversion/data_structures.go +++ b/store/multiversion/data_structures.go @@ -14,10 +14,12 @@ const ( type MultiVersionValue interface { GetLatest() (value MultiVersionValueItem, found bool) + GetLatestNonEstimate() (value MultiVersionValueItem, found bool) GetLatestBeforeIndex(index int) (value MultiVersionValueItem, found bool) Set(index int, incarnation int, value []byte) SetEstimate(index int, incarnation int) Delete(index int, incarnation int) + Remove(index int) } type MultiVersionValueItem interface { @@ -42,8 +44,6 @@ func NewMultiVersionItem() *multiVersionItem { } // GetLatest returns the latest written value to the btree, and returns a boolean indicating whether it was found. -// -// A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { item.mtx.RLock() defer item.mtx.RUnlock() @@ -56,6 +56,29 @@ func (item *multiVersionItem) GetLatest() (MultiVersionValueItem, bool) { return valueItem, true } +// GetLatestNonEstimate returns the latest written value that isn't an ESTIMATE and returns a boolean indicating whether it was found. +// This can be used when we want to write finalized values, since ESTIMATEs can be considered to be irrelevant at that point +func (item *multiVersionItem) GetLatestNonEstimate() (MultiVersionValueItem, bool) { + item.mtx.RLock() + defer item.mtx.RUnlock() + + var vItem *valueItem + var found bool + item.valueTree.Descend(func(bTreeItem btree.Item) bool { + // only return if non-estimate + item := bTreeItem.(*valueItem) + if item.IsEstimate() { + // if estimate, continue + return true + } + // else we want to return + vItem = item + found = true + return false + }) + return vItem, found +} + // GetLatest returns the latest written value to the btree prior to the index passed in, and returns a boolean indicating whether it was found. // // A `nil` value along with `found=true` indicates a deletion that has occurred and the underlying parent store doesn't need to be hit. @@ -95,6 +118,13 @@ func (item *multiVersionItem) Delete(index int, incarnation int) { item.valueTree.ReplaceOrInsert(deletedItem) } +func (item *multiVersionItem) Remove(index int) { + item.mtx.Lock() + defer item.mtx.Unlock() + + item.valueTree.Delete(&valueItem{index: index}) +} + func (item *multiVersionItem) SetEstimate(index int, incarnation int) { item.mtx.Lock() defer item.mtx.Unlock() diff --git a/store/multiversion/data_structures_test.go b/store/multiversion/data_structures_test.go index 31696d366..fccc26a8b 100644 --- a/store/multiversion/data_structures_test.go +++ b/store/multiversion/data_structures_test.go @@ -198,3 +198,31 @@ func TestMultiversionItemEstimate(t *testing.T) { require.True(t, found) require.Equal(t, one, value.Value()) } + +func TestMultiversionItemRemove(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.Set(1, 0, []byte("one")) + mvItem.Set(2, 0, []byte("two")) + + mvItem.Remove(2) + value, found := mvItem.GetLatest() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) +} + +func TestMultiversionItemGetLatestNonEstimate(t *testing.T) { + mvItem := mv.NewMultiVersionItem() + + mvItem.SetEstimate(3, 0) + + value, found := mvItem.GetLatestNonEstimate() + require.False(t, found) + require.Nil(t, value) + + mvItem.Set(1, 0, []byte("one")) + value, found = mvItem.GetLatestNonEstimate() + require.True(t, found) + require.Equal(t, []byte("one"), value.Value()) + +} diff --git a/store/multiversion/store.go b/store/multiversion/store.go index b52c6af1a..3aa4800f3 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -1,31 +1,43 @@ package multiversion import ( + "sort" "sync" + + "github.com/cosmos/cosmos-sdk/store/types" ) type MultiVersionStore interface { GetLatest(key []byte) (value MultiVersionValueItem) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) - Set(index int, incarnation int, key []byte, value []byte) - SetEstimate(index int, incarnation int, key []byte) - Delete(index int, incarnation int, key []byte) + Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets + SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets + Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets Has(index int, key []byte) bool - // TODO: do we want to add helper functions for validations with readsets / applying writesets ? + WriteLatestToStore(parentStore types.KVStore) + SetWriteset(index int, incarnation int, writeset WriteSet) + InvalidateWriteset(index int, incarnation int) + SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) + GetAllWritesetKeys() map[int][]string } +type WriteSet map[string][]byte + +var _ MultiVersionStore = (*Store)(nil) + type Store struct { mtx sync.RWMutex // map that stores the key -> MultiVersionValue mapping for accessing from a given key multiVersionMap map[string]MultiVersionValue - // TODO: do we need to add something here to persist readsets for later validation - // TODO: we need to support iterators as well similar to how cachekv does it - // TODO: do we need secondary indexing on index -> keys - this way if we need to abort we can replace those keys with ESTIMATE values? - maybe this just means storing writeset + // TODO: do we need to support iterators as well similar to how cachekv does it - yes + + txWritesetKeys map[int][]string // map of tx index -> writeset keys } func NewMultiVersionStore() *Store { return &Store{ multiVersionMap: make(map[string]MultiVersionValue), + txWritesetKeys: make(map[int][]string), } } @@ -41,7 +53,7 @@ func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) { } val, found := s.multiVersionMap[keyString].GetLatest() if !found { - return nil // this shouldn't be possible + return nil // this is possible IF there is are writeset that are then removed for that key } return val } @@ -97,6 +109,95 @@ func (s *Store) Set(index int, incarnation int, key []byte, value []byte) { s.multiVersionMap[keyString].Set(index, incarnation, value) } +func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) { + writeset := make(map[string][]byte) + if newWriteSet != nil { + // if non-nil writeset passed in, we can use that to optimize removals + writeset = newWriteSet + } + // if there is already a writeset existing, we should remove that fully + if keys, ok := s.txWritesetKeys[index]; ok { + // we need to delete all of the keys in the writeset from the multiversion store + for _, key := range keys { + // small optimization to check if the new writeset is going to write this key, if so, we can leave it behind + if _, ok := writeset[key]; ok { + // we don't need to remove this key because it will be overwritten anyways - saves the operation of removing + rebalancing underlying btree + continue + } + // remove from the appropriate item if present in multiVersionMap + if val, ok := s.multiVersionMap[key]; ok { + val.Remove(index) + } + } + } + // unset the writesetKeys for this index + delete(s.txWritesetKeys, index) +} + +// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store. +func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) + for key, value := range writeset { + writeSetKeys = append(writeSetKeys, key) + s.tryInitMultiVersionItem(key) + if value == nil { + // delete if nil value + s.multiVersionMap[key].Delete(index, incarnation) + } else { + s.multiVersionMap[key].Set(index, incarnation, value) + } + } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys +} + +// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs +func (s *Store) InvalidateWriteset(index int, incarnation int) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if keys, ok := s.txWritesetKeys[index]; ok { + for _, key := range keys { + // invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level + s.tryInitMultiVersionItem(key) // this SHOULD no-op because we're invalidating existing keys + s.multiVersionMap[key].SetEstimate(index, incarnation) + } + } + // we leave the writeset in place because we'll need it for key removal later if/when we replace with a new writeset +} + +// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating +func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteSet) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // remove old writeset if it exists + s.removeOldWriteset(index, writeset) + + writeSetKeys := make([]string, 0, len(writeset)) + // still need to save the writeset so we can remove the elements later: + for key := range writeset { + writeSetKeys = append(writeSetKeys, key) + s.tryInitMultiVersionItem(key) + s.multiVersionMap[key].SetEstimate(index, incarnation) + } + sort.Strings(writeSetKeys) + s.txWritesetKeys[index] = writeSetKeys +} + +// GetWritesetKeys implements MultiVersionStore. +func (s *Store) GetAllWritesetKeys() map[int][]string { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.txWritesetKeys +} + // SetEstimate implements MultiVersionStore. func (s *Store) SetEstimate(index int, incarnation int, key []byte) { s.mtx.Lock() @@ -117,4 +218,38 @@ func (s *Store) Delete(index int, incarnation int, key []byte) { s.multiVersionMap[keyString].Delete(index, incarnation) } -var _ MultiVersionStore = (*Store)(nil) +func (s *Store) WriteLatestToStore(parentStore types.KVStore) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // sort the keys + keys := make([]string, 0, len(s.multiVersionMap)) + for key := range s.multiVersionMap { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + mvValue, found := s.multiVersionMap[key].GetLatestNonEstimate() + if !found { + // this means that at some point, there was an estimate, but we have since removed it so there isn't anything writeable at the key, so we can skip + continue + } + // we shouldn't have any ESTIMATE values when performing the write, because we read the latest non-estimate values only + if mvValue.IsEstimate() { + panic("should not have any estimate values when writing to parent store") + } + // if the value is deleted, then delete it from the parent store + if mvValue.IsDeleted() { + // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot + // be sure if the underlying store might do a save with the byteslice or + // not. Once we get confirmation that .Delete is guaranteed not to + // save the byteslice, then we can assume only a read-only copy is sufficient. + parentStore.Delete([]byte(key)) + continue + } + if mvValue.Value() != nil { + parentStore.Set([]byte(key), mvValue.Value()) + } + } +} diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 91465c435..732a5a6ba 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -3,8 +3,10 @@ package multiversion_test import ( "testing" + "github.com/cosmos/cosmos-sdk/store/dbadapter" "github.com/cosmos/cosmos-sdk/store/multiversion" "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" ) func TestMultiVersionStore(t *testing.T) { @@ -52,3 +54,89 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) { require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1"))) require.False(t, store.Has(0, []byte("key1"))) } + +func TestMultiVersionStoreWriteToParent(t *testing.T) { + // initialize cachekv store + parentKVStore := dbadapter.Store{DB: dbm.NewMemDB()} + mvs := multiversion.NewMultiVersionStore() + + parentKVStore.Set([]byte("key2"), []byte("value0")) + parentKVStore.Set([]byte("key4"), []byte("value4")) + + mvs.Set(1, 1, []byte("key1"), []byte("value1")) + mvs.Set(2, 1, []byte("key1"), []byte("value2")) + mvs.Set(3, 1, []byte("key2"), []byte("value3")) + mvs.Delete(1, 1, []byte("key3")) + mvs.Delete(1, 1, []byte("key4")) + + mvs.WriteLatestToStore(parentKVStore) + + // assert state in parent store + require.Equal(t, []byte("value2"), parentKVStore.Get([]byte("key1"))) + require.Equal(t, []byte("value3"), parentKVStore.Get([]byte("key2"))) + require.False(t, parentKVStore.Has([]byte("key3"))) + require.False(t, parentKVStore.Has([]byte("key4"))) + + // verify no-op if mvs contains ESTIMATE + mvs.SetEstimate(1, 2, []byte("key5")) + mvs.WriteLatestToStore(parentKVStore) + require.False(t, parentKVStore.Has([]byte("key5"))) +} + +func TestMultiVersionStoreWritesetSetAndInvalidate(t *testing.T) { + mvs := multiversion.NewMultiVersionStore() + + writeset := make(map[string][]byte) + writeset["key1"] = []byte("value1") + writeset["key2"] = []byte("value2") + writeset["key3"] = nil + + mvs.SetWriteset(1, 2, writeset) + require.Equal(t, []byte("value1"), mvs.GetLatest([]byte("key1")).Value()) + require.Equal(t, []byte("value2"), mvs.GetLatest([]byte("key2")).Value()) + require.True(t, mvs.GetLatest([]byte("key3")).IsDeleted()) + + writeset2 := make(map[string][]byte) + writeset2["key1"] = []byte("value3") + + mvs.SetWriteset(2, 1, writeset2) + require.Equal(t, []byte("value3"), mvs.GetLatest([]byte("key1")).Value()) + + // invalidate writeset1 + mvs.InvalidateWriteset(1, 2) + + // verify estimates + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key1")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key2")).IsEstimate()) + require.True(t, mvs.GetLatestBeforeIndex(2, []byte("key3")).IsEstimate()) + + // third writeset + writeset3 := make(map[string][]byte) + writeset3["key4"] = []byte("foo") + writeset3["key5"] = nil + + // write the writeset directly as estimate + mvs.SetEstimatedWriteset(3, 1, writeset3) + + require.True(t, mvs.GetLatest([]byte("key4")).IsEstimate()) + require.True(t, mvs.GetLatest([]byte("key5")).IsEstimate()) + + // try replacing writeset1 to verify old keys removed + writeset1_b := make(map[string][]byte) + writeset1_b["key1"] = []byte("value4") + + mvs.SetWriteset(1, 2, writeset1_b) + require.Equal(t, []byte("value4"), mvs.GetLatestBeforeIndex(2, []byte("key1")).Value()) + require.Nil(t, mvs.GetLatestBeforeIndex(2, []byte("key2"))) + // verify that GetLatest for key3 returns nil - because of removal from writeset + require.Nil(t, mvs.GetLatest([]byte("key3"))) + + // verify output for GetAllWritesetKeys + writesetKeys := mvs.GetAllWritesetKeys() + // we have 3 writesets + require.Equal(t, 3, len(writesetKeys)) + require.Equal(t, []string{"key1"}, writesetKeys[1]) + require.Equal(t, []string{"key1"}, writesetKeys[2]) + require.Equal(t, []string{"key4", "key5"}, writesetKeys[3]) + +}