Skip to content
Merged

Memos #121

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/spec/integration/await_workflow_result_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@
run_id = Temporal.start_workflow(
LoopWorkflow,
2, # it continues as new if this arg is > 1
{ options: { workflow_id: workflow_id } },
options: {
workflow_id: workflow_id,
},
)

expect do
Expand Down
44 changes: 44 additions & 0 deletions examples/spec/integration/continue_as_new_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require 'workflows/loop_workflow'

describe LoopWorkflow do
it 'workflow continues as new into a new run' do
workflow_id = SecureRandom.uuid
memo = {
'my-memo' => 'foo',
}
run_id = Temporal.start_workflow(
LoopWorkflow,
2, # it continues as new if this arg is > 1
options: {
workflow_id: workflow_id,
memo: memo,
},
)

# First run will throw because it continued as new
next_run_id = nil
expect do
Temporal.await_workflow_result(
LoopWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)
end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error|
next_run_id = error.new_run_id
end

expect(next_run_id).to_not eq(nil)

# Second run will not throw because it returns rather than continues as new.
final_result = Temporal.await_workflow_result(
LoopWorkflow,
workflow_id: workflow_id,
run_id: next_run_id,
)

expect(final_result[:count]).to eq(1)

# memo should be copied to the next run automatically
expect(final_result[:memo]).to eq(memo)
end
end
39 changes: 38 additions & 1 deletion examples/spec/integration/metadata_workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
workflow_id = 'task-queue-' + SecureRandom.uuid
run_id = Temporal.start_workflow(
subject,
{ options: { workflow_id: workflow_id } },
options: { workflow_id: workflow_id }
)

actual_result = Temporal.await_workflow_result(
subject,
workflow_id: workflow_id,
run_id: run_id,
)

expect(actual_result.task_queue).to eq(Temporal.configuration.task_queue)
end

Expand Down Expand Up @@ -51,4 +53,39 @@
)
expect(Time.now - actual_result.run_started_at).to be_between(0, 30)
end

it 'gets memo from workflow execution info' do
workflow_id = 'memo_execution_test_wf-' + SecureRandom.uuid
run_id = Temporal.start_workflow(subject, options: { workflow_id: workflow_id, memo: { 'foo' => 'bar' } })

actual_result = Temporal.await_workflow_result(
subject,
workflow_id: workflow_id,
run_id: run_id,
)
expect(actual_result.memo['foo']).to eq('bar')

expect(Temporal.fetch_workflow_execution_info(
'ruby-samples', workflow_id, nil
).memo).to eq({ 'foo' => 'bar' })
end

it 'gets memo from workflow context with no memo' do
workflow_id = 'memo_context_no_memo_test_wf-' + SecureRandom.uuid

run_id = Temporal.start_workflow(
subject,
options: { workflow_id: workflow_id }
)

actual_result = Temporal.await_workflow_result(
subject,
workflow_id: workflow_id,
run_id: run_id,
)
expect(actual_result.memo).to eq({})
expect(Temporal.fetch_workflow_execution_info(
'ruby-samples', workflow_id, nil
).memo).to eq({})
end
end
5 changes: 4 additions & 1 deletion examples/workflows/loop_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ def execute(count)
return workflow.continue_as_new(count - 1)
end

return count
return {
count: count,
memo: workflow.metadata.memo,
}
end
end
2 changes: 1 addition & 1 deletion examples/workflows/metadata_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ class MetadataWorkflow < Temporal::Workflow
def execute
workflow.metadata
end
end
end
7 changes: 5 additions & 2 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def start_workflow(workflow, *input, options: {}, **args)
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers
headers: execution_options.headers,
memo: execution_options.memo,
)
else
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
Expand All @@ -73,6 +74,7 @@ def start_workflow(workflow, *input, options: {}, **args)
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
memo: execution_options.memo,
signal_name: signal_name,
signal_input: signal_input
)
Expand Down Expand Up @@ -119,7 +121,8 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
cron_schedule: cron_schedule
cron_schedule: cron_schedule,
memo: execution_options.memo
)

response.run_id
Expand Down
8 changes: 8 additions & 0 deletions lib/temporal/concerns/payloads.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
from_payloads(payloads)&.first
end

def from_payload_map(payload_map)
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
end

def to_payloads(data)
payload_converter.to_payloads(data)
end
Expand All @@ -41,6 +45,10 @@ def to_signal_payloads(data)
to_payloads([data])
end

def to_payload_map(data)
data.transform_values(&method(:to_payload))
end

private

def payload_converter
Expand Down
16 changes: 12 additions & 4 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def start_workflow_execution(
task_timeout:,
workflow_id_reuse_policy: nil,
headers: nil,
cron_schedule: nil
cron_schedule: nil,
memo: nil
)
request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
identity: identity,
Expand All @@ -101,7 +102,10 @@ def start_workflow_execution(
header: Temporal::Api::Common::V1::Header.new(
fields: headers
),
cron_schedule: cron_schedule
cron_schedule: cron_schedule,
memo: Temporal::Api::Common::V1::Memo.new(
fields: to_payload_map(memo || {})
)
)

