Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cancel-in-progress sample #225

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cancel-in-progress/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Cancel-in-progress

This example demonstrates how to implement a workflow that ensures that only one run of the child workflow is executed at a time. Subsequent runs will cancel the running child workflow and re-run it with the latest sent options through `SignalWithStartWorkflow`.
Those semantics are useful, especially when implementing a CI pipeline. New commits during the workflow execution should short-circuit the child workflow and only build the most recent commit.


Make sure the [Temporal Server is running locally](https://docs.temporal.io/docs/server/quick-install).

From the root of the project, start a Worker:

```bash
go run cancel-in-progress/worker/main.go
```

Start the Workflow Execution:

```bash
go run cancel-in-progress/starter/main.go
```
19 changes: 19 additions & 0 deletions cancel-in-progress/child_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package cancel_in_progress

import (
"go.temporal.io/sdk/workflow"
"time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget whether it's go fmt or goimports but may need to run them both so that stdlib imports are separate from others

)

// SampleChildWorkflow is a Workflow Definition
func SampleChildWorkflow(ctx workflow.Context, name string) (string, error) {
logger := workflow.GetLogger(ctx)

// Simulate some long running processing.
_ = workflow.Sleep(ctx, time.Second*3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you swallow this error this cannot get cancelled. It is important for our samples to show best practices of always responding to errors (if we haven't in other samples we should fix that)


greeting := "Hello " + name + "!"
logger.Info("Child workflow execution: " + greeting)

return greeting, nil
}
109 changes: 109 additions & 0 deletions cancel-in-progress/parent_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package cancel_in_progress

import (
"errors"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

const ParentWorkflowSignalName = "parent-workflow-signal"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the signal name should reference what it does rather than who it's to, e.g. const RestartChildSignalName = "restart-child-signal"


func SampleParentWorkflow(ctx workflow.Context) (result string, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write tests for this?

logger := workflow.GetLogger(ctx)

var message string

reBuildSignalChan := workflow.GetSignalChannel(ctx, ParentWorkflowSignalName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean here by "build"?


// This will not block because the workflow is started with a signal transactional.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent with whether you capitalize/punctuate your comments

reBuildSignalChan.Receive(ctx, &message)

cwo := workflow.ChildWorkflowOptions{
WaitForCancellation: true,
}
ctx = workflow.WithChildOptions(ctx, cwo)

var isProcessingDone = false

for !isProcessingDone {
ctx, cancelHandler := workflow.WithCancel(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity reasons you might want rename ctx instead of shadowing the parent ctx. It can be unclear whether this is wrapping one from the previous loop iteration or not.


// it is important to re-execute the child workflow after every signal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you don't execute after every signal. If you get two signals you only execute after the latest. (seems to be by intention, just need to change this comment)

// because we might have cancelled the previous execution
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, SampleChildWorkflow, message)

selector := workflow.NewSelector(ctx)

selector.AddFuture(childWorkflowFuture, func(f workflow.Future) {
err = f.Get(ctx, &result)

// we don't want to end the parent workflow when child workflow is cancelled
if errors.Is(ctx.Err(), workflow.ErrCanceled) {
logger.Info("Child execution cancelled.")
return
}

if err != nil {
logger.Error("Child execution failure.", "Error", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do this same log statement twice

}

// Child workflow completed. Let's end the parent workflow.
isProcessingDone = true
})

selector.AddReceive(reBuildSignalChan, func(c workflow.ReceiveChannel, more bool) {
logger.Info("Received signal.", "Message", message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log shows the message from the last time the loop ran. The message is not received until GetLatestMessageFromChannel is executed.


// cancel the child workflow as fast as possible after we received a new signal
// if a child workflow execution is in progress we cancel everything and start a new child workflow (for loop)
cancelHandler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not waiting for the child workflow to complete its cancel. How do you know it was received and properly handled? Or if it's by intention that you can have many of these child workflows running at once with this algorithm, there should be a comment mentioning that.


// drain the channel to get the latest signal
// Users might send multiple signals in a short period of time
// and we are only interested in the latest signal
message = GetLatestMessageFromChannel(logger, reBuildSignalChan)
})

// wait for the build workflow to finish or the signal to cancel the running execution through a signal
selector.Select(ctx)

// in case of an error we want to cancel the child workflow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, due to parent close policy, returning from the parent is actually terminating the child, not cancelling it

if err != nil {
logger.Error("Child execution failure.", "Error", err)
return "", err
}
}

logger.Info("Parent execution completed.", "Result", result)

return result, nil
}

func GetLatestMessageFromChannel(logger log.Logger, ch workflow.ReceiveChannel) string {
var message string
var messages []string

for {
var m string
if ch.ReceiveAsync(&m) {
messages = append(messages, m)
logger.Info("Additional message received.", "message", message)
} else {
break
}
}

for i, m := range messages {
// continue with the latest signal
if i == len(messages)-1 {
// Update the workflow options to use the latest message for the next child workflow execution
message = m
logger.Info("Continue with latest message.", "message", message)
} else {
logger.Info("Cancel old workflow execution.", "message", message)
// You might want to do some cleanup here
}
}

return message
}
Comment on lines +82 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func GetLatestMessageFromChannel(logger log.Logger, ch workflow.ReceiveChannel) string {
var message string
var messages []string
for {
var m string
if ch.ReceiveAsync(&m) {
messages = append(messages, m)
logger.Info("Additional message received.", "message", message)
} else {
break
}
}
for i, m := range messages {
// continue with the latest signal
if i == len(messages)-1 {
// Update the workflow options to use the latest message for the next child workflow execution
message = m
logger.Info("Continue with latest message.", "message", message)
} else {
logger.Info("Cancel old workflow execution.", "message", message)
// You might want to do some cleanup here
}
}
return message
}
func GetLatestMessageFromChannel(logger log.Logger, ch workflow.ReceiveChannel) (message string) {
for ch.ReceiveAsync(&message) {
}
return
}

This is a bit simpler way to write this, can throw logs in if important

78 changes: 78 additions & 0 deletions cancel-in-progress/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"context"
"go.temporal.io/sdk/client"
"log"
"math/rand"
"strconv"
"time"

cancel_in_progress "github.com/temporalio/samples-go/cancel-in-progress"
)

func main() {
// The client is a heavyweight object that should be created only once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default can be left unset

})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// This Workflow ID must be fixed because we don't want to start a new workflow every time.
projectID := "my-project"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a "project"?


workflowID := "parent-workflow_" + projectID
workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: "child-workflow",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid clash w/ other samples, can you name this the name of the sample?

}

// Start the workflow execution or send a signal to an existing workflow execution.
workflowRun, err := c.SignalWithStartWorkflow(
context.Background(),
workflowID,
cancel_in_progress.ParentWorkflowSignalName,
"World",
workflowOptions,
cancel_in_progress.SampleParentWorkflow,
)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow",
"WorkflowID", workflowRun.GetID(), "RunID", workflowRun.GetRunID())

// Send three signals to the workflow. At the end, we will expect that only the result of the last signal is returned.
for i := 1; i <= 3; i++ {
workflowRun, err = c.SignalWithStartWorkflow(
context.Background(),
workflowID,
cancel_in_progress.ParentWorkflowSignalName,
"World"+strconv.Itoa(i),
workflowOptions,
cancel_in_progress.SampleParentWorkflow,
)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)

log.Println("Started workflow",
"WorkflowID", workflowRun.GetID(), "RunID", workflowRun.GetRunID())
}

// Synchronously wait for the Workflow Execution to complete.
// Behind the scenes the SDK performs a long poll operation.
// If you need to wait for the Workflow Execution to complete from another process use
// Client.GetWorkflow API to get an instance of the WorkflowRun.
var result string
err = workflowRun.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Failure getting workflow result", err)
}
log.Printf("Workflow result: %v", result)
}
31 changes: 31 additions & 0 deletions cancel-in-progress/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

cancel_in_progress "github.com/temporalio/samples-go/cancel-in-progress"
)

func main() {
// The client is a heavyweight object that should be created only once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "child-workflow", worker.Options{})

w.RegisterWorkflow(cancel_in_progress.SampleParentWorkflow)
w.RegisterWorkflow(cancel_in_progress.SampleChildWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}