Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,15 @@ def wait_for_any(*futures)
return if futures.empty? || futures.any?(&:finished?)

fiber = Fiber.current
blocked = true

futures.each do |future|
handlers = futures.map do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
# Because any of the futures can resume the fiber, ignore any callbacks
# from other futures after unblocking has occurred
if blocked && future.finished?
blocked = false
fiber.resume
end
fiber.resume if future.finished?
end
end

Fiber.yield
handlers.each(&:unregister)

return
end
Expand All @@ -277,18 +272,13 @@ def wait_until(&unblock_condition)
return if unblock_condition.call

fiber = Fiber.current
blocked = true

dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
fiber.resume
end

handler = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
fiber.resume if unblock_condition.call
end

Fiber.yield
handler.unregister

return
end
Expand Down Expand Up @@ -316,6 +306,8 @@ def on_signal(signal_name = nil, &block)
call_in_fiber(block, signal, input)
end
end

return
end

def on_query(query, &block)
Expand Down
41 changes: 35 additions & 6 deletions lib/temporal/workflow/dispatcher.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'temporal/errors'

module Temporal
class Workflow
# This provides a generic event dispatcher mechanism. There are two main entry
Expand All @@ -13,18 +15,44 @@ class Workflow
# the event_name. The order of this dispatch is not guaranteed.
#
class Dispatcher
# Raised if a duplicate ID is encountered during dispatch handling.
# This likely indicates a bug in temporal-ruby or that unsupported multithreaded
# workflow code is being used.
class DuplicateIDError < InternalError; end

# Tracks a registered handle so that it can be unregistered later
# The handlers are passed by reference here to be mutated (removed) by the
# unregister call below.
class RegistrationHandle
def initialize(handlers_for_target, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be handy to add a comment saying that handlers are passed by reference and are mutated here

@handlers_for_target = handlers_for_target
@id = id
end

# Unregister the handler from the dispatcher
def unregister
handlers_for_target.delete(id)
end

private

attr_reader :handlers_for_target, :id
end

WILDCARD = '*'.freeze
TARGET_WILDCARD = '*'.freeze

EventStruct = Struct.new(:event_name, :handler)

def initialize
@handlers = Hash.new { |hash, key| hash[key] = [] }
@handlers = Hash.new { |hash, key| hash[key] = {} }
@next_id = 0
end

def register_handler(target, event_name, &handler)
handlers[target] << EventStruct.new(event_name, handler)
self
@next_id += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't thread-safe on non-CRuby implementations. Do we need to worry about handler registration being called from multiple threads @antstorm ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require using multi-threading in workflow code, right? That doesn't seem idiomatic, but I don't know if it's possible. It also seems like this wouldn't work with other parts of the library either since synchronization is not widely used. Would the current version that does array appending even be expected to work correctly in this case? Or is that operation guaranteed to be atomic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. Hoping Anthony responds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the workflow code is definitely expected to be all single-threaded, so I wouldn't worry too much about it. Besides the way we write to handlers is already non thread-safe, so this doesn't add any additional issues

handlers[target][@next_id] = EventStruct.new(event_name, handler)
RegistrationHandle.new(handlers[target], @next_id)
end

def dispatch(target, event_name, args = nil)
Expand All @@ -39,9 +67,10 @@ def dispatch(target, event_name, args = nil)

def handlers_for(target, event_name)
handlers[target]
.concat(handlers[TARGET_WILDCARD])
.select { |event_struct| match?(event_struct, event_name) }
.map(&:handler)
.merge(handlers[TARGET_WILDCARD]) { raise DuplicateIDError.new('Cannot resolve duplicate dispatcher handler IDs') }
.select { |_, event_struct| match?(event_struct, event_name) }
.sort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of surprised that this does the right thing since handlers[TARGET_WILDCARD] merged in earlier isn't a DispatchHandler with an associated sequence ID to sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlers[target] and handlers[TARGET_WILDCARD] both produce a hash of number -> EventStruct. Because the IDs all should be unique, they merge seamlessly unless there is some sort of bug occurring to produce duplicate IDs. DispatchHandler is only returned out of the function to be used to unregister the handler, and not stored inside the dispatcher at all.

The naming here is confusing because there's a handler block argument but also a DispatchHandler object, and they're not actually related. Maybe I should rename DispatchHandler to something like RegistrationHandle? I could also unify them but there's no real functional purpose for that. I'll try to rework this a bit to at least be easier to follow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Yes, that confused me for sure. Renaming would help.

.map { |_, event_struct| event_struct.handler }
end

def match?(event_struct, event_name)
Expand Down
45 changes: 38 additions & 7 deletions spec/unit/lib/temporal/workflow/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
describe '#register_handler' do
let(:block) { -> { 'handler body' } }
let(:event_name) { 'signaled' }
let(:dispatcher) { subject.register_handler(target, event_name, &block) }
let(:dispatcher) do
subject.register_handler(target, event_name, &block)
subject
end
let(:handlers) { dispatcher.send(:handlers) }

context 'with default handler_name' do
Expand All @@ -19,17 +22,17 @@
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Array)
expect(handlers[target]).to be_kind_of(Hash)
expect(handlers[target].count).to eq 1
end

