Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support disabling run following for workflow results #791

Merged
merged 3 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type (
// WorkflowRun represents a started non child workflow.
WorkflowRun = internal.WorkflowRun

// WorkflowRunGetOptions are options for WorkflowRun.GetWithOptions.
WorkflowRunGetOptions = internal.WorkflowRunGetOptions

// QueryWorkflowWithOptionsRequest defines the request to QueryWorkflowWithOptions.
QueryWorkflowWithOptionsRequest = internal.QueryWorkflowWithOptionsRequest

Expand Down
15 changes: 9 additions & 6 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,16 @@ type (
// ContinueAsNewError contains information about how to continue the workflow as new.
ContinueAsNewError struct {
//params *ExecuteWorkflowParams
WorkflowType *WorkflowType
Input *commonpb.Payloads
Header *commonpb.Header
TaskQueueName string
WorkflowType *WorkflowType
Input *commonpb.Payloads
Header *commonpb.Header
TaskQueueName string
WorkflowRunTimeout time.Duration
WorkflowTaskTimeout time.Duration

// Deprecated: WorkflowExecutionTimeout is deprecated and is never set or
// used internally.
WorkflowExecutionTimeout time.Duration
WorkflowRunTimeout time.Duration
WorkflowTaskTimeout time.Duration
}

// UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
Expand Down
85 changes: 65 additions & 20 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,38 @@ type (
// GetID return workflow ID, which will be same as StartWorkflowOptions.ID if provided.
GetID() string

// GetRunID return the first started workflow run ID (please see below) - empty string if no such run
// GetRunID return the first started workflow run ID (please see below) -
// empty string if no such run. Note, this value may change after Get is
// called if there was a later run for this run.
GetRunID() string

// Get will fill the workflow execution result to valuePtr,
// if workflow execution is a success, or return corresponding,
// error. This is a blocking API.
// Get will fill the workflow execution result to valuePtr, if workflow
// execution is a success, or return corresponding error. This is a blocking
// API.
//
// This call will follow execution runs to the latest result for this run
// instead of strictly returning this run's result. This means that if the
// workflow returned ContinueAsNewError, has a more recent cron execution,
// or has a new run ID on failure (i.e. a retry), this will wait and return
// the result for the latest run in the chain. To strictly get the result
// for this run without following to the latest, use GetWithOptions and set
// the DisableFollowingRuns option to true.
Get(ctx context.Context, valuePtr interface{}) error

// NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
// return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
// Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError,
// the second run has run ID "run ID 2" and return some result other than ContinueAsNewError:
// GetRunID() will always return "run ID 1" and Get(ctx context.Context, valuePtr interface{}) will return the result of second run.
// NOTE: DO NOT USE client.ExecuteWorkflow API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
// GetWithOptions will fill the workflow execution result to valuePtr, if
// workflow execution is a success, or return corresponding error. This is a
// blocking API.
GetWithOptions(ctx context.Context, valuePtr interface{}, options WorkflowRunGetOptions) error
}

// WorkflowRunGetOptions are options for WorkflowRun.GetWithOptions.
WorkflowRunGetOptions struct {
// DisableFollowingRuns, if true, will not follow execution chains to the
// latest run. By default when this is false, getting the result of a
// workflow may not use the literal run ID but instead follow to later runs
// if the workflow returned a ContinueAsNewError, has a later cron, or is
// retried on failure.
DisableFollowingRuns bool
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I fail if this was set to true but a run ID was not provided? Is there a use case for not providing a run ID but still not following to the next run?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't in TS. I can't think of a reason not to allow it, saves looking up the last run ID for a workflow ID.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this setting name to FollowRuns and still default to true? It's easier to follow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this setting name to FollowRuns and still default to true? It's easier to follow.

I am afraid not in Go. You can't default fields, their default is always the zero value which for booleans is false. This is why for all Go structs, you specifically build the field where false is the reasonable default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I get it. I thought that we could check if the options struct is null and default to true but we want empty struct to also default to true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This options struct is never null inside the outer options (it's not a pointer). And even if it could be nullable, we want to support people creating it and just setting the fields they care about instead of trying to differentiate between unset and explicit false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I agree this is the best approach

}

// workflowRunImpl is an implementation of WorkflowRun
Expand Down Expand Up @@ -878,6 +895,14 @@ func (workflowRun *workflowRunImpl) GetID() string {
}

func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{}) error {
return workflowRun.GetWithOptions(ctx, valuePtr, WorkflowRunGetOptions{})
}

