Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion examples/spec/integration/continue_as_new_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
memo = {
'my-memo' => 'foo',
}
headers = {
'my-header' => 'bar',
}
run_id = Temporal.start_workflow(
LoopWorkflow,
2, # it continues as new if this arg is > 1
options: {
workflow_id: workflow_id,
memo: memo,
headers: headers,
},
)

Expand All @@ -38,7 +42,8 @@

expect(final_result[:count]).to eq(1)

# memo should be copied to the next run automatically
# memo and headers should be copied to the next run automatically
expect(final_result[:memo]).to eq(memo)
expect(final_result[:headers]).to eq(headers)
end
end
2 changes: 1 addition & 1 deletion examples/spec/integration/metadata_workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
MetadataWorkflow,
options: {
workflow_id: workflow_id,
headers: { 'foo' => Temporal.configuration.converter.to_payload('bar') },
headers: { 'foo' => 'bar' },
}
)

Expand Down
1 change: 1 addition & 0 deletions examples/workflows/loop_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def execute(count)
return {
count: count,
memo: workflow.metadata.memo,
headers: workflow.metadata.headers,
}
end
end
14 changes: 12 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def start_workflow_execution(
workflow_task_timeout: task_timeout,
request_id: SecureRandom.uuid,
header: Temporal::Api::Common::V1::Header.new(
fields: headers
fields: to_payload_map(headers || {})
),
cron_schedule: cron_schedule,
memo: Temporal::Api::Common::V1::Memo.new(
Expand Down Expand Up @@ -312,6 +312,16 @@ def signal_with_start_workflow_execution(
signal_input:,
memo: nil
)
proto_header_fields = if headers.nil?
to_payload_map({})
elsif headers.class == Hash
to_payload_map(headers)
else
# Preserve backward compatability for headers specified using proto objects
warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects'
headers
end

request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
identity: identity,
namespace: namespace,
Expand All @@ -328,7 +338,7 @@ def signal_with_start_workflow_execution(
workflow_task_timeout: task_timeout,
request_id: SecureRandom.uuid,
header: Temporal::Api::Common::V1::Header.new(
fields: headers
fields: proto_header_fields,
),
cron_schedule: cron_schedule,
signal_name: signal_name,
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/connection/serializer/continue_as_new.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def to_proto
def serialize_headers(headers)
return unless headers

Temporal::Api::Common::V1::Header.new(fields: object.headers)
Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers))
end

def serialize_memo(memo)
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/connection/serializer/start_child_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def to_proto
def serialize_headers(headers)
return unless headers

Temporal::Api::Common::V1::Header.new(fields: object.headers)
Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers))
end

def serialize_memo(memo)
Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ def continue_as_new(*input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

# If memo is not overridden, copy from current run
# If memo or headers are not overridden, use those from the current run
options_from_metadata = {
memo: metadata.memo,
headers: metadata.headers,
}
options = options_from_metadata.merge(options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
task_queue: 'my-task-queue',
input: ['one', 'two'],
timeouts: Temporal.configuration.timeouts,
headers: {'foo-header': 'bar'},
memo: {'foo-memo': 'baz'},
)

Expand All @@ -28,6 +29,7 @@
expect(attribs.input.payloads[0].data).to eq('"one"')
expect(attribs.input.payloads[1].data).to eq('"two"')

expect(attribs.header.fields['foo-header'].data).to eq('"bar"')
expect(attribs.memo.fields['foo-memo'].data).to eq('"baz"')
end
end
Expand Down