Skip to content

Commit

Permalink
Fix ordering issue on child workflow cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Oct 14, 2021
1 parent 47b28b9 commit e3583c5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 0 deletions.
15 changes: 15 additions & 0 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,12 @@ func (d *childWorkflowCommandStateMachine) cancel() {
switch d.state {
case commandStateStarted:
d.moveState(commandStateCanceledAfterStarted, eventCancel)
// A child workflow may be canceled _after_ something like an activity start
// happens inside a simulated goroutine. However, since the state of the
// entire child workflow is recorded based on when it started not when it
// was canceled, we have to move it to the end once canceled to keep the
// expected commands in order of when they actually occurred.
d.helper.moveCommandToBack(d)
d.helper.incrementNextCommandEventID()
default:
d.commandStateMachineBase.cancel()
Expand Down Expand Up @@ -879,6 +885,15 @@ func (h *commandsHelper) removeCancelOfResolvedCommand(commandID commandID) {
}
}

func (h *commandsHelper) moveCommandToBack(command commandStateMachine) {
elem := h.commands[command.getID()]
if elem == nil {
panicIllegalState(fmt.Sprintf("moving command not present %v", command))
}
h.orderedCommands.Remove(elem)
h.commands[command.getID()] = h.orderedCommands.PushBack(command)
}

func (h *commandsHelper) scheduleActivityTask(
scheduleID int64,
attributes *commandpb.ScheduleActivityTaskCommandAttributes,
Expand Down
5 changes: 5 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,11 @@ func (ts *IntegrationTestSuite) TestEndToEndLatencyMetrics() {
ts.Equal(prevNonLocalValue, nonLocal.Value())
}

func (ts *IntegrationTestSuite) TestCancelChildAndExecuteActivityRace() {
err := ts.executeWorkflow("cancel-child-and-execute-act-race", ts.workflows.CancelChildAndExecuteActivityRace, nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
ts.NoError(err)
Expand Down
39 changes: 39 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,43 @@ func (w *Workflows) WaitSignalReturnParam(ctx workflow.Context, v interface{}) (
return v, nil
}

func (w *Workflows) CancelChildAndExecuteActivityRace(ctx workflow.Context) error {
// This workflow replicates an issue where cancel was reported out of order
// with when it occurs. Specifically, this workflow creates a long-running
// child then signals its cancellation from a simulated goroutine and
// immediately starts an activity. Previously, the SDK would put the cancel
// command before the execute command since the child workflow started first.

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 2 * time.Minute})

// Start long-running child workflow
childCtx, childCancel := workflow.WithCancel(ctx)
childCtx = workflow.WithChildOptions(childCtx, workflow.ChildWorkflowOptions{WaitForCancellation: true})
child := workflow.ExecuteChildWorkflow(childCtx, w.SleepForDuration, 3*time.Minute)
if err := child.GetChildWorkflowExecution().Get(ctx, nil); err != nil {
return err
}

// Start "goroutine" to send to channel and immediately start activity
ch := workflow.NewChannel(ctx)
workflow.Go(ctx, func(ctx workflow.Context) {
ch.Send(ctx, nil)
if err := workflow.ExecuteActivity(ctx, new(Activities).Sleep, 1*time.Millisecond).Get(ctx, nil); err != nil {
panic(err)
}
})

// Wait for channel and cancel child
ch.Receive(ctx, nil)
childCancel()
child.Get(ctx, nil)
return nil
}

func (w *Workflows) SleepForDuration(ctx workflow.Context, d time.Duration) error {
return workflow.Sleep(ctx, d)
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -1327,6 +1364,8 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.CronWorkflow)
worker.RegisterWorkflow(w.CancelTimerConcurrentWithOtherCommandWorkflow)
worker.RegisterWorkflow(w.CancelMultipleCommandsOverMultipleTasks)
worker.RegisterWorkflow(w.CancelChildAndExecuteActivityRace)
worker.RegisterWorkflow(w.SleepForDuration)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
Expand Down

0 comments on commit e3583c5

Please sign in to comment.