Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker versioning API Changes #293

Merged
merged 17 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
version: v1
breaking:
ignore:
# TODO: Remove after PR 293
- temporal/api/taskqueue/v1
- temporal/api/workflowservice/v1
use:
- WIRE_JSON
lint:
Expand Down
11 changes: 11 additions & 0 deletions temporal/api/command/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ message ScheduleActivityTaskCommandAttributes {
// Request to start the activity directly bypassing matching service and worker polling
// The slot for executing the activity should be reserved when setting this field to true.
bool request_eager_execution = 12;
// If this is set, the workflow executing this command wishes to start the activity using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 13;
}

message RequestCancelActivityTaskCommandAttributes {
Expand Down Expand Up @@ -191,6 +195,9 @@ message ContinueAsNewWorkflowExecutionCommandAttributes {
temporal.api.common.v1.Header header = 12;
temporal.api.common.v1.Memo memo = 13;
temporal.api.common.v1.SearchAttributes search_attributes = 14;
// If this is set, the workflow executing this command wishes to continue as new using a version
// compatible with the version that this workflow most recently ran on.
bool use_compatible_version = 15;

// `workflow_execution_timeout` is omitted as it shouldn't be overridden from within a workflow.
}
Expand Down Expand Up @@ -218,6 +225,10 @@ message StartChildWorkflowExecutionCommandAttributes {
temporal.api.common.v1.Header header = 14;
temporal.api.common.v1.Memo memo = 15;
temporal.api.common.v1.SearchAttributes search_attributes = 16;
// If this is set, the workflow executing this command wishes to start the child workflow using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 17;
}

message ProtocolMessageCommandAttributes {
Expand Down
15 changes: 13 additions & 2 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,28 @@ message MeteringMetadata {

// Identifies the version(s) of a worker that processed a task
message WorkerVersionStamp {
// An opaque whole-worker identifier
// An opaque whole-worker identifier. Replaces the deprecated `binary_checksum` field when this
// message is included in requests which previously used that.
string build_id = 1;
// Set if the worker used a dynamically loadable bundle to process
// the task. The bundle could be a WASM blob, JS bundle, etc.
string bundle_id = 2;

// If set, the worker is opting in to worker versioning. Otherwise, this is used only as a
// marker for workflow reset points and the BuildIDs search attribute.
bool use_versioning = 3;
}

// Identifies the version(s) that a worker is compatible with when polling or identifying itself
// Identifies the version(s) that a worker is compatible with when polling or identifying itself,
// and whether or not this worker is opting into the build-id based versioning feature. This is
// used by matching to determine which workers ought to receive what tasks.
message WorkerVersionCapabilities {
// An opaque whole-worker identifier
string build_id = 1;

// If set, the worker is opting in to worker versioning, and wishes to only receive appropriate
// tasks.
bool use_versioning = 2;

// Later, may include info like "I can process WASM and/or JS bundles"
}
19 changes: 19 additions & 0 deletions temporal/api/enums/v1/task_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ enum TaskQueueType {
// Activity type of task queue.
TASK_QUEUE_TYPE_ACTIVITY = 2;
}

// Specifies which category of tasks may reach a worker on a versioned task queue.
// Used both in a reachability query and its response.
enum TaskReachability {
TASK_REACHABILITY_UNSPECIFIED = 0;
// There's a possiblity for a worker to receive new workflow tasks. Workers should *not* be retired.
TASK_REACHABILITY_NEW_WORKFLOWS = 1;
// There's a possiblity for a worker to receive existing workflow and activity tasks from existing workflows. Workers
// should *not* be retired.
// This enum value does not distinguish between open and closed workflows.
TASK_REACHABILITY_EXISTING_WORKFLOWS = 2;
// There's a possiblity for a worker to receive existing workflow and activity tasks from open workflows. Workers
// should *not* be retired.
TASK_REACHABILITY_OPEN_WORKFLOWS = 3;
// There's a possiblity for a worker to receive existing workflow tasks from closed workflows. Workers may be
// retired dependending on application requirements. For example, if there's no need to query closed workflows.
TASK_REACHABILITY_CLOSED_WORKFLOWS = 4;
}

5 changes: 5 additions & 0 deletions temporal/api/errordetails/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,8 @@ message SystemWorkflowFailure {

message WorkflowNotReadyFailure {
}

message NewerBuildExistsFailure {
// The current default compatible build ID which will receive tasks
string default_build_id = 1;
}
34 changes: 30 additions & 4 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ message WorkflowExecutionStartedEventAttributes {
int64 parent_initiated_event_version = 26;
// This field is new in 1.21.
string workflow_id = 28;
// If this workflow intends to use anything other than the current overall default version for
// the queue, then we include it here.
temporal.api.common.v1.WorkerVersionStamp source_version_stamp = 29;
}

message WorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -154,6 +157,9 @@ message WorkflowExecutionContinuedAsNewEventAttributes {
temporal.api.common.v1.Header header = 12;
temporal.api.common.v1.Memo memo = 13;
temporal.api.common.v1.SearchAttributes search_attributes = 14;
// If this is set, the workflow executing this command wishes to continue as new using a version
// compatible with the version that this workflow most recently ran on.
bool use_compatible_version = 15;

// workflow_execution_timeout is omitted as it shouldn't be overridden from within a workflow.
}
Expand Down Expand Up @@ -195,13 +201,14 @@ message WorkflowTaskCompletedEventAttributes {
string identity = 3;
// Binary ID of the worker who completed this task
string binary_checksum = 4;
// Version info of the worker who processed this workflow task, or missing if worker is not
// using versioning. If present, the `build_id` field within is also used as `binary_checksum`,
// which may be omitted in that case (it may also be populated to preserve compatibility).
// Version info of the worker who processed this workflow task. If present, the `build_id` field
// within is also used as `binary_checksum`, which may be omitted in that case (it may also be
// populated to preserve compatibility).
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
// Data the SDK wishes to record for itself, but server need not interpret, and does not
// directly impact workflow state.
temporal.api.sdk.v1.WorkflowTaskCompletedMetadata sdk_metadata = 6;

// Local usage data sent during workflow task completion and recorded here for posterity
temporal.api.common.v1.MeteringMetadata metering_metadata = 13;
}
Expand Down Expand Up @@ -230,8 +237,13 @@ message WorkflowTaskFailedEventAttributes {
string new_run_id = 7;
// TODO: ?
int64 fork_event_version = 8;
// If a worker explicitly failed this task, it's binary id
// DEPRECATED since 1.21 - use `worker_version` instead.
// If a worker explicitly failed this task, its binary id
string binary_checksum = 9;
// Version info of the worker who processed this workflow task. If present, the `build_id` field
// within is also used as `binary_checksum`, which may be omitted in that case (it may also be
// populated to preserve compatibility).
temporal.api.common.v1.WorkerVersionStamp worker_version = 10;
}

message ActivityTaskScheduledEventAttributes {
Expand Down Expand Up @@ -272,6 +284,10 @@ message ActivityTaskScheduledEventAttributes {
// configuration. Retries will happen up to `schedule_to_close_timeout`. To disable retries set
// retry_policy.maximum_attempts to 1.
temporal.api.common.v1.RetryPolicy retry_policy = 12;
// If this is set, the workflow executing this command wishes to start the activity using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 13;
}

message ActivityTaskStartedEventAttributes {
Expand All @@ -297,6 +313,8 @@ message ActivityTaskCompletedEventAttributes {
int64 started_event_id = 3;
// id of the worker that completed this task
string identity = 4;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 5;
}

message ActivityTaskFailedEventAttributes {
Expand All @@ -309,6 +327,8 @@ message ActivityTaskFailedEventAttributes {
// id of the worker that failed this task
string identity = 4;
temporal.api.enums.v1.RetryState retry_state = 5;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 6;
}

message ActivityTaskTimedOutEventAttributes {
Expand Down Expand Up @@ -341,6 +361,8 @@ message ActivityTaskCanceledEventAttributes {
int64 started_event_id = 4;
// id of the worker who canceled this activity
string identity = 5;
// Version info of the worker who processed this workflow task.
temporal.api.common.v1.WorkerVersionStamp worker_version = 6;
}

message TimerStartedEventAttributes {
Expand Down Expand Up @@ -558,6 +580,10 @@ message StartChildWorkflowExecutionInitiatedEventAttributes {
temporal.api.common.v1.Header header = 15;
temporal.api.common.v1.Memo memo = 16;
temporal.api.common.v1.SearchAttributes search_attributes = 17;
// If this is set, the workflow executing this command wishes to start the child workflow using
// a version compatible with the version that this workflow most recently ran on, if such
// behavior is possible.
bool use_compatible_version = 19;
}

message StartChildWorkflowExecutionFailedEventAttributes {
Expand Down
31 changes: 24 additions & 7 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ message TaskQueue {
string name = 1;
// Default: TASK_QUEUE_KIND_NORMAL.
temporal.api.enums.v1.TaskQueueKind kind = 2;
// Iff kind == TASK_QUEUE_KIND_STICKY, then this field contains the name of
// the normal task queue that the sticky worker is running on.
string normal_name = 3;
}

// Only applies to activity task queues
Expand Down Expand Up @@ -87,12 +90,26 @@ message StickyExecutionAttributes {
google.protobuf.Duration schedule_to_start_timeout = 2 [(gogoproto.stdduration) = true];
}

// Used by the worker versioning APIs, represents an ordering of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build ids.
// Used by the worker versioning APIs, represents an unordered set of one or more versions which are
// considered to be compatible with each other. Currently the versions are always worker build IDs.
message CompatibleVersionSet {
// A unique identifier for this version set. Users don't need to understand or care about this
// value, but it has value for debugging purposes.
string version_set_id = 1;
// All the compatible versions, ordered from oldest to newest
repeated string build_ids = 2;
// All the compatible versions, unordered, except for the last element, which is considered the set "default".
repeated string build_ids = 1;
}

// Reachability of tasks for a worker on a single task queue.
message TaskQueueReachability {
string task_queue = 1;
// Task reachability for a worker in a single task queue.
// See the TaskReachability docstring for information about each enum variant.
// If reachability is empty, this worker is considered unreachable in this task queue.
repeated temporal.api.enums.v1.TaskReachability reachability = 2;
}

// Reachability of tasks for a worker by build id, in one or more task queues.
message BuildIdReachability {
// A build id or empty if unversioned.
string build_id = 1;
// Reachability per task queue.
repeated TaskQueueReachability task_queue_reachability = 2;
}
4 changes: 4 additions & 0 deletions temporal/api/workflow/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ message ResetPoints {
}

message ResetPointInfo {
// A worker binary version identifier, will be deprecated and superseded by a newer concept of
// build_id.
string binary_checksum = 1;
// The first run ID in the execution chain that was touched by this worker build.
string run_id = 2;
// Event ID of the first WorkflowTaskCompleted event processed by this worker build.
int64 first_workflow_task_completed_id = 3;
google.protobuf.Timestamp create_time = 4 [(gogoproto.stdtime) = true];
// (-- api-linter: core::0214::resource-expiry=disabled
Expand Down
Loading