Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Use worker pool to limit number of goroutines #355

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 47 additions & 45 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package tasks

import (
"context"
"sort"
"sync"

"github.com/tendermint/tendermint/abci/types"
"golang.org/x/sync/errgroup"

"github.com/cosmos/cosmos-sdk/store/multiversion"
store "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/occ"
"github.com/cosmos/cosmos-sdk/utils/tracing"
"github.com/tendermint/tendermint/abci/types"
)

type status string
Expand Down Expand Up @@ -73,6 +72,7 @@ type scheduler struct {
multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore
tracingInfo *tracing.Info
allTasks []*deliverTxTask
workCh chan func()
}

// NewScheduler creates a new scheduler
Expand All @@ -92,6 +92,25 @@ func (s *scheduler) invalidateTask(task *deliverTxTask) {
}
}

func (s *scheduler) Start(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case work := <-s.workCh:
work()
}
}
}()
}
}

func (s *scheduler) Do(work func()) {
s.workCh <- work
}

func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
var conflicts []int
uniq := make(map[int]struct{})
Expand Down Expand Up @@ -181,6 +200,17 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
s.PrefillEstimates(ctx, reqs)
tasks := toTasks(reqs)
s.allTasks = tasks
s.workCh = make(chan func(), len(tasks))

workers := s.workers
if s.workers < 1 {
workers = len(tasks)
}

workerCtx, cancel := context.WithCancel(ctx.Context())
defer cancel()
s.Start(workerCtx, workers)

toExecute := tasks
for !allValidated(tasks) {
var err error
Expand Down Expand Up @@ -269,17 +299,18 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del

wg := sync.WaitGroup{}
for i := 0; i < len(tasks); i++ {
t := tasks[i]
wg.Add(1)
go func(task *deliverTxTask) {
s.Do(func() {
defer wg.Done()
if !s.validateTask(ctx, task) {
task.Reset()
task.Increment()
if !s.validateTask(ctx, t) {
t.Reset()
t.Increment()
mx.Lock()
res = append(res, task)
res = append(res, t)
mx.Unlock()
}
}(tasks[i])
})
}
wg.Wait()

Expand All @@ -293,48 +324,19 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
//ctx, span := s.traceSpan(ctx, "SchedulerExecuteAll", nil)
//defer span.End()

ch := make(chan *deliverTxTask, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())

// a workers value < 1 means no limit
workers := s.workers
if s.workers < 1 {
workers = len(tasks)
}

// validationWg waits for all validations to complete
// validations happen in separate goroutines in order to wait on previous index
validationWg := &sync.WaitGroup{}
validationWg.Add(len(tasks))
grp.Go(func() error {
validationWg.Wait()
return nil
})

for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case task, ok := <-ch:
if !ok {
return nil
}
s.prepareAndRunTask(validationWg, ctx, task)
}
}
})
}

for _, task := range tasks {
ch <- task
t := task
s.Do(func() {
s.prepareAndRunTask(validationWg, ctx, t)
})
}
close(ch)

if err := grp.Wait(); err != nil {
return err
}
validationWg.Wait()

return nil
}
Expand All @@ -345,7 +347,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
task.Ctx = ctx

s.executeTask(task.Ctx, task)
go func() {
s.Do(func() {
defer wg.Done()
defer close(task.ValidateCh)
// wait on previous task to finish validation
Expand All @@ -356,7 +358,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
task.Reset()
}
task.ValidateCh <- struct{}{}
}()
})
}

//func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) {
Expand Down
Loading