From 440dc50afbfd2856d5aff05e51f49aae70e5ae76 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 28 Nov 2022 18:40:30 +0800 Subject: [PATCH 1/3] Set limit on DescribeSchedule query/signal loop --- service/frontend/workflow_handler.go | 62 ++++++++++++++++------------ 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index ec0e0782ac2..a3bacdab3c9 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -78,6 +78,7 @@ import ( "go.temporal.io/server/common/rpc/interceptor" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/worker/batcher" "go.temporal.io/server/service/worker/scheduler" ) @@ -3119,7 +3120,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl // then query to get current state from the workflow itself // TODO: turn the refresh path into a synchronous update so we don't have to retry in a loop sentRefresh := make(map[commonpb.WorkflowExecution]struct{}) - var describeScheduleResponse *workflowservice.DescribeScheduleResponse + var queryResponse schedspb.DescribeResponse op := func(ctx context.Context) error { req := &historyservice.QueryWorkflowRequest{ @@ -3135,14 +3136,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return err } - var response schedspb.DescribeResponse - err = payloads.Decode(res.GetResponse().GetQueryResult(), &response) + queryResponse.Reset() + err = payloads.Decode(res.GetResponse().GetQueryResult(), &queryResponse) if err != nil { return err } // map action search attributes - if sa := response.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil { + if sa := queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil { saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) @@ -3153,14 +3154,14 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return err } if aliasedSas != nil { - response.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas + queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas } } // for all running workflows started by the schedule, we should check that they're // still running, and if not, poke the schedule to refresh needRefresh := false - for _, ex := range response.GetInfo().GetRunningWorkflows() { + for _, ex := range queryResponse.GetInfo().GetRunningWorkflows() { if _, ok := sentRefresh[*ex]; ok { // we asked the schedule to refresh this one because it wasn't running, but // it's still reporting it as running @@ -3192,22 +3193,6 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl } if !needRefresh { - token := make([]byte, 8) - binary.BigEndian.PutUint64(token, uint64(response.ConflictToken)) - - searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes() - searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes) - - memo := describeResponse.GetWorkflowExecutionInfo().GetMemo() - memo = wh.cleanScheduleMemo(memo) - - describeScheduleResponse = &workflowservice.DescribeScheduleResponse{ - Schedule: response.Schedule, - Info: response.Info, - Memo: memo, - SearchAttributes: searchAttributes, - ConflictToken: token, - } return nil } @@ -3229,15 +3214,40 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return errWaitForRefresh } - // TODO: confirm retry is necessary here. - policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond) + // wait up to 4 seconds or rpc deadline minus 1 second, but at least 1 second + expiration := 4 * time.Second + if deadline, ok := ctx.Deadline(); ok { + remaining := deadline.Sub(time.Now()) - 1*time.Second + expiration = util.Min(expiration, remaining) + } + expiration = util.Max(expiration, 1*time.Second) + policy := backoff.NewExponentialRetryPolicy(200 * time.Millisecond). + WithExpirationInterval(expiration) isWaitErr := func(e error) bool { return e == errWaitForRefresh } + err = backoff.ThrottleRetryContext(ctx, op, policy, isWaitErr) - if err != nil { + // if we still got errWaitForRefresh that means we used up our retries, just return + // whatever we have + if err != nil && err != errWaitForRefresh { return nil, err } - return describeScheduleResponse, nil + token := make([]byte, 8) + binary.BigEndian.PutUint64(token, uint64(queryResponse.ConflictToken)) + + searchAttributes := describeResponse.GetWorkflowExecutionInfo().GetSearchAttributes() + searchAttributes = wh.cleanScheduleSearchAttributes(searchAttributes) + + memo := describeResponse.GetWorkflowExecutionInfo().GetMemo() + memo = wh.cleanScheduleMemo(memo) + + return &workflowservice.DescribeScheduleResponse{ + Schedule: queryResponse.Schedule, + Info: queryResponse.Info, + Memo: memo, + SearchAttributes: searchAttributes, + ConflictToken: token, + }, nil } // Changes the configuration or state of an existing schedule. From de53118d454e0415418cf0b47b80d18685860b36 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 29 Nov 2022 01:03:53 +0800 Subject: [PATCH 2/3] staticcheck --- service/frontend/workflow_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index a3bacdab3c9..c3b63f7991d 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -3217,7 +3217,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl // wait up to 4 seconds or rpc deadline minus 1 second, but at least 1 second expiration := 4 * time.Second if deadline, ok := ctx.Deadline(); ok { - remaining := deadline.Sub(time.Now()) - 1*time.Second + remaining := time.Until(deadline) - 1*time.Second expiration = util.Min(expiration, remaining) } expiration = util.Max(expiration, 1*time.Second) From 5a47d5ddbd7b63584b40c13d63d8f9d0d17869fb Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 29 Nov 2022 01:28:37 +0800 Subject: [PATCH 3/3] only send one signal --- service/frontend/workflow_handler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index c3b63f7991d..f8d21fa258e 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -3120,6 +3120,9 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl // then query to get current state from the workflow itself // TODO: turn the refresh path into a synchronous update so we don't have to retry in a loop sentRefresh := make(map[commonpb.WorkflowExecution]struct{}) + // limit how many signals we send, separate from the retry policy (which is used to retry + // the query if the signal was not received or processed yet) + signalsLeft := 1 var queryResponse schedspb.DescribeResponse op := func(ctx context.Context) error { @@ -3192,9 +3195,10 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl } } - if !needRefresh { + if !needRefresh || signalsLeft == 0 { return nil } + signalsLeft-- // poke to refresh _, err = wh.historyClient.SignalWorkflowExecution(ctx, &historyservice.SignalWorkflowExecutionRequest{