From 0aebbc95121e77a94b32f4c62af9f9346d12b81f Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 19 Oct 2023 13:41:46 -0400 Subject: [PATCH] [occ] Add scheduler logic for validation (#336) ## Describe your changes and provide context - This was copied from #332 which became unwieldy due to commit history (merges/rebases) - Adds scheduler logic for validation - In this initial version it completes all executions then performs validations (which feed retries) - Once we start benchmarking we can make performance improvements to this - Retries tasks that fail validation and have no dependencies ## Testing performed to validate your change - Scheduler Test verifies multi-worker with conflicts --- baseapp/abci.go | 4 - baseapp/deliver_tx_batch_test.go | 131 ++++++++++++++++++++ server/mock/store.go | 8 ++ store/cachemulti/store.go | 17 +++ store/multiversion/mvkv.go | 17 +++ store/multiversion/store.go | 7 ++ store/rootmulti/store.go | 12 ++ store/types/store.go | 6 + tasks/scheduler.go | 198 +++++++++++++++++++++++++------ tasks/scheduler_test.go | 88 ++++++++++---- 10 files changed, 426 insertions(+), 62 deletions(-) create mode 100644 baseapp/deliver_tx_batch_test.go diff --git a/baseapp/abci.go b/baseapp/abci.go index 586162ca3..52661b30a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -236,11 +236,7 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc } // DeliverTxBatch executes multiple txs -// TODO: support occ logic with scheduling func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) { - //TODO: inject multiversion store without import cycle (figure out right place for this) - // ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore()) - reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries)) for _, tx := range req.TxEntries { reqList = append(reqList, tx.Request) diff --git a/baseapp/deliver_tx_batch_test.go b/baseapp/deliver_tx_batch_test.go new file mode 100644 index 000000000..13cd9fd60 --- /dev/null +++ b/baseapp/deliver_tx_batch_test.go @@ -0,0 +1,131 @@ +package baseapp + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + + "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +func toInt(b []byte) int { + r, _ := strconv.Atoi(string(b)) + return r +} + +func toByteArr(i int) []byte { + return []byte(fmt.Sprintf("%d", i)) +} + +func handlerKVStore(capKey sdk.StoreKey) sdk.Handler { + return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) { + ctx = ctx.WithEventManager(sdk.NewEventManager()) + res := &sdk.Result{} + + // Extract the unique ID from the message (assuming you have added this) + txIndex := ctx.TxIndex() + + // Use the unique ID to get a specific key for this transaction + sharedKey := []byte(fmt.Sprintf("shared")) + txKey := []byte(fmt.Sprintf("tx-%d", txIndex)) + + // Similar steps as before: Get the store, retrieve a value, increment it, store back, emit an event + // Get the store + store := ctx.KVStore(capKey) + + // increment per-tx key (no conflict) + val := toInt(store.Get(txKey)) + store.Set(txKey, toByteArr(val+1)) + + // increment shared key + sharedVal := toInt(store.Get(sharedKey)) + store.Set(sharedKey, toByteArr(sharedVal+1)) + + // Emit an event with the incremented value and the unique ID + ctx.EventManager().EmitEvent( + sdk.NewEvent(sdk.EventTypeMessage, + sdk.NewAttribute("shared-val", fmt.Sprintf("%d", sharedVal+1)), + sdk.NewAttribute("tx-val", fmt.Sprintf("%d", val+1)), + sdk.NewAttribute("tx-id", fmt.Sprintf("%d", txIndex)), + ), + ) + + res.Events = ctx.EventManager().Events().ToABCIEvents() + return res, nil + } +} + +func requireAttribute(t *testing.T, evts []abci.Event, name string, val string) { + for _, evt := range evts { + for _, att := range evt.Attributes { + if string(att.Key) == name { + require.Equal(t, val, string(att.Value)) + return + } + } + } + require.Fail(t, fmt.Sprintf("attribute %s not found via value %s", name, val)) +} + +func TestDeliverTxBatch(t *testing.T) { + // test increments in the ante + //anteKey := []byte("ante-key") + anteOpt := func(bapp *BaseApp) {} + + // test increments in the handler + routerOpt := func(bapp *BaseApp) { + r := sdk.NewRoute(routeMsgCounter, handlerKVStore(capKey1)) + bapp.Router().AddRoute(r) + } + + app := setupBaseApp(t, anteOpt, routerOpt) + app.InitChain(context.Background(), &abci.RequestInitChain{}) + + // Create same codec used in txDecoder + codec := codec.NewLegacyAmino() + registerTestCodec(codec) + + nBlocks := 3 + txPerHeight := 5 + + for blockN := 0; blockN < nBlocks; blockN++ { + header := tmproto.Header{Height: int64(blockN) + 1} + app.setDeliverState(header) + app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter()) + app.BeginBlock(app.deliverState.ctx, abci.RequestBeginBlock{Header: header}) + + var requests []*sdk.DeliverTxEntry + for i := 0; i < txPerHeight; i++ { + counter := int64(blockN*txPerHeight + i) + tx := newTxCounter(counter, counter) + + txBytes, err := codec.Marshal(tx) + require.NoError(t, err) + requests = append(requests, &sdk.DeliverTxEntry{ + Request: abci.RequestDeliverTx{Tx: txBytes}, + }) + } + + responses := app.DeliverTxBatch(app.deliverState.ctx, sdk.DeliverTxBatchRequest{TxEntries: requests}) + require.Len(t, responses.Results, txPerHeight) + + for idx, deliverTxRes := range responses.Results { + res := deliverTxRes.Response + require.Equal(t, abci.CodeTypeOK, res.Code) + requireAttribute(t, res.Events, "tx-id", fmt.Sprintf("%d", idx)) + requireAttribute(t, res.Events, "tx-val", fmt.Sprintf("%d", blockN+1)) + requireAttribute(t, res.Events, "shared-val", fmt.Sprintf("%d", blockN*txPerHeight+idx+1)) + } + + app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{}) + require.Empty(t, app.deliverState.ctx.MultiStore().GetEvents()) + app.SetDeliverStateToCommit() + app.Commit(context.Background()) + } +} diff --git a/server/mock/store.go b/server/mock/store.go index a4ebbcb37..bdbc8a4d6 100644 --- a/server/mock/store.go +++ b/server/mock/store.go @@ -229,3 +229,11 @@ func (kv kvStore) ReverseSubspaceIterator(prefix []byte) sdk.Iterator { func NewCommitMultiStore() sdk.CommitMultiStore { return multiStore{kv: make(map[sdk.StoreKey]kvStore)} } + +func (ms multiStore) SetKVStores(handler func(key store.StoreKey, s sdk.KVStore) store.CacheWrap) store.MultiStore { + panic("not implemented") +} + +func (ms multiStore) StoreKeys() []sdk.StoreKey { + panic("not implemented") +} diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 43e00c32b..96ce20dfc 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -208,3 +208,20 @@ func (cms Store) GetKVStore(key types.StoreKey) types.KVStore { func (cms Store) GetWorkingHash() ([]byte, error) { panic("should never attempt to get working hash from cache multi store") } + +// StoreKeys returns a list of all store keys +func (cms Store) StoreKeys() []types.StoreKey { + keys := make([]types.StoreKey, 0, len(cms.stores)) + for _, key := range cms.keys { + keys = append(keys, key) + } + return keys +} + +// SetKVStores sets the underlying KVStores via a handler for each key +func (cms Store) SetKVStores(handler func(sk types.StoreKey, s types.KVStore) types.CacheWrap) types.MultiStore { + for k, s := range cms.stores { + cms.stores[k] = handler(k, s.(types.KVStore)) + } + return cms +} diff --git a/store/multiversion/mvkv.go b/store/multiversion/mvkv.go index 1b2f947c1..6eeabd517 100644 --- a/store/multiversion/mvkv.go +++ b/store/multiversion/mvkv.go @@ -6,6 +6,8 @@ import ( "sync" "time" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" scheduler "github.com/cosmos/cosmos-sdk/types/occ" @@ -356,6 +358,21 @@ func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) { store.dirtySet[keyStr] = struct{}{} } +// Write implements types.CacheWrap so this store can exist on the cache multi store +func (store *VersionIndexedStore) Write() { + panic("not implemented") +} + +// GetEvents implements types.CacheWrap so this store can exist on the cache multi store +func (store *VersionIndexedStore) GetEvents() []abci.Event { + panic("not implemented") +} + +// ResetEvents implements types.CacheWrap so this store can exist on the cache multi store +func (store *VersionIndexedStore) ResetEvents() { + panic("not implemented") +} + func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) { // append to iterateset store.iterateset = append(store.iterateset, iterationTracker) diff --git a/store/multiversion/store.go b/store/multiversion/store.go index 0d16f12d6..7c14c6415 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -8,6 +8,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/telemetry" + "github.com/cosmos/cosmos-sdk/types/occ" occtypes "github.com/cosmos/cosmos-sdk/types/occ" db "github.com/tendermint/tm-db" ) @@ -24,6 +25,7 @@ type MultiVersionStore interface { CollectIteratorItems(index int) *db.MemDB SetReadset(index int, readset ReadSet) GetReadset(index int) ReadSet + VersionedIndexedStore(index int, incarnation int, abortChannel chan occ.Abort) *VersionIndexedStore SetIterateset(index int, iterateset Iterateset) GetIterateset(index int) Iterateset ValidateTransactionState(index int) (bool, []int) @@ -58,6 +60,11 @@ func NewMultiVersionStore(parentStore types.KVStore) *Store { } } +// VersionedIndexedStore creates a new versioned index store for a given incarnation and transaction index +func (s *Store) VersionedIndexedStore(index int, incarnation int, abortChannel chan occ.Abort) *VersionIndexedStore { + return NewVersionIndexedStore(s.parentStore, s, index, incarnation, abortChannel) +} + // GetLatest implements MultiVersionStore. func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) { s.mtx.RLock() diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index e14280b46..c7578bdca 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -1197,3 +1197,15 @@ func flushPruningHeights(batch dbm.Batch, pruneHeights []int64) { batch.Set([]byte(pruneHeightsKey), bz) } + +func (rs *Store) SetKVStores(handler func(key types.StoreKey, s types.KVStore) types.CacheWrap) types.MultiStore { + panic("SetKVStores is not implemented for rootmulti") +} + +func (rs *Store) StoreKeys() []types.StoreKey { + res := make([]types.StoreKey, len(rs.keysByName)) + for _, sk := range rs.keysByName { + res = append(res, sk) + } + return res +} diff --git a/store/types/store.go b/store/types/store.go index b34068e9a..5ecb5e166 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -145,6 +145,12 @@ type MultiStore interface { // Resets the tracked event list ResetEvents() + + // SetKVStores is a generalized wrapper method + SetKVStores(handler func(key StoreKey, s KVStore) CacheWrap) MultiStore + + // StoreKeys returns a list of store keys + StoreKeys() []StoreKey } // From MultiStore.CacheMultiStore().... diff --git a/tasks/scheduler.go b/tasks/scheduler.go index c8b063fe2..575fc1547 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -1,9 +1,15 @@ package tasks import ( - sdk "github.com/cosmos/cosmos-sdk/types" + "sort" + "github.com/tendermint/tendermint/abci/types" "golang.org/x/sync/errgroup" + + "github.com/cosmos/cosmos-sdk/store/multiversion" + store "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/occ" ) type status string @@ -21,14 +27,32 @@ const ( // statusValidated means the task has been validated // tasks in this status can be reset if an earlier task fails validation statusValidated status = "validated" + // statusWaiting tasks are waiting for another tx to complete + statusWaiting status = "waiting" ) type deliverTxTask struct { - Status status - Index int - Incarnation int - Request types.RequestDeliverTx - Response *types.ResponseDeliverTx + Ctx sdk.Context + AbortCh chan occ.Abort + + Status status + Dependencies []int + Abort *occ.Abort + Index int + Incarnation int + Request types.RequestDeliverTx + Response *types.ResponseDeliverTx + VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore +} + +func (dt *deliverTxTask) Increment() { + dt.Incarnation++ + dt.Status = statusPending + dt.Response = nil + dt.Abort = nil + dt.AbortCh = nil + dt.Dependencies = nil + dt.VersionStores = nil } // Scheduler processes tasks concurrently @@ -37,8 +61,9 @@ type Scheduler interface { } type scheduler struct { - deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) - workers int + deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) + workers int + multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore } // NewScheduler creates a new scheduler @@ -49,6 +74,31 @@ func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.Req } } +func (s *scheduler) invalidateTask(task *deliverTxTask) { + for _, mv := range s.multiVersionStores { + mv.InvalidateWriteset(task.Index, task.Incarnation) + } +} + +func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { + var conflicts []int + uniq := make(map[int]struct{}) + valid := true + for _, mv := range s.multiVersionStores { + ok, mvConflicts := mv.ValidateTransactionState(task.Index) + for _, c := range mvConflicts { + if _, ok := uniq[c]; !ok { + conflicts = append(conflicts, c) + uniq[c] = struct{}{} + } + } + // any non-ok value makes valid false + valid = ok && valid + } + sort.Ints(conflicts) + return valid, conflicts +} + func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask { res := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { @@ -69,36 +119,65 @@ func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { return res } +func (s *scheduler) initMultiVersionStore(ctx sdk.Context) { + mvs := make(map[sdk.StoreKey]multiversion.MultiVersionStore) + keys := ctx.MultiStore().StoreKeys() + for _, sk := range keys { + mvs[sk] = multiversion.NewMultiVersionStore(ctx.MultiStore().GetKVStore(sk)) + } + s.multiVersionStores = mvs +} + +func indexesValidated(tasks []*deliverTxTask, idx []int) bool { + for _, i := range idx { + if tasks[i].Status != statusValidated { + return false + } + } + return true +} + +func allValidated(tasks []*deliverTxTask) bool { + for _, t := range tasks { + if t.Status != statusValidated { + return false + } + } + return true +} + func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) { + s.initMultiVersionStore(ctx) tasks := toTasks(reqs) toExecute := tasks - for len(toExecute) > 0 { + for !allValidated(tasks) { + var err error // execute sets statuses of tasks to either executed or aborted - err := s.executeAll(ctx, toExecute) - if err != nil { - return nil, err + if len(toExecute) > 0 { + err = s.executeAll(ctx, toExecute) + if err != nil { + return nil, err + } } // validate returns any that should be re-executed // note this processes ALL tasks, not just those recently executed - toExecute, err = s.validateAll(ctx, tasks) + toExecute, err = s.validateAll(tasks) if err != nil { return nil, err } for _, t := range toExecute { - t.Incarnation++ - t.Status = statusPending - t.Response = nil - //TODO: reset anything that needs resetting + t.Increment() } } + for _, mv := range s.multiVersionStores { + mv.WriteLatestToStore() + } return collectResponses(tasks), nil } -// TODO: validate each tasks -// TODO: return list of tasks that are invalid -func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) { +func (s *scheduler) validateAll(tasks []*deliverTxTask) ([]*deliverTxTask, error) { var res []*deliverTxTask // find first non-validated entry @@ -111,13 +190,33 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del } for i := startIdx; i < len(tasks); i++ { - // any aborted tx is known to be suspect here - if tasks[i].Status == statusAborted { + switch tasks[i].Status { + case statusAborted: + // aborted means it can be re-run immediately res = append(res, tasks[i]) - } else { - //TODO: validate the tasks and add it if invalid - //TODO: create and handle abort for validation - tasks[i].Status = statusValidated + + // validated tasks can become unvalidated if an earlier re-run task now conflicts + case statusExecuted, statusValidated: + if valid, conflicts := s.findConflicts(tasks[i]); !valid { + s.invalidateTask(tasks[i]) + + // if the conflicts are now validated, then rerun this task + if indexesValidated(tasks, conflicts) { + res = append(res, tasks[i]) + } else { + // otherwise, wait for completion + tasks[i].Dependencies = conflicts + tasks[i].Status = statusWaiting + } + } else if len(conflicts) == 0 { + tasks[i].Status = statusValidated + } + + case statusWaiting: + // if conflicts are done, then this task is ready to run again + if indexesValidated(tasks, tasks[i].Dependencies) { + res = append(res, tasks[i]) + } } } return res, nil @@ -125,7 +224,6 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del // ExecuteAll executes all tasks concurrently // Tasks are updated with their status -// TODO: retries on aborted tasks // TODO: error scenarios func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { ch := make(chan *deliverTxTask, len(tasks)) @@ -147,19 +245,21 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { if !ok { return nil } - //TODO: ensure version multi store is on context - // buffered so it doesn't block on write - // abortCh := make(chan occ.Abort, 1) - //TODO: consume from abort in non-blocking way (give it a length) - resp := s.deliverTx(ctx, task.Request) + resp := s.deliverTx(task.Ctx, task.Request) + + close(task.AbortCh) - // close(abortCh) + if abt, ok := <-task.AbortCh; ok { + task.Status = statusAborted + task.Abort = &abt + continue + } - //if _, ok := <-abortCh; ok { - // tasks.status = TaskStatusAborted - // continue - //} + // write from version store to multiversion stores + for _, v := range task.VersionStores { + v.WriteToMultiVersionStore() + } task.Status = statusExecuted task.Response = &resp @@ -170,6 +270,30 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { grp.Go(func() error { defer close(ch) for _, task := range tasks { + // initialize the context + ctx = ctx.WithTxIndex(task.Index) + + // non-blocking + cms := ctx.MultiStore().CacheMultiStore() + abortCh := make(chan occ.Abort, len(s.multiVersionStores)) + + // init version stores by store key + vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore) + for storeKey, mvs := range s.multiVersionStores { + vs[storeKey] = mvs.VersionedIndexedStore(task.Index, task.Incarnation, abortCh) + } + + // save off version store so we can ask it things later + task.VersionStores = vs + ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap { + return vs[k] + }) + + ctx = ctx.WithMultiStore(ms) + + task.AbortCh = abortCh + task.Ctx = ctx + select { case <-gCtx.Done(): return gCtx.Err() diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index ba9d97846..f132356ec 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -2,57 +2,103 @@ package tasks import ( "context" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/abci/types" + "errors" + "fmt" "testing" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/abci/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + sdk "github.com/cosmos/cosmos-sdk/types" ) type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx -func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { - return f(ctx, req) -} +var testStoreKey = sdk.NewKVStoreKey("mock") +var itemKey = []byte("key") func requestList(n int) []types.RequestDeliverTx { tasks := make([]types.RequestDeliverTx, n) for i := 0; i < n; i++ { - tasks[i] = types.RequestDeliverTx{} + tasks[i] = types.RequestDeliverTx{ + Tx: []byte(fmt.Sprintf("%d", i)), + } } return tasks } +func initTestCtx() sdk.Context { + ctx := sdk.Context{}.WithContext(context.Background()) + db := dbm.NewMemDB() + mem := dbadapter.Store{DB: db} + stores := make(map[sdk.StoreKey]sdk.CacheWrapper) + stores[testStoreKey] = cachekv.NewStore(mem, testStoreKey, 1000) + keys := make(map[string]sdk.StoreKey) + keys[testStoreKey.Name()] = testStoreKey + store := cachemulti.NewStore(db, stores, keys, nil, nil, nil) + ctx = ctx.WithMultiStore(&store) + return ctx +} + func TestProcessAll(t *testing.T) { tests := []struct { name string workers int + runs int requests []types.RequestDeliverTx deliverTxFunc mockDeliverTxFunc expectedErr error }{ { - name: "All tasks processed without aborts", - workers: 2, - requests: requestList(5), + name: "Test for conflicts", + workers: 50, + runs: 25, + requests: requestList(50), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { - return types.ResponseDeliverTx{} + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } }, expectedErr: nil, }, - //TODO: Add more test cases } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx) - ctx := sdk.Context{}.WithContext(context.Background()) - - res, err := s.ProcessAll(ctx, tt.requests) - if err != tt.expectedErr { - t.Errorf("Expected error %v, got %v", tt.expectedErr, err) - } else { - // response for each request exists - assert.Len(t, res, len(tt.requests)) + for i := 0; i < tt.runs; i++ { + s := NewScheduler(tt.workers, tt.deliverTxFunc) + ctx := initTestCtx() + + res, err := s.ProcessAll(ctx, tt.requests) + if !errors.Is(err, tt.expectedErr) { + t.Errorf("Expected error %v, got %v", tt.expectedErr, err) + } else { + require.Len(t, res, len(tt.requests)) + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + res := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(tt.requests)-1)), res) + } } }) }