Skip to content

Commit

Permalink
Worker versioning API Changes (#293)
Browse files Browse the repository at this point in the history
API changes for the first real release of the worker versioning feature

Co-authored-by: Roey Berman <roey@temporal.io>
Co-authored-by: David Reiss <david@temporal.io>
Co-authored-by: Roey Berman <roey.berman@gmail.com>
Co-authored-by: Chad Retz <chad.retz@gmail.com>
  • Loading branch information
5 people authored May 25, 2023
1 parent f4f5bd8 commit 81d18cc
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 58 deletions.
4 changes: 4 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
version: v1
breaking:
ignore:
# TODO: Remove after PR 293
- temporal/api/taskqueue/v1
- temporal/api/workflowservice/v1
use:
- WIRE_JSON
lint:
Expand Down
11 changes: 11 additions & 0 deletions temporal/api/command/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ message ScheduleActivityTaskCommandAttributes {
// Request to start the activity directly bypassing matching service and worker polling
// The slot for executing the activity should be reserved when setting this field to true.
bool request_eager_execution = 12;
// If this is set, the workflow executing this command wishes to start the activity using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 13;
}

message RequestCancelActivityTaskCommandAttributes {
Expand Down Expand Up @@ -191,6 +195,9 @@ message ContinueAsNewWorkflowExecutionCommandAttributes {
temporal.api.common.v1.Header header = 12;
temporal.api.common.v1.Memo memo = 13;
temporal.api.common.v1.SearchAttributes search_attributes = 14;
// If this is set, the workflow executing this command wishes to continue as new using a version
// compatible with the version that this workflow most recently ran on.
bool use_compatible_version = 15;

// `workflow_execution_timeout` is omitted as it shouldn't be overridden from within a workflow.
}
Expand Down Expand Up @@ -218,6 +225,10 @@ message StartChildWorkflowExecutionCommandAttributes {
temporal.api.common.v1.Header header = 14;
temporal.api.common.v1.Memo memo = 15;
temporal.api.common.v1.SearchAttributes search_attributes = 16;
// If this is set, the workflow executing this command wishes to start the child workflow using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 17;
}

message ProtocolMessageCommandAttributes {
Expand Down
15 changes: 13 additions & 2 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,28 @@ message MeteringMetadata {

// Identifies the version(s) of a worker that processed a task
message WorkerVersionStamp {
// An opaque whole-worker identifier
// An opaque whole-worker identifier. Replaces the deprecated `binary_checksum` field when this
// message is included in requests which previously used that.
string build_id = 1;
// Set if the worker used a dynamically loadable bundle to process
// the task. The bundle could be a WASM blob, JS bundle, etc.
string bundle_id = 2;

// If set, the worker is opting in to worker versioning. Otherwise, this is used only as a
// marker for workflow reset points and the BuildIDs search attribute.
bool use_versioning = 3;
}

// Identifies the version(s) that a worker is compatible with when polling or identifying itself
// Identifies the version(s) that a worker is compatible with when polling or identifying itself,
// and whether or not this worker is opting into the build-id based versioning feature. This is
// used by matching to determine which workers ought to receive what tasks.
message WorkerVersionCapabilities {
// An opaque whole-worker identifier
string build_id = 1;

// If set, the worker is opting in to worker versioning, and wishes to only receive appropriate
// tasks.
bool use_versioning = 2;

// Later, may include info like "I can process WASM and/or JS bundles"
}
19 changes: 19 additions & 0 deletions temporal/api/enums/v1/task_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ enum TaskQueueType {
// Activity type of task queue.
TASK_QUEUE_TYPE_ACTIVITY = 2;
}

// Specifies which category of tasks may reach a worker on a versioned task queue.
// Used both in a reachability query and its response.
enum TaskReachability {
TASK_REACHABILITY_UNSPECIFIED = 0;
// There's a possiblity for a worker to receive new workflow tasks. Workers should *not* be retired.
TASK_REACHABILITY_NEW_WORKFLOWS = 1;
// There's a possiblity for a worker to receive existing workflow and activity tasks from existing workflows. Workers
// should *not* be retired.
// This enum value does not distinguish between open and closed workflows.
TASK_REACHABILITY_EXISTING_WORKFLOWS = 2;
// There's a possiblity for a worker to receive existing workflow and activity tasks from open workflows. Workers
// should *not* be retired.
TASK_REACHABILITY_OPEN_WORKFLOWS = 3;
// There's a possiblity for a worker to receive existing workflow tasks from closed workflows. Workers may be
// retired dependending on application requirements. For example, if there's no need to query closed workflows.
TASK_REACHABILITY_CLOSED_WORKFLOWS = 4;
}

5 changes: 5 additions & 0 deletions temporal/api/errordetails/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,8 @@ message SystemWorkflowFailure {

message WorkflowNotReadyFailure {
}

message NewerBuildExistsFailure {
// The current default compatible build ID which will receive tasks
string default_build_id = 1;
}
34 changes: 30 additions & 4 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ message WorkflowExecutionStartedEventAttributes {
int64 parent_initiated_event_version = 26;
// This field is new in 1.21.
string workflow_id = 28;
// If this workflow intends to use anything other than the current overall default version for
// the queue, then we include it here.
temporal.api.common.v1.WorkerVersionStamp source_version_stamp = 29;
}

message WorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -154,6 +157,9 @@ message WorkflowExecutionContinuedAsNewEventAttributes {
temporal.api.common.v1.Header header = 12;
temporal.api.common.v1.Memo memo = 13;
temporal.api.common.v1.SearchAttributes search_attributes = 14;
// If this is set, the workflow executing this command wishes to continue as new using a version
// compatible with the version that this workflow most recently ran on.
bool use_compatible_version = 15;

// workflow_execution_timeout is omitted as it shouldn't be overridden from within a workflow.
}
Expand Down Expand Up @@ -195,13 +201,14 @@ message WorkflowTaskCompletedEventAttributes {
string identity = 3;
// Binary ID of the worker who completed this task
string binary_checksum = 4;
// Version info of the worker who processed this workflow task, or missing if worker is not
// using versioning. If present, the `build_id` field within is also used as `binary_checksum`,
// which may be omitted in that case (it may also be populated to preserve compatibility).
// Version info of the worker who processed this workflow task. If present, the `build_id` field
// within is also used as `binary_checksum`, which may be omitted in that case (it may also be
// populated to preserve compatibility).
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
// directly impact workflow state.
temporal.api.sdk.v1.WorkflowTaskCompletedMetadata sdk_metadata = 6;

// Local usage data sent during workflow task completion and recorded here for posterity
temporal.api.common.v1.MeteringMetadata metering_metadata = 13;
}
Expand Down Expand Up @@ -230,8 +237,13 @@ message WorkflowTaskFailedEventAttributes {
string new_run_id = 7;
// TODO: ?
int64 fork_event_version = 8;
// If a worker explicitly failed this task, it's binary id
// DEPRECATED since 1.21 - use `worker_version` instead.
// If a worker explicitly failed this task, its binary id
string binary_checksum = 9;
// Version info of the worker who processed this workflow task. If present, the `build_id` field
// within is also used as `binary_checksum`, which may be omitted in that case (it may also be
// populated to preserve compatibility).
temporal.api.common.v1.WorkerVersionStamp worker_version = 10;
}

message ActivityTaskScheduledEventAttributes {
Expand Down Expand Up @@ -272,6 +284,10 @@ message ActivityTaskScheduledEventAttributes {
// configuration. Retries will happen up to `schedule_to_close_timeout`. To disable retries set
// retry_policy.maximum_attempts to 1.
temporal.api.common.v1.RetryPolicy retry_policy = 12;
// If this is set, the workflow executing this command wishes to start the activity using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 13;
}

message ActivityTaskStartedEventAttributes {
Expand All @@ -297,6 +313,8 @@ message ActivityTaskCompletedEventAttributes {
int64 started_event_id = 3;
// id of the worker that completed this task
string identity = 4;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
}

message ActivityTaskFailedEventAttributes {
Expand All @@ -309,6 +327,8 @@ message ActivityTaskFailedEventAttributes {
// id of the worker that failed this task
string identity = 4;
temporal.api.enums.v1.RetryState retry_state = 5;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 6;
}

message ActivityTaskTimedOutEventAttributes {
Expand Down Expand Up @@ -341,6 +361,8 @@ message ActivityTaskCanceledEventAttributes {
int64 started_event_id = 4;
// id of the worker who canceled this activity
string identity = 5;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 6;
}

message TimerStartedEventAttributes {
Expand Down Expand Up @@ -558,6 +580,10 @@ message StartChildWorkflowExecutionInitiatedEventAttributes {
temporal.api.common.v1.Header header = 15;
temporal.api.common.v1.Memo memo = 16;
temporal.api.common.v1.SearchAttributes search_attributes = 17;
// If this is set, the workflow executing this command wishes to start the child workflow using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 19;
}

message StartChildWorkflowExecutionFailedEventAttributes {
Expand Down
31 changes: 24 additions & 7 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ message TaskQueue {
string name = 1;
// Default: TASK_QUEUE_KIND_NORMAL.
temporal.api.enums.v1.TaskQueueKind kind = 2;
// Iff kind == TASK_QUEUE_KIND_STICKY, then this field contains the name of
// the normal task queue that the sticky worker is running on.
string normal_name = 3;
}

// Only applies to activity task queues
Expand Down Expand Up @@ -87,12 +90,26 @@ message StickyExecutionAttributes {
google.protobuf.Duration schedule_to_start_timeout = 2 [(gogoproto.stdduration) = true];
}

// Used by the worker versioning APIs, represents an ordering of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build ids.
// Used by the worker versioning APIs, represents an unordered set of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build IDs.
message CompatibleVersionSet {
// A unique identifier for this version set. Users don't need to understand or care about this
// value, but it has value for debugging purposes.
string version_set_id = 1;
// All the compatible versions, ordered from oldest to newest
repeated string build_ids = 2;
// All the compatible versions, unordered, except for the last element, which is considered the set "default".
repeated string build_ids = 1;
}

// Reachability of tasks for a worker on a single task queue.
message TaskQueueReachability {
string task_queue = 1;
// Task reachability for a worker in a single task queue.
// See the TaskReachability docstring for information about each enum variant.
// If reachability is empty, this worker is considered unreachable in this task queue.
repeated temporal.api.enums.v1.TaskReachability reachability = 2;
}

// Reachability of tasks for a worker by build id, in one or more task queues.
message BuildIdReachability {
// A build id or empty if unversioned.
string build_id = 1;
// Reachability per task queue.
repeated TaskQueueReachability task_queue_reachability = 2;
}
4 changes: 4 additions & 0 deletions temporal/api/workflow/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ message ResetPoints {
}

message ResetPointInfo {
// A worker binary version identifier, will be deprecated and superseded by a newer concept of
// build_id.
string binary_checksum = 1;
// The first run ID in the execution chain that was touched by this worker build.
string run_id = 2;
// Event ID of the first WorkflowTaskCompleted event processed by this worker build.
int64 first_workflow_task_completed_id = 3;
google.protobuf.Timestamp create_time = 4 [(gogoproto.stdtime) = true];
// (-- api-linter: core::0214::resource-expiry=disabled
Expand Down
Loading

0 comments on commit 81d18cc

Please sign in to comment.