Skip to content

Commit

Permalink
[occ] Add scheduler logic for validation (#336)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- This was copied from #332 which became unwieldy due to commit history
(merges/rebases)
- Adds scheduler logic for validation
- In this initial version it completes all executions then performs
validations (which feed retries)
- Once we start benchmarking we can make performance improvements to
this
- Retries tasks that fail validation and have no dependencies

## Testing performed to validate your change
- Scheduler Test verifies multi-worker with conflicts
  • Loading branch information
stevenlanders authored Oct 19, 2023
1 parent b34d61c commit 0aebbc9
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 62 deletions.
4 changes: 0 additions & 4 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,7 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}

// DeliverTxBatch executes multiple txs
// TODO: support occ logic with scheduling
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
//TODO: inject multiversion store without import cycle (figure out right place for this)
// ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore())

reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries))
for _, tx := range req.TxEntries {
reqList = append(reqList, tx.Request)
Expand Down
131 changes: 131 additions & 0 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package baseapp

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
)

func toInt(b []byte) int {
r, _ := strconv.Atoi(string(b))
return r
}

func toByteArr(i int) []byte {
return []byte(fmt.Sprintf("%d", i))
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) {
ctx = ctx.WithEventManager(sdk.NewEventManager())
res := &sdk.Result{}

// Extract the unique ID from the message (assuming you have added this)
txIndex := ctx.TxIndex()

// Use the unique ID to get a specific key for this transaction
sharedKey := []byte(fmt.Sprintf("shared"))
txKey := []byte(fmt.Sprintf("tx-%d", txIndex))

// Similar steps as before: Get the store, retrieve a value, increment it, store back, emit an event
// Get the store
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := toInt(store.Get(txKey))
store.Set(txKey, toByteArr(val+1))

// increment shared key
sharedVal := toInt(store.Get(sharedKey))
store.Set(sharedKey, toByteArr(sharedVal+1))

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
sdk.NewEvent(sdk.EventTypeMessage,
sdk.NewAttribute("shared-val", fmt.Sprintf("%d", sharedVal+1)),
sdk.NewAttribute("tx-val", fmt.Sprintf("%d", val+1)),
sdk.NewAttribute("tx-id", fmt.Sprintf("%d", txIndex)),
),
)

res.Events = ctx.EventManager().Events().ToABCIEvents()
return res, nil
}
}

func requireAttribute(t *testing.T, evts []abci.Event, name string, val string) {
for _, evt := range evts {
for _, att := range evt.Attributes {
if string(att.Key) == name {
require.Equal(t, val, string(att.Value))
return
}
}
}
require.Fail(t, fmt.Sprintf("attribute %s not found via value %s", name, val))
}

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
//anteKey := []byte("ante-key")
anteOpt := func(bapp *BaseApp) {}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
r := sdk.NewRoute(routeMsgCounter, handlerKVStore(capKey1))
bapp.Router().AddRoute(r)
}

app := setupBaseApp(t, anteOpt, routerOpt)
app.InitChain(context.Background(), &abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.setDeliverState(header)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())
app.BeginBlock(app.deliverState.ctx, abci.RequestBeginBlock{Header: header})

var requests []*sdk.DeliverTxEntry
for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
requests = append(requests, &sdk.DeliverTxEntry{
Request: abci.RequestDeliverTx{Tx: txBytes},
})
}

responses := app.DeliverTxBatch(app.deliverState.ctx, sdk.DeliverTxBatchRequest{TxEntries: requests})
require.Len(t, responses.Results, txPerHeight)

for idx, deliverTxRes := range responses.Results {
res := deliverTxRes.Response
require.Equal(t, abci.CodeTypeOK, res.Code)
requireAttribute(t, res.Events, "tx-id", fmt.Sprintf("%d", idx))
requireAttribute(t, res.Events, "tx-val", fmt.Sprintf("%d", blockN+1))
requireAttribute(t, res.Events, "shared-val", fmt.Sprintf("%d", blockN*txPerHeight+idx+1))
}

