Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set limit on DescribeSchedule query/signal loop #3669

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/worker/batcher"
"go.temporal.io/server/service/worker/scheduler"
)
Expand Down Expand Up @@ -3119,7 +3120,10 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
// then query to get current state from the workflow itself
// TODO: turn the refresh path into a synchronous update so we don't have to retry in a loop
sentRefresh := make(map[commonpb.WorkflowExecution]struct{})
var describeScheduleResponse *workflowservice.DescribeScheduleResponse
// limit how many signals we send, separate from the retry policy (which is used to retry
// the query if the signal was not received or processed yet)
signalsLeft := 1
var queryResponse schedspb.DescribeResponse

op := func(ctx context.Context) error {
req := &historyservice.QueryWorkflowRequest{
Expand All @@ -3135,14 +3139,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return err
}

var response schedspb.DescribeResponse
err = payloads.Decode(res.GetResponse().GetQueryResult(), &response)
queryResponse.Reset()
err = payloads.Decode(res.GetResponse().GetQueryResult(), &queryResponse)
if err != nil {
return err
}

// map action search attributes
if sa := response.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil {
if sa := queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil {
saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
Expand All @@ -3153,14 +3157,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return err
}
if aliasedSas != nil {
response.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas
queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas
}
}

// for all running workflows started by the schedule, we should check that they're
// still running, and if not, poke the schedule to refresh
needRefresh := false
for _, ex := range response.GetInfo().GetRunningWorkflows() {
for _, ex := range queryResponse.GetInfo().GetRunningWorkflows() {
if _, ok := sentRefresh[*ex]; ok {
// we asked the schedule to refresh this one because it wasn't running, but
// it's still reporting it as running
Expand Down Expand Up @@ -3191,25 +3195,10 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
}
}

if !needRefresh {
token := make([]byte, 8)
binary.BigEndian.PutUint64(token, uint64(response.ConflictToken))

searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes()
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)

memo := describeResponse.GetWorkflowExecutionInfo().GetMemo()
memo = wh.cleanScheduleMemo(memo)

describeScheduleResponse = &workflowservice.DescribeScheduleResponse{
Schedule: response.Schedule,
Info: response.Info,
Memo: memo,
SearchAttributes: searchAttributes,
ConflictToken: token,
}
if !needRefresh || signalsLeft == 0 {
return nil
}
signalsLeft--

// poke to refresh
_, err = wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
Expand All @@ -3229,15 +3218,40 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return errWaitForRefresh
}

// TODO: confirm retry is necessary here.
policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
// wait up to 4 seconds or rpc deadline minus 1 second, but at least 1 second
expiration := 4 * time.Second
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline) - 1*time.Second
expiration = util.Min(expiration, remaining)
}
expiration = util.Max(expiration, 1*time.Second)
policy := backoff.NewExponentialRetryPolicy(200 * time.Millisecond).
WithExpirationInterval(expiration)
isWaitErr := func(e error) bool { return e == errWaitForRefresh }

err = backoff.ThrottleRetryContext(ctx, op, policy, isWaitErr)
if err != nil {
// if we still got errWaitForRefresh that means we used up our retries, just return
// whatever we have
Comment on lines +3233 to +3234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think errWaitForRefresh should be retried only once. Or the response should have some last update timestamp and if it is less than some threshold, we should not keep retrying.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little confusing because it's retrying for two different reasons: 1. the workflow hasn't received or acted on the signal yet, 2. it has acted but the query result lists new running workflows that we think are not running.

In case 2, yes, we could just not send another signal and return what we have. But in case 1 we should keep retrying some more.

We don't need a timestamp, we can detect the cases based on a change in the returned set of workflows, i.e. if we see a new one that shouldn't be there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also be a little sneaky and instead of retrying the query blindly, do a long-poll PollMutableState to wait for more events to appear. That could be more efficient

if err != nil && err != errWaitForRefresh {
return nil, err
}

return describeScheduleResponse, nil
token := make([]byte, 8)
binary.BigEndian.PutUint64(token, uint64(queryResponse.ConflictToken))

searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes()
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)

memo := describeResponse.GetWorkflowExecutionInfo().GetMemo()
memo = wh.cleanScheduleMemo(memo)

return &workflowservice.DescribeScheduleResponse{
Schedule: queryResponse.Schedule,
Info: queryResponse.Info,
Memo: memo,
SearchAttributes: searchAttributes,
ConflictToken: token,
}, nil
}

// Changes the configuration or state of an existing schedule.
Expand Down