From 55c39b5b4751999670f593f706cb0915c37ef87a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 17 Jan 2024 12:55:31 -0800 Subject: [PATCH] Disable eager activities if tq rate limits is set (#1350) Currently server does not properly rate limit eager activities. Disabling for now until the Server can address this. --- internal/internal_worker.go | 4 ++++ internal/internal_workers_test.go | 8 ++++++++ internal/worker.go | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 675d9d710..2596569c4 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1774,6 +1774,10 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { } if options.TaskQueueActivitiesPerSecond == 0 { options.TaskQueueActivitiesPerSecond = defaultTaskQueueActivitiesPerSecond + } else { + // Disable eager activities when the task queue rate limit is set because + // the server does not rate limit eager activities. + options.DisableEagerActivities = true } if options.StickyScheduleToStartTimeout.Seconds() == 0 { options.StickyScheduleToStartTimeout = stickyWorkflowTaskScheduleToStartTimeoutSeconds * time.Second diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index dcf796115..576e5c207 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -490,6 +490,14 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() { worker.Stop() } +func (s *WorkersTestSuite) TestWorkerTaskQueueLimitDisableEager() { + client := NewServiceClient(s.service, nil, ClientOptions{Identity: "task-queue-limit-disable-eager"}) + worker := NewAggregatedWorker(client, "task-queue-limit-disable-eager", WorkerOptions{ + TaskQueueActivitiesPerSecond: 1.0, + }) + s.True(worker.activityWorker.executionParameters.eagerActivityExecutor.disabled) +} + func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID string) map[string]*commonpb.Payloads { lamd := localActivityMarkerData{ ActivityID: activityID, diff --git a/internal/worker.go b/internal/worker.go index 125942c91..df0b23fd2 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -70,6 +70,8 @@ type ( // once for every 10 seconds. This can be used to protect down stream services from flooding. // The zero value of this uses the default value. // default: 100k + // + // Note: Setting this to a non zero value will also disable eager activities. TaskQueueActivitiesPerSecond float64 // Optional: Sets the maximum number of goroutines that will concurrently poll the @@ -200,6 +202,8 @@ type ( // Eager activity execution means the server returns requested eager // activities directly from the workflow task back to this worker which is // faster than non-eager which may be dispatched to a separate worker. + // + // Note: Eager activities will automatically be disabled if TaskQueueActivitiesPerSecond is set. DisableEagerActivities bool // Optional: Maximum number of eager activities that can be running.