Skip to content

Commit

Permalink
[occ] OCC scheduler and validation fixes (#359)
Browse files Browse the repository at this point in the history
This makes optimizations to the scheduler and validation

---------

Co-authored-by: Steven Landers <steven.landers@gmail.com>
  • Loading branch information
udpatil and stevenlanders committed Jan 31, 2024
1 parent 061ef70 commit f9541fb
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 79 deletions.
13 changes: 7 additions & 6 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,11 @@ func (s *Store) validateIterator(index int, tracker iterationTracker) bool {
}

func (s *Store) checkIteratorAtIndex(index int) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

valid := true
s.mtx.RLock()
iterateset := s.txIterateSets[index]
s.mtx.RUnlock()

for _, iterationTracker := range iterateset {
iteratorValid := s.validateIterator(index, iterationTracker)
valid = valid && iteratorValid
Expand All @@ -333,11 +333,12 @@ func (s *Store) checkIteratorAtIndex(index int) bool {
}

func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
s.mtx.RLock()
defer s.mtx.RUnlock()

conflictSet := make(map[int]struct{})

s.mtx.RLock()
readset := s.txReadSets[index]
s.mtx.RUnlock()

valid := true

// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
Expand Down
215 changes: 144 additions & 71 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"fmt"
"sort"
"sync"

"github.com/tendermint/tendermint/abci/types"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -38,7 +39,6 @@ const (

type deliverTxTask struct {
Ctx sdk.Context
Span trace.Span
AbortCh chan occ.Abort

Status status
Expand All @@ -49,10 +49,10 @@ type deliverTxTask struct {
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
ValidateCh chan struct{}
}

func (dt *deliverTxTask) Increment() {
dt.Incarnation++
func (dt *deliverTxTask) Reset() {
dt.Status = statusPending
dt.Response = nil
dt.Abort = nil
Expand All @@ -61,6 +61,11 @@ func (dt *deliverTxTask) Increment() {
dt.VersionStores = nil
}

func (dt *deliverTxTask) Increment() {
dt.Incarnation++
dt.ValidateCh = make(chan struct{}, 1)
}

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error)
Expand All @@ -71,6 +76,7 @@ type scheduler struct {
workers int
multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore
tracingInfo *tracing.Info
allTasks []*deliverTxTask
}

// NewScheduler creates a new scheduler
Expand Down Expand Up @@ -111,9 +117,10 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
Request: r.Request,
Index: idx,
Status: statusPending,
Request: r.Request,
Index: idx,
Status: statusPending,
ValidateCh: make(chan struct{}, 1),
})
}
return res
Expand Down Expand Up @@ -175,6 +182,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
// prefill estimates
s.PrefillEstimates(ctx, reqs)
tasks := toTasks(reqs)
s.allTasks = tasks
toExecute := tasks
for !allValidated(tasks) {
var err error
Expand All @@ -193,69 +201,100 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
if err != nil {
return nil, err
}
for _, t := range toExecute {
t.Increment()
}
}
for _, mv := range s.multiVersionStores {
mv.WriteLatestToStore()
}
return collectResponses(tasks), nil
}

func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) {
spanCtx, span := s.tracingInfo.StartWithContext("SchedulerValidate", ctx.TraceSpanContext())
ctx = ctx.WithTraceSpanContext(spanCtx)
func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
switch task.Status {

case statusAborted, statusPending:
return true

// validated tasks can become unvalidated if an earlier re-run task now conflicts
case statusExecuted, statusValidated:
if valid, conflicts := s.findConflicts(task); !valid {
s.invalidateTask(task)

// if the conflicts are now validated, then rerun this task
if indexesValidated(s.allTasks, conflicts) {
return true
} else {
// otherwise, wait for completion
task.Dependencies = conflicts
task.Status = statusWaiting
return false
}
} else if len(conflicts) == 0 {
// mark as validated, which will avoid re-validating unless a lower-index re-validates
task.Status = statusValidated
return false
}
// conflicts and valid, so it'll validate next time
return false

case statusWaiting:
// if conflicts are done, then this task is ready to run again
return indexesValidated(s.allTasks, task.Dependencies)
}
panic("unexpected status: " + task.Status)
}

func (s *scheduler) validateTask(ctx sdk.Context, task *deliverTxTask) bool {
_, span := s.traceSpan(ctx, "SchedulerValidate", task)
defer span.End()

var res []*deliverTxTask
if s.shouldRerun(task) {
return false
}
return true
}

