Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(UpsertSearchAttributesWorkflow)
worker.register_workflow(WaitForWorkflow)
worker.register_workflow(WaitForExternalSignalWorkflow)
worker.register_workflow(WaitForNamedSignalWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
84 changes: 84 additions & 0 deletions examples/spec/integration/named_signal_handler_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
require 'workflows/wait_for_named_signal_workflow'

describe WaitForNamedSignalWorkflow, :integration do
let(:receiver_workflow_id) { SecureRandom.uuid }

context 'when the signal is named' do
let(:arg1) { "arg1" }
let(:arg2) { 7890.1234 }

context 'and the workflow has a named signal handler matching the signal name' do
let(:signal_name) { "NamedSignal" }

it 'receives the signal in its named handler' do
_, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id})

Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2])

result = Temporal.await_workflow_result(
WaitForNamedSignalWorkflow,
workflow_id: receiver_workflow_id,
run_id: run_id,
)

expect(result[:received]).to include({signal_name => [arg1, arg2]})
expect(result[:counts]).to include({signal_name => 1})
expect(result).to eq(
{
received: {
signal_name => [arg1, arg2],
'catch-all' => [arg1, arg2]
},
counts: {
signal_name => 1,
'catch-all' => 1
}
}
)

end

it 'receives the signal in its catch-all signal handler' do
_, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id})

Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2])

result = Temporal.await_workflow_result(
WaitForNamedSignalWorkflow,
workflow_id: receiver_workflow_id,
run_id: run_id,
)

expect(result[:received]).to include({"catch-all" => [arg1, arg2]})
expect(result[:counts]).to include({"catch-all" => 1})
end
end

context 'and the workflow does NOT have a named signal handler matching the signal name' do
let(:signal_name) { 'doesNOTmatchAsignalHandler' }

it 'receives the signal in its catch-all signal handler' do
_, run_id = run_workflow(WaitForNamedSignalWorkflow, signal_name, options: { workflow_id: receiver_workflow_id})

Temporal.signal_workflow(WaitForNamedSignalWorkflow, signal_name, receiver_workflow_id, run_id, [arg1, arg2])

result = Temporal.await_workflow_result(
WaitForNamedSignalWorkflow,
workflow_id: receiver_workflow_id,
run_id: run_id,
)

expect(result).to eq(
{
received: {
'catch-all' => [arg1, arg2]
},
counts: {
'catch-all' => 1
}
}
)
end
end
end
end
27 changes: 27 additions & 0 deletions examples/workflows/wait_for_named_signal_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Can receive signals to its named signal handler. If a signal doesn't match the
# named handler's signature, then it matches the catch-all signal handler
#
class WaitForNamedSignalWorkflow < Temporal::Workflow
def execute(expected_signal)
signals_received = {}
signal_counts = Hash.new { |h,k| h[k] = 0 }

# catch-all handler
workflow.on_signal do |signal, input|
workflow.logger.info("Received signal name as #{signal}, with input #{input.inspect}")
signals_received['catch-all'] = input
signal_counts['catch-all'] += 1
end

workflow.on_signal('NamedSignal') do |input|
workflow.logger.info("Received signal name -NamedSignal-, with input #{input.inspect}")
signals_received['NamedSignal'] = input
signal_counts['NamedSignal'] += 1
end

timeout_timer = workflow.start_timer(1)
workflow.wait_for(timeout_timer)

{ received: signals_received, counts: signal_counts }
end
end
24 changes: 19 additions & 5 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require 'temporal/workflow/future'
require 'temporal/workflow/replay_aware_logger'
require 'temporal/workflow/state_manager'
require 'temporal/workflow/signal'

# This context class is available in the workflow implementation
# and provides context and methods for interacting with Temporal
Expand Down Expand Up @@ -280,11 +281,24 @@ def now
state_manager.local_time
end

def on_signal(&block)
target = History::EventTarget.workflow

dispatcher.register_handler(target, 'signaled') do |signal, input|
call_in_fiber(block, signal, input)
# Define a signal handler to receive signals onto the workflow. When
# +name+ is defined, this creates a named signal handler which will be
# invoked whenever a signal named +name+ is received. A handler without
# a set name (defaults to nil) will be the default handler and will receive
# all signals that do not match a named signal handler.
#
# @param signal_name [String, Symbol, nil] an optional signal name; converted to a String
def on_signal(signal_name=nil, &block)
if signal_name
target = Signal.new(signal_name)
dispatcher.register_handler(target, 'signaled') do |_, input|
# do not pass signal name when triggering a named handler
call_in_fiber(block, input)
end
else
dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, 'signaled') do |signal, input|
call_in_fiber(block, signal, input)
end
end
end

