From d4f950d2bfdd818adb22985d7e6afe9ed1ad535d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sun, 28 Jul 2024 18:05:21 -0700 Subject: [PATCH] Test duplicate rejected updates --- test/integration_test.go | 29 +++++++++++++++++++++++++++++ test/workflow_test.go | 19 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/test/integration_test.go b/test/integration_test.go index f8fce9011..8660e763a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -3471,6 +3471,35 @@ func (ts *IntegrationTestSuite) TestUpdateRejected() { ts.NoError(run.Get(ctx, nil)) } +func (ts *IntegrationTestSuite) TestUpdateRejectedDuplicated() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + options := ts.startWorkflowOptions("test-update-rejected-duplicated") + run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowWithRejectableUpdate) + ts.NoError(err) + // Send an update we expect to be rejected before the first workflow task + handle, err := ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + Args: []interface{}{true}, + }) + ts.NoError(err) + ts.Error(handle.Get(ctx, nil)) + // Same update ID should be allowed to be reused after the first attempt is rejected + handle, err = ts.client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + WaitForStage: client.WorkflowUpdateStageCompleted, + Args: []interface{}{false}, + }) + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + ts.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID()) +} + func (ts *IntegrationTestSuite) TestUpdateSettingHandlerInGoroutine() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index 46d4599ef..cf6ccb316 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2641,6 +2641,24 @@ func (w *Workflows) UpdateRejectedWithOtherGoRoutine(ctx workflow.Context) error return nil } +func (w *Workflows) WorkflowWithRejectableUpdate(ctx workflow.Context) error { + workflow.SetUpdateHandlerWithOptions(ctx, "update", + func(ctx workflow.Context, _ bool) error { + return nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, reject bool) error { + if reject { + return errors.New("test update rejected") + } + return nil + }, + }) + workflow.Await(ctx, func() bool { + return false + }) + return nil +} + func (w *Workflows) UpdateOrdering(ctx workflow.Context) (int, error) { updatesRan := 0 updateHandle := func(ctx workflow.Context) error { @@ -3135,6 +3153,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.QueryTestWorkflow) worker.RegisterWorkflow(w.UpdateWithMutex) worker.RegisterWorkflow(w.UpdateWithSemaphore) + worker.RegisterWorkflow(w.WorkflowWithRejectableUpdate) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childWithRetryPolicy)