Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/spec/integration/signal_with_start_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
timeouts: { execution: 10 },
}
)

Expand All @@ -29,10 +29,10 @@
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
timeouts: { execution: 10 },
}
)

Expand All @@ -50,22 +50,22 @@
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
10,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
timeouts: { execution: 10 },
}
)

second_run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
timeouts: { execution: 10 },
}
)

Expand Down
10 changes: 7 additions & 3 deletions examples/workflows/signal_with_start_workflow.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
require 'activities/hello_world_activity'

class SignalWithStartWorkflow < Temporal::Workflow

def execute(expected_signal, sleep_for)
received = 'no signal received'
def execute(expected_signal)
initial_value = 'no signal received'
received = initial_value

workflow.on_signal do |signal, input|
if signal == expected_signal
HelloWorldActivity.execute!('expected signal')
received = input
end
end

# Do something to get descheduled so the signal handler has a chance to run
workflow.sleep(sleep_for)
workflow.wait_until { received != initial_value }
received
end
end
11 changes: 9 additions & 2 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ def wait_until(&unblock_condition)

fiber = Fiber.current

handler = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# wait_until condition blocks often read state modified by target-specfic handlers like
# signal handlers or callbacks for timer or activity completion. Running the wait_until
# handlers after the other handlers ensures that state is correctly updated before being
# read.
handler = dispatcher.register_handler(
Dispatcher::WILDCARD, # any target
Dispatcher::WILDCARD, # any event type
Dispatcher::Order::AT_END) do
fiber.resume if unblock_condition.call
end

Expand Down Expand Up @@ -325,7 +332,7 @@ def on_signal(signal_name = nil, &block)
call_in_fiber(block, input)
end
else
dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, 'signaled') do |signal, input|
dispatcher.register_handler(Dispatcher::WILDCARD, 'signaled') do |signal, input|
call_in_fiber(block, signal, input)
end
end
Expand Down
33 changes: 16 additions & 17 deletions lib/temporal/workflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ class Workflow
# elsewhere in the system we may dispatch the event and execute the handler.
# We *always* execute the handler associated with the event_name.
#
# Optionally, we may register a named handler that is triggered when an event _and
# an optional handler_name key_ are provided. In this situation, we dispatch to both
# the handler associated to event_name+handler_name and to the handler associated with
# the event_name. The order of this dispatch is not guaranteed.
#
class Dispatcher
# Raised if a duplicate ID is encountered during dispatch handling.
# This likely indicates a bug in temporal-ruby or that unsupported multithreaded
Expand All @@ -40,19 +35,23 @@ def unregister
end

WILDCARD = '*'.freeze
TARGET_WILDCARD = '*'.freeze

EventStruct = Struct.new(:event_name, :handler)
module Order
AT_BEGINNING = 1
AT_END = 2
end

EventStruct = Struct.new(:event_name, :handler, :order)

def initialize
@handlers = Hash.new { |hash, key| hash[key] = {} }
@event_handlers = Hash.new { |hash, key| hash[key] = {} }
@next_id = 0
end

def register_handler(target, event_name, &handler)
def register_handler(target, event_name, order=Order::AT_BEGINNING, &handler)
@next_id += 1
handlers[target][@next_id] = EventStruct.new(event_name, handler)
RegistrationHandle.new(handlers[target], @next_id)
event_handlers[target][@next_id] = EventStruct.new(event_name, handler, order)
RegistrationHandle.new(event_handlers[target], @next_id)
end

def dispatch(target, event_name, args = nil)
Expand All @@ -63,14 +62,14 @@ def dispatch(target, event_name, args = nil)

private

attr_reader :handlers
attr_reader :event_handlers

def handlers_for(target, event_name)
handlers[target]
.merge(handlers[TARGET_WILDCARD]) { raise DuplicateIDError.new('Cannot resolve duplicate dispatcher handler IDs') }
.select { |_, event_struct| match?(event_struct, event_name) }
.sort
.map { |_, event_struct| event_struct.handler }
event_handlers[target]
.merge(event_handlers[WILDCARD]) { raise DuplicateIDError.new('Cannot resolve duplicate dispatcher handler IDs') }
.select { |_, event| match?(event, event_name) }
.sort_by{ |id, event_struct| [event_struct.order, id]}
.map { |_, event| event.handler }
end

def match?(event_struct, event_name)
Expand Down
109 changes: 43 additions & 66 deletions spec/unit/lib/temporal/workflow/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,25 @@
subject.register_handler(target, event_name, &block)
subject
end
let(:handlers) { dispatcher.send(:handlers) }
let(:handlers) { dispatcher.send(:event_handlers) }

