Skip to content

Commit

Permalink
Dedupe use compat flag logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed May 25, 2023
1 parent 8e24773 commit b1c523a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 30 deletions.
2 changes: 1 addition & 1 deletion internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type (
DisableEagerExecution bool

// VersioningIntent specifies whether this activity should run on a worker with a compatible
// build ID or not. See VersioningIntent.
// build ID or not. See temporal.VersioningIntent.
VersioningIntent VersioningIntent
}

Expand Down
24 changes: 4 additions & 20 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,8 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
if len(params.CronSchedule) > 0 {
attributes.CronSchedule = params.CronSchedule
}
useCompat := true
if params.VersioningIntent == VersioningIntentUseDefault {
useCompat = false
} else if params.VersioningIntent == VersioningIntentUnspecified {
// If the target task queue doesn't match ours, use the default version
if params.TaskQueueName != wc.workflowInfo.TaskQueueName {
useCompat = false
}
}
attributes.UseCompatibleVersion = useCompat
attributes.UseCompatibleVersion = determineUseCompatibleFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

command, err := wc.commandsHelper.startChildWorkflowExecution(attributes)
if _, ok := err.(*childWorkflowExistsWithId); ok {
Expand Down Expand Up @@ -650,16 +642,8 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
// false just before request by the eager activity executor if eager activity
// execution is otherwise disallowed
scheduleTaskAttr.RequestEagerExecution = !parameters.DisableEagerExecution
useCompat := true
if parameters.VersioningIntent == VersioningIntentUseDefault {
useCompat = false
} else if parameters.VersioningIntent == VersioningIntentUnspecified {
// If the target task queue doesn't match ours, use the default version
if parameters.TaskQueueName != wc.workflowInfo.TaskQueueName {
useCompat = false
}
}
scheduleTaskAttr.UseCompatibleVersion = useCompat
scheduleTaskAttr.UseCompatibleVersion = determineUseCompatibleFlagForCommand(
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr)
command.setData(&scheduledActivity{
Expand Down
11 changes: 2 additions & 9 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,15 +1590,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
metricsHandler.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)

useCompat := true
if contErr.VersioningIntent == VersioningIntentUseDefault {
useCompat = false
} else if contErr.VersioningIntent == VersioningIntentUnspecified {
// If the target task queue doesn't match ours, use the default version
if contErr.TaskQueueName != workflowContext.workflowInfo.TaskQueueName {
useCompat = false
}
}
useCompat := determineUseCompatibleFlagForCommand(
contErr.VersioningIntent, workflowContext.workflowInfo.TaskQueueName, contErr.TaskQueueName)
closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
Input: contErr.Input,
Expand Down
15 changes: 15 additions & 0 deletions internal/worker_version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,18 @@ func (v *BuildIDOpAddNewIDInNewDefaultSet) targetedBuildId() string { return v.B
func (v *BuildIDOpAddNewCompatibleVersion) targetedBuildId() string { return v.BuildID }
func (v *BuildIDOpPromoteSet) targetedBuildId() string { return v.BuildID }
func (v *BuildIDOpPromoteIDWithinSet) targetedBuildId() string { return v.BuildID }

// Helper to determine if how the `UseCompatibleVersion` flag for a command should be set based on
// the user's intent and whether the target task queue matches this worker's task queue.
func determineUseCompatibleFlagForCommand(intent VersioningIntent, workerTq, TargetTq string) bool {
useCompat := true
if intent == VersioningIntentUseDefault {
useCompat = false
} else if intent == VersioningIntentUnspecified {
// If the target task queue doesn't match ours, use the default version
if workerTq != TargetTq {
useCompat = false
}
}
return useCompat
}

0 comments on commit b1c523a

Please sign in to comment.