Skip to content

Commit

Permalink
Merge pull request #277 from cschleiden/ignore-poll-interval
Browse files Browse the repository at this point in the history
Ignore polling interval for redis backend
  • Loading branch information
cschleiden authored Nov 11, 2023
2 parents e57502c + 1545d5f commit d8ce2bd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
11 changes: 10 additions & 1 deletion bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func main() {

wo := worker.DefaultOptions
wo.WorkflowExecutorCacheSize = *cacheSize

if *b == "redis" {
wo.ActivityPollingInterval = 0
wo.WorkflowPollingInterval = 0
}

w := worker.New(ba, &wo)

w.RegisterWorkflow(Root)
Expand Down Expand Up @@ -150,7 +156,10 @@ func getBackend(b string, opt ...backend.BackendOption) backend.Backend {
ReadTimeout: time.Second * 30,
})

rclient.FlushAll(context.Background()).Result()
_, err := rclient.FlushAll(context.Background()).Result()
if err != nil {
panic(err)
}

b, err := redis.NewRedisBackend(rclient, redis.WithBackendOptions(opt...))
if err != nil {
Expand Down
24 changes: 18 additions & 6 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,20 @@ func (w *Worker[Task, TaskResult]) WaitForCompletion() error {
func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
defer w.pollersWg.Done()

ticker := time.NewTicker(w.options.PollingInterval)
defer ticker.Stop()
var ticker *time.Ticker

if w.options.PollingInterval > 0 {
ticker = time.NewTicker(w.options.PollingInterval)
defer ticker.Stop()
}

for {
select {
case <-ctx.Done():
return
default:
}

task, err := w.poll(ctx, 30*time.Second)
if err != nil {
w.logger.ErrorContext(ctx, "error polling task", "error", err)
Expand All @@ -92,10 +102,12 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) {
continue // check for new tasks right away
}

select {
case <-ticker.C:
case <-ctx.Done():
return
if w.options.PollingInterval > 0 {
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}
}
Expand Down

0 comments on commit d8ce2bd

Please sign in to comment.