Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Add validation function for transaction state to multiversionstore #330

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 73 additions & 25 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestVersionIndexedStoreGetters(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

Expand All @@ -41,16 +41,25 @@ func TestVersionIndexedStoreGetters(t *testing.T) {
require.Equal(t, []byte("value1"), val3)

// test deleted value written to MVS but not parent store
mvs.Delete(0, 2, []byte("delKey"))
mvs.SetWriteset(0, 2, map[string][]byte{
"delKey": nil,
})
parentKVStore.Set([]byte("delKey"), []byte("value4"))
valDel := vis.Get([]byte("delKey"))
require.Nil(t, valDel)
require.False(t, vis.Has([]byte("delKey")))

// set different key in MVS - for various indices
mvs.Set(0, 2, []byte("key3"), []byte("value3"))
mvs.Set(2, 1, []byte("key3"), []byte("value4"))
mvs.SetEstimate(5, 0, []byte("key3"))
mvs.SetWriteset(0, 2, map[string][]byte{
"delKey": nil,
"key3": []byte("value3"),
})
mvs.SetWriteset(2, 1, map[string][]byte{
"key3": []byte("value4"),
})
mvs.SetEstimatedWriteset(5, 0, map[string][]byte{
"key3": nil,
})

// read the key that falls down to MVS
val4 := vis.Get([]byte("key3"))
Expand Down Expand Up @@ -89,15 +98,17 @@ func TestVersionIndexedStoreGetters(t *testing.T) {
func TestVersionIndexedStoreSetters(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

// test simple set
vis.Set([]byte("key1"), []byte("value1"))
require.Equal(t, []byte("value1"), vis.GetWriteset()["key1"])

mvs.Set(0, 1, []byte("key2"), []byte("value2"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key2": []byte("value2"),
})
vis.Delete([]byte("key2"))
require.Nil(t, vis.Get([]byte("key2")))
// because the delete should be at the writeset level, we should not have populated the readset
Expand All @@ -112,7 +123,7 @@ func TestVersionIndexedStoreSetters(t *testing.T) {
func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

Expand All @@ -129,11 +140,13 @@ func TestVersionIndexedStoreBoilerplateFunctions(t *testing.T) {
func TestVersionIndexedStoreWrite(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
Expand All @@ -154,11 +167,13 @@ func TestVersionIndexedStoreWrite(t *testing.T) {
func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 1, 2, make(chan scheduler.Abort))

mvs.Set(0, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(0, 1, map[string][]byte{
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
Expand All @@ -179,17 +194,20 @@ func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
func TestVersionIndexedStoreValidation(t *testing.T) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
parentKVStore := cachekv.NewStore(mem, types.NewKVStoreKey("mock"), 1000)
mvs := multiversion.NewMultiVersionStore()
mvs := multiversion.NewMultiVersionStore(parentKVStore)
// initialize a new VersionIndexedStore
abortC := make(chan scheduler.Abort)
vis := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 2, abortC)
// set some initial values
parentKVStore.Set([]byte("key4"), []byte("value4"))
parentKVStore.Set([]byte("key5"), []byte("value5"))
parentKVStore.Set([]byte("deletedKey"), []byte("foo"))
mvs.Set(0, 1, []byte("key1"), []byte("value1"))
mvs.Set(0, 1, []byte("key2"), []byte("value2"))
mvs.Delete(0, 1, []byte("deletedKey"))

mvs.SetWriteset(0, 1, map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
"deletedKey": nil,
})

// load those into readset
vis.Get([]byte("key1"))
Expand All @@ -202,32 +220,52 @@ func TestVersionIndexedStoreValidation(t *testing.T) {
// everything checks out, so we should be able to validate successfully
require.True(t, vis.ValidateReadset())
// modify underlying transaction key that is unrelated
mvs.Set(1, 1, []byte("key3"), []byte("value3"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
})
// should still have valid readset
require.True(t, vis.ValidateReadset())

// modify underlying transaction key that is related
mvs.Set(1, 1, []byte("key1"), []byte("value1_b"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1_b"),
})
// should now have invalid readset
require.False(t, vis.ValidateReadset())
// reset so readset is valid again
mvs.Set(1, 1, []byte("key1"), []byte("value1"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
})
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent
mvs.Set(1, 2, []byte("key4"), []byte("value4_b"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4_b"),
})
require.False(t, vis.ValidateReadset())
// reset key
mvs.Set(1, 2, []byte("key4"), []byte("value4"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
})
require.True(t, vis.ValidateReadset())

// mvs has a value that was initially read from parent - BUT in a later tx index
mvs.Set(4, 2, []byte("key4"), []byte("value4_c"))
mvs.SetWriteset(4, 2, map[string][]byte{
"key4": []byte("value4_c"),
})
// readset should remain valid
require.True(t, vis.ValidateReadset())

// mvs has an estimate
mvs.SetEstimate(1, 2, []byte("key2"))
mvs.SetEstimatedWriteset(1, 1, map[string][]byte{
"key2": nil,
})
// readset should be invalid now - but via abort channel write
go func() {
vis.ValidateReadset()
Expand All @@ -236,10 +274,20 @@ func TestVersionIndexedStoreValidation(t *testing.T) {
require.Equal(t, 1, abort.DependentTxIdx)

// test key deleted later
mvs.Delete(1, 1, []byte("key2"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
"key2": nil,
})
require.False(t, vis.ValidateReadset())
// reset key2
mvs.Set(1, 1, []byte("key2"), []byte("value2"))
mvs.SetWriteset(1, 1, map[string][]byte{
"key3": []byte("value3"),
"key1": []byte("value1"),
"key4": []byte("value4"),
"key2": []byte("value2"),
})

// lastly verify panic if parent kvstore has a conflict - this shouldn't happen but lets assert that it would panic
parentKVStore.Set([]byte("keyDNE"), []byte("foobar"))
Expand Down
98 changes: 67 additions & 31 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package multiversion

import (
"bytes"
"sort"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
)

type MultiVersionStore interface {
GetLatest(key []byte) (value MultiVersionValueItem)
GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem)
Set(index int, incarnation int, key []byte, value []byte) // TODO: maybe we don't need these if all writes are coming from writesets
SetEstimate(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Delete(index int, incarnation int, key []byte) // TODO: maybe we don't need these if all writes are coming from writesets
Has(index int, key []byte) bool
WriteLatestToStore(parentStore types.KVStore)
WriteLatestToStore()
SetWriteset(index int, incarnation int, writeset WriteSet)
InvalidateWriteset(index int, incarnation int)
SetEstimatedWriteset(index int, incarnation int, writeset WriteSet)
GetAllWritesetKeys() map[int][]string
SetReadset(index int, readset ReadSet)
GetReadset(index int) ReadSet
ValidateTransactionState(index int) []int
}

type WriteSet map[string][]byte
type ReadSet map[string][]byte

var _ MultiVersionStore = (*Store)(nil)

Expand All @@ -32,12 +36,17 @@ type Store struct {
// TODO: do we need to support iterators as well similar to how cachekv does it - yes

txWritesetKeys map[int][]string // map of tx index -> writeset keys
txReadSets map[int]ReadSet

parentStore types.KVStore
}

func NewMultiVersionStore() *Store {
func NewMultiVersionStore(parentStore types.KVStore) *Store {
return &Store{
multiVersionMap: make(map[string]MultiVersionValue),
txWritesetKeys: make(map[int][]string),
txReadSets: make(map[int]ReadSet),
parentStore: parentStore,
}
}

Expand Down Expand Up @@ -99,16 +108,6 @@ func (s *Store) tryInitMultiVersionItem(keyString string) {
}
}

// Set implements MultiVersionStore.
func (s *Store) Set(index int, incarnation int, key []byte, value []byte) {
s.mtx.Lock()
defer s.mtx.Unlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].Set(index, incarnation, value)
}

func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
writeset := make(map[string][]byte)
if newWriteSet != nil {
Expand All @@ -135,6 +134,7 @@ func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
}

// SetWriteset sets a writeset for a transaction index, and also writes all of the multiversion items in the writeset to the multiversion store.
// TODO: returns a list of NEW keys added
func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -153,7 +153,7 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
s.multiVersionMap[key].Set(index, incarnation, value)
}
}
sort.Strings(writeSetKeys)
sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice
s.txWritesetKeys[index] = writeSetKeys
}

Expand Down Expand Up @@ -198,27 +198,63 @@ func (s *Store) GetAllWritesetKeys() map[int][]string {
return s.txWritesetKeys
}

// SetEstimate implements MultiVersionStore.
func (s *Store) SetEstimate(index int, incarnation int, key []byte) {
func (s *Store) SetReadset(index int, readset ReadSet) {
s.mtx.Lock()
defer s.mtx.Unlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].SetEstimate(index, incarnation)
s.txReadSets[index] = readset
}

// Delete implements MultiVersionStore.
func (s *Store) Delete(index int, incarnation int, key []byte) {
s.mtx.Lock()
defer s.mtx.Unlock()
func (s *Store) GetReadset(index int) ReadSet {
s.mtx.RLock()
defer s.mtx.RUnlock()

keyString := string(key)
s.tryInitMultiVersionItem(keyString)
s.multiVersionMap[keyString].Delete(index, incarnation)
return s.txReadSets[index]
}

func (s *Store) ValidateTransactionState(index int) []int {
defer telemetry.MeasureSince(time.Now(), "store", "mvs", "validate")
conflictSet := map[int]struct{}{}

// validate readset
readset := s.GetReadset(index)
// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
for key, value := range readset {
// get the latest value from the multiversion store
latestValue := s.GetLatestBeforeIndex(index, []byte(key))
if latestValue == nil {
// TODO: maybe we don't even do this check?
parentVal := s.parentStore.Get([]byte(key))
if !bytes.Equal(parentVal, value) {
panic("there shouldn't be readset conflicts with parent kv store, since it shouldn't change")
}
} else {
// if estimate, mark as conflict index
if latestValue.IsEstimate() {
conflictSet[latestValue.Index()] = struct{}{}
} else if latestValue.IsDeleted() {
if value != nil {
// conflict
conflictSet[latestValue.Index()] = struct{}{}
}
} else if !bytes.Equal(latestValue.Value(), value) {
conflictSet[latestValue.Index()] = struct{}{}
}
}
}
// TODO: validate iterateset

// convert conflictset into sorted indices
conflictIndices := make([]int, 0, len(conflictSet))
for index := range conflictSet {
conflictIndices = append(conflictIndices, index)
}

sort.Ints(conflictIndices)
return conflictIndices
}

func (s *Store) WriteLatestToStore(parentStore types.KVStore) {
func (s *Store) WriteLatestToStore() {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -245,11 +281,11 @@ func (s *Store) WriteLatestToStore(parentStore types.KVStore) {
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
parentStore.Delete([]byte(key))
s.parentStore.Delete([]byte(key))
continue
}
if mvValue.Value() != nil {
parentStore.Set([]byte(key), mvValue.Value())
s.parentStore.Set([]byte(key), mvValue.Value())
}
}
}
Loading
Loading