Skip to content

Commit

Permalink
Occ iterator fix (#389)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This change serves to improve the way we track the values of the keys we
iterate over when running iterators. Previously, the iterateset would
only track the keys that were iterated, and the behavior of the iterator
was thought to not include keys that didn't have values present, OR that
the readset would be appropriately updated when reading the value from
the iterateset. (I'm not yet 100% sure that updating readset WITHIN the
tracked iterator is fully necessary, since it may be the case that the
readset modifications may have been sufficient to mitigate this issue,
but the change is currently in the PR since this is the version of code
running on the loadtest cluster for stability testing.

However, in cases when an earlier transaction was writing to the range
that would be iterated, it was possible that the stale value was read by
the transaction handler, BUT the value that got into the readset was the
newer one.

I believe this has to do with the readset updating based on directly
querying values from underlying stores, and overwriting the prior
readset value that indicated that the transaction used a stale value.
The fix I have made is that during tx execution, the cache memiterator
now reads directly form MVKV instead of individually reading from
underlying stores. The key difference here is that IF the key is already
in the readset, it will serve that STALE value instead of reading into
the underlying store where the value may have since mutated. As a
result, the behavior we now expect is that one a key is read, ONLY that
value that was read will be utilized for the duration of the
transaction. This way, we won't potentially mutate the readset by
overwriting the key entry with the later value only to have it
incorrectly pass validation.

Additionally, to more rigorously enforce this behavior, updating the
readset now will only update the map IFF the key doesnt already exist in
the readset. This should provide better guarantees around catching any
stale reads that occur over the lifespan of the transacation execution.

## Testing performed to validate your change
Running a lot of iterator heavy workloads on a loadtest cluster to
verify that no nondeterminism remains in the iterator workflow
  • Loading branch information
udpatil authored Jan 2, 2024
1 parent 2b2fb0e commit 98222ef
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 61 deletions.
86 changes: 44 additions & 42 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@ import (
// Implements Iterator.
type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset WriteSet
index int
abortChannel chan occtypes.Abort
ReadsetHandler
mvkv *VersionIndexedStore
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
readsetHandler ReadsetHandler,
) *memIterator {
var iter types.Iterator
var err error
Expand All @@ -43,40 +37,25 @@ func (store *VersionIndexedStore) newMemIterator(
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
ReadsetHandler: readsetHandler,
Iterator: iter,
mvkv: store,
}
}

// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator
// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent
func (mi *memIterator) Value() []byte {
key := mi.Iterator.Key()
// TODO: verify that this is correct
return mi.mvkv.Get(key)
}

// try fetch from writeset - return if exists
if val, ok := mi.writeset[string(key)]; ok {
return val
}

// get the value from the multiversion store
val := mi.mvStore.GetLatestBeforeIndex(mi.index, key)

// if we have an estiamte, write to abort channel
if val.IsEstimate() {
mi.abortChannel <- occtypes.NewEstimateAbort(val.Index())
}
type validationIterator struct {
types.Iterator

// need to update readset
// if we have a deleted value, return nil
if val.IsDeleted() {
defer mi.ReadsetHandler.UpdateReadSet(key, nil)
return nil
}
defer mi.ReadsetHandler.UpdateReadSet(key, val.Value())
return val.Value()
mvStore MultiVersionStore
writeset WriteSet
index int
abortChannel chan occtypes.Abort
}

func (store *Store) newMVSValidationIterator(
Expand All @@ -86,7 +65,7 @@ func (store *Store) newMVSValidationIterator(
ascending bool,
writeset WriteSet,
abortChannel chan occtypes.Abort,
) *memIterator {
) *validationIterator {
var iter types.Iterator
var err error

Expand All @@ -103,12 +82,35 @@ func (store *Store) newMVSValidationIterator(
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store,
index: index,
abortChannel: abortChannel,
ReadsetHandler: NoOpHandler{},
writeset: writeset,
return &validationIterator{
Iterator: iter,
mvStore: store,
index: index,
abortChannel: abortChannel,
writeset: writeset,
}
}

// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator
func (vi *validationIterator) Value() []byte {
key := vi.Iterator.Key()

// try fetch from writeset - return if exists
if val, ok := vi.writeset[string(key)]; ok {
return val
}

// get the value from the multiversion store
val := vi.mvStore.GetLatestBeforeIndex(vi.index, key)

// if we have an estimate, write to abort channel
if val.IsEstimate() {
vi.abortChannel <- occtypes.NewEstimateAbort(val.Index())
}

// if we have a deleted value, return nil
if val.IsDeleted() {
return nil
}
return val.Value()
}
4 changes: 4 additions & 0 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (iter *mvsMergeIterator) Value() []byte {
// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
// add values read from parent to readset
iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value)
return value
}

Expand All @@ -148,6 +150,8 @@ func (iter *mvsMergeIterator) Value() []byte {
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
// add values read from parent to readset
iter.ReadsetHandler.UpdateReadSet(iter.parent.Key(), value)
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
Expand Down
27 changes: 13 additions & 14 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ type VersionIndexedStore struct {
iterateset Iterateset
// TODO: need to add iterateset here as well

// dirty keys that haven't been sorted yet for iteration
dirtySet map[string]struct{}
// used for iterators - populated at the time of iterator instantiation
// TODO: when we want to perform iteration, we need to move all the dirty keys (writeset and readset) into the sortedTree and then combine with the iterators for the underlying stores
sortedStore *dbm.MemDB // always ascending sorted
Expand All @@ -102,7 +100,6 @@ func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersion
readset: make(map[string][]byte),
writeset: make(map[string][]byte),
iterateset: []iterationTracker{},
dirtySet: make(map[string]struct{}),
sortedStore: dbm.NewMemDB(),
parent: parent,
multiVersionStore: multiVersionStore,
Expand Down Expand Up @@ -231,7 +228,7 @@ func (store *VersionIndexedStore) Delete(key []byte) {
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete")

types.AssertValidKey(key)
store.setValue(key, nil, true, true)
store.setValue(key, nil)
}

// Has implements types.KVStore.
Expand All @@ -248,7 +245,7 @@ func (store *VersionIndexedStore) Set(key []byte, value []byte) {
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set")

types.AssertValidKey(key)
store.setValue(key, value, false, true)
store.setValue(key, value)
}

// Iterator implements types.KVStore.
Expand Down Expand Up @@ -278,11 +275,15 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b
for key := range store.writeset {
memDB.Set([]byte(key), []byte{})
}
// also add readset elements such that they fetch from readset instead of parent
for key := range store.readset {
memDB.Set([]byte(key), []byte{})
}

var parent, memIterator types.Iterator

// make a memIterator
memIterator = store.newMemIterator(start, end, memDB, ascending, store)
memIterator = store.newMemIterator(start, end, memDB, ascending)

if ascending {
parent = store.parent.Iterator(start, end)
Expand All @@ -293,7 +294,7 @@ func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending b
mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store)

iterationTracker := NewIterationTracker(start, end, ascending, store.writeset)
trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store)
trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store, store)

// mergeIterator
return trackedIterator
Expand Down Expand Up @@ -326,14 +327,11 @@ func (v *VersionIndexedStore) GetWorkingHash() ([]byte, error) {
}

// Only entrypoint to mutate writeset
func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirty bool) {
func (store *VersionIndexedStore) setValue(key, value []byte) {
types.AssertValidKey(key)

keyStr := string(key)
store.writeset[keyStr] = value
if dirty {
store.dirtySet[keyStr] = struct{}{}
}
}

func (store *VersionIndexedStore) WriteToMultiVersionStore() {
Expand All @@ -358,9 +356,10 @@ func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() {
func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) {
// add to readset
keyStr := string(key)
store.readset[keyStr] = value
// add to dirty set
store.dirtySet[keyStr] = struct{}{}
// TODO: maybe only add if not already existing?
if _, ok := store.readset[keyStr]; !ok {
store.readset[keyStr] = value
}
}

// Write implements types.CacheWrap so this store can exist on the cache multi store
Expand Down
8 changes: 5 additions & 3 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,17 @@ func TestIterator(t *testing.T) {
mvs.SetEstimatedWriteset(1, 1, map[string][]byte{
"key2": []byte("value1_b"),
})

// need to reset readset
abortC2 := make(chan scheduler.Abort)
visNew := multiversion.NewVersionIndexedStore(parentKVStore, mvs, 2, 3, abortC2)
go func() {
// new iter
iter4 := vis.Iterator([]byte("000"), []byte("key5"))
iter4 := visNew.Iterator([]byte("000"), []byte("key5"))
defer iter4.Close()
for ; iter4.Valid(); iter4.Next() {
}
}()
abort := <-abortC // read the abort from the channel
abort := <-abortC2 // read the abort from the channel
require.Equal(t, 1, abort.DependentTxIdx)

}
7 changes: 5 additions & 2 deletions store/multiversion/trackediterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ type trackedIterator struct {
types.Iterator

iterateset iterationTracker
ReadsetHandler
IterateSetHandler
}

// TODO: test

func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler) *trackedIterator {
func NewTrackedIterator(iter types.Iterator, iterationTracker iterationTracker, iterateSetHandler IterateSetHandler, readSetHandler ReadsetHandler) *trackedIterator {
return &trackedIterator{
Iterator: iter,
iterateset: iterationTracker,
IterateSetHandler: iterateSetHandler,
ReadsetHandler: readSetHandler,
}
}

Expand All @@ -43,9 +45,10 @@ func (ti *trackedIterator) Key() []byte {
// Value calls the iterator.Key() and adds the key to the iterateset, then returns the value from the iterator
func (ti *trackedIterator) Value() []byte {
key := ti.Iterator.Key()
val := ti.Iterator.Value()
// add key to the tracker
ti.iterateset.AddKey(key)
return ti.Iterator.Value()
return val
}

func (ti *trackedIterator) Next() {
Expand Down

0 comments on commit 98222ef

Please sign in to comment.