// find first non-validated entry
var startIdx int
for idx, t := range tasks {
func (s *scheduler) findFirstNonValidated() (int, bool) {
for i, t := range s.allTasks {
if t.Status != statusValidated {
startIdx = idx
break
return i, true
}
}
return 0, false
}

for i := startIdx; i < len(tasks); i++ {
switch tasks[i].Status {
case statusAborted:
// aborted means it can be re-run immediately
res = append(res, tasks[i])

// validated tasks can become unvalidated if an earlier re-run task now conflicts
case statusExecuted, statusValidated:
if valid, conflicts := s.findConflicts(tasks[i]); !valid {
s.invalidateTask(tasks[i])

// if the conflicts are now validated, then rerun this task
if indexesValidated(tasks, conflicts) {
res = append(res, tasks[i])
} else {
// otherwise, wait for completion
tasks[i].Dependencies = conflicts
tasks[i].Status = statusWaiting
}
} else if len(conflicts) == 0 {
tasks[i].Status = statusValidated
} // TODO: do we need to have handling for conflicts existing here?

case statusWaiting:
// if conflicts are done, then this task is ready to run again
if indexesValidated(tasks, tasks[i].Dependencies) {
res = append(res, tasks[i])
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) {
ctx, span := s.traceSpan(ctx, "SchedulerValidateAll", nil)
defer span.End()

var mx sync.Mutex
var res []*deliverTxTask

wg := sync.WaitGroup{}
for i := 0; i < len(tasks); i++ {
wg.Add(1)
go func(task *deliverTxTask) {
defer wg.Done()
if !s.validateTask(ctx, task) {
task.Reset()
task.Increment()
mx.Lock()
res = append(res, task)
mx.Unlock()
}
}
}(tasks[i])
}
wg.Wait()

return res, nil
}

// ExecuteAll executes all tasks concurrently
// Tasks are updated with their status
// TODO: error scenarios
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
ctx, span := s.traceSpan(ctx, "SchedulerExecuteAll", nil)
defer span.End()

ch := make(chan *deliverTxTask, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())

Expand All @@ -265,6 +304,15 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
workers = len(tasks)
}

// validationWg waits for all validations to complete
// validations happen in separate goroutines in order to wait on previous index
validationWg := &sync.WaitGroup{}
validationWg.Add(len(tasks))
grp.Go(func() error {
validationWg.Wait()
return nil
})

for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
Expand All @@ -275,24 +323,16 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
if !ok {
return nil
}
s.executeTask(task)
s.prepareAndRunTask(validationWg, ctx, task)
}
}
})
}
grp.Go(func() error {
defer close(ch)
for _, task := range tasks {
s.prepareTask(ctx, task)

select {
case <-gCtx.Done():
return gCtx.Err()
case ch <- task:
}
}
return nil
})

for _, task := range tasks {
ch <- task
}
close(ch)

if err := grp.Wait(); err != nil {
return err
Expand All @@ -301,16 +341,46 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
return nil
}

func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task *deliverTxTask) {
eCtx, eSpan := s.traceSpan(ctx, "SchedulerExecute", task)
defer eSpan.End()
task.Ctx = eCtx

s.executeTask(task.Ctx, task)
go func() {
defer wg.Done()
defer close(task.ValidateCh)
// wait on previous task to finish validation
if task.Index > 0 {
<-s.allTasks[task.Index-1].ValidateCh
}
if !s.validateTask(task.Ctx, task) {
task.Reset()
}
task.ValidateCh <- struct{}{}
}()
}

func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) {
spanCtx, span := s.tracingInfo.StartWithContext(name, ctx.TraceSpanContext())
if task != nil {
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx))))
span.SetAttributes(attribute.Int("txIndex", task.Index))
span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation))
}
ctx = ctx.WithTraceSpanContext(spanCtx)
return ctx, span
}

// prepareTask initializes the context and version stores for a task
func (s *scheduler) prepareTask(ctx sdk.Context, task *deliverTxTask) {
// initialize the context
ctx = ctx.WithTxIndex(task.Index)

_, span := s.traceSpan(ctx, "SchedulerPrepare", task)
defer span.End()

// initialize the context
abortCh := make(chan occ.Abort, len(s.multiVersionStores))
spanCtx, span := s.tracingInfo.StartWithContext("SchedulerExecute", ctx.TraceSpanContext())
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx))))
span.SetAttributes(attribute.Int("txIndex", task.Index))
span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation))
ctx = ctx.WithTraceSpanContext(spanCtx)

// if there are no stores, don't try to wrap, because there's nothing to wrap
if len(s.multiVersionStores) > 0 {
Expand All @@ -334,14 +404,17 @@ func (s *scheduler) prepareTask(ctx sdk.Context, task *deliverTxTask) {

task.AbortCh = abortCh
task.Ctx = ctx
task.Span = span
}

// executeTask executes a single task
func (s *scheduler) executeTask(task *deliverTxTask) {
if task.Span != nil {
defer task.Span.End()
}
func (s *scheduler) executeTask(ctx sdk.Context, task *deliverTxTask) {

s.prepareTask(ctx, task)

dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerDeliverTx", task)
defer dSpan.End()
task.Ctx = dCtx

resp := s.deliverTx(task.Ctx, task.Request)

close(task.AbortCh)
Expand Down
4 changes: 2 additions & 2 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestProcessAll(t *testing.T) {
{
name: "Test every tx accesses same key",
workers: 50,
runs: 25,
runs: 50,
addStores: true,
requests: requestList(50),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestProcessAll(t *testing.T) {
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte("49"), latest)
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
},
expectedErr: nil,
},
Expand Down

0 comments on commit f9541fb

Please sign in to comment.