From 1545d5fe7345a8f51eedc202a654b246dc6ecabf Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Sat, 11 Nov 2023 13:23:36 -0800 Subject: [PATCH] Ignore polling interval for redis backend --- bench/main.go | 11 ++++++++++- internal/worker/worker.go | 24 ++++++++++++++++++------ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/bench/main.go b/bench/main.go index 2a742048..0adcb989 100644 --- a/bench/main.go +++ b/bench/main.go @@ -48,6 +48,12 @@ func main() { wo := worker.DefaultOptions wo.WorkflowExecutorCacheSize = *cacheSize + + if *b == "redis" { + wo.ActivityPollingInterval = 0 + wo.WorkflowPollingInterval = 0 + } + w := worker.New(ba, &wo) w.RegisterWorkflow(Root) @@ -150,7 +156,10 @@ func getBackend(b string, opt ...backend.BackendOption) backend.Backend { ReadTimeout: time.Second * 30, }) - rclient.FlushAll(context.Background()).Result() + _, err := rclient.FlushAll(context.Background()).Result() + if err != nil { + panic(err) + } b, err := redis.NewRedisBackend(rclient, redis.WithBackendOptions(opt...)) if err != nil { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index bc69735e..2ee74a74 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -80,10 +80,20 @@ func (w *Worker[Task, TaskResult]) WaitForCompletion() error { func (w *Worker[Task, TaskResult]) poller(ctx context.Context) { defer w.pollersWg.Done() - ticker := time.NewTicker(w.options.PollingInterval) - defer ticker.Stop() + var ticker *time.Ticker + + if w.options.PollingInterval > 0 { + ticker = time.NewTicker(w.options.PollingInterval) + defer ticker.Stop() + } for { + select { + case <-ctx.Done(): + return + default: + } + task, err := w.poll(ctx, 30*time.Second) if err != nil { w.logger.ErrorContext(ctx, "error polling task", "error", err) @@ -92,10 +102,12 @@ func (w *Worker[Task, TaskResult]) poller(ctx context.Context) { continue // check for new tasks right away } - select { - case <-ticker.C: - case <-ctx.Done(): - return + if w.options.PollingInterval > 0 { + select { + case <-ticker.C: + case <-ctx.Done(): + return + } } } }