You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to create a workflow that can respond to cancellations gracefully, by waiting for either a timeout or cancellation, whichever is first. Afterwards if there is a cancellation to skip the last activity. I use a disconnected context to allow the timer to continue despite the cancellation, since it has to wait on additional work first.
Describe the bug
The workflow fails with BadRequestCancelActivityAttributes: invalid history builder state for action: add-activitytask-cancel-requested-event
Minimal Reproduction
Use temporalite as a server
Use the cancellation example from samples-go with the following workflow code instead
Start worker
Run starter
Run cancellation
The key thing is using a disconnected context to control the timer.
Workflow.go
package cancellation
import (
"errors"
"fmt"
"time"
"go.temporal.io/sdk/workflow"
)
// @@@SNIPSTART samples-go-cancellation-workflow-definition
// YourWorkflow is a Workflow Definition that shows how it can be canceled.
func YourWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("cancel workflow started")
defer func() {
if !errors.Is(ctx.Err(), workflow.ErrCanceled) {
return
}
// When the Workflow is canceled, it has to get a new disconnected context to execute any Activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(ctx, nil)
if err != nil {
logger.Error("CleanupActivity failed", "Error", err)
}
}()
//Create selector
s := workflow.NewSelector(ctx)
newCtx, _ := workflow.NewDisconnectedContext(ctx)
newCtx, cancel := workflow.WithCancel(newCtx)
timer1 := workflow.NewTimer(newCtx, 5*time.Minute)
s.AddFuture(timer1, func(f workflow.Future) {
fmt.Println("Timer Cancelled")
})
s.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
cancel()
s.Select(ctx)
})
s.Select(ctx)
var result string
err := workflow.ExecuteActivity(ctx, ActivityToBeCanceled).Get(ctx, &result)
logger.Info(fmt.Sprintf("ActivityToBeCanceled returns %v, %v", result, err))
return nil
}
main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"github.com/temporalio/samples-go/cancellation"
)
// @@@SNIPSTART samples-go-cancellation-worker-starter
func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "cancel-activity", worker.Options{})
w.RegisterWorkflow(cancellation.YourWorkflow)
w.RegisterActivity(cancellation.ActivityToBeSkipped)
w.RegisterActivity(cancellation.ActivityToBeCanceled)
w.RegisterActivity(cancellation.CleanupActivity)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
// @@@SNIPEND
activity.go
package cancellation
import (
"context"
"time"
"go.temporal.io/sdk/activity"
)
// @@@SNIPSTART samples-go-cancellation-activity-definition
func ActivityToBeCanceled(ctx context.Context) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("activity started, to cancel the Workflow Execution, use 'go run cancellation/cancel/main.go " +
"-w <WorkflowID>' or use the CLI: 'tctl wf cancel -w <WorkflowID>'")
for {
select {
case <-time.After(1 * time.Second):
logger.Info("heartbeating...")
activity.RecordHeartbeat(ctx, "")
case <-ctx.Done():
logger.Info("context is cancelled")
return "I am canceled by Done", nil
}
}
}
func CleanupActivity(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("Cleanup Activity started")
return nil
}
func ActivityToBeSkipped(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("this Activity will be skipped due to cancellation")
return nil
}
// @@@SNIPEND
Environment/Versions
OS and processor: M1 Mac
Temporal Version: Temporalite with temporalite version (devel) (server 1.19.1)
Additional context
The text was updated successfully, but these errors were encountered:
I am not able to reproduce BadRequestCancelActivityAttributes invalid history builder state for action with the latest server, however I do get the same error I see in your screenshot. Looks to be related to the use of a disconnected context and WaitForCancellation: true
I think the exact error depends on how many activities you have after the timer that was cancelled. In this case 1, if you have 2 i think you get the other one. I can't remember exactly but this one was the simplified base case
What are you really trying to do?
I am trying to create a workflow that can respond to cancellations gracefully, by waiting for either a timeout or cancellation, whichever is first. Afterwards if there is a cancellation to skip the last activity. I use a disconnected context to allow the timer to continue despite the cancellation, since it has to wait on additional work first.
Describe the bug
The workflow fails with
BadRequestCancelActivityAttributes: invalid history builder state for action: add-activitytask-cancel-requested-event
Minimal Reproduction
Use temporalite as a server
Use the
cancellation
example from samples-go with the following workflow code insteadThe key thing is using a disconnected context to control the timer.
Workflow.go
main.go
activity.go
Environment/Versions
Additional context
The text was updated successfully, but these errors were encountered: