diff --git a/examples/bin/worker b/examples/bin/worker index 968c991e..9bf14e38 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -49,6 +49,7 @@ worker.register_workflow(TripBookingWorkflow) worker.register_workflow(UpsertSearchAttributesWorkflow) worker.register_workflow(WaitForWorkflow) worker.register_workflow(WaitForExternalSignalWorkflow) +worker.register_workflow(WaitForNamedSignalWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/named_signal_handler_spec.rb b/examples/spec/integration/named_signal_handler_spec.rb new file mode 100644 index 00000000..aa5fc559 --- /dev/null +++ b/examples/spec/integration/named_signal_handler_spec.rb @@ -0,0 +1,84 @@ +require 'workflows/wait_for_named_signal_workflow' + +describe WaitForNamedSignalWorkflow, :integration do + let(:receiver_workflow_id) { SecureRandom.uuid } + + context 'when the signal is named' do + let(:arg1) { "arg1" } + let(:arg2) { 7890.1234 } + + context 'and the workflow has a named signal handler matching the signal name' do + let(:signal_name) { "NamedSignal" } + + it 'receives the signal in its named handler' do + _, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id}) + + Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2]) + + result = Temporal.await_workflow_result( + WaitForNamedSignalWorkflow, + workflow_id: receiver_workflow_id, + run_id: run_id, + ) + + expect(result[:received]).to include({signal_name => [arg1, arg2]}) + expect(result[:counts]).to include({signal_name => 1}) + expect(result).to eq( + { + received: { + signal_name => [arg1, arg2], + 'catch-all' => [arg1, arg2] + }, + counts: { + signal_name => 1, + 'catch-all' => 1 + } + } + ) + + end + + it 'receives the signal in its catch-all signal handler' do + _, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id}) + + Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2]) + + result = Temporal.await_workflow_result( + WaitForNamedSignalWorkflow, + workflow_id: receiver_workflow_id, + run_id: run_id, + ) + + expect(result[:received]).to include({"catch-all" => [arg1, arg2]}) + expect(result[:counts]).to include({"catch-all" => 1}) + end + end + + context 'and the workflow does NOT have a named signal handler matching the signal name' do + let(:signal_name) { 'doesNOTmatchAsignalHandler' } + + it 'receives the signal in its catch-all signal handler' do + _, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id}) + + Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2]) + + result = Temporal.await_workflow_result( + WaitForNamedSignalWorkflow, + workflow_id: receiver_workflow_id, + run_id: run_id, + ) + + expect(result).to eq( + { + received: { + 'catch-all' => [arg1, arg2] + }, + counts: { + 'catch-all' => 1 + } + } + ) + end + end + end +end diff --git a/examples/workflows/wait_for_named_signal_workflow.rb b/examples/workflows/wait_for_named_signal_workflow.rb new file mode 100644 index 00000000..9f715a2a --- /dev/null +++ b/examples/workflows/wait_for_named_signal_workflow.rb @@ -0,0 +1,27 @@ +# Can receive signals to its named signal handler. If a signal doesn't match the +# named handler's signature, then it matches the catch-all signal handler +# +class WaitForNamedSignalWorkflow < Temporal::Workflow + def execute(expected_signal) + signals_received = {} + signal_counts = Hash.new { |h,k| h[k] = 0 } + + # catch-all handler + workflow.on_signal do |signal, input| + workflow.logger.info("Received signal name as #{signal}, with input #{input.inspect}") + signals_received['catch-all'] = input + signal_counts['catch-all'] += 1 + end + + workflow.on_signal('NamedSignal') do |input| + workflow.logger.info("Received signal name -NamedSignal-, with input #{input.inspect}") + signals_received['NamedSignal'] = input + signal_counts['NamedSignal'] += 1 + end + + timeout_timer = workflow.start_timer(1) + workflow.wait_for(timeout_timer) + + { received: signals_received, counts: signal_counts } + end +end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index dd28fdbf..39dc8236 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -9,6 +9,7 @@ require 'temporal/workflow/future' require 'temporal/workflow/replay_aware_logger' require 'temporal/workflow/state_manager' +require 'temporal/workflow/signal' # This context class is available in the workflow implementation # and provides context and methods for interacting with Temporal @@ -280,11 +281,24 @@ def now state_manager.local_time end - def on_signal(&block) - target = History::EventTarget.workflow - - dispatcher.register_handler(target, 'signaled') do |signal, input| - call_in_fiber(block, signal, input) + # Define a signal handler to receive signals onto the workflow. When + # +name+ is defined, this creates a named signal handler which will be + # invoked whenever a signal named +name+ is received. A handler without + # a set name (defaults to nil) will be the default handler and will receive + # all signals that do not match a named signal handler. + # + # @param signal_name [String, Symbol, nil] an optional signal name; converted to a String + def on_signal(signal_name=nil, &block) + if signal_name + target = Signal.new(signal_name) + dispatcher.register_handler(target, 'signaled') do |_, input| + # do not pass signal name when triggering a named handler + call_in_fiber(block, input) + end + else + dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, 'signaled') do |signal, input| + call_in_fiber(block, signal, input) + end end end diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 2a768e54..58b32f46 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -1,15 +1,30 @@ module Temporal class Workflow + # This provides a generic event dispatcher mechanism. There are two main entry + # points to this class, #register_handler and #dispatch. + # + # A handler may be associated with a specific event name so when that event occurs + # elsewhere in the system we may dispatch the event and execute the handler. + # We *always* execute the handler associated with the event_name. + # + # Optionally, we may register a named handler that is triggered when an event _and + # an optional handler_name key_ are provided. In this situation, we dispatch to both + # the handler associated to event_name+handler_name and to the handler associated with + # the event_name. The order of this dispatch is not guaranteed. + # class Dispatcher WILDCARD = '*'.freeze TARGET_WILDCARD = '*'.freeze + EventStruct = Struct.new(:event_name, :handler) + def initialize @handlers = Hash.new { |hash, key| hash[key] = [] } end def register_handler(target, event_name, &handler) - handlers[target] << [event_name, handler] + handlers[target] << EventStruct.new(event_name, handler) + self end def dispatch(target, event_name, args = nil) @@ -25,8 +40,13 @@ def dispatch(target, event_name, args = nil) def handlers_for(target, event_name) handlers[target] .concat(handlers[TARGET_WILDCARD]) - .select { |(name, _)| name == event_name || name == WILDCARD } - .map(&:last) + .select { |event_struct| match?(event_struct, event_name) } + .map(&:handler) + end + + def match?(event_struct, event_name) + event_struct.event_name == event_name || + event_struct.event_name == WILDCARD end end end diff --git a/lib/temporal/workflow/signal.rb b/lib/temporal/workflow/signal.rb new file mode 100644 index 00000000..e31d77fc --- /dev/null +++ b/lib/temporal/workflow/signal.rb @@ -0,0 +1,5 @@ +module Temporal + class Workflow + Signal = Struct.new(:signal_name) + end +end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 8c991b08..bdebe701 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -5,6 +5,7 @@ require 'temporal/workflow/history/event_target' require 'temporal/concerns/payloads' require 'temporal/workflow/errors' +require 'temporal/workflow/signal' module Temporal class Workflow @@ -224,7 +225,9 @@ def apply_event(event) handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data'])) when 'WORKFLOW_EXECUTION_SIGNALED' - dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) + # relies on Signal#== for matching in Dispatcher + signal_target = Signal.new(event.attributes.signal_name) + dispatch(signal_target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb index 43ccc8fc..6e1ae8d6 100644 --- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb +++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb @@ -6,12 +6,56 @@ let(:other_target) { Temporal::Workflow::History::EventTarget.new(2, Temporal::Workflow::History::EventTarget::TIMER_TYPE) } describe '#register_handler' do - it 'stores a given handler against the target' do - block = -> { 'handler body' } + let(:block) { -> { 'handler body' } } + let(:event_name) { 'signaled' } + let(:dispatcher) { subject.register_handler(target, event_name, &block) } + let(:handlers) { dispatcher.send(:handlers) } - subject.register_handler(target, 'signaled', &block) + context 'with default handler_name' do + let(:handler_name) { nil } - expect(subject.send(:handlers)).to include(target => [['signaled', block]]) + it 'stores the target' do + expect(handlers.key?(target)).to be true + end + + it 'stores the target and handler once' do + expect(handlers[target]).to be_kind_of(Array) + expect(handlers[target].count).to eq 1 + end + + it 'associates the event name with the target' do + event = handlers[target].first + expect(event.event_name).to eq(event_name) + end + + it 'associates the handler with the target' do + event = handlers[target].first + expect(event.handler).to eq(block) + end + end + + context 'with a specific handler_name' do + let(:handler_name) { 'specific name' } + let(:event_name) { "signaled:#{handler_name}" } + + it 'stores the target' do + expect(handlers.key?(target)).to be true + end + + it 'stores the target and handler once' do + expect(handlers[target]).to be_kind_of(Array) + expect(handlers[target].count).to eq 1 + end + + it 'associates the event name and handler name with the target' do + event = handlers[target].first + expect(event.event_name).to eq(event_name) + end + + it 'associates the handler with the target' do + event = handlers[target].first + expect(event.handler).to eq(block) + end end end @@ -89,5 +133,28 @@ expect(target.eql?(described_class::TARGET_WILDCARD)).to be(false) end end + + context 'with a named handler' do + let(:handler_7) { -> { 'seventh block' } } + let(:handler_name) { 'specific name' } + before do + allow(handler_7).to receive(:call) + + subject.register_handler(target, 'completed', &handler_7) + end + + it 'calls the named handler and the default' do + subject.dispatch(target, 'completed', handler_name: handler_name) + + # the parent context "before" block registers the handlers with the target + # so look there for why handlers 1 and 4 are also called + expect(handler_7).to have_received(:call) + expect(handler_1).to have_received(:call) + expect(handler_4).to have_received(:call) + + expect(handler_2).not_to have_received(:call) + expect(handler_3).not_to have_received(:call) + end + end end end