Skip to content

Commit

Permalink
Disable eager activities if tq rate limits is set (#1350)
Browse files Browse the repository at this point in the history
Currently server does not properly rate limit eager activities.
Disabling for now until the Server can address this.
  • Loading branch information
Quinn-With-Two-Ns authored Jan 17, 2024
1 parent b50cce9 commit 55c39b5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,10 @@ func setWorkerOptionsDefaults(options *WorkerOptions) {
}
if options.TaskQueueActivitiesPerSecond == 0 {
options.TaskQueueActivitiesPerSecond = defaultTaskQueueActivitiesPerSecond
} else {
// Disable eager activities when the task queue rate limit is set because
// the server does not rate limit eager activities.
options.DisableEagerActivities = true
}
if options.StickyScheduleToStartTimeout.Seconds() == 0 {
options.StickyScheduleToStartTimeout = stickyWorkflowTaskScheduleToStartTimeoutSeconds * time.Second
Expand Down
8 changes: 8 additions & 0 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,14 @@ func (s *WorkersTestSuite) TestWorkerMultipleStop() {
worker.Stop()
}

func (s *WorkersTestSuite) TestWorkerTaskQueueLimitDisableEager() {
client := NewServiceClient(s.service, nil, ClientOptions{Identity: "task-queue-limit-disable-eager"})
worker := NewAggregatedWorker(client, "task-queue-limit-disable-eager", WorkerOptions{
TaskQueueActivitiesPerSecond: 1.0,
})
s.True(worker.activityWorker.executionParameters.eagerActivityExecutor.disabled)
}

func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID string) map[string]*commonpb.Payloads {
lamd := localActivityMarkerData{
ActivityID: activityID,
Expand Down
4 changes: 4 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type (
// once for every 10 seconds. This can be used to protect down stream services from flooding.
// The zero value of this uses the default value.
// default: 100k
//
// Note: Setting this to a non zero value will also disable eager activities.
TaskQueueActivitiesPerSecond float64

// Optional: Sets the maximum number of goroutines that will concurrently poll the
Expand Down Expand Up @@ -200,6 +202,8 @@ type (
// Eager activity execution means the server returns requested eager
// activities directly from the workflow task back to this worker which is
// faster than non-eager which may be dispatched to a separate worker.
//
// Note: Eager activities will automatically be disabled if TaskQueueActivitiesPerSecond is set.
DisableEagerActivities bool

// Optional: Maximum number of eager activities that can be running.
Expand Down

0 comments on commit 55c39b5

Please sign in to comment.