Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configurable/default heartbeat default/max intervals. #660

Merged
merged 3 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
RecordActivityHeartbeat(ctx, "testDetails")
invoker.Close(ctx, false)

// No HB timeout configured.
// High HB timeout configured.
service2 := workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, tally.NoopScope, cancel,
0, make(chan struct{}), s.namespace)
20*time.Second, make(chan struct{}), s.namespace)
ctx, _ = newActivityContext(ctx, nil, &activityEnvironment{
serviceInvoker: invoker2,
logger: getLogger()})
Expand Down
140 changes: 78 additions & 62 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ import (
)

const (
defaultHeartBeatInterval = 10 * 60 * time.Second

defaultStickyCacheSize = 10000

noRetryBackoff = time.Duration(-1)

defaultDefaultHeartbeatThrottleInterval = 30 * time.Second
defaultMaxHeartbeatThrottleInterval = 60 * time.Second
)

type (
Expand Down Expand Up @@ -139,18 +140,20 @@ type (

// activityTaskHandlerImpl is the implementation of ActivityTaskHandler
activityTaskHandlerImpl struct {
taskQueueName string
identity string
service workflowservice.WorkflowServiceClient
metricsScope tally.Scope
logger log.Logger
userContext context.Context
registry *registry
activityProvider activityProvider
dataConverter converter.DataConverter
workerStopCh <-chan struct{}
contextPropagators []ContextPropagator
namespace string
taskQueueName string
identity string
service workflowservice.WorkflowServiceClient
metricsScope tally.Scope
logger log.Logger
userContext context.Context
registry *registry
activityProvider activityProvider
dataConverter converter.DataConverter
workerStopCh <-chan struct{}
contextPropagators []ContextPropagator
namespace string
defaultHeartbeatThrottleInterval time.Duration
maxHeartbeatThrottleInterval time.Duration
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -1567,34 +1570,37 @@ func newActivityTaskHandlerWithCustomProvider(
activityProvider activityProvider,
) ActivityTaskHandler {
return &activityTaskHandlerImpl{
taskQueueName: params.TaskQueue,
identity: params.Identity,
service: service,
logger: params.Logger,
metricsScope: params.MetricsScope,
userContext: params.UserContext,
registry: registry,
activityProvider: activityProvider,
dataConverter: params.DataConverter,
workerStopCh: params.WorkerStopChannel,
contextPropagators: params.ContextPropagators,
namespace: params.Namespace,
taskQueueName: params.TaskQueue,
identity: params.Identity,
service: service,
logger: params.Logger,
metricsScope: params.MetricsScope,
userContext: params.UserContext,
registry: registry,
activityProvider: activityProvider,
dataConverter: params.DataConverter,
workerStopCh: params.WorkerStopChannel,
contextPropagators: params.ContextPropagators,
namespace: params.Namespace,
defaultHeartbeatThrottleInterval: params.DefaultHeartbeatThrottleInterval,
maxHeartbeatThrottleInterval: params.MaxHeartbeatThrottleInterval,
}
}

type temporalInvoker struct {
sync.Mutex
identity string
service workflowservice.WorkflowServiceClient
metricsScope tally.Scope
taskToken []byte
cancelHandler func()
heartBeatTimeout time.Duration // The heart beat interval configured for this activity.
hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call.
lastDetailsToReport **commonpb.Payloads
closeCh chan struct{}
workerStopChannel <-chan struct{}
namespace string
identity string
service workflowservice.WorkflowServiceClient
metricsScope tally.Scope
taskToken []byte
cancelHandler func()
// Amount of time to wait between each pending heartbeat send
heartbeatThrottleInterval time.Duration
hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call.
lastDetailsToReport **commonpb.Payloads
closeCh chan struct{}
workerStopChannel <-chan struct{}
namespace string
}

func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Payloads, skipBatching bool) error {
Expand All @@ -1616,15 +1622,7 @@ func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Paylo
i.lastDetailsToReport = nil

// Create timer to fire before the threshold to report.
deadlineToTrigger := i.heartBeatTimeout
if deadlineToTrigger <= 0 {
// If we don't have any heartbeat timeout configured.
deadlineToTrigger = defaultHeartBeatInterval
}

// We set a deadline at 80% of the timeout.
duration := time.Duration(0.8 * float64(deadlineToTrigger))
i.hbBatchEndTimer = time.NewTimer(duration)
i.hbBatchEndTimer = time.NewTimer(i.heartbeatThrottleInterval)

go func() {
select {
Expand Down Expand Up @@ -1661,11 +1659,7 @@ func (i *temporalInvoker) Heartbeat(ctx context.Context, details *commonpb.Paylo

func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *commonpb.Payloads) (bool, error) {
isActivityCanceled := false
timeout := i.heartBeatTimeout
if timeout <= 0 {
timeout = defaultHeartBeatInterval
}
ctx, cancel := context.WithTimeout(ctx, timeout)
ctx, cancel := context.WithTimeout(ctx, i.heartbeatThrottleInterval)
defer cancel()

err := recordActivityHeartbeat(ctx, i.service, i.metricsScope, i.identity, i.taskToken, details)
Expand Down Expand Up @@ -1726,20 +1720,20 @@ func newServiceInvoker(
service workflowservice.WorkflowServiceClient,
metricsScope tally.Scope,
cancelHandler func(),
heartBeatTimeout time.Duration,
heartbeatThrottleInterval time.Duration,
workerStopChannel <-chan struct{},
namespace string,
) ServiceInvoker {
return &temporalInvoker{
taskToken: taskToken,
identity: identity,
service: service,
metricsScope: metricsScope,
cancelHandler: cancelHandler,
heartBeatTimeout: heartBeatTimeout,
closeCh: make(chan struct{}),
workerStopChannel: workerStopChannel,
namespace: namespace,
taskToken: taskToken,
identity: identity,
service: service,
metricsScope: metricsScope,
cancelHandler: cancelHandler,
heartbeatThrottleInterval: heartbeatThrottleInterval,
closeCh: make(chan struct{}),
workerStopChannel: workerStopChannel,
namespace: namespace,
}
}

Expand All @@ -1761,8 +1755,9 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
canCtx, cancel := context.WithCancel(rootCtx)
defer cancel()

heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(common.DurationValue(t.GetHeartbeatTimeout()))
invoker := newServiceInvoker(
t.TaskToken, ath.identity, ath.service, ath.metricsScope, cancel, common.DurationValue(t.GetHeartbeatTimeout()),
t.TaskToken, ath.identity, ath.service, ath.metricsScope, cancel, heartbeatThrottleInterval,
ath.workerStopCh, ath.namespace)

workflowType := t.WorkflowType.GetName()
Expand Down Expand Up @@ -1866,6 +1861,27 @@ func (ath *activityTaskHandlerImpl) getRegisteredActivityNames() (activityNames
return
}

func (ath *activityTaskHandlerImpl) getHeartbeatThrottleInterval(heartbeatTimeout time.Duration) time.Duration {
// Build the heartbeat throttle interval as 80% of timeout if timeout present
// or use default if not
var heartbeatThrottleInterval time.Duration
if heartbeatTimeout > 0 {
heartbeatThrottleInterval = time.Duration(0.8 * float64(heartbeatTimeout))
} else if ath.defaultHeartbeatThrottleInterval > 0 {
heartbeatThrottleInterval = ath.defaultHeartbeatThrottleInterval
} else {
heartbeatThrottleInterval = defaultDefaultHeartbeatThrottleInterval
}
// Limit interval to a max
if ath.maxHeartbeatThrottleInterval > 0 && heartbeatThrottleInterval > ath.maxHeartbeatThrottleInterval {
heartbeatThrottleInterval = ath.maxHeartbeatThrottleInterval
} else if ath.maxHeartbeatThrottleInterval == 0 && heartbeatThrottleInterval > defaultMaxHeartbeatThrottleInterval {
heartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a bit complicated to read.

It would have been easier to only have to consider ActivityOptions.heartbeatTimeout, WorkerOptions.{default,max}HeartbeatThrottleInterval and have defaults for those options provided outside of this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are provided outside this method too. But there are a couple of ways this is called in test code, so I wanted to do it here just to be safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'd split it into 2 methods, one that sets the default options and one that does the calculation to reduce the complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those methods would be a few lines and each called only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do something like this?

In func newActivityTaskHandlerWithCustomProvider
do:

defaultHeartbeatThrottleInterval:
  providedOrDefaultInterval(params.DefaultHeartbeatThrottleInterval, defaultDefaultHeartbeatThrottleInterval),
maxHeartbeatThrottleInterval:
  providedOrDefaultInterval(params.MaxHeartbeatThrottleInterval, defaultMaxHeartbeatThrottleInterval),

And make getHeartbeatThrottleInterval something like:

if heartbeatTimeout > 0 {
		heartbeatThrottleInterval = time.Duration(0.8 * float64(heartbeatTimeout))
} else {
		heartbeatThrottleInterval = ath.defaultHeartbeatThrottleInterval
}

if heartbeatThrottleInterval > ath.maxHeartbeatThrottleInterval {
		heartbeatThrottleInterval = ath.maxHeartbeatThrottleInterval
}
return heartbeatThrottleInterval

If we're missing the initialization in tests just make sure ActivityTaskHandler is properly initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this to separate the obtaining of interval (from timeout or fall back to configured or fall back to system) and max (from configured or fall back to system) and then to the check after that. Hopefully this is clearer.


return heartbeatThrottleInterval
}

func createNewCommand(commandType enumspb.CommandType) *commandpb.Command {
return &commandpb.Command{
CommandType: commandType,
Expand Down
30 changes: 26 additions & 4 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,10 +1357,10 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() {
Times(2)

temporalInvoker := &temporalInvoker{
identity: "Test_Temporal_Invoker",
service: mockService,
taskToken: nil,
heartBeatTimeout: time.Second,
identity: "Test_Temporal_Invoker",
service: mockService,
taskToken: nil,
heartbeatThrottleInterval: time.Second,
}

heartbeatErr := temporalInvoker.Heartbeat(context.Background(), nil, false)
Expand Down Expand Up @@ -1740,3 +1740,25 @@ func Test_IsSearchAttributesMatched(t *testing.T) {
})
}
}

func TestHeartbeatThrottleInterval(t *testing.T) {
assertInterval := func(timeoutSec, defaultIntervalSec, maxIntervalSec, expectedSec int) {
a := &activityTaskHandlerImpl{
defaultHeartbeatThrottleInterval: time.Duration(defaultIntervalSec) * time.Second,
maxHeartbeatThrottleInterval: time.Duration(maxIntervalSec) * time.Second,
}
require.Equal(t, time.Duration(expectedSec)*time.Second,
a.getHeartbeatThrottleInterval(time.Duration(timeoutSec)*time.Second))
}

// Use 80% of timeout
assertInterval(5, 2, 10, 4)
// Use default if no timeout
assertInterval(0, 2, 10, 2)
// Use default of 30s if no timeout or default
assertInterval(0, 0, 50, 30)
// Use max if 80% of timeout is too large
assertInterval(14, 2, 10, 10)
// Default max to 60 if not set
assertInterval(5000, 2, 0, 60)
}
12 changes: 12 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ type (
// DeadlockDetectionTimeout specifies workflow task timeout.
DeadlockDetectionTimeout time.Duration

DefaultHeartbeatThrottleInterval time.Duration

MaxHeartbeatThrottleInterval time.Duration

// Pointer to the shared worker cache
cache *WorkerCache
}
Expand Down Expand Up @@ -1277,6 +1281,8 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
WorkerStopTimeout: options.WorkerStopTimeout,
ContextPropagators: client.contextPropagators,
DeadlockDetectionTimeout: options.DeadlockDetectionTimeout,
DefaultHeartbeatThrottleInterval: options.DefaultHeartbeatThrottleInterval,
MaxHeartbeatThrottleInterval: options.MaxHeartbeatThrottleInterval,
cache: cache,
}

Expand Down Expand Up @@ -1461,6 +1467,12 @@ func setWorkerOptionsDefaults(options *WorkerOptions) {
}
options.DeadlockDetectionTimeout = defaultDeadlockDetectionTimeout
}
if options.DefaultHeartbeatThrottleInterval == 0 {
options.DefaultHeartbeatThrottleInterval = defaultDefaultHeartbeatThrottleInterval
}
if options.MaxHeartbeatThrottleInterval == 0 {
options.MaxHeartbeatThrottleInterval = defaultMaxHeartbeatThrottleInterval
}
}

// setClientDefaults should be needed only in unit tests.
Expand Down
11 changes: 11 additions & 0 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ type (
// Optional: If set defines maximum amount of time that workflow task will be allowed to run. Defaults to 1 sec.
DeadlockDetectionTimeout time.Duration

// Optional: The maximum amount of time between sending each pending heartbeat to the server. Regardless of
// heartbeat timeout, no pending heartbeat will wait longer than this amount of time to send.
// default: 60 seconds
MaxHeartbeatThrottleInterval time.Duration

// Optional: The default amount of time between sending each pending heartbeat to the server. This is used if the
// ActivityOptions do not provide a HeartbeatTimeout. Otherwise, the interval becomes a value a bit smaller than the
// given HeartbeatTimeout.
// default: 30 seconds
DefaultHeartbeatThrottleInterval time.Duration
cretz marked this conversation as resolved.
Show resolved Hide resolved

// Interceptors to apply to the worker. Earlier interceptors wrap later
// interceptors.
//
Expand Down