Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequestCancelExternalWorkflow Sample #240

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions request-cancel-external-workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Cancellation
<!-- @@@SNIPSTART samples-go-cancellation-readme -->
Make sure the [Temporal Server is running locally](https://docs.temporal.io/docs/server/quick-install).

From the root of the project, start a Worker:

```bash
go run cancellation/worker/main.go
```

Start the Workflow Execution:

```bash
go run cancellation/starter/main.go
```

Cancel the Workflow Execution:

```bash
go run cancellation/cancel/main.go
```
<!-- @@@SNIPEND -->
41 changes: 41 additions & 0 deletions request-cancel-external-workflow/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package requestcancelexternalworkflow

import (
"context"
"time"

"go.temporal.io/sdk/activity"
)

// @@@SNIPSTART samples-go-cancellation-activity-definition
type Activities struct{}

func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("activity started, to cancel the Workflow Execution, use 'go run cancellation/cancel/main.go " +
"-w <WorkflowID>' or use the CLI: 'tctl wf cancel -w <WorkflowID>'")
for {
select {
case <-time.After(1 * time.Second):
logger.Info("heartbeating...")
activity.RecordHeartbeat(ctx, "")
case <-ctx.Done():
logger.Info("context is cancelled")
return "I am canceled by Done", nil
}
}
}

func (a *Activities) CleanupActivity(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("Cleanup Activity started")
return nil
}

func (a *Activities) ActivityToBeSkipped(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("this Activity will be skipped due to cancellation")
return nil
}

// @@@SNIPEND
43 changes: 43 additions & 0 deletions request-cancel-external-workflow/cancel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"flag"
"log"

"go.temporal.io/sdk/client"
)

// @@@SNIPSTART samples-go-cancellation-cancel-workflow-execution-trigger
func main() {
var workflowID string
flag.StringVar(&workflowID, "wid", "workflowID-to-cancel", "workflowID of the Workflow Execution to be canceled.")
flag.Parse()

if workflowID == "" {
flag.PrintDefaults()
return
}

// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

//err = c.CancelWorkflow(context.Background(), workflowID, "")
//if err != nil {
// log.Fatalln("Unable to cancel Workflow Execution", err)
//}
log.Println("SIGNALLING")
err = c.SignalWorkflow(context.Background(), workflowID, "", "cancelme", "cancelled")
if err != nil {
log.Fatalln("Unable to signal Workflow Execution", err)
}
log.Println("Workflow Execution cancelled", "WorkflowID", workflowID)
}

// @@@SNIPEND
39 changes: 39 additions & 0 deletions request-cancel-external-workflow/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"context"
"flag"
requestcancelexternalworkflow "github.com/temporalio/samples-go/request-cancel-external-workflow"
"log"

"go.temporal.io/sdk/client"
)

// @@@SNIPSTART samples-go-cancellation-workflow-execution-starter
func main() {
var workflowID string
flag.StringVar(&workflowID, "w", "workflowID-to-cancel", "w is the workflowID of the workflow to be canceled.")
flag.Parse()

// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: "cancel-activity",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, requestcancelexternalworkflow.CancellingWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
}

// @@@SNIPEND
36 changes: 36 additions & 0 deletions request-cancel-external-workflow/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
requestcancelexternalworkflow "github.com/temporalio/samples-go/request-cancel-external-workflow"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/temporalio/samples-go/cancellation"
)

// @@@SNIPSTART samples-go-cancellation-worker-starter
func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "cancel-activity", worker.Options{})

w.RegisterWorkflow(requestcancelexternalworkflow.CancellingWorkflow)
w.RegisterWorkflow(requestcancelexternalworkflow.ChildWorkflow)
w.RegisterActivity(&cancellation.Activities{})

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}

// @@@SNIPEND
104 changes: 104 additions & 0 deletions request-cancel-external-workflow/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package requestcancelexternalworkflow

import (
"errors"
"go.temporal.io/sdk/workflow"
"time"
)

// @@@SNIPSTART samples-go-cancellation-workflow-definition
// YourWorkflow is a Workflow Definition that shows how it can be canceled.
func CancellingWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("cancel workflow started")
var a *Activities // Used to call Activities by function pointer
defer func() {

if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}

// When the Workflow is canceled, it has to get a new disconnected context to execute any Activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, a.CleanupActivity).Get(ctx, nil)
if err != nil {
logger.Error("CleanupActivity failed", "Error", err)
}
}()

var cancelled string

workflow.Go(ctx, func(ctx workflow.Context) {
sigChan := workflow.GetSignalChannel(ctx, "cancelme")
sel := workflow.NewSelector(ctx)
sel.AddReceive(sigChan, func(c workflow.ReceiveChannel, more bool) {
sigChan.Receive(ctx, &cancelled)
logger.Info("received signal", cancelled)
})

sel.Select(ctx)
})

childCtx, _ := workflow.WithCancel(ctx)
childCtx = workflow.WithChildOptions(childCtx, workflow.ChildWorkflowOptions{
WaitForCancellation: true,
WorkflowID: "mykid",
})
childFuture := workflow.ExecuteChildWorkflow(childCtx, ChildWorkflow)

if err := workflow.Await(ctx, func() bool {
return cancelled == "cancelled"
}); err != nil {
return err
}
logger.Info("cancelling child")
var childResult string

if err := workflow.RequestCancelExternalWorkflow(ctx, "mykid", "").Get(ctx, &childResult); err != nil {
return err
}

//cancelChild()
if err := childFuture.Get(childCtx, &childResult); err != nil {
logger.Error("child raised error", "err", err)
}

logger.Info("child result is", "result", childResult)

logger.Info("Workflow Execution complete.")

return nil
}
func ChildWorkflow(ctx workflow.Context) (myresult string, err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("CHILD: workflow started")
myresult = "result: pristine"
defer func() {
logger.Info("CHILD: defer")
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}
myresult = "result: child cancellation result"
err = nil
logger.Info("CHILD: cancel workflow")
}()
var neverSet bool
if aerr := workflow.Await(ctx, func() bool {
return neverSet
}); aerr != nil {
return "result: await errd", aerr
}
return "result: final value", nil
}
Loading