it 'associates the event name with the target' do
event = handlers[target].first
event = handlers[target][1]
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target].first
event = handlers[target][1]
expect(event.handler).to eq(block)
end
end
Expand All @@ -43,20 +46,44 @@
end

it 'stores the target and handler once' do
expect(handlers[target]).to be_kind_of(Array)
expect(handlers[target]).to be_kind_of(Hash)
expect(handlers[target].count).to eq 1
end

it 'associates the event name and handler name with the target' do
event = handlers[target].first
event = handlers[target][1]
expect(event.event_name).to eq(event_name)
end

it 'associates the handler with the target' do
event = handlers[target].first
event = handlers[target][1]
expect(event.handler).to eq(block)
end
end

it 'removes a given handler against the target' do
block1 = -> { 'handler body' }
block2 = -> { 'other handler body' }
block3 = -> { 'yet another handler body' }

handle1 = subject.register_handler(target, 'signaled', &block1)
subject.register_handler(target, 'signaled', &block2)
subject.register_handler(other_target, 'signaled', &block3)

expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike asserting behavior against private structures like this. When a method is declared private, the "private" aspect is about code organization. Private methods should be able to get refactored, renamed, changed, etc without breaking unit tests at all.

If this is the only way to test it then the code structure is a code smell and needs some love. IMHO.

Perhaps write unit tests against the private DispatchHandler class. Then you can assert that the DispatchHandler instances are created with the right args.

As for testing the ordering, that's a stickier problem that I'd have to think about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd originally written tests only against the public interface, but other tests were added in #157 that further inspected the private contents. That was merged between when I first started writing this and began upstreaming this PR. So I decided to stick with that convention, and update the tests. Happy to rework this if everyone agrees we should only test the public interface here. This would make the tests less brittle.

The ordering tests are critical, as that's the crux of one of the bugs where the order was not deterministic. I agree the contents of the dispatcher's internal state don't need to be in a particular order, as it's an implementation detail. All that needs to be guaranteed is dispatch order follows registration order. There is a test for this farther down the file already. Do you think that's sufficient? Or are there more cases I could cover there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, yes, and it's particularly annoying to me that I did it in my own PR. :)

Now I'm inspired to go back and fix that because I can't allow myself to be a hypocrite.

As for ordering, I think the later tests cover it sufficiently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to leave this in order to get the PR merged, but we can refactor this (in a separate PR) and put the expectations inside the handler blocks, so instead of checking the internal state we just make sure that the right ones are firing when an even is dispatched (fully conforming to a public interface of a Dispatcher)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for now since at least it has good coverage

expect(subject.send(:handlers)[target][1].handler).to be(block1)

expect(subject.send(:handlers)[target][2].event_name).to eq('signaled')
expect(subject.send(:handlers)[target][2].handler).to be(block2)

expect(subject.send(:handlers)[other_target][3].event_name).to eq('signaled')
expect(subject.send(:handlers)[other_target][3].handler).to be(block3)

handle1.unregister
expect(subject.send(:handlers)[target][1]).to be(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might wanna check that the others are still there. If you empty the hash completely it won't fail this spec

expect(subject.send(:handlers)[target][2]).to_not be(nil)
expect(subject.send(:handlers)[other_target][3]).to_not be(nil)
end
end

describe '#dispatch' do
Expand Down Expand Up @@ -114,10 +141,13 @@

context 'with TARGET_WILDCARD target handler' do
let(:handler_6) { -> { 'sixth block' } }
let(:handler_7) { -> { 'seventh block' } }
before do
allow(handler_6).to receive(:call)
allow(handler_7).to receive(:call)

subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
subject.register_handler(target, 'completed', &handler_7)
end

it 'calls the handler' do
Expand All @@ -127,6 +157,7 @@
expect(handler_1).to have_received(:call).ordered
expect(handler_4).to have_received(:call).ordered
expect(handler_6).to have_received(:call).ordered
expect(handler_7).to have_received(:call).ordered
end

it 'TARGET_WILDCARD can be compared to an EventTarget object' do
Expand Down