Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate commands issued after workflow completion #93

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ worker.register_workflow(CancellingTimerWorkflow)
worker.register_workflow(CheckWorkflow)
worker.register_workflow(FailingWorkflow)
worker.register_workflow(HelloWorldWorkflow)
worker.register_workflow(InvalidCancelWorkflow)
worker.register_workflow(LocalHelloWorldWorkflow)
worker.register_workflow(LongWorkflow)
worker.register_workflow(ParentWorkflow)
Expand Down
17 changes: 17 additions & 0 deletions examples/workflows/invalid_cancel_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require 'activities/hello_world_activity'

# If you run this, you'll get a WorkflowAlreadyCompletingError because after the
# cancel, we try to do something else.
class InvalidCancelWorkflow < Cadence::Workflow
timeouts execution: 20

def execute
future = HelloWorldActivity.execute('Alice')
workflow.sleep(1)
workflow.cancel
# Doing anything after cancel (or any workflow completion) is illegal
future.done do
HelloWorldActivity.execute('Bob')
end
end
end
7 changes: 7 additions & 0 deletions lib/cadence/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,11 @@ class TimeoutError < ClientError; end
# A superclass for activity exceptions raised explicitly
# with the itent to propagate to a workflow
class ActivityException < ClientError; end

# Once the workflow succeeds, fails, or continues as new, you can't issue any other commands such as
# scheduling an activity. This error is thrown if you try, before we report completion back to the server.
# This could happen due to activity futures that aren't awaited before the workflow closes,
# calling workflow.continue_as_new, workflow.complete, or workflow.fail in the middle of your workflow code,
# or an internal framework bug.
class WorkflowAlreadyCompletingError < InternalError; end
end
20 changes: 20 additions & 0 deletions lib/cadence/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def schedule(decision)
state_machine = decision_tracker[decision_id]
state_machine.requested if state_machine.state == DecisionStateMachine::NEW_STATE

validate_append_decision(decision)
decisions << [decision_id, decision]

[History::EventTarget.from_decision(decision_id, decision), cancelation_id]
Expand Down Expand Up @@ -93,6 +94,25 @@ def next_event_id
@last_event_id += 1
end

def validate_append_decision(decision)
return if decisions.last.nil?
_, previous_command = decisions.last
case previous_command
when Decision::CompleteWorkflow, Decision::FailWorkflow
context_string = case previous_command
when Decision::CompleteWorkflow
"The workflow completed"
when Decision::FailWorkflow
"The workflow failed"
end
raise Cadence::WorkflowAlreadyCompletingError.new(
"You cannot do anything in a Workflow after it completes. #{context_string}, "\
"but then it sent a new decision: #{decision.class}. This can happen, for example, if you've "\
"not waited for all of your Activity futures before finishing the Workflow."
)
end
end

def apply_event(event)
state_machine = decision_tracker[event.decision_id]
target = History::EventTarget.from_event(event)
Expand Down
24 changes: 24 additions & 0 deletions spec/unit/lib/cadence/workflow/state_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,30 @@

expect(subject.decisions.length).to eq(1)
end

# These are all "terminal" decisions
[
Cadence::Workflow::Decision::FailWorkflow.new(
reason: 'dummy',
),
Cadence::Workflow::Decision::CompleteWorkflow.new(
result: 5,
),
].each do |terminal_command|
it "fails to validate if #{terminal_command.class} is not the last command scheduled" do
state_manager = described_class.new(Cadence::Workflow::Dispatcher.new)

next_command = Cadence::Workflow::Decision::RecordMarker.new(
name: Cadence::Workflow::StateManager::RELEASE_MARKER,
details: 'dummy',
)

state_manager.schedule(terminal_command)
expect do
state_manager.schedule(next_command)
end.to raise_error(Cadence::WorkflowAlreadyCompletingError)
end
end
end

describe '.apply' do
Expand Down
Loading