Skip to content

Conversation

@jeffschoner-stripe
Copy link
Contributor

This contains critical follow ups to #111.

Deterministically order wildcard dispatch handlers

Dispatch handlers for wait_until are now called in a deterministic order that does not change as a workflow progresses, preventing non-determinism errors from arising where user code is indeed deterministic.

When wait_until is called, a dispatch handler is added that resumes the fiber once the condition has been satisfied. Because this condition could change due to any workflow progress, it must be evaluated on every dispatch. Before this change, all wait_until handlers were always evaluated after any targeted dispatch callbacks (such as from a specific activity or timer). This can cause non-determinism in certain corner cases. These callbacks are now always called in the same order. This is achieved by associating an autoincrementing, unique ID with each handler. When the list of handlers is merged and filtered, it is now sorted by these IDs, guaranteeing order even as the workflow progresses.

Remove dispatch handlers once they're no longer needed

In activities with long histories, the number of dispatch handlers can get large. Particularly for workflows that call wait_until many times (such as in a loop), the number can get very large, and they must be invoked on every dispatch. This can cause performance problems that result in workflow task timeout.

This change removes dispatch handlers once no longer needed. The unique autoincrementing ID mentioned from the above fix is used to remove handlers once they are no longer needed, dramatically improving performance for workflows with long histories. A DispatchHandler type is introduced to encapsulate this removal-by-id behavior.

def register_handler(target, event_name, &handler)
handlers[target] << EventStruct.new(event_name, handler)
self
@next_id += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't thread-safe on non-CRuby implementations. Do we need to worry about handler registration being called from multiple threads @antstorm ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would require using multi-threading in workflow code, right? That doesn't seem idiomatic, but I don't know if it's possible. It also seems like this wouldn't work with other parts of the library either since synchronization is not widely used. Would the current version that does array appending even be expected to work correctly in this case? Or is that operation guaranteed to be atomic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. Hoping Anthony responds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the workflow code is definitely expected to be all single-threaded, so I wouldn't worry too much about it. Besides the way we write to handlers is already non thread-safe, so this doesn't add any additional issues

.concat(handlers[TARGET_WILDCARD])
.select { |event_struct| match?(event_struct, event_name) }
.map(&:handler)
.merge(handlers[TARGET_WILDCARD]) { raise 'Cannot resolve duplicate dispatcher handler IDs' }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend creating a real Error here instead of raising StandardError. This doesn't play well with Sentry and other error management tools when errors are non-unique.

e.g.

DuplicateHandlerIdError = Class.new(StandardError)
...
.merge(handlers[TARGET_WILDCARD]) { raise DuplicateHandlerIdError.new("Cannot resolve duplicate dispatcher handler IDs") }

.map(&:handler)
.merge(handlers[TARGET_WILDCARD]) { raise 'Cannot resolve duplicate dispatcher handler IDs' }
.select { |_, event_struct| match?(event_struct, event_name) }
.sort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of surprised that this does the right thing since handlers[TARGET_WILDCARD] merged in earlier isn't a DispatchHandler with an associated sequence ID to sort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlers[target] and handlers[TARGET_WILDCARD] both produce a hash of number -> EventStruct. Because the IDs all should be unique, they merge seamlessly unless there is some sort of bug occurring to produce duplicate IDs. DispatchHandler is only returned out of the function to be used to unregister the handler, and not stored inside the dispatcher at all.

The naming here is confusing because there's a handler block argument but also a DispatchHandler object, and they're not actually related. Maybe I should rename DispatchHandler to something like RegistrationHandle? I could also unify them but there's no real functional purpose for that. I'll try to rework this a bit to at least be easier to follow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Yes, that confused me for sure. Renaming would help.

subject.register_handler(target, 'signaled', &block2)
subject.register_handler(other_target, 'signaled', &block3)

expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike asserting behavior against private structures like this. When a method is declared private, the "private" aspect is about code organization. Private methods should be able to get refactored, renamed, changed, etc without breaking unit tests at all.

If this is the only way to test it then the code structure is a code smell and needs some love. IMHO.

Perhaps write unit tests against the private DispatchHandler class. Then you can assert that the DispatchHandler instances are created with the right args.

As for testing the ordering, that's a stickier problem that I'd have to think about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd originally written tests only against the public interface, but other tests were added in #157 that further inspected the private contents. That was merged between when I first started writing this and began upstreaming this PR. So I decided to stick with that convention, and update the tests. Happy to rework this if everyone agrees we should only test the public interface here. This would make the tests less brittle.

The ordering tests are critical, as that's the crux of one of the bugs where the order was not deterministic. I agree the contents of the dispatcher's internal state don't need to be in a particular order, as it's an implementation detail. All that needs to be guaranteed is dispatch order follows registration order. There is a test for this farther down the file already. Do you think that's sufficient? Or are there more cases I could cover there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, yes, and it's particularly annoying to me that I did it in my own PR. :)

Now I'm inspired to go back and fix that because I can't allow myself to be a hypocrite.

As for ordering, I think the later tests cover it sufficiently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to leave this in order to get the PR merged, but we can refactor this (in a separate PR) and put the expectations inside the handler blocks, so instead of checking the internal state we just make sure that the right ones are firing when an even is dispatched (fully conforming to a public interface of a Dispatcher)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for now since at least it has good coverage

Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few nits, but overall looks almost ready. Great PR and a very useful change, thank you @jeffschoner-stripe 🙌

return if unblock_condition.call

fiber = Fiber.current
blocked = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this flag if we are now unregistering event handlers upon resuming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. At first, I wasn't sure if there were callback within callback cases where this wouldn't work. But it should behave deterministically. As soon as a handler block is invoked for the first time, its fiber will be resumed, and unregistration will be completed before any other handler blocks are invoked.

def register_handler(target, event_name, &handler)
handlers[target] << EventStruct.new(event_name, handler)
self
@next_id += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the workflow code is definitely expected to be all single-threaded, so I wouldn't worry too much about it. Besides the way we write to handlers is already non thread-safe, so this doesn't add any additional issues

subject.register_handler(target, 'signaled', &block2)
subject.register_handler(other_target, 'signaled', &block3)

expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to leave this in order to get the PR merged, but we can refactor this (in a separate PR) and put the expectations inside the handler blocks, so instead of checking the internal state we just make sure that the right ones are firing when an even is dispatched (fully conforming to a public interface of a Dispatcher)

expect(subject.send(:handlers)[other_target][3].handler).to be(block3)

handle1.unregister
expect(subject.send(:handlers)[target][1]).to be(nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might wanna check that the others are still there. If you empty the hash completely it won't fail this spec

end
end
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make sure you editor puts the newlines at the end of saved files

#
class Dispatcher
class DispatchHandler
def initialize(handlers_for_target, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be handy to add a comment saying that handlers are passed by reference and are mutated here

@jeffschoner-stripe
Copy link
Contributor Author

@antstorm @chuckremes2 Please take another look

Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks again! :shipit:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

4 participants