Skip to content

Commit

Permalink
[occ] Add optimizations for multiversion and mvkv (#361)
Browse files Browse the repository at this point in the history
Add optimizations to reduce mutex lock contention and refactor with sync
Maps. This also removes telemetry that was added liberally, and we can
later add in telemetry more mindfully and feature flagged.

loadtest chain testing
  • Loading branch information
udpatil committed Jan 25, 2024
1 parent 02aa486 commit 48bc98a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 146 deletions.
14 changes: 7 additions & 7 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,13 +841,13 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {

defer telemetry.MeasureThroughputSinceWithLabels(
telemetry.TxCount,
[]metrics.Label{
telemetry.NewLabel("mode", modeKeyToString[mode]),
},
time.Now(),
)
// defer telemetry.MeasureThroughputSinceWithLabels(
// telemetry.TxCount,
// []metrics.Label{
// telemetry.NewLabel("mode", modeKeyToString[mode]),
// },
// time.Now(),
// )

// Reset events after each checkTx or simulateTx or recheckTx
// DeliverTx is garbage collected after FinalizeBlocker
Expand Down
2 changes: 0 additions & 2 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
defer store.mtx.Unlock()
// TODO: (occ) Note that for iterators, we'll need to have special handling (discussed in RFC) to ensure proper validation

// TODO: (occ) Note that for iterators, we'll need to have special handling (discussed in RFC) to ensure proper validation

var parent, cache types.Iterator

if ascending {
Expand Down
53 changes: 29 additions & 24 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package multiversion
import (
"io"
"sort"
"sync"
"time"

abci "github.com/tendermint/tendermint/abci/types"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
scheduler "github.com/cosmos/cosmos-sdk/types/occ"
dbm "github.com/tendermint/tm-db"
)
Expand Down Expand Up @@ -72,7 +69,8 @@ func (item *iterationTracker) SetEarlyStopKey(key []byte) {

// Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index
type VersionIndexedStore struct {
mtx sync.Mutex
// TODO: this shouldnt NEED a mutex because its used within single transaction execution, therefore no concurrency
// mtx sync.Mutex
// used for tracking reads and writes for eventual validation + persistence into multi-version store
// TODO: does this need sync.Map?
readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store)
Expand Down Expand Up @@ -130,9 +128,10 @@ func (store *VersionIndexedStore) Get(key []byte) []byte {
// if the key is in the cache, return it

// don't have RW mutex because we have to update readset
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "get")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "get")

types.AssertValidKey(key)
strKey := string(key)
Expand Down Expand Up @@ -176,9 +175,10 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV

// This function iterates over the readset, validating that the values in the readset are consistent with the values in the multiversion store and underlying parent store, and returns a boolean indicating validity
func (store *VersionIndexedStore) ValidateReadset() bool {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "validate_readset")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "validate_readset")

// sort the readset keys - this is so we have consistent behavior when theres varying conflicts within the readset (eg. read conflict vs estimate)
readsetKeys := make([]string, 0, len(store.readset))
Expand Down Expand Up @@ -225,9 +225,10 @@ func (store *VersionIndexedStore) ValidateReadset() bool {

// Delete implements types.KVStore.
func (store *VersionIndexedStore) Delete(key []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "delete")

types.AssertValidKey(key)
store.setValue(key, nil, true, true)
Expand All @@ -241,9 +242,10 @@ func (store *VersionIndexedStore) Has(key []byte) bool {

// Set implements types.KVStore.
func (store *VersionIndexedStore) Set(key []byte, value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "set")

types.AssertValidKey(key)
store.setValue(key, value, false, true)
Expand All @@ -262,8 +264,9 @@ func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iter
// TODO: still needs iterateset tracking
// Iterator implements types.KVStore.
func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()

// get the sorted keys from MVS
// TODO: ideally we take advantage of mvs keys already being sorted
Expand Down Expand Up @@ -334,18 +337,20 @@ func (store *VersionIndexedStore) setValue(key, value []byte, deleted bool, dirt
}

func (store *VersionIndexedStore) WriteToMultiVersionStore() {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset)
store.multiVersionStore.SetReadset(store.transactionIndex, store.readset)
store.multiVersionStore.SetIterateset(store.transactionIndex, store.iterateset)
}

func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
// TODO: remove?
// store.mtx.Lock()
// defer store.mtx.Unlock()
// defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset)
// TODO: do we need to write readset and iterateset in this case? I don't think so since if this is called it means we aren't doing validation
}
Expand Down
Loading

0 comments on commit 48bc98a

Please sign in to comment.