diff --git a/lib/temporal/workflow/command_state_machine.rb b/lib/temporal/workflow/command_state_machine.rb index 74adcf16..69bb2528 100644 --- a/lib/temporal/workflow/command_state_machine.rb +++ b/lib/temporal/workflow/command_state_machine.rb @@ -48,6 +48,14 @@ def fail def time_out @state = TIMED_OUT_STATE end + + def closed? + @state == COMPLETED_STATE || + @state == CANCELED_STATE || + @state == FAILED_STATE || + @state == TIMED_OUT_STATE || + @state == TERMINATED_STATE + end end end end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 762ae250..5c8eaf9e 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -42,7 +42,7 @@ def run state_manager.apply(window) end - RunResult.new(commands: state_manager.commands, new_sdk_flags_used: state_manager.new_sdk_flags_used) + RunResult.new(commands: state_manager.final_commands, new_sdk_flags_used: state_manager.new_sdk_flags_used) end # Process queries using the pre-registered query handlers diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index e3809662..2cf82159 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -20,7 +20,7 @@ class StateManager class UnsupportedEvent < Temporal::InternalError; end class UnsupportedMarkerType < Temporal::InternalError; end - attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals + attr_reader :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals def initialize(dispatcher, config) @dispatcher = dispatcher @@ -87,6 +87,24 @@ def schedule(command) [event_target_from(command_id, command), cancelation_id] end + def final_commands + # Filter out any activity or timer cancellation commands if the underlying activity or + # timer has completed. This can occur when an activity or timer completes while a + # workflow task is being processed that would otherwise cancel this time or activity. + commands.filter do |command_pair| + case command_pair.last + when Command::CancelTimer + state_machine = command_tracker[command_pair.last.timer_id] + !state_machine.closed? + when Command::RequestActivityCancellation + state_machine = command_tracker[command_pair.last.activity_id] + !state_machine.closed? + else + true + end + end + end + def release?(release_name) track_release(release_name) unless releases.key?(release_name) @@ -149,7 +167,7 @@ def history_size private - attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config + attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config def use_signals_first(raw_events) # The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 50aa74d3..bad27d22 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -469,6 +469,89 @@ def test_order_one_task(*expected_sdk_flags) end end + describe "#final_commands" do + let(:dispatcher) { Temporal::Workflow::Dispatcher.new } + let(:state_manager) do + Temporal::Workflow::StateManager.new(dispatcher, config) + end + + let(:config) { Temporal::Configuration.new } + + it "preserves canceled activity or timer commands when not completed" do + schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new + state_manager.schedule(schedule_activity_command) + + start_timer_command = Temporal::Workflow::Command::StartTimer.new + state_manager.schedule(start_timer_command) + + cancel_activity_command = Temporal::Workflow::Command::RequestActivityCancellation.new( + activity_id: schedule_activity_command.activity_id + ) + state_manager.schedule(cancel_activity_command) + + cancel_timer_command = Temporal::Workflow::Command::CancelTimer.new( + timer_id: start_timer_command.timer_id + ) + state_manager.schedule(cancel_timer_command) + + expect(state_manager.final_commands).to( + eq( + [ + [1, schedule_activity_command], + [2, start_timer_command], + [3, cancel_activity_command], + [4, cancel_timer_command] + ] + ) + ) + end + + it "drop cancel activity command when completed" do + schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new + state_manager.schedule(schedule_activity_command) + + cancel_command = Temporal::Workflow::Command::RequestActivityCancellation.new( + activity_id: schedule_activity_command.activity_id + ) + state_manager.schedule(cancel_command) + + # Fake completing the activity + window = Temporal::Workflow::History::Window.new + # The fake assumes an activity event completed two events ago, so fix the event id to +2 + window.add( + Temporal::Workflow::History::Event.new( + Fabricate(:api_activity_task_completed_event, event_id: schedule_activity_command.activity_id + 2) + ) + ) + state_manager.apply(window) + + expect(state_manager.final_commands).to(eq([[1, schedule_activity_command]])) + end + + it "drop cancel timer command when completed" do + start_timer_command = Temporal::Workflow::Command::StartTimer.new + state_manager.schedule(start_timer_command) + + cancel_command = Temporal::Workflow::Command::CancelTimer.new( + timer_id: start_timer_command.timer_id + ) + state_manager.schedule(cancel_command) + + # Fake completing the timer + window = Temporal::Workflow::History::Window.new + # The fake assumes an activity event completed four events ago, so fix the event id to +4 + window.add( + Temporal::Workflow::History::Event.new( + Fabricate(:api_timer_fired_event, event_id: start_timer_command.timer_id + 4) + ) + ) + state_manager.apply(window) + + expect(state_manager.final_commands).to(eq([[1, start_timer_command]])) + end + end + + describe '#search_attributes' do let(:initial_search_attributes) do {