context 'with default handler_name' do
let(:handler_name) { nil }

it 'stores the target' do
expect(handlers.key?(target)).to be true
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Hash)
expect(handlers[target].count).to eq 1
end

it 'associates the event name with the target' do
event = handlers[target][1]
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target][1]
expect(event.handler).to eq(block)
end
it 'stores the target' do
expect(handlers.key?(target)).to be true
end

context 'with a specific handler_name' do
let(:handler_name) { 'specific name' }
let(:event_name) { "signaled:#{handler_name}" }

it 'stores the target' do
expect(handlers.key?(target)).to be true
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Hash)
expect(handlers[target].count).to eq 1
end
it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Hash)
expect(handlers[target].count).to eq 1
end

it 'associates the event name and handler name with the target' do
event = handlers[target][1]
expect(event.event_name).to eq(event_name)
end
it 'associates the event name with the target' do
event = handlers[target][1]
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target][1]
expect(event.handler).to eq(block)
end
it 'associates the handler with the target' do
event = handlers[target][1]
expect(event.handler).to eq(block)
end

it 'removes a given handler against the target' do
Expand All @@ -70,19 +42,19 @@
subject.register_handler(target, 'signaled', &block2)
subject.register_handler(other_target, 'signaled', &block3)

expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
expect(subject.send(:handlers)[target][1].handler).to be(block1)
expect(handlers[target][1].event_name).to eq('signaled')
expect(handlers[target][1].handler).to be(block1)

expect(subject.send(:handlers)[target][2].event_name).to eq('signaled')
expect(subject.send(:handlers)[target][2].handler).to be(block2)
expect(handlers[target][2].event_name).to eq('signaled')
expect(handlers[target][2].handler).to be(block2)

expect(subject.send(:handlers)[other_target][3].event_name).to eq('signaled')
expect(subject.send(:handlers)[other_target][3].handler).to be(block3)
expect(handlers[other_target][3].event_name).to eq('signaled')
expect(handlers[other_target][3].handler).to be(block3)

handle1.unregister
expect(subject.send(:handlers)[target][1]).to be(nil)
expect(subject.send(:handlers)[target][2]).to_not be(nil)
expect(subject.send(:handlers)[other_target][3]).to_not be(nil)
expect(handlers[target][1]).to be(nil)
expect(handlers[target][2]).to_not be(nil)
expect(handlers[other_target][3]).to_not be(nil)
end
end

Expand Down Expand Up @@ -139,14 +111,14 @@
end
end

context 'with TARGET_WILDCARD target handler' do
context 'with WILDCARD target handler' do
let(:handler_6) { -> { 'sixth block' } }
let(:handler_7) { -> { 'seventh block' } }
before do
allow(handler_6).to receive(:call)
allow(handler_7).to receive(:call)

subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, &handler_6)
subject.register_handler(target, 'completed', &handler_7)
end

Expand All @@ -160,31 +132,36 @@
expect(handler_7).to have_received(:call).ordered
end

it 'TARGET_WILDCARD can be compared to an EventTarget object' do
expect(target.eql?(described_class::TARGET_WILDCARD)).to be(false)
it 'WILDCARD can be compared to an EventTarget object' do
expect(target.eql?(described_class::WILDCARD)).to be(false)
end
end

context 'with a named handler' do
context 'with AT_END order' do
let(:handler_5) { -> { 'fifth block' } }
let(:handler_6) { -> { 'sixth block' } }
let(:handler_7) { -> { 'seventh block' } }
let(:handler_name) { 'specific name' }
before do
allow(handler_5).to receive(:call)
allow(handler_6).to receive(:call)
allow(handler_7).to receive(:call)

subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, described_class::Order::AT_END, &handler_5)
subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, described_class::Order::AT_END, &handler_6)
subject.register_handler(target, 'completed', &handler_7)
end

it 'calls the named handler and the default' do
subject.dispatch(target, 'completed', handler_name: handler_name)
it 'calls the handler' do
subject.dispatch(target, 'completed')

# the parent context "before" block registers the handlers with the target
# so look there for why handlers 1 and 4 are also called
expect(handler_7).to have_received(:call)
expect(handler_1).to have_received(:call)
expect(handler_4).to have_received(:call)
# Target handlers still invoked
expect(handler_1).to have_received(:call).ordered
expect(handler_4).to have_received(:call).ordered
expect(handler_7).to have_received(:call).ordered

expect(handler_2).not_to have_received(:call)
expect(handler_3).not_to have_received(:call)
# AT_END handlers are invoked at the end, in order
expect(handler_5).to have_received(:call).ordered
expect(handler_6).to have_received(:call).ordered
end
end
end
Expand Down