Skip to content

Commit

Permalink
[occ] Add validation function for transaction state to multiversionst…
Browse files Browse the repository at this point in the history
…ore (#330)

## Describe your changes and provide context
This adds in validation for transaction state to multiversion store, and
implements readset validation for it as well.

## Testing performed to validate your change
Unit Test
  • Loading branch information
udpatil committed Jan 2, 2024
1 parent 03069b6 commit ac4609d
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 76 deletions.
98 changes: 73 additions & 25 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestVersionIndexedStoreGetters(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

Expand All @@ -41,16 +41,25 @@ func TestVersionIndexedStoreGetters(t *testing.T) {
require.Equal(t, []byte("value1"), val3)

// test deleted value written to MVS but not parent store
mvs.Delete(0, 2, []byte("delKey"))
mvs.SetWriteset(0, 2, map[string][]byte{
"delKey": nil,
})
parentKVStore.Set([]byte("delKey"), []byte("value4"))
valDel := vis.Get([]byte("delKey"))
require.Nil(t, valDel)
require.False(t, vis.Has([]byte("delKey")))

// set different key in MVS - for various indices
mvs.Set(0, 2, []byte("key3"), []byte("value3"))
mvs.Set(2, 1, []byte("key3"), []byte("value4"))
mvs.SetEstimate(5, 0, []byte("key3"))
mvs.SetWriteset(0, 2, map[string][]byte{
"delKey": nil,
"key3": []byte("value3"),
})
mvs.SetWriteset(2, 1, map[string][]byte{
"key3": []byte("value4"),
})
mvs.SetEstimatedWriteset(5, 0, map[string][]byte{
"key3": nil,
})

// read the key that falls down to MVS
val4 := vis.Get([]byte("key3"))
Expand Down Expand Up @@ -89,15 +98,17 @@ func TestVersionIndexedStoreGetters(t *testing.T) {
func TestVersionIndexedStoreSetters(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

// test simple set
vis.Set([]byte("key1"), []byte("value1"))
require.Equal(t, []byte("value1"), vis.GetWriteset()["key1"])

mvs.Set(0, 1, []byte("key2"), []byte("value2"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key2": []byte("value2"),
})
vis.Delete([]byte("key2"))
require.Nil(t, vis.Get([]byte("key2")))
// because the delete should be at the writeset level, we should not have populated the readset
Expand All @@ -112,7 +123,7 @@ func TestVersionIndexedStoreSetters(t *testing.T) {
func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

Expand All @@ -129,11 +140,13 @@ func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) {
func TestVersionIndexedStoreWrite(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
Expand All @@ -154,11 +167,13 @@ func TestVersionIndexedStoreWrite(t *testing.T) {
func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
Expand All @@ -179,17 +194,20 @@ func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
func TestVersionIndexedStoreValidation(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
abortC := make(chan scheduler.Abort)
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC)
// set some initial values
parentKVStore.Set([]byte("key4"), []byte("value4"))
parentKVStore.Set([]byte("key5"), []byte("value5"))
parentKVStore.Set([]byte("deletedKey"), []byte("foo"))
mvs.Set(0, 1, []byte("key1"), []byte("value1"))
mvs.Set(0, 1, []byte("key2"), []byte("value2"))
mvs.Delete(0, 1, []byte("deletedKey"))

mvs.SetWriteset(0, 1, map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
"deletedKey": nil,
})

// load those into readset
vis.Get([]byte("key1"))
Expand All @@ -202,32 +220,52 @@ func TestVersionIndexedStoreValidation(t *testing.T) {
// everything checks out, so we should be able to validate successfully
require.True(t, vis.ValidateReadset())
// modify underlying transaction key that is unrelated
mvs.Set(1, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
})
// should still have valid readset
require.True(t, vis.ValidateReadset())

// modify underlying transaction key that is related
mvs.Set(1, 1, []byte("key1"), []byte("value1_b"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1_b"),
})
// should now have invalid readset
require.False(t, vis.ValidateReadset())
// reset so readset is valid again
mvs.Set(1, 1, []byte("key1"), []byte("value1"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
})
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent
mvs.Set(1, 2, []byte("key4"), []byte("value4_b"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4_b"),
})
require.False(t, vis.ValidateReadset())
// reset key
mvs.Set(1, 2, []byte("key4"), []byte("value4"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
})
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent - BUT in a later tx index
mvs.Set(4, 2, []byte("key4"), []byte("value4_c"))
mvs.SetWriteset(4, 2, map[string][]byte{
"key4": []byte("value4_c"),
})
// readset should remain valid
require.True(t, vis.ValidateReadset())

// mvs has an estimate
mvs.SetEstimate(1, 2, []byte("key2"))
mvs.SetEstimatedWriteset(1, 1, map[string][]byte{
"key2": nil,
})
// readset should be invalid now - but via abort channel write
go func() {
vis.ValidateReadset()
Expand All @@ -236,10 +274,20 @@ func TestVersionIndexedStoreValidation(t *testing.T) {
require.Equal(t, 1, abort.DependentTxIdx)

// test key deleted later
mvs.Delete(1, 1, []byte("key2"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
"key2": nil,
})
require.False(t, vis.ValidateReadset())
// reset key2
mvs.Set(1, 1, []byte("key2"), []byte("value2"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
"key2": []byte("value2"),
})

// lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic
parentKVStore.Set([]byte("keyDNE"), []byte("foobar"))
Expand Down
98 changes: 67 additions & 31 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package multiversion

import (
"bytes"
"sort"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
)

type MultiVersionStore interface {
GetLatest(key []byte) (value MultiVersionValueItem)
GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem)
Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets
SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Has(index int, key []byte) bool
WriteLatestToStore(parentStore types.KVStore)
WriteLatestToStore()
SetWriteset(index int, incarnation int, writeset WriteSet)
InvalidateWriteset(index int, incarnation int)
SetEstimatedWriteset(index int, incarnation int, writeset WriteSet)
GetAllWritesetKeys() map[int][]string
SetReadset(index int, readset ReadSet)
GetReadset(index int) ReadSet
ValidateTransactionState(index int) []int
}

type WriteSet map[string][]byte
type ReadSet map[string][]byte

var _ MultiVersionStore = (*Store)(nil)

Expand All @@ -32,12 +36,17 @@ type Store struct {
// TODO: do we need to support iterators as well similar to how cachekv does it - yes

txWritesetKeys map[int][]string // map of tx index -> writeset keys
txReadSets map[int]ReadSet

parentStore types.KVStore
}

func NewMultiVersionStore() *Store {
func NewMultiVersionStore(parentStore types.KVStore) *Store {
return &Store{
multiVersionMap: make(map[string]MultiVersionValue),
txWritesetKeys: make(map[int][]string),
txReadSets: make(map[int]ReadSet),
parentStore: parentStore,
}
}

Expand Down Expand Up @@ -99,16 +108,6 @@ func (s *Store) tryInitMultiVersionItem(keyString string) {
}
}

// Set implements MultiVersionStore.
func (s *Store) Set(index int, incarnation int, key []byte, value []byte) {
s.mtx.Lock()
defer s.mtx.Unlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].Set(index, incarnation, value)
}

func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
writeset := make(map[string][]byte)
if newWriteSet != nil {
Expand All @@ -135,6 +134,7 @@ func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
}

// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store.
// TODO: returns a list of NEW keys added
func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -153,7 +153,7 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
s.multiVersionMap[key].Set(index, incarnation, value)
}
}
sort.Strings(writeSetKeys)
sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice
s.txWritesetKeys[index] = writeSetKeys
}

Expand Down Expand Up @@ -198,27 +198,63 @@ func (s *Store) GetAllWritesetKeys() map[int][]string {
return s.txWritesetKeys
}

// SetEstimate implements MultiVersionStore.
func (s *Store) SetEstimate(index int, incarnation int, key []byte) {
func (s *Store) SetReadset(index int, readset ReadSet) {
s.mtx.Lock()
defer s.mtx.Unlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].SetEstimate(index, incarnation)
s.txReadSets[index] = readset
}

// Delete implements MultiVersionStore.
func (s *Store) Delete(index int, incarnation int, key []byte) {
s.mtx.Lock()
defer s.mtx.Unlock()
func (s *Store) GetReadset(index int) ReadSet {
s.mtx.RLock()
defer s.mtx.RUnlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].Delete(index, incarnation)
return s.txReadSets[index]
}

func (s *Store) ValidateTransactionState(index int) []int {
defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate")
conflictSet := map[int]struct{}{}

// validate readset
readset := s.GetReadset(index)
// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
for key, value := range readset {
// get the latest value from the multiversion store
latestValue := s.GetLatestBeforeIndex(index, []byte(key))
if latestValue == nil {
// TODO: maybe we don't even do this check?
parentVal := s.parentStore.Get([]byte(key))
if !bytes.Equal(parentVal, value) {
panic("there shouldn't be readset conflicts with parent kv store, since it shouldn't change")
}
} else {
// if estimate, mark as conflict index
if latestValue.IsEstimate() {
conflictSet[latestValue.Index()] = struct{}{}
} else if latestValue.IsDeleted() {
if value != nil {
// conflict
conflictSet[latestValue.Index()] = struct{}{}
}
} else if !bytes.Equal(latestValue.Value(), value) {
conflictSet[latestValue.Index()] = struct{}{}
}
}
}
// TODO: validate iterateset

// convert conflictset into sorted indices
conflictIndices := make([]int, 0, len(conflictSet))
for index := range conflictSet {
conflictIndices = append(conflictIndices, index)
}

sort.Ints(conflictIndices)
return conflictIndices
}

func (s *Store) WriteLatestToStore(parentStore types.KVStore) {
func (s *Store) WriteLatestToStore() {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -245,11 +281,11 @@ func (s *Store) WriteLatestToStore(parentStore types.KVStore) {
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
parentStore.Delete([]byte(key))
s.parentStore.Delete([]byte(key))
continue
}
if mvValue.Value() != nil {
parentStore.Set([]byte(key), mvValue.Value())
s.parentStore.Set([]byte(key), mvValue.Value())
}
}
}
Loading

0 comments on commit ac4609d

Please sign in to comment.