Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker Versioning #1120

Merged
merged 24 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0cb488e
Upgrade API go to latest worker-versioning commit
bergundy Apr 14, 2023
8ea3f41
Update API worker-versioning feature branch
bergundy Apr 25, 2023
4c84bab
Use rebased API
bergundy May 3, 2023
1cec8d0
Upgrade api to get latest feature branch changes
bergundy May 4, 2023
9754c7e
Remove build_id from ResetPointInfo (still TBD if needed)
bergundy May 5, 2023
466b45d
Update api dependency to feature branch HEAD
bergundy May 11, 2023
2dc72e7
Upgrade API again
bergundy May 16, 2023
7207661
Use interface for operation, other updates
Sushisource May 18, 2023
d7eadcb
Fix test gomod
Sushisource May 18, 2023
2b1fa2f
Pass through UseLatestBuildID flag
Sushisource May 18, 2023
5fd2848
Update flag to use new scheme & expose enum as user interface
Sushisource May 24, 2023
fd82ac6
More reachability API changes
bergundy May 19, 2023
c6e41e8
More reachability API changes
bergundy May 19, 2023
6d22155
Propagate UseBuildIDForVersioning to RespondWorkflowTaskCompletedResp…
bergundy May 20, 2023
bcc09ff
Add stamps to places they're needed now & update polling reqs
Sushisource May 24, 2023
51eae5c
Include normal name in WFT sticky polls
Sushisource May 24, 2023
2b77bfa
Include normal name in StickyExecutionAttributes (#1118)
dnr May 25, 2023
30813c7
Add WithVersionIntent option setter & move VersionIntent to temporal pkg
Sushisource May 25, 2023
90edaf4
Rename stuff based on reviews
Sushisource May 25, 2023
04e2915
Dedupe use compat flag logic
Sushisource May 25, 2023
60d4ae6
Add experimental comments
Sushisource May 25, 2023
f3d24e2
Update to main branch
Sushisource May 25, 2023
c6734d1
Last bits of review comments
Sushisource May 25, 2023
3be58e3
Don't send binary checksum when build id capability is present
Sushisource May 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@ type (
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdCompatibility.
WorkerBuildIDVersionSets = internal.WorkerBuildIDVersionSets

// BuildIDOpAddNewIDInNewDefaultSet is an operation for UpdateWorkerBuildIdCompatibilityOptions
// to add a new BuildID in a new default set.
BuildIDOpAddNewIDInNewDefaultSet = internal.BuildIDOpAddNewIDInNewDefaultSet
Copy link
Member

@cretz cretz May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Godoc relating to this feature needs to be documented as experimental IMO.

I know it can seem unnecessary to do to each Godoc piece and instead just the entry/affecting point, but people are going to be expecting that unless otherwise marked, you're not gonna change this variable and I don't think we want to commit to API stability on any of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks


// BuildIDOpAddNewCompatibleVersion is an operation for UpdateWorkerBuildIdCompatibilityOptions
// to add a new BuildID to an existing compatible set.
BuildIDOpAddNewCompatibleVersion = internal.BuildIDOpAddNewCompatibleVersion

// BuildIDOpPromoteSet is an operation for UpdateWorkerBuildIdCompatibilityOptions to promote a
// set to be the default set by targeting an existing BuildID.
BuildIDOpPromoteSet = internal.BuildIDOpPromoteSet

// BuildIDOpPromoteIDWithinSet is an operation for UpdateWorkerBuildIdCompatibilityOptions to
// promote a BuildID within a set to be the default.
BuildIDOpPromoteIDWithinSet = internal.BuildIDOpPromoteIDWithinSet

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down
2 changes: 1 addition & 1 deletion contrib/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module go.temporal.io/sdk/contrib/datadog
go 1.16

require (
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
go.temporal.io/sdk v1.12.0
gopkg.in/DataDog/dd-trace-go.v1 v1.42.0
)
Expand Down
161 changes: 136 additions & 25 deletions contrib/datadog/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module go.temporal.io/sdk/contrib/opentelemetry
go 1.16

require (
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/otel v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.opentelemetry.io/otel/trace v1.2.0
Expand Down
167 changes: 132 additions & 35 deletions contrib/opentelemetry/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/opentracing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/opentracing/opentracing-go v1.2.0
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
go.temporal.io/sdk v1.12.0
)

Expand Down
167 changes: 132 additions & 35 deletions contrib/opentracing/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/tally/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module go.temporal.io/sdk/contrib/tally
go 1.16

require (
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally/v4 v4.1.1
go.temporal.io/sdk v1.12.0
)
Expand Down
167 changes: 132 additions & 35 deletions contrib/tally/go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/pborman/uuid v1.2.1
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.8.2
go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b
github.com/stretchr/testify v1.8.3
go.temporal.io/api v1.19.1-0.20230524162623-0e1dbb54f8e4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to update this after api lands?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

go.uber.org/atomic v1.9.0
golang.org/x/time v0.1.0
google.golang.org/grpc v1.54.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
)
137 changes: 126 additions & 11 deletions go.sum

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ type (
// activities directly from the workflow task back to this worker which is
// faster than non-eager which may be dispatched to a separate worker.
DisableEagerExecution bool

// VersioningIntent specifies whether this activity should run on a worker with a compatible
// build ID or not. See temporal.VersioningIntent.
VersioningIntent VersioningIntent
}

// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
Expand Down Expand Up @@ -208,9 +212,11 @@ func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
// RecordActivityHeartbeat sends heartbeat for the currently executing activity
// If the activity is either canceled (or) workflow/activity doesn't exist then we would cancel
// the context with error context.Canceled.
// TODO: we don't have a way to distinguish between the two cases when context is canceled because
// context doesn't support overriding value of ctx.Error.
// TODO: Implement automatic heartbeating with cancellation through ctx.
//
// TODO: we don't have a way to distinguish between the two cases when context is canceled because
// context doesn't support overriding value of ctx.Error.
// TODO: Implement automatic heartbeating with cancellation through ctx.
//
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
// can check error TimeoutType()/Details().
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
Expand Down
19 changes: 12 additions & 7 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ type (
// Deprecated: WorkflowExecutionTimeout is deprecated and is never set or
// used internally.
WorkflowExecutionTimeout time.Duration

// VersioningIntent specifies whether the continued workflow should run on a worker with a
// compatible build ID or not. See VersioningIntent.
VersioningIntent VersioningIntent
}

// UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
Expand Down Expand Up @@ -416,14 +420,14 @@ func IsCanceledError(err error) bool {
// If the workflow main function returns this error then the current execution is ended and
// the new execution with same workflow ID is started automatically with options
// provided to this function.
// ctx - use context to override any options for the new workflow like run timeout, task timeout, task queue.
// if not mentioned it would use the defaults that the current workflow is using.
// ctx := WithWorkflowRunTimeout(ctx, 30 * time.Minute)
// ctx := WithWorkflowTaskTimeout(ctx, 5 * time.Second)
// ctx := WithWorkflowTaskQueue(ctx, "example-group")
// wfn - workflow function. for new execution it can be different from the currently running.
// args - arguments for the new workflow.
//
// ctx - use context to override any options for the new workflow like run timeout, task timeout, task queue.
// if not mentioned it would use the defaults that the current workflow is using.
// ctx := WithWorkflowRunTimeout(ctx, 30 * time.Minute)
// ctx := WithWorkflowTaskTimeout(ctx, 5 * time.Second)
// ctx := WithWorkflowTaskQueue(ctx, "example-group")
// wfn - workflow function. for new execution it can be different from the currently running.
// args - arguments for the new workflow.
Comment on lines +424 to +430
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know that your formatter fixed these which is fine, but I think the fix was to just remove the space before each (it pretended two-space means indent like code).

func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) error {
i := getWorkflowOutboundInterceptor(ctx)
// Put header on context before executing
Expand Down Expand Up @@ -460,6 +464,7 @@ func (wc *workflowEnvironmentInterceptor) NewContinueAsNewError(
WorkflowExecutionTimeout: options.WorkflowExecutionTimeout,
WorkflowRunTimeout: options.WorkflowRunTimeout,
WorkflowTaskTimeout: options.WorkflowTaskTimeout,
VersioningIntent: options.VersioningIntent,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we keeping this in the context or adding a new constructor for the CAN error?
I prefer the latter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do too but that ship has sailed, we'll just keep it consistent here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 To keeping consistent, 👎 to new constructor or overloads in general for small options like this. We messed this up by not returning *ContinueAsNewError from this function originally, but anyone can construct that error themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manual construction works for me.

}
}

Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
OriginalTaskQueueName string
RetryPolicy *commonpb.RetryPolicy
DisableEagerExecution bool
VersioningIntent VersioningIntent
}

