From fb5e2dc2392211214fcc48e0c2e1bb24916b4326 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 4 Feb 2022 11:30:44 -0600 Subject: [PATCH 01/18] initial work to support SignalExternalWorkflow --- lib/temporal/workflow/command.rb | 5 ++- lib/temporal/workflow/context.rb | 35 +++++++++++++++++++ lib/temporal/workflow/history/event_target.rb | 1 + lib/temporal/workflow/state_manager.rb | 2 ++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/lib/temporal/workflow/command.rb b/lib/temporal/workflow/command.rb index 8297abe9..64e1a776 100644 --- a/lib/temporal/workflow/command.rb +++ b/lib/temporal/workflow/command.rb @@ -11,6 +11,7 @@ module Command CancelTimer = Struct.new(:timer_id, keyword_init: true) CompleteWorkflow = Struct.new(:result, keyword_init: true) FailWorkflow = Struct.new(:exception, keyword_init: true) + SignalExternalWorkflow = Struct.new(:namespace, :execution, :signal_name, :input, :control, :child_workflow_only, keyword_init: true) # only these commands are supported right now SCHEDULE_ACTIVITY_TYPE = :schedule_activity @@ -21,6 +22,7 @@ module Command CANCEL_TIMER_TYPE = :cancel_timer COMPLETE_WORKFLOW_TYPE = :complete_workflow FAIL_WORKFLOW_TYPE = :fail_workflow + SIGNAL_EXTERNAL_WORKFLOW_TYPE = :signal_external_workflow COMMAND_CLASS_MAP = { SCHEDULE_ACTIVITY_TYPE => ScheduleActivity, @@ -30,7 +32,8 @@ module Command START_TIMER_TYPE => StartTimer, CANCEL_TIMER_TYPE => CancelTimer, COMPLETE_WORKFLOW_TYPE => CompleteWorkflow, - FAIL_WORKFLOW_TYPE => FailWorkflow + FAIL_WORKFLOW_TYPE => FailWorkflow, + SIGNAL_EXTERNAL_WORKFLOW_TYPE => SignalExternalWorkflow }.freeze def self.generate(type, **args) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index d88d8f30..b81d4f66 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -304,6 +304,41 @@ def cancel(target, cancelation_id) end end + # Send a signal from one workflow to another workflow. For signaling from the client, + # see Client#signal_workflow. + # + # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class + # is passed, its config (namespace, task_queue, timeouts, etc) will be used + # @param signal_name [String] name of signal + # @param input [String, Array, nil] optional arguments for the signal + # @param args [Hash] keyword arguments to be pass to workflow's #execute method + # @param options [Hash, nil] optional overrides + # @option options [String] :workflow_id + # @option options [String] :run_id of the specific workflow or "" if none is passed + # @option options [String] :name workflow name + # @option options [String] :child_workflow_only indicates whether the signal should only be deliverd to a + # child workflow; defaults to false + # + # @return [Future] future + def signal_external_workflow(workflow_class, signal_name, *input, options: {}) + options ||= {} + + execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) + + command = Command::SignalExternalWorkflow.new( + namespace: execution_options.namespace, + execution: { + workflow_id: options[:workflow_id], + run_id: options[:run_id] || "" + }, + signal_name: signal_name, + input: input, + control: "", # deprecated + child_workflow_only: !!options[:child_workflow_only] + ) + schedule_command(command) + end + private attr_reader :state_manager, :dispatcher, :workflow_class diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index 8f605a01..5cd185f7 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -16,6 +16,7 @@ class UnexpectedEventType < InternalError; end CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE = :cancel_external_workflow_request WORKFLOW_TYPE = :workflow CANCEL_WORKFLOW_REQUEST_TYPE = :cancel_workflow_request + SIGNAL_EXTERNAL_WORKFLOW_TYPE = :external_workflow # NOTE: The order is important, first prefix match wins (will be a longer match) TARGET_TYPES = { diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index ac4c1846..a8d3eae8 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -274,6 +274,8 @@ def event_target_from(command_id, command) History::EventTarget::WORKFLOW_TYPE when Command::StartChildWorkflow History::EventTarget::CHILD_WORKFLOW_TYPE + when Command::SignalExternalWorkflow + History::EventTarget::SIGNAL_EXTERNAL_WORKFLOW_TYPE end History::EventTarget.new(command_id, target_type) From 45fc69384b8f2e1591b77f9fe09c1341efd83e0e Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 4 Feb 2022 12:57:04 -0600 Subject: [PATCH 02/18] define the serializer and hook it up --- lib/temporal/connection/serializer.rb | 4 ++- .../serializer/signal_external_workflow.rb | 33 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 lib/temporal/connection/serializer/signal_external_workflow.rb diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 98ce71b4..b6da64b3 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -8,6 +8,7 @@ require 'temporal/connection/serializer/complete_workflow' require 'temporal/connection/serializer/continue_as_new' require 'temporal/connection/serializer/fail_workflow' +require 'temporal/connection/serializer/signal_external_workflow' module Temporal module Connection @@ -21,7 +22,8 @@ module Serializer Workflow::Command::CancelTimer => Serializer::CancelTimer, Workflow::Command::CompleteWorkflow => Serializer::CompleteWorkflow, Workflow::Command::ContinueAsNew => Serializer::ContinueAsNew, - Workflow::Command::FailWorkflow => Serializer::FailWorkflow + Workflow::Command::FailWorkflow => Serializer::FailWorkflow, + Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow }.freeze def self.serialize(object) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb new file mode 100644 index 00000000..d5313957 --- /dev/null +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -0,0 +1,33 @@ +require 'temporal/connection/serializer/base' +require 'temporal/concerns/payloads' + +module Temporal + module Connection + module Serializer + class SignalExternalWorkflow < Base + include Concerns::Payloads + + def to_proto + Temporal::Api::Command::V1::Command.new( + command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, + schedule_activity_task_command_attributes: + Temporal::Api::Command::V1::SignalExternalWorkflowExecutionCommandAttributes.new( + namespace: object.namespace, + execution: serialize_execution(object.execution), + signal_name: object.signal_name, + input: to_payloads(object.input), + control: object.control, + child_workflow_only: object.child_workflow_only + ) + ) + end + + private + + def serialize_execution(execution) + Temporal::Api::Common::V1::WorkflowExecution.new(workflow_id: execution.workflow_id, run_id: execution.run_id) + end + end + end + end +end From df7f9be2ecb235917a8e2d7b9446a2826201da3b Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 4 Feb 2022 17:00:26 -0600 Subject: [PATCH 03/18] stub in what I think is the correct work for each event type --- lib/temporal/workflow/state_manager.rb | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index a8d3eae8..95c84394 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -241,13 +241,21 @@ def apply_event(event) # todo when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED' - # todo + # Temporal Server will try to Signal the targeted Workflow + # Contains the Signal name, as well as a Signal payload + state_machine.start + dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED' - # todo + # Temporal Server cannot Signal the targeted Workflow + # Usually because the Workflow could not be found + state_machine.fail + dispatch(target, 'failed', 'StandardError', from_payloads(event.attributes.cause)) when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' - # todo + # Temporal Server has successfully Signaled the targeted Workflow + # Fire & Forget, so no result is returned? Don't we have a Future we return elsewhere? + state_machine.complete when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES' # todo From 3b85c5cd2758da946c978169c2e4d9fb84ea9eb9 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Mon, 7 Feb 2022 10:20:50 -0600 Subject: [PATCH 04/18] some fixes per antstorm advice --- .../connection/serializer/signal_external_workflow.rb | 4 ++-- lib/temporal/workflow/context.rb | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index d5313957..bcd4e08c 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -10,7 +10,7 @@ class SignalExternalWorkflow < Base def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, - schedule_activity_task_command_attributes: + signal_external_workflow_execution_command_attributes: Temporal::Api::Command::V1::SignalExternalWorkflowExecutionCommandAttributes.new( namespace: object.namespace, execution: serialize_execution(object.execution), @@ -25,7 +25,7 @@ def to_proto private def serialize_execution(execution) - Temporal::Api::Common::V1::WorkflowExecution.new(workflow_id: execution.workflow_id, run_id: execution.run_id) + Temporal::Api::Common::V1::WorkflowExecution.new(workflow_id: execution[:workflow_id], run_id: execution[:run_id]) end end end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index b81d4f66..ee313e8c 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -311,7 +311,6 @@ def cancel(target, cancelation_id) # is passed, its config (namespace, task_queue, timeouts, etc) will be used # @param signal_name [String] name of signal # @param input [String, Array, nil] optional arguments for the signal - # @param args [Hash] keyword arguments to be pass to workflow's #execute method # @param options [Hash, nil] optional overrides # @option options [String] :workflow_id # @option options [String] :run_id of the specific workflow or "" if none is passed @@ -329,7 +328,7 @@ def signal_external_workflow(workflow_class, signal_name, *input, options: {}) namespace: execution_options.namespace, execution: { workflow_id: options[:workflow_id], - run_id: options[:run_id] || "" + run_id: options[:run_id] }, signal_name: signal_name, input: input, From 3ec41f7c04734f36dea0f57b42b2a59938136bd2 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Mon, 7 Feb 2022 13:41:23 -0600 Subject: [PATCH 05/18] initial attempt at integration test --- .../workflow_signals_externally_spec.rb | 28 +++++++++++++++++++ .../workflows/workflow_signals_externally.rb | 28 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 examples/spec/integration/workflow_signals_externally_spec.rb create mode 100644 examples/workflows/workflow_signals_externally.rb diff --git a/examples/spec/integration/workflow_signals_externally_spec.rb b/examples/spec/integration/workflow_signals_externally_spec.rb new file mode 100644 index 00000000..5fbb2db2 --- /dev/null +++ b/examples/spec/integration/workflow_signals_externally_spec.rb @@ -0,0 +1,28 @@ +require 'workflows/workflow_signals_externally' + +describe WaitForExternalSignalWorkflow do + let(:signal_name) { "signal_name" } + + it 'receives signal from an external workflow' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + WaitForExternalSignalWorkflow, + 10, # number of echo activities to run + 2, # max activity parallelism + signal_name, + options: { workflow_id: workflow_id } + ) + + Temporal.start_workflow(SendSignalToExternalWorkflow, signal_name, workflow_id) + + result = Temporal.await_workflow_result( + WaitForExternalSignalWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result.length).to eq(1) + expect(result.keys).to eq([signal_name]) + expect(result.values).to eq(["arg1", "arg2"]) + end +end diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb new file mode 100644 index 00000000..5327ce85 --- /dev/null +++ b/examples/workflows/workflow_signals_externally.rb @@ -0,0 +1,28 @@ +# One workflow sends a signal to another workflow. +# +class WaitForExternalSignalWorkflow < Temporal::Workflow + def execute(expected_signal) + signals_received = {} + + workflow.on_signal do |signal, input| + signals_received[signal] = input + end + + workflow.wait_for do + workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") + signals_received.key?(expected_signal) + end + + timeout_timer = workflow.start_timer(1) + workflow.wait_for(timeout_timer) + + signals_received + end +end + +class SendSignalToExternalWorkflow < Temporal::Workflow + def execute(signal_name, target_workflow) + logger.info("Send a signal to an external workflow") + workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + end +end From 2b7cf11ba465034f20b7731e922a4f0c8ddca563 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 11:42:06 -0600 Subject: [PATCH 06/18] docs on testing and an improvement to existing test --- examples/README.md | 20 ++++++++++++++++--- examples/bin/worker | 2 ++ .../workflow_signals_externally_spec.rb | 10 +++++----- .../workflows/workflow_signals_externally.rb | 4 +++- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/examples/README.md b/examples/README.md index 72697ed6..bd9e064a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,7 @@ To try these out you need to have a running Temporal service ([setup instruction Install all the gem dependencies by running: ```sh -> bundle install +bundle install ``` Modify the `init.rb` file to point to your Temporal cluster. @@ -15,11 +15,25 @@ Modify the `init.rb` file to point to your Temporal cluster. Start a worker process: ```sh -> bin/worker +bin/worker ``` Use this command to trigger one of the example workflows from the `workflows` directory: ```sh -> bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...] +bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...] ``` +## Testing + +To run tests, make sure the temporal server and the worker process are already running: +```shell +docker-compose up +bin/worker +``` +To execute the tests, run: +```shell +bundle exec rspec +``` +To add a new test that uses a new workflow or new activity, make sure to register those new +workflows and activities by modifying the `bin/worker` file and adding them there. After any +changes to that file, restart the worker process to pick up the new registrations. diff --git a/examples/bin/worker b/examples/bin/worker index 14752ede..174806e8 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -45,6 +45,8 @@ worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) worker.register_workflow(WaitForWorkflow) +worker.register_workflow(WaitForExternalSignalWorkflow) +worker.register_workflow(SendSignalToExternalWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/workflow_signals_externally_spec.rb b/examples/spec/integration/workflow_signals_externally_spec.rb index 5fbb2db2..54d4389b 100644 --- a/examples/spec/integration/workflow_signals_externally_spec.rb +++ b/examples/spec/integration/workflow_signals_externally_spec.rb @@ -7,8 +7,6 @@ workflow_id = SecureRandom.uuid run_id = Temporal.start_workflow( WaitForExternalSignalWorkflow, - 10, # number of echo activities to run - 2, # max activity parallelism signal_name, options: { workflow_id: workflow_id } ) @@ -21,8 +19,10 @@ run_id: run_id, ) - expect(result.length).to eq(1) - expect(result.keys).to eq([signal_name]) - expect(result.values).to eq(["arg1", "arg2"]) + expect(result).to eq( + { + signal_name => ["arg1", "arg2"] + } + ) end end diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb index 5327ce85..cf5dac48 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/workflow_signals_externally.rb @@ -5,6 +5,7 @@ def execute(expected_signal) signals_received = {} workflow.on_signal do |signal, input| + workflow.logger.info("Received signal name #{signal}, with input #{input.inspect}") signals_received[signal] = input end @@ -23,6 +24,7 @@ def execute(expected_signal) class SendSignalToExternalWorkflow < Temporal::Workflow def execute(signal_name, target_workflow) logger.info("Send a signal to an external workflow") - workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + future.get end end From 4e7e8a3430155a46270cfbdbafd6169215e56869 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 11:43:06 -0600 Subject: [PATCH 07/18] encode the signal payload using correct helper --- lib/temporal/connection/serializer/signal_external_workflow.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index bcd4e08c..0a18e919 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -15,7 +15,7 @@ def to_proto namespace: object.namespace, execution: serialize_execution(object.execution), signal_name: object.signal_name, - input: to_payloads(object.input), + input: to_signal_payloads(object.input), control: object.control, child_workflow_only: object.child_workflow_only ) From 9971c20029c328841ad91c9ea806851f0841ae27 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 11:44:39 -0600 Subject: [PATCH 08/18] return a Future and fulfill it correctly upon completion --- lib/temporal/workflow/context.rb | 15 +++++++++++++++ lib/temporal/workflow/state_manager.rb | 7 +++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index ee313e8c..2e4a7b9d 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -336,6 +336,21 @@ def signal_external_workflow(workflow_class, signal_name, *input, options: {}) child_workflow_only: !!options[:child_workflow_only] ) schedule_command(command) + + target, cancelation_id = schedule_command(command) + future = Future.new(target, self, cancelation_id: cancelation_id) + + dispatcher.register_handler(target, 'completed') do |result| + future.set(result) + future.success_callbacks.each { |callback| call_in_fiber(callback, result) } + end + + dispatcher.register_handler(target, 'failed') do |exception| + future.fail(exception) + future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) } + end + + future end private diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 95c84394..26e5e474 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -243,8 +243,10 @@ def apply_event(event) when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED' # Temporal Server will try to Signal the targeted Workflow # Contains the Signal name, as well as a Signal payload + # The workflow that sends the signal creates this event in its log; the + # receiving workflow has no related event (?) state_machine.start - dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) + discard_command(target) when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED' # Temporal Server cannot Signal the targeted Workflow @@ -254,8 +256,9 @@ def apply_event(event) when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' # Temporal Server has successfully Signaled the targeted Workflow - # Fire & Forget, so no result is returned? Don't we have a Future we return elsewhere? + # Return the result to the Future waiting on this state_machine.complete + dispatch(target, 'completed') when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES' # todo From 82240acfabc373a23c8d3a01666c8bd75149e392 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 11:45:56 -0600 Subject: [PATCH 09/18] get the \*event_id from the right field in the command structure --- lib/temporal/workflow/history/event.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/temporal/workflow/history/event.rb b/lib/temporal/workflow/history/event.rb index be389636..34827abc 100644 --- a/lib/temporal/workflow/history/event.rb +++ b/lib/temporal/workflow/history/event.rb @@ -12,7 +12,6 @@ class Event REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED - EXTERNAL_WORKFLOW_EXECUTION_SIGNALED UPSERT_WORKFLOW_SEARCH_ATTRIBUTES ].freeze @@ -48,7 +47,7 @@ def originating_event_id 1 # fixed id for everything related to current workflow when *EVENT_TYPES attributes.scheduled_event_id - when *CHILD_WORKFLOW_EVENTS + when *CHILD_WORKFLOW_EVENTS, 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' attributes.initiated_event_id else id From c274b68c3ccbe2533e2d39274b85d4ccecfdc676 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 12:17:49 -0600 Subject: [PATCH 10/18] modify test to verify the signal is only received once --- .../spec/integration/workflow_signals_externally_spec.rb | 7 ++++++- examples/workflows/workflow_signals_externally.rb | 4 +++- lib/temporal/workflow/state_manager.rb | 3 ++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/examples/spec/integration/workflow_signals_externally_spec.rb b/examples/spec/integration/workflow_signals_externally_spec.rb index 54d4389b..b16d8f32 100644 --- a/examples/spec/integration/workflow_signals_externally_spec.rb +++ b/examples/spec/integration/workflow_signals_externally_spec.rb @@ -21,7 +21,12 @@ expect(result).to eq( { - signal_name => ["arg1", "arg2"] + received: { + signal_name => ["arg1", "arg2"] + }, + counts: { + signal_name => 1 + } } ) end diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb index cf5dac48..e2af20f5 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/workflow_signals_externally.rb @@ -3,10 +3,12 @@ class WaitForExternalSignalWorkflow < Temporal::Workflow def execute(expected_signal) signals_received = {} + signal_counts = Hash.new { |h,k| h[k] = 0 } workflow.on_signal do |signal, input| workflow.logger.info("Received signal name #{signal}, with input #{input.inspect}") signals_received[signal] = input + signal_counts[signal] += 1 end workflow.wait_for do @@ -17,7 +19,7 @@ def execute(expected_signal) timeout_timer = workflow.start_timer(1) workflow.wait_for(timeout_timer) - signals_received + { received: signals_received, counts: signal_counts } end end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 26e5e474..1f0af021 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -244,7 +244,7 @@ def apply_event(event) # Temporal Server will try to Signal the targeted Workflow # Contains the Signal name, as well as a Signal payload # The workflow that sends the signal creates this event in its log; the - # receiving workflow has no related event (?) + # receiving workflow records WORKFLOW_EXECUTION_SIGNALED on reception state_machine.start discard_command(target) @@ -252,6 +252,7 @@ def apply_event(event) # Temporal Server cannot Signal the targeted Workflow # Usually because the Workflow could not be found state_machine.fail + discard_command(target) dispatch(target, 'failed', 'StandardError', from_payloads(event.attributes.cause)) when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' From f65ce32a34b227a3538995213b72e32227029ef8 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 12:38:33 -0600 Subject: [PATCH 11/18] test for failure to deliver a signal to external workflow --- .../workflow_signals_externally_spec.rb | 17 +++++++++++++++++ .../workflows/workflow_signals_externally.rb | 4 ++++ lib/temporal/workflow/history/event.rb | 3 +-- lib/temporal/workflow/state_manager.rb | 4 ++-- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/examples/spec/integration/workflow_signals_externally_spec.rb b/examples/spec/integration/workflow_signals_externally_spec.rb index b16d8f32..9f199030 100644 --- a/examples/spec/integration/workflow_signals_externally_spec.rb +++ b/examples/spec/integration/workflow_signals_externally_spec.rb @@ -30,4 +30,21 @@ } ) end + + it 'correctly handles failure to deliver' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + SendSignalToExternalWorkflow, + signal_name, + workflow_id, + options: { workflow_id: workflow_id }) + + result = Temporal.await_workflow_result( + SendSignalToExternalWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to eq(:failed) + end end diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb index e2af20f5..681aa680 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/workflow_signals_externally.rb @@ -27,6 +27,10 @@ class SendSignalToExternalWorkflow < Temporal::Workflow def execute(signal_name, target_workflow) logger.info("Send a signal to an external workflow") future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + @status = nil + future.done { @status = :success } + future.failed { @status = :failed } future.get + @status end end diff --git a/lib/temporal/workflow/history/event.rb b/lib/temporal/workflow/history/event.rb index 34827abc..9bf90927 100644 --- a/lib/temporal/workflow/history/event.rb +++ b/lib/temporal/workflow/history/event.rb @@ -10,7 +10,6 @@ class Event ACTIVITY_TASK_CANCELED TIMER_FIRED REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED - SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED UPSERT_WORKFLOW_SEARCH_ATTRIBUTES ].freeze @@ -47,7 +46,7 @@ def originating_event_id 1 # fixed id for everything related to current workflow when *EVENT_TYPES attributes.scheduled_event_id - when *CHILD_WORKFLOW_EVENTS, 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' + when *CHILD_WORKFLOW_EVENTS, 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED', 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED' attributes.initiated_event_id else id diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 1f0af021..12e3dea9 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -252,8 +252,8 @@ def apply_event(event) # Temporal Server cannot Signal the targeted Workflow # Usually because the Workflow could not be found state_machine.fail - discard_command(target) - dispatch(target, 'failed', 'StandardError', from_payloads(event.attributes.cause)) + # discard_command(target) + dispatch(target, 'failed', 'StandardError', event.attributes.cause) when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' # Temporal Server has successfully Signaled the targeted Workflow From 6f16c7315880421f45a1cbcef90e38895a71984a Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 12:39:53 -0600 Subject: [PATCH 12/18] do not discard the failure command otherwise non-deterministic --- lib/temporal/workflow/state_manager.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 12e3dea9..030ebaa6 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -252,7 +252,6 @@ def apply_event(event) # Temporal Server cannot Signal the targeted Workflow # Usually because the Workflow could not be found state_machine.fail - # discard_command(target) dispatch(target, 'failed', 'StandardError', event.attributes.cause) when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED' From fe3aa4d0d8f94c23773cc3e2cbb3eb989532f0de Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Wed, 9 Feb 2022 16:32:17 -0600 Subject: [PATCH 13/18] simplify test workflow by eliminating unnecessary timer --- examples/workflows/workflow_signals_externally.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb index 681aa680..647d96ad 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/workflow_signals_externally.rb @@ -16,9 +16,6 @@ def execute(expected_signal) signals_received.key?(expected_signal) end - timeout_timer = workflow.start_timer(1) - workflow.wait_for(timeout_timer) - { received: signals_received, counts: signal_counts } end end From 820dfcc5fb06974fd005c8d0590c07967b295aee Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Thu, 10 Feb 2022 13:57:08 -0600 Subject: [PATCH 14/18] oops, had double call to #schedule_command so signals were sent twice --- lib/temporal/workflow/context.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 2e4a7b9d..37c1e5b1 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -335,7 +335,6 @@ def signal_external_workflow(workflow_class, signal_name, *input, options: {}) control: "", # deprecated child_workflow_only: !!options[:child_workflow_only] ) - schedule_command(command) target, cancelation_id = schedule_command(command) future = Future.new(target, self, cancelation_id: cancelation_id) From fd85b4bd5857348ddf4c8d963263ec7b5035f702 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Thu, 10 Feb 2022 14:29:32 -0600 Subject: [PATCH 15/18] edit description of example --- examples/workflows/workflow_signals_externally.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/workflow_signals_externally.rb index 647d96ad..8a3e4860 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/workflow_signals_externally.rb @@ -1,4 +1,5 @@ -# One workflow sends a signal to another workflow. +# One workflow sends a signal to another workflow. Can be used to implement +# the synchronous-proxy pattern (see Go samples) # class WaitForExternalSignalWorkflow < Temporal::Workflow def execute(expected_signal) From f53b947341becb0852b3028298b543af59438967 Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 11 Feb 2022 10:15:16 -0600 Subject: [PATCH 16/18] split to separate files and improve test coverage --- .../wait_for_external_signal_workflow_spec.rb | 82 +++++++++++++++++++ .../workflow_signals_externally_spec.rb | 50 ----------- .../send_signal_to_external_workflow.rb | 17 ++++ ...b => wait_for_external_signal_workflow.rb} | 12 --- 4 files changed, 99 insertions(+), 62 deletions(-) create mode 100644 examples/spec/integration/wait_for_external_signal_workflow_spec.rb delete mode 100644 examples/spec/integration/workflow_signals_externally_spec.rb create mode 100644 examples/workflows/send_signal_to_external_workflow.rb rename examples/workflows/{workflow_signals_externally.rb => wait_for_external_signal_workflow.rb} (63%) diff --git a/examples/spec/integration/wait_for_external_signal_workflow_spec.rb b/examples/spec/integration/wait_for_external_signal_workflow_spec.rb new file mode 100644 index 00000000..35c1fd36 --- /dev/null +++ b/examples/spec/integration/wait_for_external_signal_workflow_spec.rb @@ -0,0 +1,82 @@ +require 'workflows/wait_for_external_signal_workflow' +require 'workflows/send_signal_to_external_workflow' + +describe WaitForExternalSignalWorkflow do + let(:signal_name) { "signal_name" } + let(:receiver_workflow_id) { SecureRandom.uuid } + let(:sender_workflow_id) { SecureRandom.uuid } + + context 'when the workflows succeed then' do + it 'receives signal from an external workflow only once' do + run_id = Temporal.start_workflow( + WaitForExternalSignalWorkflow, + signal_name, + options: {workflow_id: receiver_workflow_id} + ) + + Temporal.start_workflow( + SendSignalToExternalWorkflow, + signal_name, + receiver_workflow_id + ) + + result = Temporal.await_workflow_result( + WaitForExternalSignalWorkflow, + workflow_id: receiver_workflow_id, + run_id: run_id, + ) + + expect(result).to eq( + { + received: { + signal_name => ["arg1", "arg2"] + }, + counts: { + signal_name => 1 + } + } + ) + end + + it 'returns :success to the sending workflow' do + Temporal.start_workflow( + WaitForExternalSignalWorkflow, + signal_name, + options: {workflow_id: receiver_workflow_id} + ) + + run_id = Temporal.start_workflow( + SendSignalToExternalWorkflow, + signal_name, + receiver_workflow_id, + options: {workflow_id: sender_workflow_id} + ) + + result = Temporal.await_workflow_result( + SendSignalToExternalWorkflow, + workflow_id: sender_workflow_id, + run_id: run_id, + ) + + expect(result).to eq(:success) + end + end + + context 'when the workflows fail' do + it 'correctly handles failure to deliver' do + run_id = Temporal.start_workflow( + SendSignalToExternalWorkflow, + signal_name, + receiver_workflow_id, + options: {workflow_id: sender_workflow_id}) + + result = Temporal.await_workflow_result( + SendSignalToExternalWorkflow, + workflow_id: sender_workflow_id, + run_id: run_id, + ) + + expect(result).to eq(:failed) + end + end +end diff --git a/examples/spec/integration/workflow_signals_externally_spec.rb b/examples/spec/integration/workflow_signals_externally_spec.rb deleted file mode 100644 index 9f199030..00000000 --- a/examples/spec/integration/workflow_signals_externally_spec.rb +++ /dev/null @@ -1,50 +0,0 @@ -require 'workflows/workflow_signals_externally' - -describe WaitForExternalSignalWorkflow do - let(:signal_name) { "signal_name" } - - it 'receives signal from an external workflow' do - workflow_id = SecureRandom.uuid - run_id = Temporal.start_workflow( - WaitForExternalSignalWorkflow, - signal_name, - options: { workflow_id: workflow_id } - ) - - Temporal.start_workflow(SendSignalToExternalWorkflow, signal_name, workflow_id) - - result = Temporal.await_workflow_result( - WaitForExternalSignalWorkflow, - workflow_id: workflow_id, - run_id: run_id, - ) - - expect(result).to eq( - { - received: { - signal_name => ["arg1", "arg2"] - }, - counts: { - signal_name => 1 - } - } - ) - end - - it 'correctly handles failure to deliver' do - workflow_id = SecureRandom.uuid - run_id = Temporal.start_workflow( - SendSignalToExternalWorkflow, - signal_name, - workflow_id, - options: { workflow_id: workflow_id }) - - result = Temporal.await_workflow_result( - SendSignalToExternalWorkflow, - workflow_id: workflow_id, - run_id: run_id, - ) - - expect(result).to eq(:failed) - end -end diff --git a/examples/workflows/send_signal_to_external_workflow.rb b/examples/workflows/send_signal_to_external_workflow.rb new file mode 100644 index 00000000..ab565255 --- /dev/null +++ b/examples/workflows/send_signal_to_external_workflow.rb @@ -0,0 +1,17 @@ +# Sends +signal_name+ to the +target_workflow+ from within a workflow. +# This is different than using the Client#send_signal method which is +# for signaling a workflow *from outside* any workflow. +# +# Returns :success or :failed +# +class SendSignalToExternalWorkflow < Temporal::Workflow + def execute(signal_name, target_workflow) + logger.info("Send a signal to an external workflow") + future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + @status = nil + future.done { @status = :success } + future.failed { @status = :failed } + future.get + @status + end +end diff --git a/examples/workflows/workflow_signals_externally.rb b/examples/workflows/wait_for_external_signal_workflow.rb similarity index 63% rename from examples/workflows/workflow_signals_externally.rb rename to examples/workflows/wait_for_external_signal_workflow.rb index 8a3e4860..03986309 100644 --- a/examples/workflows/workflow_signals_externally.rb +++ b/examples/workflows/wait_for_external_signal_workflow.rb @@ -20,15 +20,3 @@ def execute(expected_signal) { received: signals_received, counts: signal_counts } end end - -class SendSignalToExternalWorkflow < Temporal::Workflow - def execute(signal_name, target_workflow) - logger.info("Send a signal to an external workflow") - future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) - @status = nil - future.done { @status = :success } - future.failed { @status = :failed } - future.get - @status - end -end From e43066e5aca554aadd6a2dfcd5084b458fd8738b Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 11 Feb 2022 10:37:45 -0600 Subject: [PATCH 17/18] change method signature for consistency and a few other cleanups --- .../send_signal_to_external_workflow.rb | 2 +- .../serializer/signal_external_workflow.rb | 2 +- lib/temporal/workflow/command.rb | 2 +- lib/temporal/workflow/context.rb | 34 +++++++++---------- lib/temporal/workflow/history/event_target.rb | 1 - 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/examples/workflows/send_signal_to_external_workflow.rb b/examples/workflows/send_signal_to_external_workflow.rb index ab565255..c4d560e4 100644 --- a/examples/workflows/send_signal_to_external_workflow.rb +++ b/examples/workflows/send_signal_to_external_workflow.rb @@ -7,7 +7,7 @@ class SendSignalToExternalWorkflow < Temporal::Workflow def execute(signal_name, target_workflow) logger.info("Send a signal to an external workflow") - future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, "arg1", "arg2", options: {workflow_id: target_workflow} ) + future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, target_workflow, nil, ["arg1", "arg2"]) @status = nil future.done { @status = :success } future.failed { @status = :failed } diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index 0a18e919..91907edd 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -16,7 +16,7 @@ def to_proto execution: serialize_execution(object.execution), signal_name: object.signal_name, input: to_signal_payloads(object.input), - control: object.control, + control: "", # deprecated child_workflow_only: object.child_workflow_only ) ) diff --git a/lib/temporal/workflow/command.rb b/lib/temporal/workflow/command.rb index 64e1a776..c02bc8d6 100644 --- a/lib/temporal/workflow/command.rb +++ b/lib/temporal/workflow/command.rb @@ -11,7 +11,7 @@ module Command CancelTimer = Struct.new(:timer_id, keyword_init: true) CompleteWorkflow = Struct.new(:result, keyword_init: true) FailWorkflow = Struct.new(:exception, keyword_init: true) - SignalExternalWorkflow = Struct.new(:namespace, :execution, :signal_name, :input, :control, :child_workflow_only, keyword_init: true) + SignalExternalWorkflow = Struct.new(:namespace, :execution, :signal_name, :input, :child_workflow_only, keyword_init: true) # only these commands are supported right now SCHEDULE_ACTIVITY_TYPE = :schedule_activity diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 37c1e5b1..7a07d4fc 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -304,36 +304,34 @@ def cancel(target, cancelation_id) end end - # Send a signal from one workflow to another workflow. For signaling from the client, - # see Client#signal_workflow. + # Send a signal from inside a workflow to another workflow. Not to be confused with + # Client#signal_workflow which sends a signal from outside a workflow to a workflow. # - # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class - # is passed, its config (namespace, task_queue, timeouts, etc) will be used - # @param signal_name [String] name of signal + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param signal [String] name of the signal to send + # @param workflow_id [String] + # @param run_id [String] # @param input [String, Array, nil] optional arguments for the signal - # @param options [Hash, nil] optional overrides - # @option options [String] :workflow_id - # @option options [String] :run_id of the specific workflow or "" if none is passed - # @option options [String] :name workflow name - # @option options [String] :child_workflow_only indicates whether the signal should only be deliverd to a + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default + # @param child_workflow_only [Boolean] indicates whether the signal should only be delivered to a # child workflow; defaults to false # # @return [Future] future - def signal_external_workflow(workflow_class, signal_name, *input, options: {}) + def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false) options ||= {} - execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) command = Command::SignalExternalWorkflow.new( - namespace: execution_options.namespace, + namespace: namespace || execution_options.namespace, execution: { - workflow_id: options[:workflow_id], - run_id: options[:run_id] + workflow_id: workflow_id, + run_id: run_id }, - signal_name: signal_name, + signal_name: signal, input: input, - control: "", # deprecated - child_workflow_only: !!options[:child_workflow_only] + child_workflow_only: child_workflow_only ) target, cancelation_id = schedule_command(command) diff --git a/lib/temporal/workflow/history/event_target.rb b/lib/temporal/workflow/history/event_target.rb index 5cd185f7..8f605a01 100644 --- a/lib/temporal/workflow/history/event_target.rb +++ b/lib/temporal/workflow/history/event_target.rb @@ -16,7 +16,6 @@ class UnexpectedEventType < InternalError; end CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE = :cancel_external_workflow_request WORKFLOW_TYPE = :workflow CANCEL_WORKFLOW_REQUEST_TYPE = :cancel_workflow_request - SIGNAL_EXTERNAL_WORKFLOW_TYPE = :external_workflow # NOTE: The order is important, first prefix match wins (will be a longer match) TARGET_TYPES = { From b888fe3ee1daf0c1bf1e2fe5fb4ea1cd4c59f44c Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Fri, 11 Feb 2022 11:52:41 -0600 Subject: [PATCH 18/18] oops, fix EventType name to match correct constant --- lib/temporal/workflow/state_manager.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 030ebaa6..22bbd4c0 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -286,7 +286,7 @@ def event_target_from(command_id, command) when Command::StartChildWorkflow History::EventTarget::CHILD_WORKFLOW_TYPE when Command::SignalExternalWorkflow - History::EventTarget::SIGNAL_EXTERNAL_WORKFLOW_TYPE + History::EventTarget::EXTERNAL_WORKFLOW_TYPE end History::EventTarget.new(command_id, target_type)