if workflow_id_reuse_policy
Expand Down Expand Up @@ -305,7 +309,8 @@ def signal_with_start_workflow_execution(
headers: nil,
cron_schedule: nil,
signal_name:,
signal_input:
signal_input:,
memo: nil
)
request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
identity: identity,
Expand All @@ -327,7 +332,10 @@ def signal_with_start_workflow_execution(
),
cron_schedule: cron_schedule,
signal_name: signal_name,
signal_input: to_signal_payloads(signal_input)
signal_input: to_signal_payloads(signal_input),
memo: Temporal::Api::Common::V1::Memo.new(
fields: to_payload_map(memo || {})
),
)

if workflow_id_reuse_policy
Expand Down
9 changes: 8 additions & 1 deletion lib/temporal/connection/serializer/continue_as_new.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def to_proto
workflow_run_timeout: object.timeouts[:execution],
workflow_task_timeout: object.timeouts[:task],
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
header: serialize_headers(object.headers)
header: serialize_headers(object.headers),
memo: serialize_memo(object.memo)
)
)
end
Expand All @@ -31,6 +32,12 @@ def serialize_headers(headers)

Temporal::Api::Common::V1::Header.new(fields: object.headers)
end

def serialize_memo(memo)
return unless memo

Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo))
end
end
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/temporal/connection/serializer/start_child_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def to_proto
workflow_run_timeout: object.timeouts[:run],
workflow_task_timeout: object.timeouts[:task],
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
header: serialize_headers(object.headers)
header: serialize_headers(object.headers),
memo: serialize_memo(object.memo),
)
)
end
Expand All @@ -34,6 +35,12 @@ def serialize_headers(headers)

Temporal::Api::Common::V1::Header.new(fields: object.headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is probably out of scope for this PR, but this also needs to have to_payload_map call

end

def serialize_memo(memo)
return unless memo

Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo))
end
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/execution_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module Temporal
class ExecutionOptions
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo

def initialize(object, options, defaults = nil)
# Options are treated as overrides and take precedence
Expand All @@ -13,6 +13,7 @@ def initialize(object, options, defaults = nil)
@retry_policy = options[:retry_policy] || {}
@timeouts = options[:timeouts] || {}
@headers = options[:headers] || {}
@memo = options[:memo] || {}

# For Temporal::Workflow and Temporal::Activity use defined values as the next option
if has_executable_concern?(object)
Expand Down
15 changes: 3 additions & 12 deletions lib/temporal/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def generate_activity_metadata(task, namespace)
workflow_run_id: task.workflow_execution.run_id,
workflow_id: task.workflow_execution.workflow_id,
workflow_name: task.workflow_type.name,
headers: headers(task.header&.fields),
headers: from_payload_map(task.header&.fields || {}),
heartbeat_details: from_details_payloads(task.heartbeat_details)
)
end
Expand Down Expand Up @@ -48,20 +48,11 @@ def generate_workflow_metadata(event, task_metadata)
attempt: event.attributes.attempt,
namespace: task_metadata.namespace,
task_queue: event.attributes.task_queue.name,
headers: headers(event.attributes.header&.fields),
headers: from_payload_map(event.attributes.header&.fields || {}),
run_started_at: event.timestamp,
memo: from_payload_map(event.attributes.memo&.fields || {}),
)
end

private

def headers(fields)
result = {}
fields.each do |field, payload|
result[field] = from_payload(payload)
end
result
end
end
end
end
6 changes: 4 additions & 2 deletions lib/temporal/metadata/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Temporal
module Metadata
class Workflow < Base
attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at
attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at, :memo

def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:)
def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:, memo:)
@namespace = namespace
@id = id
@name = name
Expand All @@ -14,6 +14,7 @@ def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:,
@task_queue = task_queue
@headers = headers
@run_started_at = run_started_at
@memo = memo

freeze
end
Expand All @@ -31,6 +32,7 @@ def to_h
'attempt' => attempt,
'task_queue' => task_queue,
'run_started_at' => run_started_at.to_f,
'memo' => memo,
}
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/temporal/testing/temporal_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def start_locally(workflow, schedule, *input, **args)
reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed
workflow_id = options[:workflow_id] || SecureRandom.uuid
run_id = SecureRandom.uuid
memo = options[:memo] || {}

if !allowed?(workflow_id, reuse_policy)
raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(
Expand All @@ -101,6 +102,7 @@ def start_locally(workflow, schedule, *input, **args)
attempt: 1,
task_queue: execution_options.task_queue,
run_started_at: Time.now,
memo: memo,
headers: execution_options.headers
)
context = Temporal::Testing::LocalWorkflowContext.new(
Expand Down
1 change: 1 addition & 0 deletions lib/temporal/testing/workflow_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def execute_locally(*input)
task_queue: 'unit-test-task-queue',
headers: {},
run_started_at: Time.now,
memo: {},
)
context = Temporal::Testing::LocalWorkflowContext.new(
execution, workflow_id, run_id, disabled_releases, metadata
Expand Down
4 changes: 2 additions & 2 deletions lib/temporal/workflow/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ class Workflow
module Command
# TODO: Move these classes into their own directories under workflow/command/*
ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, keyword_init: true)
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, keyword_init: true)
ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, :memo, keyword_init: true)
RequestActivityCancellation = Struct.new(:activity_id, keyword_init: true)
RecordMarker = Struct.new(:name, :details, keyword_init: true)
StartTimer = Struct.new(:timeout, :timer_id, keyword_init: true)
Expand Down
Loading