Skip to content

Commit

Permalink
Don't send binary checksum when build id capability is present
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed May 25, 2023
1 parent c6734d1 commit 3be58e3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
6 changes: 5 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
langUsedFlags = append(langUsedFlags, uint32(flag))
}

return &workflowservice.RespondWorkflowTaskCompletedRequest{
builtRequest := &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Commands: commands,
Messages: messages,
Expand All @@ -1674,6 +1674,10 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
UseVersioning: wth.useBuildIDForVersioning,
},
}
if wth.capabilities != nil && wth.capabilities.BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
}
return builtRequest
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
Expand Down
38 changes: 29 additions & 9 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ type (
stopC <-chan struct{}
// The worker's build ID, either as defined by the user or automatically set
workerBuildID string
// Whether or not the worker has opted in to the build-id based versioning feature
// Whether the worker has opted in to the build-id based versioning feature
useBuildIDVersioning bool
// Server's capabilities
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
}

// numPollerMetric tracks the number of active pollers and publishes a metric on it.
Expand Down Expand Up @@ -253,6 +255,13 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (interface{}, er
}
}

func (bp *basePoller) getCapabilities() *workflowservice.GetSystemInfoResponse_Capabilities {
if bp.capabilities == nil {
return &workflowservice.GetSystemInfoResponse_Capabilities{}
}
return bp.capabilities
}

// newWorkflowTaskPoller creates a new workflow task poller which must have a one to one relationship to workflow worker
func newWorkflowTaskPoller(
taskHandler WorkflowTaskHandler,
Expand All @@ -265,6 +274,7 @@ func newWorkflowTaskPoller(
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
capabilities: params.capabilities,
},
service: service,
namespace: params.Namespace,
Expand Down Expand Up @@ -460,7 +470,7 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
builtRequest := &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: wtp.failureConverter.ErrorToFailure(err),
Expand All @@ -472,6 +482,12 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err
UseVersioning: wtp.useBuildIDVersioning,
},
}

if wtp.getCapabilities().BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
}

return builtRequest
}

func newLocalActivityPoller(
Expand Down Expand Up @@ -698,10 +714,13 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
TaskQueue: taskQueue,
Identity: wtp.identity,
BinaryChecksum: wtp.workerBuildID,
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
},
}
builtRequest.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
UseVersioning: wtp.useBuildIDVersioning,
if wtp.getCapabilities().BuildIdBasedVersioning {
builtRequest.BinaryChecksum = ""
}
return builtRequest
}
Expand Down Expand Up @@ -868,6 +887,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
capabilities: params.capabilities,
},
taskHandler: taskHandler,
service: service,
Expand Down Expand Up @@ -898,10 +918,10 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
TaskQueue: &taskqueuepb.TaskQueue{Name: atp.taskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: atp.identity,
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}
request.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
UseVersioning: atp.useBuildIDVersioning,
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
UseVersioning: atp.useBuildIDVersioning,
},
}

response, err := atp.pollActivityTaskQueue(ctx, request)
Expand Down

0 comments on commit 3be58e3

Please sign in to comment.