Skip to content

Commit

Permalink
Support for WorkflowIdConflictPolicy (#1563)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos authored Jul 26, 2024
1 parent bcc623d commit 1f0296c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 2 deletions.
9 changes: 7 additions & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,16 @@ type (
// Optional: defaulted to 10 secs.
WorkflowTaskTimeout time.Duration

// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
// for dedupe logic if set to RejectDuplicate.
// WorkflowIDReusePolicy - Specifies server behavior if a *completed* workflow with the same id exists.
// This can be useful for dedupe logic if set to RejectDuplicate
// Optional: defaulted to AllowDuplicate.
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy

// WorkflowIDConflictPolicy - Specifies server behavior if a *running* workflow with the same id exists.
// This cannot be set if WorkflowIDReusePolicy is set to TerminateIfRunning.
// Optional: defaulted to Fail.
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy

// When WorkflowExecutionErrorWhenAlreadyStarted is true, Client.ExecuteWorkflow will return an error if the
// workflow id has already been used and WorkflowIDReusePolicy would disallow a re-run. If it is set to false,
// rather than erroring a WorkflowRun instance representing the current or last run will be returned.
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type (
WorkflowID string
WaitForCancellation bool
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
DataConverter converter.DataConverter
RetryPolicy *commonpb.RetryPolicy
CronSchedule string
Expand Down
3 changes: 3 additions & 0 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/retry"
Expand Down Expand Up @@ -1592,6 +1593,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow(
WorkflowTaskTimeout: durationpb.New(workflowTaskTimeout),
Identity: w.client.identity,
WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
RetryPolicy: convertToPBRetryPolicy(in.Options.RetryPolicy),
CronSchedule: in.Options.CronSchedule,
Memo: memo,
Expand Down Expand Up @@ -1745,6 +1747,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: in.Options.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
Header: header,
}

Expand Down
72 changes: 72 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,43 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseIgnoreDuplicateWhileRunning()
ts.NotEqual(run1.GetRunID(), run3.GetRunID())
}

func (ts *IntegrationTestSuite) TestWorkflowIDConflictPolicy() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := ts.startWorkflowOptions("test-workflowidconflict-" + uuid.New())
opts.WorkflowExecutionErrorWhenAlreadyStarted = true

var alreadyStartedErr *serviceerror.WorkflowExecutionAlreadyStarted

// Start a workflow
run1, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.NoError(err)

// Confirm another fails by default
_, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &alreadyStartedErr)

// Confirm fails if explicitly given that option
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
_, err = ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &alreadyStartedErr)

// Confirm gives back same WorkflowRun if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
run2, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run2.GetRunID())

// Confirm terminates and starts new if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
run3, err := ts.client.ExecuteWorkflow(ctx, opts, ts.workflows.IDConflictPolicy)
ts.NotEqual(run1.GetRunID(), run3.GetRunID())

statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID())
ts.NoError(err)
ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED)
}

func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_ShortLived() {
ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithRetryPolicy, 0)
}
Expand Down Expand Up @@ -1919,6 +1956,41 @@ func (ts *IntegrationTestSuite) TestStartDelaySignalWithStart() {
ts.Equal(5*time.Second, event.GetWorkflowExecutionStartedEventAttributes().GetFirstWorkflowTaskBackoff().AsDuration())
}

func (ts *IntegrationTestSuite) TestSignalWithStartIdConflictPolicy() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var invalidArgErr *serviceerror.InvalidArgument
opts := ts.startWorkflowOptions("test-signalwithstart-workflowidconflict-" + uuid.New())

// Start a workflow
run1, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.NoError(err)

// Confirm gives back same WorkflowRun by default
run2, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run2.GetRunID())

// Confirm gives back same WorkflowRun if requested explicitly
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
run3, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.Equal(run1.GetRunID(), run3.GetRunID())

// Confirm policy to fail is invalid
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL
_, err = ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.ErrorAs(err, &invalidArgErr)

// Confirm terminates and starts new if requested
opts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
run4, err := ts.client.SignalWithStartWorkflow(ctx, opts.ID, "signal", true, opts, ts.workflows.IDConflictPolicy)
ts.NotEqual(run1.GetRunID(), run4.GetRunID())

statusRun1, err := ts.client.DescribeWorkflowExecution(ctx, run1.GetID(), run1.GetRunID())
ts.NoError(err)
ts.Equal(statusRun1.WorkflowExecutionInfo.Status, enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED)
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
var originalResult []string
err := ts.executeWorkflow("basic-reset-workflow-execution", ts.workflows.Basic, &originalResult)
Expand Down
9 changes: 9 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -644,6 +645,13 @@ func (w *Workflows) IDReusePolicy(
return ans1 + ans2, nil
}

func (w *Workflows) IDConflictPolicy(
ctx workflow.Context,
) error {
workflow.Await(ctx, func() bool { return false })
return nil
}

func (w *Workflows) ChildWorkflowWithRetryPolicy(ctx workflow.Context, expectedMaximumAttempts int, iterations int) error {
return w.childWorkflowWithRetryPolicy(ctx, w.childWithRetryPolicy, expectedMaximumAttempts, iterations)
}
Expand Down Expand Up @@ -3063,6 +3071,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ContinueAsNewWithRetryPolicy)
worker.RegisterWorkflow(w.ContinueAsNewWithChildWF)
worker.RegisterWorkflow(w.IDReusePolicy)
worker.RegisterWorkflow(w.IDConflictPolicy)
worker.RegisterWorkflow(w.InspectActivityInfo)
worker.RegisterWorkflow(w.InspectLocalActivityInfo)
worker.RegisterWorkflow(w.LargeQueryResultWorkflow)
Expand Down

0 comments on commit 1f0296c

Please sign in to comment.