From 79f67f59adfd59b20ac4d96b8ba25842e93a09e5 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sun, 19 Nov 2023 14:29:22 -0800 Subject: [PATCH 1/3] Remove cancelation commands when underlying futures are closed --- .../integration/activity_cancellation_spec.rb | 15 ++++ .../workflow/command_state_machine.rb | 8 ++ lib/temporal/workflow/executor.rb | 2 +- lib/temporal/workflow/state_manager.rb | 22 ++++- .../temporal/workflow/state_manager_spec.rb | 83 +++++++++++++++++++ 5 files changed, 127 insertions(+), 3 deletions(-) diff --git a/examples/spec/integration/activity_cancellation_spec.rb b/examples/spec/integration/activity_cancellation_spec.rb index ca39d639..2cb0ea1f 100644 --- a/examples/spec/integration/activity_cancellation_spec.rb +++ b/examples/spec/integration/activity_cancellation_spec.rb @@ -33,4 +33,19 @@ expect(result).to be_a(Temporal::ActivityCanceled) expect(result.message).to eq('ACTIVITY_ID_NOT_STARTED') end + + it 'cancels a running activity around the time it completes' do + workflow_id, run_id = run_workflow(LongWorkflow, 1, 0.5) + + sleep 0.4 + Temporal.signal_workflow(LongWorkflow, :CANCEL, workflow_id, run_id) + + result = Temporal.await_workflow_result( + LongWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result).to be_nil + end end diff --git a/lib/temporal/workflow/command_state_machine.rb b/lib/temporal/workflow/command_state_machine.rb index 74adcf16..69bb2528 100644 --- a/lib/temporal/workflow/command_state_machine.rb +++ b/lib/temporal/workflow/command_state_machine.rb @@ -48,6 +48,14 @@ def fail def time_out @state = TIMED_OUT_STATE end + + def closed? + @state == COMPLETED_STATE || + @state == CANCELED_STATE || + @state == FAILED_STATE || + @state == TIMED_OUT_STATE || + @state == TERMINATED_STATE + end end end end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 762ae250..5c8eaf9e 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -42,7 +42,7 @@ def run state_manager.apply(window) end - RunResult.new(commands: state_manager.commands, new_sdk_flags_used: state_manager.new_sdk_flags_used) + RunResult.new(commands: state_manager.final_commands, new_sdk_flags_used: state_manager.new_sdk_flags_used) end # Process queries using the pre-registered query handlers diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index e3809662..2cf82159 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -20,7 +20,7 @@ class StateManager class UnsupportedEvent < Temporal::InternalError; end class UnsupportedMarkerType < Temporal::InternalError; end - attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals + attr_reader :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals def initialize(dispatcher, config) @dispatcher = dispatcher @@ -87,6 +87,24 @@ def schedule(command) [event_target_from(command_id, command), cancelation_id] end + def final_commands + # Filter out any activity or timer cancellation commands if the underlying activity or + # timer has completed. This can occur when an activity or timer completes while a + # workflow task is being processed that would otherwise cancel this time or activity. + commands.filter do |command_pair| + case command_pair.last + when Command::CancelTimer + state_machine = command_tracker[command_pair.last.timer_id] + !state_machine.closed? + when Command::RequestActivityCancellation + state_machine = command_tracker[command_pair.last.activity_id] + !state_machine.closed? + else + true + end + end + end + def release?(release_name) track_release(release_name) unless releases.key?(release_name) @@ -149,7 +167,7 @@ def history_size private - attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config + attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config def use_signals_first(raw_events) # The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 50aa74d3..cc3029ea 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -469,6 +469,89 @@ def test_order_one_task(*expected_sdk_flags) end end + describe "#final_commands" do + let(:dispatcher) { Temporal::Workflow::Dispatcher.new } + let(:state_manager) do + Temporal::Workflow::StateManager.new(dispatcher, config) + end + + let(:config) { Temporal::Configuration.new } + + it "preserves canceled activity or timer commands when not completed" do + schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new + state_manager.schedule(schedule_activity_command) + + start_timer_command = Temporal::Workflow::Command::StartTimer.new + state_manager.schedule(start_timer_command) + + cancel_activity_command = Temporal::Workflow::Command::RequestActivityCancellation.new( + activity_id: schedule_activity_command.activity_id + ) + state_manager.schedule(cancel_activity_command) + + cancel_timer_command = Temporal::Workflow::Command::RequestActivityCancellation.new( + activity_id: start_timer_command.timer_id + ) + state_manager.schedule(cancel_timer_command) + + expect(state_manager.final_commands).to( + eq( + [ + [1, schedule_activity_command], + [2, start_timer_command], + [3, cancel_activity_command], + [4, cancel_timer_command] + ] + ) + ) + end + + it "drop cancel activity command when completed" do + schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new + state_manager.schedule(schedule_activity_command) + + cancel_command = Temporal::Workflow::Command::RequestActivityCancellation.new( + activity_id: schedule_activity_command.activity_id + ) + state_manager.schedule(cancel_command) + + # Fake completing the activity + window = Temporal::Workflow::History::Window.new + # The fake assumes an activity event completed two events ago, so fix the event id to +2 + window.add( + Temporal::Workflow::History::Event.new( + Fabricate(:api_activity_task_completed_event, event_id: schedule_activity_command.activity_id + 2) + ) + ) + state_manager.apply(window) + + expect(state_manager.final_commands).to(eq([[1, schedule_activity_command]])) + end + + it "drop cancel timer command when completed" do + start_timer_command = Temporal::Workflow::Command::StartTimer.new + state_manager.schedule(start_timer_command) + + cancel_command = Temporal::Workflow::Command::CancelTimer.new( + timer_id: start_timer_command.timer_id + ) + state_manager.schedule(cancel_command) + + # Fake completing the timer + window = Temporal::Workflow::History::Window.new + # The fake assumes an activity event completed four events ago, so fix the event id to +4 + window.add( + Temporal::Workflow::History::Event.new( + Fabricate(:api_timer_fired_event, event_id: start_timer_command.timer_id + 4) + ) + ) + state_manager.apply(window) + + expect(state_manager.final_commands).to(eq([[1, start_timer_command]])) + end + end + + describe '#search_attributes' do let(:initial_search_attributes) do { From 48fefb9742b0b38c6a8edf52da5fc45bce92b0da Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Thu, 7 Dec 2023 09:24:54 -0800 Subject: [PATCH 2/3] Fix spec for timer command preservation --- spec/unit/lib/temporal/workflow/state_manager_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index cc3029ea..bad27d22 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -489,8 +489,8 @@ def test_order_one_task(*expected_sdk_flags) ) state_manager.schedule(cancel_activity_command) - cancel_timer_command = Temporal::Workflow::Command::RequestActivityCancellation.new( - activity_id: start_timer_command.timer_id + cancel_timer_command = Temporal::Workflow::Command::CancelTimer.new( + timer_id: start_timer_command.timer_id ) state_manager.schedule(cancel_timer_command) From 19a0b64132d7dbdcfc7a5b93cee33b6872b5d73f Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Thu, 7 Dec 2023 09:30:38 -0800 Subject: [PATCH 3/3] Remove potentially flaky example spec --- .../integration/activity_cancellation_spec.rb | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/examples/spec/integration/activity_cancellation_spec.rb b/examples/spec/integration/activity_cancellation_spec.rb index 2cb0ea1f..ca39d639 100644 --- a/examples/spec/integration/activity_cancellation_spec.rb +++ b/examples/spec/integration/activity_cancellation_spec.rb @@ -33,19 +33,4 @@ expect(result).to be_a(Temporal::ActivityCanceled) expect(result.message).to eq('ACTIVITY_ID_NOT_STARTED') end - - it 'cancels a running activity around the time it completes' do - workflow_id, run_id = run_workflow(LongWorkflow, 1, 0.5) - - sleep 0.4 - Temporal.signal_workflow(LongWorkflow, :CANCEL, workflow_id, run_id) - - result = Temporal.await_workflow_result( - LongWorkflow, - workflow_id: workflow_id, - run_id: run_id, - ) - - expect(result).to be_nil - end end