func (workflowRun *workflowRunImpl) GetWithOptions(
ctx context.Context,
valuePtr interface{},
options WorkflowRunGetOptions,
) error {

iter := workflowRun.iterFn(ctx, workflowRun.currentRunID.Get())
if !iter.HasNext() {
Expand All @@ -891,8 +916,8 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
switch closeEvent.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
attributes := closeEvent.GetWorkflowExecutionCompletedEventAttributes()
if attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId)
if !options.DisableFollowingRuns && attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId, options)
}
if valuePtr == nil || attributes.Result == nil {
return nil
Expand All @@ -904,8 +929,8 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
return workflowRun.dataConverter.FromPayloads(attributes.Result, valuePtr)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
attributes := closeEvent.GetWorkflowExecutionFailedEventAttributes()
if attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId)
if !options.DisableFollowingRuns && attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId, options)
}
err = ConvertFailureToError(attributes.GetFailure(), workflowRun.dataConverter)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
Expand All @@ -916,13 +941,28 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
err = newTerminatedError()
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
attributes := closeEvent.GetWorkflowExecutionTimedOutEventAttributes()
if attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId)
if !options.DisableFollowingRuns && attributes.NewExecutionRunId != "" {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId, options)
}
err = NewTimeoutError("Workflow timeout", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
attributes := closeEvent.GetWorkflowExecutionContinuedAsNewEventAttributes()
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId)
if !options.DisableFollowingRuns {
return workflowRun.follow(ctx, valuePtr, attributes.NewExecutionRunId, options)
}
err := &ContinueAsNewError{
WorkflowType: &WorkflowType{Name: attributes.GetWorkflowType().GetName()},
Input: attributes.Input,
Header: attributes.Header,
TaskQueueName: attributes.GetTaskQueue().GetName(),
}
if attributes.WorkflowRunTimeout != nil {
err.WorkflowRunTimeout = *attributes.WorkflowRunTimeout
}
if attributes.WorkflowTaskTimeout != nil {
err.WorkflowTaskTimeout = *attributes.WorkflowTaskTimeout
}
return err
default:
return fmt.Errorf("unexpected event type %s when handling workflow execution result", closeEvent.GetEventType())
}
Expand All @@ -940,10 +980,15 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{
// doesn't return until the chain finishes. These can be ContinuedAsNew events, Completed events
// (for workflows with a cron schedule), or Failed or TimedOut events (for workflows with a retry
// policy or cron schedule).
func (workflowRun *workflowRunImpl) follow(ctx context.Context, valuePtr interface{}, newRunID string) error {
func (workflowRun *workflowRunImpl) follow(
ctx context.Context,
valuePtr interface{},
newRunID string,
options WorkflowRunGetOptions,
) error {
curRunID := util.PopulatedOnceCell(newRunID)
workflowRun.currentRunID = &curRunID
return workflowRun.Get(ctx, valuePtr)
return workflowRun.GetWithOptions(ctx, valuePtr, options)
}

func getWorkflowMemo(input map[string]interface{}, dc converter.DataConverter) (*commonpb.Memo, error) {
Expand Down
20 changes: 18 additions & 2 deletions mocks/WorkflowRun.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,6 +2106,33 @@ func (ts *IntegrationTestSuite) TestLargeHistoryReplay() {
ts.Contains(err.Error(), "intentional panic")
}

func (ts *IntegrationTestSuite) TestClientGetNotFollowingRuns() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Start workflow that does a continue as new
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-client-get-not-following-runs"),
ts.workflows.ContinueAsNew, 1, ts.taskQueueName)
ts.NoError(err)

// Do the regular get which returns the final value and a different run ID
origRunID := run.GetRunID()
var val int
ts.NoError(run.Get(ctx, &val))
ts.Equal(999, val)
ts.NotEqual(origRunID, run.GetRunID())

// Get the run with the original ID and fetch without following runs
run = ts.client.GetWorkflow(ctx, run.GetID(), origRunID)
err = run.GetWithOptions(ctx, nil, client.WorkflowRunGetOptions{DisableFollowingRuns: true})
ts.Error(err)
contErr := err.(*workflow.ContinueAsNewError)
ts.Equal("ContinueAsNew", contErr.WorkflowType.Name)
ts.Equal("0", string(contErr.Input.Payloads[0].Data))
ts.Equal("\""+ts.taskQueueName+"\"", string(contErr.Input.Payloads[1].Data))
ts.Equal(ts.taskQueueName, contErr.TaskQueueName)
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{
HostPort: ts.config.ServiceAddr,
Expand Down