diff --git a/.gitignore b/.gitignore index 3eb77f09..a0911055 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ _yardoc/ # rspec failure tracking .rspec_status + +.idea/* diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 2970479a..a829b549 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -5,6 +5,7 @@ require 'temporal/workflow' require 'temporal/workflow/history' require 'temporal/workflow/execution_info' +require 'temporal/reset_strategy' module Temporal class Client @@ -134,9 +135,16 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam end end - def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason: 'manual reset') - workflow_task_id ||= get_last_completed_workflow_task_id(namespace, workflow_id, run_id) - raise Error, 'Could not find a completed workflow task event' unless workflow_task_id + def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset') + # Pick default strategy for backwards-compatibility + strategy ||= :last_workflow_task unless workflow_task_id + + if strategy && workflow_task_id + raise ArgumentError, 'Please specify either :strategy or :workflow_task_id' + end + + workflow_task_id ||= find_workflow_task(namespace, workflow_id, run_id, strategy)&.id + raise Error, 'Could not find an event to reset to' unless workflow_task_id response = connection.reset_workflow_execution( namespace: namespace, @@ -195,6 +203,16 @@ def fail_activity(async_token, exception) ) end + def get_workflow_history(namespace:, workflow_id:, run_id:) + history_response = connection.get_workflow_execution_history( + namespace: namespace, + workflow_id: workflow_id, + run_id: run_id + ) + + Workflow::History.new(history_response.history.events) + end + class ResultConverter extend Concerns::Payloads end @@ -208,15 +226,31 @@ def connection @connection ||= Temporal::Connection.generate(config.for_connection) end - def get_last_completed_workflow_task_id(namespace, workflow_id, run_id) - history_response = connection.get_workflow_execution_history( + def find_workflow_task(namespace, workflow_id, run_id, strategy) + history = get_workflow_history( namespace: namespace, workflow_id: workflow_id, run_id: run_id ) - history = Workflow::History.new(history_response.history.events) - workflow_task_event = history.get_last_completed_workflow_task - workflow_task_event&.id + + # TODO: Move this into a separate class if it keeps growing + case strategy + when ResetStrategy::LAST_WORKFLOW_TASK + events = %[WORKFLOW_TASK_COMPLETED WORKFLOW_TASK_TIMED_OUT WORKFLOW_TASK_FAILED].freeze + history.events.select { |event| events.include?(event.type) }.last + when ResetStrategy::FIRST_WORKFLOW_TASK + events = %[WORKFLOW_TASK_COMPLETED WORKFLOW_TASK_TIMED_OUT WORKFLOW_TASK_FAILED].freeze + history.events.select { |event| events.include?(event.type) }.first + when ResetStrategy::LAST_FAILED_ACTIVITY + events = %[ACTIVITY_TASK_FAILED ACTIVITY_TASK_TIMED_OUT].freeze + failed_event = history.events.select { |event| events.include?(event.type) }.last + return unless failed_event + + scheduled_event = history.find_event_by_id(failed_event.attributes.scheduled_event_id) + history.find_event_by_id(scheduled_event.attributes.workflow_task_completed_event_id) + else + raise ArgumentError, 'Unsupported reset strategy' + end end end end diff --git a/lib/temporal/reset_strategy.rb b/lib/temporal/reset_strategy.rb new file mode 100644 index 00000000..615f20a9 --- /dev/null +++ b/lib/temporal/reset_strategy.rb @@ -0,0 +1,7 @@ +module Temporal + module ResetStrategy + LAST_WORKFLOW_TASK = :last_workflow_task + FIRST_WORKFLOW_TASK = :first_workflow_task + LAST_FAILED_ACTIVITY = :last_failed_activity + end +end diff --git a/lib/temporal/workflow/history.rb b/lib/temporal/workflow/history.rb index 8a26954a..ad188a77 100644 --- a/lib/temporal/workflow/history.rb +++ b/lib/temporal/workflow/history.rb @@ -11,8 +11,8 @@ def initialize(events) @iterator = @events.each end - def last_completed_workflow_task - events.select { |event| event.type == 'WORKFLOW_TASK_COMPLETED' }.last + def find_event_by_id(id) + events.find { |event| event.id == id } end # It is very important to replay the History window by window in order to diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index ad5bae6d..0d7e9e48 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -34,10 +34,10 @@ end end -Fabricator(:api_decision_task_scheduled_event, from: :api_history_event) do - event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_SCHEDULED } - decision_task_scheduled_event_attributes do |attrs| - Temporal::Api::History::V1::DecisionTaskScheduledEventAttributes.new( +Fabricator(:api_workflow_task_scheduled_event, from: :api_history_event) do + event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_SCHEDULED } + workflow_task_scheduled_event_attributes do |attrs| + Temporal::Api::History::V1::WorkflowTaskScheduledEventAttributes.new( task_queue: Fabricate(:api_task_queue), start_to_close_timeout: 15, attempt: 0 @@ -45,10 +45,10 @@ end end -Fabricator(:api_decision_task_started_event, from: :api_history_event) do - event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_STARTED } - decision_task_started_event_attributes do |attrs| - Temporal::Api::History::V1::DecisionTaskStartedEventAttributes.new( +Fabricator(:api_workflow_task_started_event, from: :api_history_event) do + event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_STARTED } + workflow_task_started_event_attributes do |attrs| + Temporal::Api::History::V1::WorkflowTaskStartedEventAttributes.new( scheduled_event_id: attrs[:event_id] - 1, identity: 'test-worker@test-host', request_id: SecureRandom.uuid @@ -56,10 +56,10 @@ end end -Fabricator(:api_decision_task_completed_event, from: :api_history_event) do - event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_COMPLETED } - decision_task_completed_event_attributes do |attrs| - Temporal::Api::History::V1::DecisionTaskCompletedEventAttributes.new( +Fabricator(:api_workflow_task_completed_event, from: :api_history_event) do + event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_COMPLETED } + workflow_task_completed_event_attributes do |attrs| + Temporal::Api::History::V1::WorkflowTaskCompletedEventAttributes.new( scheduled_event_id: attrs[:event_id] - 2, started_event_id: attrs[:event_id] - 1, identity: 'test-worker@test-host' @@ -71,10 +71,10 @@ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_SCHEDULED } activity_task_scheduled_event_attributes do |attrs| Temporal::Api::History::V1::ActivityTaskScheduledEventAttributes.new( - activity_id: attrs[:event_id], - activity_type: Temporal::Api::History::V1::ActivityType.new(name: 'TestActivity'), + activity_id: attrs[:event_id].to_s, + activity_type: Temporal::Api::Common::V1::ActivityType.new(name: 'TestActivity'), workflow_task_completed_event_id: attrs[:event_id] - 1, - domain: 'test-domain', + namespace: 'test-namespace', task_queue: Fabricate(:api_task_queue) ) end @@ -107,8 +107,7 @@ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_FAILED } activity_task_failed_event_attributes do |attrs| Temporal::Api::History::V1::ActivityTaskFailedEventAttributes.new( - reason: 'StandardError', - details: 'Activity failed', + failure: Temporal::Api::Failure::V1::Failure.new(message: "Activity failed"), scheduled_event_id: attrs[:event_id] - 2, started_event_id: attrs[:event_id] - 1, identity: 'test-worker@test-host' diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index e2140ca6..cc31b89b 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -1,6 +1,8 @@ +require 'securerandom' require 'temporal/client' require 'temporal/configuration' require 'temporal/workflow' +require 'temporal/workflow/history' require 'temporal/connection/grpc' describe Temporal::Client do @@ -8,6 +10,9 @@ let(:config) { Temporal::Configuration.new } let(:connection) { instance_double(Temporal::Connection::GRPC) } + let(:namespace) { 'default-test-namespace' } + let(:workflow_id) { SecureRandom.uuid } + let(:run_id) { SecureRandom.uuid } class TestStartWorkflow < Temporal::Workflow namespace 'default-test-namespace' @@ -20,7 +25,12 @@ class TestStartWorkflow < Temporal::Workflow .with(config.for_connection) .and_return(connection) end - after { subject.remove_instance_variable(:@connection) } + + after do + if subject.instance_variable_get(:@connection) + subject.remove_instance_variable(:@connection) + end + end describe '#start_workflow' do let(:temporal_response) do @@ -368,9 +378,38 @@ class NamespacedWorkflow < Temporal::Workflow let(:temporal_response) do Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionResponse.new(run_id: 'xxx') end + let(:history) do + Temporal::Workflow::History.new([ + Fabricate(:api_workflow_execution_started_event, event_id: 1), + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4), + Fabricate(:api_activity_task_scheduled_event, event_id: 5), + Fabricate(:api_activity_task_started_event, event_id: 6), + Fabricate(:api_activity_task_completed_event, event_id: 7), + Fabricate(:api_workflow_task_scheduled_event, event_id: 8), + Fabricate(:api_workflow_task_started_event, event_id: 9), + Fabricate(:api_workflow_task_completed_event, event_id: 10), + Fabricate(:api_activity_task_scheduled_event, event_id: 11), + Fabricate(:api_activity_task_started_event, event_id: 12), + Fabricate(:api_activity_task_failed_event, event_id: 13), + Fabricate(:api_workflow_task_scheduled_event, event_id: 14), + Fabricate(:api_workflow_task_started_event, event_id: 15), + Fabricate(:api_workflow_task_completed_event, event_id: 16), + Fabricate(:api_workflow_execution_completed_event, event_id: 17) + ]) + end before { allow(connection).to receive(:reset_workflow_execution).and_return(temporal_response) } + before do + allow(connection).to receive(:reset_workflow_execution).and_return(temporal_response) + allow(subject) + .to receive(:get_workflow_history) + .with(namespace: namespace, workflow_id: workflow_id, run_id: run_id) + .and_return(history) + end + context 'when workflow_task_id is provided' do let(:workflow_task_id) { 42 } @@ -403,6 +442,87 @@ class NamespacedWorkflow < Temporal::Workflow expect(result).to eq('xxx') end end + + context 'when neither strategy nor workflow_task_id is provided' do + it 'uses default strategy' do + subject.reset_workflow(namespace, workflow_id, run_id) + + expect(connection).to have_received(:reset_workflow_execution).with( + namespace: namespace, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + workflow_task_event_id: 16 + ) + end + end + + context 'when both strategy and workflow_task_id are provided' do + it 'uses default strategy' do + expect do + subject.reset_workflow( + namespace, + workflow_id, + run_id, + strategy: :last_workflow_task, + workflow_task_id: 10 + ) + end.to raise_error(ArgumentError, 'Please specify either :strategy or :workflow_task_id') + end + end + + context 'with a specified strategy' do + context ':last_workflow_task' do + it 'resets workflow' do + subject.reset_workflow(namespace, workflow_id, run_id, strategy: :last_workflow_task) + + expect(connection).to have_received(:reset_workflow_execution).with( + namespace: namespace, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + workflow_task_event_id: 16 + ) + end + end + + context ':first_workflow_task' do + it 'resets workflow' do + subject.reset_workflow(namespace, workflow_id, run_id, strategy: :first_workflow_task) + + expect(connection).to have_received(:reset_workflow_execution).with( + namespace: namespace, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + workflow_task_event_id: 4 + ) + end + end + + + context ':last_failed_activity' do + it 'resets workflow' do + subject.reset_workflow(namespace, workflow_id, run_id, strategy: :last_failed_activity) + + expect(connection).to have_received(:reset_workflow_execution).with( + namespace: namespace, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + workflow_task_event_id: 10 + ) + end + end + + context 'unsupported strategy' do + it 'resets workflow' do + expect do + subject.reset_workflow(namespace, workflow_id, run_id, strategy: :foobar) + end.to raise_error(ArgumentError, 'Unsupported reset strategy') + end + end + end end describe '#terminate_workflow' do