Skip to content

Commit

Permalink
Set versioning behavior per workflow type
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal committed Nov 13, 2024
1 parent 36167ae commit c51231a
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 10 deletions.
7 changes: 6 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,12 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
if workflowContext.workflowInfo.currentVersioningBehavior != VersioningBehaviorUnspecified {
builtRequest.VersioningBehavior = versioningBehaviorToProto(workflowContext.workflowInfo.currentVersioningBehavior)
} else {
builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior)
workflowType := workflowContext.workflowInfo.WorkflowType
if behavior, ok := wth.registry.getWorkflowVersioningBehavior(workflowType); ok {
builtRequest.VersioningBehavior = versioningBehaviorToProto(behavior)
} else {
builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior)
}
}
}
return builtRequest
Expand Down
37 changes: 28 additions & 9 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,13 @@ func (aw *activityWorker) Stop() {

type registry struct {
sync.Mutex
nexusServices map[string]*nexus.Service
workflowFuncMap map[string]interface{}
workflowAliasMap map[string]string
activityFuncMap map[string]activity
activityAliasMap map[string]string
interceptors []WorkerInterceptor
nexusServices map[string]*nexus.Service
workflowFuncMap map[string]interface{}
workflowAliasMap map[string]string
workflowVersioningBehaviorMap map[string]VersioningBehavior
activityFuncMap map[string]activity
activityAliasMap map[string]string
interceptors []WorkerInterceptor
}

type registryOptions struct {
Expand All @@ -557,6 +558,9 @@ func (r *registry) RegisterWorkflowWithOptions(
panic("WorkflowDefinitionFactory must be registered with a name")
}
r.workflowFuncMap[options.Name] = factory
if options.VersioningBehavior != VersioningBehaviorUnspecified {
r.workflowVersioningBehaviorMap[options.Name] = options.VersioningBehavior
}
return
}
// Validate that it is a function
Expand All @@ -580,6 +584,9 @@ func (r *registry) RegisterWorkflowWithOptions(
}
}
r.workflowFuncMap[registerName] = wf
if options.VersioningBehavior != VersioningBehaviorUnspecified {
r.workflowVersioningBehaviorMap[registerName] = options.VersioningBehavior
}
if len(alias) > 0 && r.workflowAliasMap != nil {
r.workflowAliasMap[fnName] = alias
}
Expand Down Expand Up @@ -773,6 +780,17 @@ func (r *registry) getWorkflowDefinition(wt WorkflowType) (WorkflowDefinition, e
return newSyncWorkflowDefinition(executor), nil
}

func (r *registry) getWorkflowVersioningBehavior(wt WorkflowType) (VersioningBehavior, bool) {
lookup := wt.Name
if alias, ok := r.getWorkflowAlias(lookup); ok {
lookup = alias
}
r.Lock()
defer r.Unlock()
behavior, ok := r.workflowVersioningBehaviorMap[lookup]
return behavior, ok
}

// Validate function parameters.
func validateFnFormat(fnType reflect.Type, isWorkflow bool) error {
if fnType.Kind() != reflect.Func {
Expand Down Expand Up @@ -825,9 +843,10 @@ func newRegistry() *registry { return newRegistryWithOptions(registryOptions{})

func newRegistryWithOptions(options registryOptions) *registry {
r := &registry{
workflowFuncMap: make(map[string]interface{}),
activityFuncMap: make(map[string]activity),
nexusServices: make(map[string]*nexus.Service),
workflowFuncMap: make(map[string]interface{}),
workflowVersioningBehaviorMap: make(map[string]VersioningBehavior),
activityFuncMap: make(map[string]activity),
nexusServices: make(map[string]*nexus.Service),
}
if !options.disableAliasing {
r.workflowAliasMap = make(map[string]string)
Expand Down
4 changes: 4 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ type (
// inside a workflow as a child workflow.
Name string
DisableAlreadyRegisteredCheck bool
// Optional: Provides a default Versioning Behavior to workflows of this type.
// See workflow.SetVersioningBehavior to override this default.
// NOTE: Experimental
VersioningBehavior VersioningBehavior
}

localActivityContext struct {
Expand Down
94 changes: 94 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6271,6 +6271,100 @@ func (ts *IntegrationTestSuite) TestVersioningBehaviorInRespondWorkflowTaskCompl
}
}

func (ts *IntegrationTestSuite) TestVersioningBehaviorPerWorkflowType() {
versioningBehaviorAll := make([]enumspb.VersioningBehavior, 0)
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

// We are setting the default build ID with versioning-2 rules to test
// with existing servers. TODO(antlai-temporal) use versioning-3 APIs
// after there is a server release that supports versioning-3
res, err := ts.client.GetWorkerVersioningRules(ctx, client.GetWorkerVersioningOptions{
TaskQueue: ts.taskQueueName,
})
ts.NoError(err)

_, err = ts.client.UpdateWorkerVersioningRules(ctx, client.UpdateWorkerVersioningRulesOptions{
TaskQueue: ts.taskQueueName,
ConflictToken: res.ConflictToken,
Operation: &client.VersioningOperationInsertAssignmentRule{
RuleIndex: 0,
Rule: client.VersioningAssignmentRule{
TargetBuildID: "1.0",
},
},
})
ts.NoError(err)

c, err := client.Dial(client.Options{
HostPort: ts.config.ServiceAddr,
Namespace: ts.config.Namespace,
ConnectionOptions: client.ConnectionOptions{
TLS: ts.config.TLS,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(func(
ctx context.Context,
method string,
req interface{},
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
if method == "/temporal.api.workflowservice.v1.WorkflowService/RespondWorkflowTaskCompleted" {
asReq := req.(*workflowservice.RespondWorkflowTaskCompletedRequest)
versioningBehaviorAll = append(versioningBehaviorAll, asReq.VersioningBehavior)
}
return invoker(ctx, method, req, reply, cc, opts...)
}),
},
},
})
ts.NoError(err)
defer c.Close()

ts.worker.Stop()
ts.workerStopped = true
w := worker.New(c, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
BuildID: "1.0",
UseBuildIDForVersioning: true,
DeploymentName: "deploy-test2",
DefaultVersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
},
})

w.RegisterWorkflowWithOptions(ts.workflows.Basic, workflow.RegisterOptions{
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
w.RegisterWorkflowWithOptions(ts.workflows.SetPinnedVersioningBehavior, workflow.RegisterOptions{
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
})
ts.activities.register(w)

ts.Nil(w.Start())
defer w.Stop()
wfOpts := ts.startWorkflowOptions("test-default-versioning-behavior-per-type")
ts.NoError(ts.executeWorkflowWithOption(wfOpts, ts.workflows.Basic, nil))

ts.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, versioningBehaviorAll[0])
for i := 1; i < len(versioningBehaviorAll); i++ {
ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_PINNED)
}

// Show that an explicit setter overrides workflow type default
versioningBehaviorAll = nil

wfOpts = ts.startWorkflowOptions("test-override-versioning-behavior-per-type")
ts.NoError(ts.executeWorkflowWithOption(wfOpts,
ts.workflows.SetPinnedVersioningBehavior, nil))

ts.Equal(enumspb.VERSIONING_BEHAVIOR_PINNED, versioningBehaviorAll[0])
for i := 1; i < len(versioningBehaviorAll); i++ {
ts.Equal(versioningBehaviorAll[i], enumspb.VERSIONING_BEHAVIOR_PINNED)
}
}

func (ts *IntegrationTestSuite) TestGetVersioningBehavior() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
Expand Down

0 comments on commit c51231a

Please sign in to comment.