Skip to content

Commit

Permalink
Set limit on DescribeSchedule query/signal loop (#3669)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Nov 29, 2022
1 parent 937c38f commit ee4648e
Showing 1 changed file with 41 additions and 27 deletions.
68 changes: 41 additions & 27 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/worker/batcher"
"go.temporal.io/server/service/worker/scheduler"
)
Expand Down Expand Up @@ -3119,7 +3120,10 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
// then query to get current state from the workflow itself
// TODO: turn the refresh path into a synchronous update so we don't have to retry in a loop
sentRefresh := make(map[commonpb.WorkflowExecution]struct{})
var describeScheduleResponse *workflowservice.DescribeScheduleResponse
// limit how many signals we send, separate from the retry policy (which is used to retry
// the query if the signal was not received or processed yet)
signalsLeft := 1
var queryResponse schedspb.DescribeResponse

op := func(ctx context.Context) error {
req := &historyservice.QueryWorkflowRequest{
Expand All @@ -3135,14 +3139,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return err
}

var response schedspb.DescribeResponse
err = payloads.Decode(res.GetResponse().GetQueryResult(), &response)
queryResponse.Reset()
err = payloads.Decode(res.GetResponse().GetQueryResult(), &queryResponse)
if err != nil {
return err
}

// map action search attributes
if sa := response.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil {
if sa := queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil {
saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
Expand All @@ -3153,14 +3157,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return err
}
if aliasedSas != nil {
response.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas
queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas
}
}

// for all running workflows started by the schedule, we should check that they're
// still running, and if not, poke the schedule to refresh
needRefresh := false
for _, ex := range response.GetInfo().GetRunningWorkflows() {
for _, ex := range queryResponse.GetInfo().GetRunningWorkflows() {
if _, ok := sentRefresh[*ex]; ok {
// we asked the schedule to refresh this one because it wasn't running, but
// it's still reporting it as running
Expand Down Expand Up @@ -3191,25 +3195,10 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
}
}

if !needRefresh {
token := make([]byte, 8)
binary.BigEndian.PutUint64(token, uint64(response.ConflictToken))

searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes()
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)

memo := describeResponse.GetWorkflowExecutionInfo().GetMemo()
memo = wh.cleanScheduleMemo(memo)

describeScheduleResponse = &workflowservice.DescribeScheduleResponse{
Schedule: response.Schedule,
Info: response.Info,
Memo: memo,
SearchAttributes: searchAttributes,
ConflictToken: token,
}
if !needRefresh || signalsLeft == 0 {
return nil
}
signalsLeft--

// poke to refresh
_, err = wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{
Expand All @@ -3229,15 +3218,40 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return errWaitForRefresh
}

// TODO: confirm retry is necessary here.
policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
// wait up to 4 seconds or rpc deadline minus 1 second, but at least 1 second
expiration := 4 * time.Second
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline) - 1*time.Second
expiration = util.Min(expiration, remaining)
}
expiration = util.Max(expiration, 1*time.Second)
policy := backoff.NewExponentialRetryPolicy(200 * time.Millisecond).
WithExpirationInterval(expiration)
isWaitErr := func(e error) bool { return e == errWaitForRefresh }

err = backoff.ThrottleRetryContext(ctx, op, policy, isWaitErr)
if err != nil {
// if we still got errWaitForRefresh that means we used up our retries, just return
// whatever we have
if err != nil && err != errWaitForRefresh {
return nil, err
}

return describeScheduleResponse, nil
token := make([]byte, 8)
binary.BigEndian.PutUint64(token, uint64(queryResponse.ConflictToken))

searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes()
searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes)

memo := describeResponse.GetWorkflowExecutionInfo().GetMemo()
memo = wh.cleanScheduleMemo(memo)

return &workflowservice.DescribeScheduleResponse{
Schedule: queryResponse.Schedule,
Info: queryResponse.Info,
Memo: memo,
SearchAttributes: searchAttributes,
ConflictToken: token,
}, nil
}

// Changes the configuration or state of an existing schedule.
Expand Down

0 comments on commit ee4648e

Please sign in to comment.