Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make cachekv store thread-safe again (backport #14378) #14818

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### State Machine Breaking

* (baseapp, x/auth/posthandler) [#13940](https://github.com/cosmos/cosmos-sdk/pull/13940) Update `PostHandler` to receive the `runTx` success boolean.
* (store) [#14378](https://github.com/cosmos/cosmos-sdk/pull/14378) The `CacheKV` store is thread-safe again, which includes improved iteration and deletion logic. Iteration is on a strictly isolated view now, which is breaking from previous behavior.

### API Breaking Changes

Expand Down
38 changes: 24 additions & 14 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/tidwall/btree"
)

Expand All @@ -21,46 +22,55 @@ var errKeyEmpty = errors.New("key cannot be empty")
//
// We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly.
type BTree struct {
tree btree.BTreeG[item]
tree *btree.BTreeG[item]
}

// NewBTree creates a wrapper around `btree.BTreeG`.
func NewBTree() *BTree {
return &BTree{tree: *btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
// Contract: cachekv store must not be called concurrently
NoLocks: true,
})}
func NewBTree() BTree {
return BTree{
tree: btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
NoLocks: false,
}),
}
}

func (bt *BTree) Set(key, value []byte) {
func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
}

func (bt *BTree) Get(key []byte) []byte {
func (bt BTree) Get(key []byte) []byte {
i, found := bt.tree.Get(newItem(key, nil))
if !found {
return nil
}
return i.value
}

func (bt *BTree) Delete(key []byte) {
func (bt BTree) Delete(key []byte) {
bt.tree.Delete(newItem(key, nil))
}

func (bt *BTree) Iterator(start, end []byte) (*memIterator, error) {
func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), true), nil
return newMemIterator(start, end, bt, true), nil
}

func (bt *BTree) ReverseIterator(start, end []byte) (*memIterator, error) {
func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), false), nil
return newMemIterator(start, end, bt, false), nil
}

// Copy the tree. This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (bt BTree) Copy() BTree {
return BTree{
tree: bt.tree.Copy(),
}
}

// item is a btree item with byte slices as keys and values
Expand Down
3 changes: 2 additions & 1 deletion store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"testing"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestDBIterator(t *testing.T) {
verifyIterator(t, ritr, nil, "reverse iterator with empty db")
}

func verifyIterator(t *testing.T, itr *memIterator, expected []int64, msg string) {
func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) {
i := 0
for itr.Valid() {
key := itr.Key()
Expand Down
24 changes: 3 additions & 21 deletions store/cachekv/internal/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@ import (
var _ types.Iterator = (*memIterator)(nil)

// memIterator iterates over iterKVCache items.
// if key is nil, means it was deleted.
// if value is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
iter btree.GenericIter[item]

start []byte
end []byte
ascending bool
lastKey []byte
deleted map[string]struct{}
valid bool
}

func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}, ascending bool) *memIterator {
func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator {
iter := items.tree.Iter()
var valid bool
if ascending {
Expand Down Expand Up @@ -52,8 +50,6 @@ func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}
start: start,
end: end,
ascending: ascending,
lastKey: nil,
deleted: deleted,
valid: valid,
}

Expand Down Expand Up @@ -113,21 +109,7 @@ func (mi *memIterator) Key() []byte {
}

func (mi *memIterator) Value() []byte {
item := mi.iter.Item()
key := item.key
// We need to handle the case where deleted is modified and includes our current key
// We handle this by maintaining a lastKey object in the iterator.
// If the current key is the same as the last key (and last key is not nil / the start)
// then we are calling value on the same thing as last time.
// Therefore we don't check the mi.deleted to see if this key is included in there.
if _, ok := mi.deleted[string(key)]; ok {
if mi.lastKey == nil || !bytes.Equal(key, mi.lastKey) {
// not re-calling on old last key
return nil
}
}
mi.lastKey = key
return item.value
return mi.iter.Item().value
}

func (mi *memIterator) assertValid() {
Expand Down
71 changes: 27 additions & 44 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ type cValue struct {
}

// Store wraps an in-memory cache around an underlying types.KVStore.
// If a cached value is nil but deleted is defined for the corresponding key,
// it means the parent doesn't have the key. (No need to delete upon Write())
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
deleted map[string]struct{}
unsortedCache map[string]struct{}
sortedCache *internal.BTree // always ascending sorted
sortedCache internal.BTree // always ascending sorted
parent types.KVStore
}

Expand All @@ -41,7 +38,6 @@ var _ types.CacheKVStore = (*Store)(nil)
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
deleted: make(map[string]struct{}),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
parent: parent,
Expand All @@ -63,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) {
cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)]
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
store.setCacheValue(key, value, false)
} else {
value = cacheValue.value
}
Expand All @@ -79,7 +75,7 @@ func (store *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)