// ExecuteLocalActivityOptions options for executing a local activity
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
if len(params.CronSchedule) > 0 {
attributes.CronSchedule = params.CronSchedule
}
attributes.UseCompatibleVersion = determineUseCompatibleFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

command, err := wc.commandsHelper.startChildWorkflowExecution(attributes)
if _, ok := err.(*childWorkflowExistsWithId); ok {
Expand Down Expand Up @@ -640,6 +642,8 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
// false just before request by the eager activity executor if eager activity
// execution is otherwise disallowed
scheduleTaskAttr.RequestEagerExecution = !parameters.DisableEagerExecution
scheduleTaskAttr.UseCompatibleVersion = determineUseCompatibleFlagForCommand(
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)

command := wc.commandsHelper.scheduleActivityTask(scheduleID, scheduleTaskAttr)
command.setData(&scheduledActivity{
Expand Down
60 changes: 31 additions & 29 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type (
logger log.Logger
identity string
workerBuildID string
useBuildIDForVersioning bool
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
Expand Down Expand Up @@ -158,6 +159,7 @@ type (
namespace string
defaultHeartbeatThrottleInterval time.Duration
maxHeartbeatThrottleInterval time.Duration
versionStamp *commonpb.WorkerVersionStamp
}

// history wrapper method to help information about events.
Expand Down Expand Up @@ -422,7 +424,8 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin
ppMgr: ppMgr,
metricsHandler: params.MetricsHandler,
identity: params.Identity,
workerBuildID: params.WorkerBuildID,
workerBuildID: params.getBuildID(),
useBuildIDForVersioning: params.UseBuildIDForVersioning,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
Expand Down Expand Up @@ -905,7 +908,7 @@ ProcessEvents:
break ProcessEvents
}
if binaryChecksum == "" {
w.workflowInfo.BinaryChecksum = w.wth.getBuildID()
w.workflowInfo.BinaryChecksum = w.wth.workerBuildID
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
Expand Down Expand Up @@ -1586,16 +1589,20 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
// Continue as new error.
metricsHandler.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)

useCompat := determineUseCompatibleFlagForCommand(
contErr.VersioningIntent, workflowContext.workflowInfo.TaskQueueName, contErr.TaskQueueName)
closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
Input: contErr.Input,
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowRunTimeout: &contErr.WorkflowRunTimeout,
WorkflowTaskTimeout: &contErr.WorkflowTaskTimeout,
Header: contErr.Header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
RetryPolicy: convertToPBRetryPolicy(workflowContext.workflowInfo.RetryPolicy),
WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name},
Input: contErr.Input,
TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
WorkflowRunTimeout: &contErr.WorkflowRunTimeout,
WorkflowTaskTimeout: &contErr.WorkflowTaskTimeout,
Header: contErr.Header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
RetryPolicy: convertToPBRetryPolicy(workflowContext.workflowInfo.RetryPolicy),
UseCompatibleVersion: useCompat,
}}
} else if workflowContext.err != nil {
// Workflow failures
Expand Down Expand Up @@ -1648,27 +1655,25 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
langUsedFlags = append(langUsedFlags, uint32(flag))
}

