Skip to content

Commit

Permalink
Async Update Polling (#1088)
Browse files Browse the repository at this point in the history
* Remove orphaned go:generate directives

The codegen referenced here has been removed from this repository.

* Add support for async workflow updates.

This consists mainly of exposing the WaitPolicy to users of the workflow
client and implementing a lazy version of WorkflowUpdateHandle.

* Retry poll request until parent ctx expires

Loop inside the PollWorkflowClient stub function calling up to the
server with a context built from the long poll timeout.

* Retry poll for update on nil response outcome

Will now retry in 3 cases:
1. local context deadline exceeded
2. grpc error with DEADLINE_EXCEEDED
3. No error but grpc response does not contain an outcome
  • Loading branch information
Matt McShane authored Apr 19, 2023
1 parent 528337f commit 29aecb3
Show file tree
Hide file tree
Showing 8 changed files with 485 additions and 40 deletions.
12 changes: 11 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../LICENSE -package client -source client.go -destination client_mock.go
//go:generate go run ../internal/cmd/generateproxy/main.go

// Package client is used by external programs to communicate with Temporal service.
// NOTE: DO NOT USE THIS API INSIDE OF ANY WORKFLOW CODE!!!
Expand Down Expand Up @@ -173,6 +172,11 @@ type (
// NOTE: Experimental
WorkflowUpdateHandle = internal.WorkflowUpdateHandle

// GetWorkflowUpdateHandleOptions encapsulates the parameters needed to unambiguously
// refer to a Workflow Update
// NOTE: Experimental
GetWorkflowUpdateHandleOptions = internal.GetWorkflowUpdateHandleOptions

// UpdateWorkerBuildIdCompatibilityOptions is the input to Client.UpdateWorkerBuildIdCompatibility.
// NOTE: Experimental
UpdateWorkerBuildIdCompatibilityOptions = internal.UpdateWorkerBuildIdCompatibilityOptions
Expand Down Expand Up @@ -504,6 +508,12 @@ type (
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
// if not specified the most recent runID will be used.
// NOTE: Experimental
GetWorkflowUpdateHandle(ref GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down
2 changes: 0 additions & 2 deletions converter/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate go run ../internal/cmd/generateinterceptor/main.go

package converter

import (
Expand Down
6 changes: 6 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,12 @@ type (
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
// if not specified the most recent runID will be used.
// NOTE: Experimental
GetWorkflowUpdateHandle(GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle

// WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases
// that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the
// service are not configured with internal semantics such as automatic retries.
Expand Down
15 changes: 13 additions & 2 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

commonpb "go.temporal.io/api/common/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -308,6 +309,12 @@ type ClientOutboundInterceptor interface {
// NOTE: Experimental
UpdateWorkflow(context.Context, *ClientUpdateWorkflowInput) (WorkflowUpdateHandle, error)

// PollWorkflowUpdate requests the outcome of a specific update from the
// server.
//
// NOTE: Experimental
PollWorkflowUpdate(context.Context, *ClientPollWorkflowUpdateInput) (converter.EncodedValue, error)

mustEmbedClientOutboundInterceptorBase()
}

Expand All @@ -322,9 +329,13 @@ type ClientUpdateWorkflowInput struct {
Args []interface{}
RunID string
FirstExecutionRunID string
WaitPolicy *updatepb.WaitPolicy
}

// this isn't upstream in API yet
// WaitFor enumspb.WorkflowExecutionUpdateWaitEvent
// ClientPollWorkflowUpdateInput is the input to
// ClientOutboundInterceptor.PollWorkflowUpdate.
type ClientPollWorkflowUpdateInput struct {
UpdateRef *updatepb.UpdateRef
}

// ScheduleClientCreateInput is the input to
Expand Down
7 changes: 7 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@ func (c *ClientOutboundInterceptorBase) UpdateWorkflow(
return c.Next.UpdateWorkflow(ctx, in)
}

func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate(
ctx context.Context,
in *ClientPollWorkflowUpdateInput,
) (converter.EncodedValue, error) {
return c.Next.PollWorkflowUpdate(ctx, in)
}

// ExecuteWorkflow implements ClientOutboundInterceptor.ExecuteWorkflow.
func (c *ClientOutboundInterceptorBase) ExecuteWorkflow(
ctx context.Context,
Expand Down
180 changes: 149 additions & 31 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/internal/common/retry"
"go.temporal.io/sdk/internal/common/serializer"
"go.temporal.io/sdk/internal/common/util"
"go.temporal.io/sdk/log"
uberatomic "go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

// Assert that structs do indeed implement the interfaces
Expand All @@ -63,6 +66,8 @@ const (
defaultGetHistoryTimeout = 65 * time.Second

getSystemInfoTimeout = 5 * time.Second

pollUpdateTimeout = 60 * time.Second
)

var (
Expand Down Expand Up @@ -763,15 +768,16 @@ type UpdateWorkflowWithOptionsRequest struct {
// then the server will reject the update request with an error.
FirstExecutionRunID string

// this isn't upstream in API yet
// WaitFor enumspb.WorkflowExecutionUpdateWaitEvent
// How this RPC should block on the server before returning.
WaitPolicy *updatepb.WaitPolicy
}

// WorkflowUpdateHandle is a handle to a workflow execution update process. The
// update may or may not have completed so an instance of this type functions
// simlar to a Future with respect to the outcome of the update. If the update
// is rejected or returns an error, the Get function on this type will return
// that error through the output valuePtr.
// NOTE: Experimental
type WorkflowUpdateHandle interface {
// WorkflowID observes the update's workflow ID.
WorkflowID() string
Expand All @@ -786,13 +792,38 @@ type WorkflowUpdateHandle interface {
Get(ctx context.Context, valuePtr interface{}) error
}

// updateHandle is a dumb implementation of WorkflowExecutionUpdateHandle that
// only supports the case where the output value is aready known at construction
// time.
type updateHandle struct {
// GetWorkflowUpdateHandleOptions encapsulates the parameters needed to unambiguously
// refer to a Workflow Update.
// NOTE: Experimental
type GetWorkflowUpdateHandleOptions struct {
// WorkflowID of the target update
WorkflowID string

// RunID of the target workflow. If blank, use the most recent run
RunID string

// UpdateID of the target update
UpdateID string
}

type baseUpdateHandle struct {
ref *updatepb.UpdateRef
}

// completedUpdateHandle is an UpdateHandle impelementation for use when the outcome
// of the update is already known and the Get call can return immediately.
type completedUpdateHandle struct {
baseUpdateHandle
value converter.EncodedValue
err error
ref *updatepb.UpdateRef
}

// lazyUpdateHandle represents and update that is not known to have completed
// yet (i.e. the associated updatepb.Outcome is not known) and thus calling Get
// will poll the server for the outcome.
type lazyUpdateHandle struct {
baseUpdateHandle
client *WorkflowClient
}

// QueryWorkflowWithOptionsRequest is the request to QueryWorkflowWithOptions
Expand Down Expand Up @@ -1000,6 +1031,37 @@ func (wc *WorkflowClient) UpdateWorkflowWithOptions(
Args: req.Args,
RunID: req.RunID,
FirstExecutionRunID: req.FirstExecutionRunID,
WaitPolicy: req.WaitPolicy,
})
}

func (wc *WorkflowClient) GetWorkflowUpdateHandle(ref GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle {
return &lazyUpdateHandle{
client: wc,
baseUpdateHandle: baseUpdateHandle{
ref: &updatepb.UpdateRef{
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: ref.WorkflowID,
RunId: ref.RunID,
},
UpdateId: ref.UpdateID,
},
},
}
}

// PollWorkflowUpdate sends a request for the outcome of the specified update
// through the interceptor chain.
func (wc *WorkflowClient) PollWorkflowUpdate(
ctx context.Context,
ref *updatepb.UpdateRef,
) (converter.EncodedValue, error) {
if err := wc.ensureInitialized(); err != nil {
return nil, err
}
ctx = contextWithNewHeader(ctx)
return wc.interceptor.PollWorkflowUpdate(ctx, &ClientPollWorkflowUpdateInput{
UpdateRef: ref,
})
}

Expand Down Expand Up @@ -1687,12 +1749,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
RunId: in.RunID,
}
resp, err := w.client.workflowService.UpdateWorkflowExecution(grpcCtx, &workflowservice.UpdateWorkflowExecutionRequest{
WaitPolicy: &updatepb.WaitPolicy{
// currently fixed as this is the only supported execution method - will
// change to BlockingPolicy (or similar) when upstream API changes land
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
},

WaitPolicy: in.WaitPolicy,
Namespace: w.client.namespace,
WorkflowExecution: wfexec,
FirstExecutionRunId: in.FirstExecutionRunID,
Expand All @@ -1708,45 +1765,106 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
},
},
})
handle := &updateHandle{ref: resp.GetUpdateRef()}
if err != nil {
handle.err = err
return handle, nil
return nil, err
}
switch v := resp.GetOutcome().GetValue().(type) {
case nil:
panic("unspported update outcome: Incomplete")
return &lazyUpdateHandle{
client: w.client,
baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()},
}, nil
case *updatepb.Outcome_Failure:
handle.err = w.client.failureConverter.FailureToError(v.Failure)
return &completedUpdateHandle{
err: w.client.failureConverter.FailureToError(v.Failure),
baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()},
}, nil
case *updatepb.Outcome_Success:
handle.value = newEncodedValue(v.Success, w.client.dataConverter)
return &completedUpdateHandle{
value: newEncodedValue(v.Success, w.client.dataConverter),
baseUpdateHandle: baseUpdateHandle{ref: resp.GetUpdateRef()},
}, nil
}
return nil, fmt.Errorf("unsupported outcome type %T", resp.GetOutcome().GetValue())
}

func (w *workflowClientInterceptor) PollWorkflowUpdate(
parentCtx context.Context,
in *ClientPollWorkflowUpdateInput,
) (converter.EncodedValue, error) {
// header, _ = headerPropagated(ctx, w.client.contextPropagators)
//todo header not in PollWorkflowUpdate

pollReq := workflowservice.PollWorkflowExecutionUpdateRequest{
Namespace: w.client.namespace,
UpdateRef: in.UpdateRef,
Identity: w.client.identity,
WaitPolicy: &updatepb.WaitPolicy{
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
},
}
for parentCtx.Err() == nil {
ctx, cancel := newGRPCContext(
parentCtx,
grpcLongPoll(true),
grpcTimeout(pollUpdateTimeout),
)
ctx = context.WithValue(
ctx,
retry.ConfigKey,
createDynamicServiceRetryPolicy(ctx).GrpcRetryConfig(),
)
resp, err := w.client.workflowService.PollWorkflowExecutionUpdate(ctx, &pollReq)
cancel()
if err == context.DeadlineExceeded ||
status.Code(err) == codes.DeadlineExceeded ||
(err == nil && resp.GetOutcome() == nil) {
continue
}
if err != nil {
return nil, err
}
switch v := resp.GetOutcome().GetValue().(type) {
case *updatepb.Outcome_Failure:
return nil, w.client.failureConverter.FailureToError(v.Failure)
case *updatepb.Outcome_Success:
return newEncodedValue(v.Success, w.client.dataConverter), nil
default:
return nil, fmt.Errorf("unsupported outcome type %T", v)
}
}
return handle, nil
return nil, parentCtx.Err()
}

// Required to implement ClientOutboundInterceptor
func (*workflowClientInterceptor) mustEmbedClientOutboundInterceptorBase() {}

func (uh *updateHandle) WorkflowID() string {
func (uh *baseUpdateHandle) WorkflowID() string {
return uh.ref.GetWorkflowExecution().GetWorkflowId()
}

func (uh *updateHandle) RunID() string {
func (uh *baseUpdateHandle) RunID() string {
return uh.ref.GetWorkflowExecution().GetRunId()
}

func (uh *updateHandle) UpdateID() string {
func (uh *baseUpdateHandle) UpdateID() string {
return uh.ref.GetUpdateId()
}

func (uh *updateHandle) Get(ctx context.Context, valuePtr interface{}) error {
// implementation note: this is a broken implementation that assumes the
// update has already completed by the time Get is called. This is true for
// the current implementation of synchronous updates but will not be true in
// the future with async update.
func (ch *completedUpdateHandle) Get(ctx context.Context, valuePtr interface{}) error {
if ch.err != nil || valuePtr == nil {
return ch.err
}
if err := ch.value.Get(valuePtr); err != nil {
return err
}
return nil
}

if uh.err != nil || valuePtr == nil {
return uh.err
func (luh *lazyUpdateHandle) Get(ctx context.Context, valuePtr interface{}) error {
enc, err := luh.client.PollWorkflowUpdate(ctx, luh.ref)
if err != nil {
return err
}
return uh.value.Get(valuePtr)
return enc.Get(valuePtr)
}
Loading

0 comments on commit 29aecb3

Please sign in to comment.