From 5723fee6f6e9d838e807eb7e4291d5b25c7d3006 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 3 Dec 2021 13:51:18 -0600 Subject: [PATCH 1/2] Support configurable/default heartbeat default/max intervals. Fixes #656 --- internal/activity_test.go | 4 +- internal/internal_task_handlers.go | 140 +++++++++++++----------- internal/internal_task_handlers_test.go | 30 ++++- internal/internal_worker.go | 12 ++ internal/worker.go | 11 ++ 5 files changed, 129 insertions(+), 68 deletions(-) diff --git a/internal/activity_test.go b/internal/activity_test.go index 79cab55ea..eaf38e201 100644 --- a/internal/activity_test.go +++ b/internal/activity_test.go @@ -140,10 +140,10 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() { RecordActivityHeartbeat(ctx, "testDetails") invoker.Close(ctx, false) - // No HB timeout configured. + // High HB timeout configured. service2 := workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl) invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, tally.NoopScope, cancel, - 0, make(chan struct{}), s.namespace) + 20*time.Second, make(chan struct{}), s.namespace) ctx, _ = newActivityContext(ctx, nil, &activityEnvironment{ serviceInvoker: invoker2, logger: getLogger()}) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 245d4b2b2..77e2bdd84 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -57,11 +57,12 @@ import ( ) const ( - defaultHeartBeatInterval = 10 * 60 * time.Second - defaultStickyCacheSize = 10000 noRetryBackoff = time.Duration(-1) + + defaultDefaultHeartbeatThrottleInterval = 30 * time.Second + defaultMaxHeartbeatThrottleInterval = 60 * time.Second ) type ( @@ -139,18 +140,20 @@ type ( // activityTaskHandlerImpl is the implementation of ActivityTaskHandler activityTaskHandlerImpl struct { - taskQueueName string - identity string - service workflowservice.WorkflowServiceClient - metricsScope tally.Scope - logger log.Logger - userContext context.Context - registry *registry - activityProvider activityProvider - dataConverter converter.DataConverter - workerStopCh <-chan struct{} - contextPropagators []ContextPropagator - namespace string + taskQueueName string + identity string + service workflowservice.WorkflowServiceClient + metricsScope tally.Scope + logger log.Logger + userContext context.Context + registry *registry + activityProvider activityProvider + dataConverter converter.DataConverter + workerStopCh <-chan struct{} + contextPropagators []ContextPropagator + namespace string + defaultHeartbeatThrottleInterval time.Duration + maxHeartbeatThrottleInterval time.Duration } // history wrapper method to help information about events. @@ -1567,34 +1570,37 @@ func newActivityTaskHandlerWithCustomProvider( activityProvider activityProvider, ) ActivityTaskHandler { return &activityTaskHandlerImpl{ - taskQueueName: params.TaskQueue, - identity: params.Identity, - service: service, - logger: params.Logger, - metricsScope: params.MetricsScope, - userContext: params.UserContext, - registry: registry, - activityProvider: activityProvider, - dataConverter: params.DataConverter, - workerStopCh: params.WorkerStopChannel, - contextPropagators: params.ContextPropagators, - namespace: params.Namespace, + taskQueueName: params.TaskQueue, + identity: params.Identity, + service: service, + logger: params.Logger, + metricsScope: params.MetricsScope, + userContext: params.UserContext, + registry: registry, + activityProvider: activityProvider, + dataConverter: params.DataConverter, + workerStopCh: params.WorkerStopChannel, + contextPropagators: params.ContextPropagators, + namespace: params.Namespace, + defaultHeartbeatThrottleInterval: params.DefaultHeartbeatThrottleInterval, + maxHeartbeatThrottleInterval: params.MaxHeartbeatThrottleInterval, } } type temporalInvoker struct { sync.Mutex - identity string - service workflowservice.WorkflowServiceClient - metricsScope tally.Scope - taskToken []byte - cancelHandler func() - heartBeatTimeout time.Duration // The heart beat interval configured for this activity. - hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call. - lastDetailsToReport **commonpb.Payloads - closeCh chan struct{} - workerStopChannel <-chan struct{} - namespace string + identity string + service workflowservice.WorkflowServiceClient + metricsScope tally.Scope + taskToken []byte + cancelHandler func() + // Amount of time to wait between each pending heartbeat send + heartbeatThrottleInterval time.Duration + hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call. + lastDetailsToReport **commonpb.Payloads + closeCh chan struct{} + workerStopChannel <-chan struct{} + namespace string } func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Payloads, skipBatching bool) error { @@ -1616,15 +1622,7 @@ func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Paylo i.lastDetailsToReport = nil // Create timer to fire before the threshold to report. - deadlineToTrigger := i.heartBeatTimeout - if deadlineToTrigger <= 0 { - // If we don't have any heartbeat timeout configured. - deadlineToTrigger = defaultHeartBeatInterval - } - - // We set a deadline at 80% of the timeout. - duration := time.Duration(0.8 * float64(deadlineToTrigger)) - i.hbBatchEndTimer = time.NewTimer(duration) + i.hbBatchEndTimer = time.NewTimer(i.heartbeatThrottleInterval) go func() { select { @@ -1661,11 +1659,7 @@ func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Paylo func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *commonpb.Payloads) (bool, error) { isActivityCanceled := false - timeout := i.heartBeatTimeout - if timeout <= 0 { - timeout = defaultHeartBeatInterval - } - ctx, cancel := context.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(ctx, i.heartbeatThrottleInterval) defer cancel() err := recordActivityHeartbeat(ctx, i.service, i.metricsScope, i.identity, i.taskToken, details) @@ -1726,20 +1720,20 @@ func newServiceInvoker( service workflowservice.WorkflowServiceClient, metricsScope tally.Scope, cancelHandler func(), - heartBeatTimeout time.Duration, + heartbeatThrottleInterval time.Duration, workerStopChannel <-chan struct{}, namespace string, ) ServiceInvoker { return &temporalInvoker{ - taskToken: taskToken, - identity: identity, - service: service, - metricsScope: metricsScope, - cancelHandler: cancelHandler, - heartBeatTimeout: heartBeatTimeout, - closeCh: make(chan struct{}), - workerStopChannel: workerStopChannel, - namespace: namespace, + taskToken: taskToken, + identity: identity, + service: service, + metricsScope: metricsScope, + cancelHandler: cancelHandler, + heartbeatThrottleInterval: heartbeatThrottleInterval, + closeCh: make(chan struct{}), + workerStopChannel: workerStopChannel, + namespace: namespace, } } @@ -1761,8 +1755,9 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice canCtx, cancel := context.WithCancel(rootCtx) defer cancel() + heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(common.DurationValue(t.GetHeartbeatTimeout())) invoker := newServiceInvoker( - t.TaskToken, ath.identity, ath.service, ath.metricsScope, cancel, common.DurationValue(t.GetHeartbeatTimeout()), + t.TaskToken, ath.identity, ath.service, ath.metricsScope, cancel, heartbeatThrottleInterval, ath.workerStopCh, ath.namespace) workflowType := t.WorkflowType.GetName() @@ -1866,6 +1861,27 @@ func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames return } +func (ath *activityTaskHandlerImpl) getHeartbeatThrottleInterval(heartbeatTimeout time.Duration) time.Duration { + // Build the heartbeat throttle interval as 80% of timeout if timeout present + // or use default if not + var heartbeatThrottleInterval time.Duration + if heartbeatTimeout > 0 { + heartbeatThrottleInterval = time.Duration(0.8 * float64(heartbeatTimeout)) + } else if ath.defaultHeartbeatThrottleInterval > 0 { + heartbeatThrottleInterval = ath.defaultHeartbeatThrottleInterval + } else { + heartbeatThrottleInterval = defaultDefaultHeartbeatThrottleInterval + } + // Limit interval to a max + if ath.maxHeartbeatThrottleInterval > 0 && heartbeatThrottleInterval > ath.maxHeartbeatThrottleInterval { + heartbeatThrottleInterval = ath.maxHeartbeatThrottleInterval + } else if ath.maxHeartbeatThrottleInterval == 0 && heartbeatThrottleInterval > defaultMaxHeartbeatThrottleInterval { + heartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval + } + + return heartbeatThrottleInterval +} + func createNewCommand(commandType enumspb.CommandType) *commandpb.Command { return &commandpb.Command{ CommandType: commandType, diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 31cf759b6..c026e5a1f 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -1357,10 +1357,10 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() { Times(2) temporalInvoker := &temporalInvoker{ - identity: "Test_Temporal_Invoker", - service: mockService, - taskToken: nil, - heartBeatTimeout: time.Second, + identity: "Test_Temporal_Invoker", + service: mockService, + taskToken: nil, + heartbeatThrottleInterval: time.Second, } heartbeatErr := temporalInvoker.Heartbeat(context.Background(), nil, false) @@ -1740,3 +1740,25 @@ func Test_IsSearchAttributesMatched(t *testing.T) { }) } } + +func TestHeartbeatThrottleInterval(t *testing.T) { + assertInterval := func(timeoutSec, defaultIntervalSec, maxIntervalSec, expectedSec int) { + a := &activityTaskHandlerImpl{ + defaultHeartbeatThrottleInterval: time.Duration(defaultIntervalSec) * time.Second, + maxHeartbeatThrottleInterval: time.Duration(maxIntervalSec) * time.Second, + } + require.Equal(t, time.Duration(expectedSec)*time.Second, + a.getHeartbeatThrottleInterval(time.Duration(timeoutSec)*time.Second)) + } + + // Use 80% of timeout + assertInterval(5, 2, 10, 4) + // Use default if no timeout + assertInterval(0, 2, 10, 2) + // Use default of 30s if no timeout or default + assertInterval(0, 0, 50, 30) + // Use max if 80% of timeout is too large + assertInterval(14, 2, 10, 10) + // Default max to 60 if not set + assertInterval(5000, 2, 0, 60) +} diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 9cd445cea..0e301980d 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -200,6 +200,10 @@ type ( // DeadlockDetectionTimeout specifies workflow task timeout. DeadlockDetectionTimeout time.Duration + DefaultHeartbeatThrottleInterval time.Duration + + MaxHeartbeatThrottleInterval time.Duration + // Pointer to the shared worker cache cache *WorkerCache } @@ -1277,6 +1281,8 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke WorkerStopTimeout: options.WorkerStopTimeout, ContextPropagators: client.contextPropagators, DeadlockDetectionTimeout: options.DeadlockDetectionTimeout, + DefaultHeartbeatThrottleInterval: options.DefaultHeartbeatThrottleInterval, + MaxHeartbeatThrottleInterval: options.MaxHeartbeatThrottleInterval, cache: cache, } @@ -1461,6 +1467,12 @@ func setWorkerOptionsDefaults(options *WorkerOptions) { } options.DeadlockDetectionTimeout = defaultDeadlockDetectionTimeout } + if options.DefaultHeartbeatThrottleInterval == 0 { + options.DefaultHeartbeatThrottleInterval = defaultDefaultHeartbeatThrottleInterval + } + if options.MaxHeartbeatThrottleInterval == 0 { + options.MaxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval + } } // setClientDefaults should be needed only in unit tests. diff --git a/internal/worker.go b/internal/worker.go index 3253a4663..bc173b702 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -159,6 +159,17 @@ type ( // Optional: If set defines maximum amount of time that workflow task will be allowed to run. Defaults to 1 sec. DeadlockDetectionTimeout time.Duration + // Optional: The maximum amount of time between sending each pending heartbeat to the server. Regardless of + // heartbeat timeout, no pending heartbeat will wait longer than this amount of time to send. + // default: 60 seconds + MaxHeartbeatThrottleInterval time.Duration + + // Optional: The default amount of time between sending each pending heartbeat to the server. This is used if the + // ActivityOptions do not provide a HeartbeatTimeout. Otherwise, the interval becomes a value a bit smaller than the + // given HeartbeatTimeout. + // default: 30 seconds + DefaultHeartbeatThrottleInterval time.Duration + // Interceptors to apply to the worker. Earlier interceptors wrap later // interceptors. // From 6ed9112fc533f716857c218129c96eef34264848 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Thu, 9 Dec 2021 09:50:14 -0600 Subject: [PATCH 2/2] Change look of throttle getter --- internal/internal_task_handlers.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 77e2bdd84..04aef1b31 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1862,8 +1862,8 @@ func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames } func (ath *activityTaskHandlerImpl) getHeartbeatThrottleInterval(heartbeatTimeout time.Duration) time.Duration { - // Build the heartbeat throttle interval as 80% of timeout if timeout present - // or use default if not + // Set interval as 80% of timeout if present, or the configured default if + // present, or the system default otherwise var heartbeatThrottleInterval time.Duration if heartbeatTimeout > 0 { heartbeatThrottleInterval = time.Duration(0.8 * float64(heartbeatTimeout)) @@ -1872,13 +1872,17 @@ func (ath *activityTaskHandlerImpl) getHeartbeatThrottleInterval(heartbeatTimeou } else { heartbeatThrottleInterval = defaultDefaultHeartbeatThrottleInterval } - // Limit interval to a max - if ath.maxHeartbeatThrottleInterval > 0 && heartbeatThrottleInterval > ath.maxHeartbeatThrottleInterval { - heartbeatThrottleInterval = ath.maxHeartbeatThrottleInterval - } else if ath.maxHeartbeatThrottleInterval == 0 && heartbeatThrottleInterval > defaultMaxHeartbeatThrottleInterval { - heartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval + + // Use the configured max if present, or the system default otherwise + maxHeartbeatThrottleInterval := ath.maxHeartbeatThrottleInterval + if maxHeartbeatThrottleInterval == 0 { + maxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval } + // Limit interval to a max + if heartbeatThrottleInterval > maxHeartbeatThrottleInterval { + heartbeatThrottleInterval = maxHeartbeatThrottleInterval + } return heartbeatThrottleInterval }