builtRequest := &workflowservice.RespondWorkflowTaskCompletedRequest{
return &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Commands: commands,
Messages: messages,
Identity: wth.identity,
ReturnNewWorkflowTask: true,
ForceCreateNewWorkflowTask: forceNewWorkflowTask,
BinaryChecksum: wth.getBuildID(),
BinaryChecksum: wth.workerBuildID,
QueryResults: queryResults,
Namespace: wth.namespace,
MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts},
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: langUsedFlags,
},
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
Sushisource marked this conversation as resolved.
Show resolved Hide resolved
BuildId: wth.workerBuildID,
UseVersioning: wth.useBuildIDForVersioning,
},
}
if wth.workerBuildID != "" {
builtRequest.WorkerVersionStamp = &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
}
}
return builtRequest
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
Expand All @@ -1687,13 +1692,6 @@ func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.Hi
return nil
}

func (wth *workflowTaskHandlerImpl) getBuildID() string {
if wth.workerBuildID != "" {
return wth.workerBuildID
}
return getBinaryChecksum()
}

func newActivityTaskHandler(
service workflowservice.WorkflowServiceClient,
params workerExecutionParameters,
Expand Down Expand Up @@ -1724,6 +1722,10 @@ func newActivityTaskHandlerWithCustomProvider(
namespace: params.Namespace,
defaultHeartbeatThrottleInterval: params.DefaultHeartbeatThrottleInterval,
maxHeartbeatThrottleInterval: params.MaxHeartbeatThrottleInterval,
versionStamp: &commonpb.WorkerVersionStamp{
BuildId: params.getBuildID(),
UseVersioning: params.UseBuildIDForVersioning,
},
}
}

Expand Down Expand Up @@ -1932,7 +1934,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.UnregisteredActivityInvocationCounter).Inc(1)
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil,
NewActivityNotRegisteredError(activityType, ath.getRegisteredActivityNames()),
ath.dataConverter, ath.failureConverter, ath.namespace, false), nil
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp), nil
}

// panic handler
Expand All @@ -1950,7 +1952,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
metricsHandler.Counter(metrics.ActivityTaskErrorCounter).Inc(1)
panicErr := newPanicError(p, st)
result = convertActivityResultToRespondRequest(ath.identity, t.TaskToken, nil, panicErr,
ath.dataConverter, ath.failureConverter, ath.namespace, false)
ath.dataConverter, ath.failureConverter, ath.namespace, false, ath.versionStamp)
}
}()

Expand Down Expand Up @@ -1990,7 +1992,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
)
}
return convertActivityResultToRespondRequest(ath.identity, t.TaskToken, output, err,
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCancel), nil
ath.dataConverter, ath.failureConverter, ath.namespace, isActivityCancel, ath.versionStamp), nil
}

func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
Expand Down
Loading