store.setCacheValue(key, value, false, true)
store.setCacheValue(key, value, true)
}

// Has implements types.KVStore.
Expand All @@ -94,15 +90,15 @@ func (store *Store) Delete(key []byte) {
defer store.mtx.Unlock()

types.AssertValidKey(key)
store.setCacheValue(key, nil, true, true)
store.setCacheValue(key, nil, true)
}

// Implements Cachetypes.KVStore.
func (store *Store) Write() {
store.mtx.Lock()
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.deleted) == 0 && len(store.unsortedCache) == 0 {
if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
return
}
Expand All @@ -122,19 +118,16 @@ func (store *Store) Write() {
// TODO: Consider allowing usage of Batch, which would allow the write to
// at least happen atomically.
for _, key := range keys {
if store.isDeleted(key) {
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
store.parent.Delete([]byte(key))
continue
}

// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
cacheValue := store.cache[key]
if cacheValue.value != nil {
// It already exists in the parent, hence delete it.
// It already exists in the parent, hence update it.
store.parent.Set([]byte(key), cacheValue.value)
} else {
store.parent.Delete([]byte(key))
}
}

Expand All @@ -144,9 +137,6 @@ func (store *Store) Write() {
for key := range store.cache {
delete(store.cache, key)
}
for key := range store.deleted {
delete(store.deleted, key)
}
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
Expand Down Expand Up @@ -180,16 +170,24 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()

var parent, cache types.Iterator
store.dirtyItems(start, end)
isoSortedCache := store.sortedCache.Copy()

var (
err error
parent, cache types.Iterator
)

if ascending {
parent = store.parent.Iterator(start, end)
cache, err = isoSortedCache.Iterator(start, end)
} else {
parent = store.parent.ReverseIterator(start, end)
cache, err = isoSortedCache.ReverseIterator(start, end)
}
if err != nil {
panic(err)
}

store.dirtyItems(start, end)
cache = internal.NewMemIterator(start, end, store.sortedCache, store.deleted, ascending)

return internal.NewCacheMergeIterator(parent, cache, ascending)
}
Expand Down Expand Up @@ -370,13 +368,7 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
}

for _, item := range unsorted {
if item.Value == nil {
// deleted element, tracked by store.deleted
// setting arbitrary value
store.sortedCache.Set(item.Key, []byte{})
continue
}

// sortedCache is able to store `nil` value to represent deleted items.
store.sortedCache.Set(item.Key, item.Value)
}
}
Expand All @@ -385,23 +377,14 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
// etc

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
// A `nil` value means a deletion.
func (store *Store) setCacheValue(key, value []byte, dirty bool) {
keyStr := conv.UnsafeBytesToStr(key)
store.cache[keyStr] = &cValue{
value: value,
dirty: dirty,
}
if deleted {
store.deleted[keyStr] = struct{}{}
} else {
delete(store.deleted, keyStr)
}
if dirty {
store.unsortedCache[keyStr] = struct{}{}
}
}

func (store *Store) isDeleted(key string) bool {
_, ok := store.deleted[key]
return ok
}