diff --git a/examples/spec/integration/continue_as_new_spec.rb b/examples/spec/integration/continue_as_new_spec.rb index bbeef5ce..74b0771e 100644 --- a/examples/spec/integration/continue_as_new_spec.rb +++ b/examples/spec/integration/continue_as_new_spec.rb @@ -6,12 +6,16 @@ memo = { 'my-memo' => 'foo', } + headers = { + 'my-header' => 'bar', + } run_id = Temporal.start_workflow( LoopWorkflow, 2, # it continues as new if this arg is > 1 options: { workflow_id: workflow_id, memo: memo, + headers: headers, }, ) @@ -38,7 +42,8 @@ expect(final_result[:count]).to eq(1) - # memo should be copied to the next run automatically + # memo and headers should be copied to the next run automatically expect(final_result[:memo]).to eq(memo) + expect(final_result[:headers]).to eq(headers) end end diff --git a/examples/spec/integration/metadata_workflow_spec.rb b/examples/spec/integration/metadata_workflow_spec.rb index 0e3c4099..ac828e01 100644 --- a/examples/spec/integration/metadata_workflow_spec.rb +++ b/examples/spec/integration/metadata_workflow_spec.rb @@ -26,7 +26,7 @@ MetadataWorkflow, options: { workflow_id: workflow_id, - headers: { 'foo' => Temporal.configuration.converter.to_payload('bar') }, + headers: { 'foo' => 'bar' }, } ) diff --git a/examples/workflows/loop_workflow.rb b/examples/workflows/loop_workflow.rb index e10a9b30..b99408f4 100644 --- a/examples/workflows/loop_workflow.rb +++ b/examples/workflows/loop_workflow.rb @@ -11,6 +11,7 @@ def execute(count) return { count: count, memo: workflow.metadata.memo, + headers: workflow.metadata.headers, } end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 70987e1e..b2ce0350 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -100,7 +100,7 @@ def start_workflow_execution( workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporal::Api::Common::V1::Header.new( - fields: headers + fields: to_payload_map(headers || {}) ), cron_schedule: cron_schedule, memo: Temporal::Api::Common::V1::Memo.new( @@ -312,6 +312,16 @@ def signal_with_start_workflow_execution( signal_input:, memo: nil ) + proto_header_fields = if headers.nil? + to_payload_map({}) + elsif headers.class == Hash + to_payload_map(headers) + else + # Preserve backward compatability for headers specified using proto objects + warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' + headers + end + request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new( identity: identity, namespace: namespace, @@ -328,7 +338,7 @@ def signal_with_start_workflow_execution( workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporal::Api::Common::V1::Header.new( - fields: headers + fields: proto_header_fields, ), cron_schedule: cron_schedule, signal_name: signal_name, diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index db8259c6..357f0008 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -30,7 +30,7 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: object.headers) + Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) end def serialize_memo(memo) diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index ce1dc6ee..5f6b350d 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -33,7 +33,7 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: object.headers) + Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) end def serialize_memo(memo) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index c06e4750..d88d8f30 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -195,9 +195,10 @@ def continue_as_new(*input, **args) options = args.delete(:options) || {} input << args unless args.empty? - # If memo is not overridden, copy from current run + # If memo or headers are not overridden, use those from the current run options_from_metadata = { memo: metadata.memo, + headers: metadata.headers, } options = options_from_metadata.merge(options) diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index 4a7e90eb..18de4355 100644 --- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -9,6 +9,7 @@ task_queue: 'my-task-queue', input: ['one', 'two'], timeouts: Temporal.configuration.timeouts, + headers: {'foo-header': 'bar'}, memo: {'foo-memo': 'baz'}, ) @@ -28,6 +29,7 @@ expect(attribs.input.payloads[0].data).to eq('"one"') expect(attribs.input.payloads[1].data).to eq('"two"') + expect(attribs.header.fields['foo-header'].data).to eq('"bar"') expect(attribs.memo.fields['foo-memo'].data).to eq('"baz"') end end