app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{})
require.Empty(t, app.deliverState.ctx.MultiStore().GetEvents())
app.SetDeliverStateToCommit()
app.Commit(context.Background())
}
}
8 changes: 8 additions & 0 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,11 @@ func (kv kvStore) ReverseSubspaceIterator(prefix []byte) sdk.Iterator {
func NewCommitMultiStore() sdk.CommitMultiStore {
return multiStore{kv: make(map[sdk.StoreKey]kvStore)}
}

func (ms multiStore) SetKVStores(handler func(key store.StoreKey, s sdk.KVStore) store.CacheWrap) store.MultiStore {
panic("not implemented")
}

func (ms multiStore) StoreKeys() []sdk.StoreKey {
panic("not implemented")
}
17 changes: 17 additions & 0 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,20 @@ func (cms Store) GetKVStore(key types.StoreKey) types.KVStore {
func (cms Store) GetWorkingHash() ([]byte, error) {
panic("should never attempt to get working hash from cache multi store")
}

// StoreKeys returns a list of all store keys
func (cms Store) StoreKeys() []types.StoreKey {
keys := make([]types.StoreKey, 0, len(cms.stores))
for _, key := range cms.keys {
keys = append(keys, key)
}
return keys
}

// SetKVStores sets the underlying KVStores via a handler for each key
func (cms Store) SetKVStores(handler func(sk types.StoreKey, s types.KVStore) types.CacheWrap) types.MultiStore {
for k, s := range cms.stores {
cms.stores[k] = handler(k, s.(types.KVStore))
}
return cms
}
17 changes: 17 additions & 0 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

abci "github.com/tendermint/tendermint/abci/types"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
scheduler "github.com/cosmos/cosmos-sdk/types/occ"
Expand Down Expand Up @@ -356,6 +358,21 @@ func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) {
store.dirtySet[keyStr] = struct{}{}
}

// Write implements types.CacheWrap so this store can exist on the cache multi store
func (store *VersionIndexedStore) Write() {
panic("not implemented")
}

// GetEvents implements types.CacheWrap so this store can exist on the cache multi store
func (store *VersionIndexedStore) GetEvents() []abci.Event {
panic("not implemented")
}

// ResetEvents implements types.CacheWrap so this store can exist on the cache multi store
func (store *VersionIndexedStore) ResetEvents() {
panic("not implemented")
}

func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) {
// append to iterateset
store.iterateset = append(store.iterateset, iterationTracker)
Expand Down
7 changes: 7 additions & 0 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/cosmos/cosmos-sdk/types/occ"
occtypes "github.com/cosmos/cosmos-sdk/types/occ"
db "github.com/tendermint/tm-db"
)
Expand All @@ -24,6 +25,7 @@ type MultiVersionStore interface {
CollectIteratorItems(index int) *db.MemDB
SetReadset(index int, readset ReadSet)
GetReadset(index int) ReadSet
VersionedIndexedStore(index int, incarnation int, abortChannel chan occ.Abort) *VersionIndexedStore
SetIterateset(index int, iterateset Iterateset)
GetIterateset(index int) Iterateset
ValidateTransactionState(index int) (bool, []int)
Expand Down Expand Up @@ -58,6 +60,11 @@ func NewMultiVersionStore(parentStore types.KVStore) *Store {
}
}

// VersionedIndexedStore creates a new versioned index store for a given incarnation and transaction index
func (s *Store) VersionedIndexedStore(index int, incarnation int, abortChannel chan occ.Abort) *VersionIndexedStore {
return NewVersionIndexedStore(s.parentStore, s, index, incarnation, abortChannel)
}

// GetLatest implements MultiVersionStore.
func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) {
s.mtx.RLock()
Expand Down
12 changes: 12 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,3 +1197,15 @@ func flushPruningHeights(batch dbm.Batch, pruneHeights []int64) {

batch.Set([]byte(pruneHeightsKey), bz)
}

func (rs *Store) SetKVStores(handler func(key types.StoreKey, s types.KVStore) types.CacheWrap) types.MultiStore {
panic("SetKVStores is not implemented for rootmulti")
}

func (rs *Store) StoreKeys() []types.StoreKey {
res := make([]types.StoreKey, len(rs.keysByName))
for _, sk := range rs.keysByName {
res = append(res, sk)
}
return res
}
6 changes: 6 additions & 0 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ type MultiStore interface {

// Resets the tracked event list
ResetEvents()

// SetKVStores is a generalized wrapper method
SetKVStores(handler func(key StoreKey, s KVStore) CacheWrap) MultiStore

// StoreKeys returns a list of store keys
StoreKeys() []StoreKey
}

// From MultiStore.CacheMultiStore()....
Expand Down
Loading

0 comments on commit 0aebbc9

Please sign in to comment.