diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 61a180705..7803fc923 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -41,6 +41,7 @@ type deliverTxTask struct { Ctx sdk.Context AbortCh chan occ.Abort + mx sync.RWMutex Status status Dependencies []int Abort *occ.Abort @@ -52,8 +53,20 @@ type deliverTxTask struct { ValidateCh chan status } +func (dt *deliverTxTask) IsStatus(s status) bool { + dt.mx.RLock() + defer dt.mx.RUnlock() + return dt.Status == s +} + +func (dt *deliverTxTask) SetStatus(s status) { + dt.mx.Lock() + defer dt.mx.Unlock() + dt.Status = s +} + func (dt *deliverTxTask) Reset() { - dt.Status = statusPending + dt.SetStatus(statusPending) dt.Response = nil dt.Abort = nil dt.AbortCh = nil @@ -182,7 +195,7 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { func indexesValidated(tasks []*deliverTxTask, idx []int) bool { for _, i := range idx { - if tasks[i].Status != statusValidated { + if !tasks[i].IsStatus(statusValidated) { return false } } @@ -191,7 +204,7 @@ func indexesValidated(tasks []*deliverTxTask, idx []int) bool { func allValidated(tasks []*deliverTxTask) bool { for _, t := range tasks { - if t.Status != statusValidated { + if !t.IsStatus(statusValidated) { return false } } @@ -220,7 +233,7 @@ type schedulerMetrics struct { func (s *scheduler) emitMetrics() { telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries") - telemetry.SetGauge(float32(s.metrics.maxIncarnation), "scheduler", "max_incarnation") + telemetry.IncrCounter(float32(s.metrics.maxIncarnation), "scheduler", "incarnations") } func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) { @@ -296,12 +309,12 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { } else { // otherwise, wait for completion task.Dependencies = conflicts - task.Status = statusWaiting + task.SetStatus(statusWaiting) return false } } else if len(conflicts) == 0 { // mark as validated, which will avoid re-validating unless a lower-index re-validates - task.Status = statusValidated + task.SetStatus(statusValidated) return false } // conflicts and valid, so it'll validate next time @@ -346,18 +359,18 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del return nil, nil } - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} for i := startIdx; i < len(tasks); i++ { - t := tasks[i] wg.Add(1) + t := tasks[i] s.DoValidate(func() { defer wg.Done() if !s.validateTask(ctx, t) { + mx.Lock() + defer mx.Unlock() t.Reset() t.Increment() - mx.Lock() res = append(res, t) - mx.Unlock() } }) } @@ -373,53 +386,28 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { // validationWg waits for all validations to complete // validations happen in separate goroutines in order to wait on previous index - validationWg := &sync.WaitGroup{} - validationWg.Add(len(tasks)) + wg := &sync.WaitGroup{} + wg.Add(len(tasks)) for _, task := range tasks { t := task s.DoExecute(func() { - s.prepareAndRunTask(validationWg, ctx, t) + s.prepareAndRunTask(wg, ctx, t) }) } - validationWg.Wait() + wg.Wait() return nil } -func (s *scheduler) waitOnPreviousAndValidate(wg *sync.WaitGroup, task *deliverTxTask) { - defer wg.Done() - defer close(task.ValidateCh) - // wait on previous task to finish validation - // if a previous task fails validation, then subsequent should fail too (cascade) - if task.Index > 0 { - res, ok := <-s.allTasks[task.Index-1].ValidateCh - if ok && res != statusValidated { - task.Reset() - task.ValidateCh <- task.Status - return - } - } - // if not validated, reset the task - if !s.validateTask(task.Ctx, task) { - task.Reset() - } - - // notify next task of this one's status - task.ValidateCh <- task.Status -} - func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task *deliverTxTask) { eCtx, eSpan := s.traceSpan(ctx, "SchedulerExecute", task) defer eSpan.End() - task.Ctx = eCtx + task.Ctx = eCtx s.executeTask(task) - - s.DoValidate(func() { - s.waitOnPreviousAndValidate(wg, task) - }) + wg.Done() } func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) { @@ -509,12 +497,12 @@ func (s *scheduler) executeTask(task *deliverTxTask) { // If abort has occurred, return, else set the response and status if abortOccurred { - task.Status = statusAborted + task.SetStatus(statusAborted) task.Abort = abort return } - task.Status = statusExecuted + task.SetStatus(statusExecuted) task.Response = &resp // write from version store to multiversion stores diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 298143584..00de7ea50 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -99,7 +99,7 @@ func TestProcessAll(t *testing.T) { name: "Test tx writing to a store that another tx is iterating", workers: 50, runs: 1, - requests: requestList(500), + requests: requestList(100), addStores: true, before: func(ctx sdk.Context) { kv := ctx.MultiStore().GetKVStore(testStoreKey) @@ -208,6 +208,33 @@ func TestProcessAll(t *testing.T) { }, expectedErr: nil, }, + { + name: "Test some tx accesses same key", + workers: 50, + runs: 1, + addStores: true, + requests: requestList(2000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { + if ctx.TxIndex()%10 != 0 { + return types.ResponseDeliverTx{ + Info: "none", + } + } + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {}, + expectedErr: nil, + }, { name: "Test no stores on context should not panic", workers: 50,