Expand Down
26 changes: 23 additions & 3 deletions lib/temporal/workflow/dispatcher.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
module Temporal
class Workflow
# This provides a generic event dispatcher mechanism. There are two main entry
# points to this class, #register_handler and #dispatch.
#
# A handler may be associated with a specific event name so when that event occurs
# elsewhere in the system we may dispatch the event and execute the handler.
# We *always* execute the handler associated with the event_name.
#
# Optionally, we may register a named handler that is triggered when an event _and
# an optional handler_name key_ are provided. In this situation, we dispatch to both
# the handler associated to event_name+handler_name and to the handler associated with
# the event_name. The order of this dispatch is not guaranteed.
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like a cleaner solution would be to add the handler_name to event_name and dispatch the event twice:

  • "signaled:signalName"
  • "signaled"

I think this would keep the dispatcher simpler for now and does allow us to achieve everything that is needed

class Dispatcher
WILDCARD = '*'.freeze
TARGET_WILDCARD = '*'.freeze

EventStruct = Struct.new(:event_name, :handler)

def initialize
@handlers = Hash.new { |hash, key| hash[key] = [] }
end

def register_handler(target, event_name, &handler)
handlers[target] << [event_name, handler]
handlers[target] << EventStruct.new(event_name, handler)
self
end

def dispatch(target, event_name, args = nil)
Expand All @@ -25,8 +40,13 @@ def dispatch(target, event_name, args = nil)
def handlers_for(target, event_name)
handlers[target]
.concat(handlers[TARGET_WILDCARD])
.select { |(name, _)| name == event_name || name == WILDCARD }
.map(&:last)
.select { |event_struct| match?(event_struct, event_name) }
.map(&:handler)
end

def match?(event_struct, event_name)
event_struct.event_name == event_name ||
event_struct.event_name == WILDCARD
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/workflow/signal.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module Temporal
class Workflow
Signal = Struct.new(:signal_name)
end
end
5 changes: 4 additions & 1 deletion lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'temporal/workflow/history/event_target'
require 'temporal/concerns/payloads'
require 'temporal/workflow/errors'
require 'temporal/workflow/signal'

module Temporal
class Workflow
Expand Down Expand Up @@ -224,7 +225,9 @@ def apply_event(event)
handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data']))

when 'WORKFLOW_EXECUTION_SIGNALED'
dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input))
# relies on Signal#== for matching in Dispatcher
signal_target = Signal.new(event.attributes.signal_name)
dispatch(signal_target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input))

when 'WORKFLOW_EXECUTION_TERMINATED'
# todo
Expand Down
75 changes: 71 additions & 4 deletions spec/unit/lib/temporal/workflow/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,56 @@
let(:other_target) { Temporal::Workflow::History::EventTarget.new(2, Temporal::Workflow::History::EventTarget::TIMER_TYPE) }

describe '#register_handler' do
it 'stores a given handler against the target' do
block = -> { 'handler body' }
let(:block) { -> { 'handler body' } }
let(:event_name) { 'signaled' }
let(:dispatcher) { subject.register_handler(target, event_name, &block) }
let(:handlers) { dispatcher.send(:handlers) }

subject.register_handler(target, 'signaled', &block)
context 'with default handler_name' do
let(:handler_name) { nil }

expect(subject.send(:handlers)).to include(target => [['signaled', block]])
it 'stores the target' do
expect(handlers.key?(target)).to be true
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Array)
expect(handlers[target].count).to eq 1
end

it 'associates the event name with the target' do
event = handlers[target].first
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target].first
expect(event.handler).to eq(block)
end
end

context 'with a specific handler_name' do
let(:handler_name) { 'specific name' }
let(:event_name) { "signaled:#{handler_name}" }

it 'stores the target' do
expect(handlers.key?(target)).to be true
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Array)
expect(handlers[target].count).to eq 1
end

it 'associates the event name and handler name with the target' do
event = handlers[target].first
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target].first
expect(event.handler).to eq(block)
end
end
end

Expand Down Expand Up @@ -89,5 +133,28 @@
expect(target.eql?(described_class::TARGET_WILDCARD)).to be(false)
end
end

context 'with a named handler' do
let(:handler_7) { -> { 'seventh block' } }
let(:handler_name) { 'specific name' }
before do
allow(handler_7).to receive(:call)

subject.register_handler(target, 'completed', &handler_7)
end

it 'calls the named handler and the default' do
subject.dispatch(target, 'completed', handler_name: handler_name)

# the parent context "before" block registers the handlers with the target
# so look there for why handlers 1 and 4 are also called
expect(handler_7).to have_received(:call)
expect(handler_1).to have_received(:call)
expect(handler_4).to have_received(:call)

expect(handler_2).not_to have_received(:call)
expect(handler_3).not_to have_received(:call)
end
end
end
end