Skip to content

Commit

Permalink
Remove unnecessary aliasing/unaliasing of scheduled workflow search a…
Browse files Browse the repository at this point in the history
…ttributes (#3943)
  • Loading branch information
alexshtin authored Feb 11, 2023
1 parent 8501b04 commit 4f5b1b0
Showing 1 changed file with 10 additions and 67 deletions.
77 changes: 10 additions & 67 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2966,6 +2966,9 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
return nil, err
}

// Add namespace division before unaliasing search attributes.
searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))

request, err = wh.unaliasCreateScheduleRequestSearchAttributes(request, namespaceName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3001,8 +3004,6 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
}
// Add initial memo for list schedules
wh.addInitialScheduleMemo(request, input)
// Add namespace division
searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))
// Create StartWorkflowExecutionRequest
startReq := &workflowservice.StartWorkflowExecutionRequest{
Namespace: request.Namespace,
Expand Down Expand Up @@ -3070,7 +3071,12 @@ func (wh *WorkflowHandler) validateStartWorkflowArgsForSchedule(
return errIDReusePolicyNotAllowed
}

if err := wh.validateSearchAttributes(startWorkflow.GetSearchAttributes(), namespaceName); err != nil {
// Unalias startWorkflow search attributes only for validation. Keep aliases in request.
unaliasedStartWorkflowSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return err
}
if err := wh.validateSearchAttributes(unaliasedStartWorkflowSas, namespaceName); err != nil {
return err
}

Expand Down Expand Up @@ -3164,22 +3170,6 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl
return err
}

// map action search attributes
if sa := queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil {
saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}
searchattribute.ApplyTypeMap(sa, saTypeMap)
aliasedSas, err := searchattribute.AliasFields(wh.saMapperProvider, sa, request.Namespace)
if err != nil {
return err
}
if aliasedSas != nil {
queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes = aliasedSas
}
}

// for all running workflows started by the schedule, we should check that they're
// still running, and if not, poke the schedule to refresh
needRefresh := false
Expand Down Expand Up @@ -3313,11 +3303,6 @@ func (wh *WorkflowHandler) UpdateSchedule(ctx context.Context, request *workflow
return nil, err
}

request, err = wh.unaliasUpdateScheduleRequestStartWorkflowSearchAttributes(request, namespaceName)
if err != nil {
return nil, err
}

input := &schedspb.FullUpdateRequest{
Schedule: request.Schedule,
}
Expand Down Expand Up @@ -4960,13 +4945,7 @@ func (wh *WorkflowHandler) unaliasCreateScheduleRequestSearchAttributes(request
return nil, err
}

startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow()
unaliasedStartWorkflowSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return nil, err
}

if unaliasedSas == nil && unaliasedStartWorkflowSas == nil {
if unaliasedSas == nil {
return request, nil
}

Expand All @@ -4977,41 +4956,5 @@ func (wh *WorkflowHandler) unaliasCreateScheduleRequestSearchAttributes(request
newRequest.SearchAttributes = unaliasedSas
}

if unaliasedStartWorkflowSas != nil && startWorkflow != nil {
newStartWorkflow := *startWorkflow
newStartWorkflow.SearchAttributes = unaliasedStartWorkflowSas
newSchedule := *request.GetSchedule()
newSchedule.Action = &schedpb.ScheduleAction{
Action: &schedpb.ScheduleAction_StartWorkflow{
StartWorkflow: &newStartWorkflow,
}}
newRequest.Schedule = &newSchedule
}

return &newRequest, nil
}

func (wh *WorkflowHandler) unaliasUpdateScheduleRequestStartWorkflowSearchAttributes(request *workflowservice.UpdateScheduleRequest, namespaceName namespace.Name) (*workflowservice.UpdateScheduleRequest, error) {
startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow()
if startWorkflow == nil {
return request, nil
}

unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, startWorkflow.GetSearchAttributes(), namespaceName.String())
if err != nil {
return nil, err
}
if unaliasedSas == nil {
return request, nil
}
newStartWorkflow := *startWorkflow
newStartWorkflow.SearchAttributes = unaliasedSas
newSchedule := *request.GetSchedule()
newSchedule.Action = &schedpb.ScheduleAction{
Action: &schedpb.ScheduleAction_StartWorkflow{
StartWorkflow: &newStartWorkflow,
}}
newRequest := *request
newRequest.Schedule = &newSchedule
return &newRequest, nil
}

0 comments on commit 4f5b1b0

Please sign in to comment.