diff --git a/CHANGELOG.md b/CHANGELOG.md index b6daa596..372d0113 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.5 +- Implement strategies for resetting workflows + ## 0.1.4 - Fix a bug which prevented retry_policy from being passed as explicit options - Make retry_policy options mergeable with the values in an Activity or a Workflow diff --git a/lib/cadence/client.rb b/lib/cadence/client.rb index f84a1638..5e3d9e4f 100644 --- a/lib/cadence/client.rb +++ b/lib/cadence/client.rb @@ -5,6 +5,7 @@ require 'cadence/workflow' require 'cadence/workflow/history' require 'cadence/workflow/execution_info' +require 'cadence/reset_strategy' module Cadence class Client @@ -73,9 +74,16 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) ) end - def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset') - decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id) - raise Error, 'Could not find a completed decision task event' unless decision_task_id + def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset') + # Pick default strategy for backwards-compatibility + strategy ||= :last_decision_task unless decision_task_id + + if strategy && decision_task_id + raise ArgumentError, 'Please specify either :strategy or :decision_task_id' + end + + decision_task_id ||= find_decision_task(domain, workflow_id, run_id, strategy)&.id + raise Error, 'Could not find an event to reset to' unless decision_task_id response = connection.reset_workflow_execution( domain: domain, @@ -150,14 +158,31 @@ def connection @connection ||= Cadence::Connection.generate(config.for_connection) end - def get_last_completed_decision_task(domain, workflow_id, run_id) + def find_decision_task(domain, workflow_id, run_id, strategy) history = get_workflow_history( domain: domain, workflow_id: workflow_id, run_id: run_id ) - history.last_completed_decision_task&.id + # TODO: Move this into a separate class if it keeps growing + case strategy + when ResetStrategy::LAST_DECISION_TASK + events = %[DecisionTaskCompleted DecisionTaskTimedOut DecisionTaskFailed].freeze + history.events.select { |event| events.include?(event.type) }.last + when ResetStrategy::FIRST_DECISION_TASK + events = %[DecisionTaskCompleted DecisionTaskTimedOut DecisionTaskFailed].freeze + history.events.select { |event| events.include?(event.type) }.first + when ResetStrategy::LAST_FAILED_ACTIVITY + events = %[ActivityTaskFailed ActivityTaskTimedOut].freeze + failed_event = history.events.select { |event| events.include?(event.type) }.last + return unless failed_event + + scheduled_event = history.find_event_by_id(failed_event.attributes.scheduledEventId) + history.find_event_by_id(scheduled_event.attributes.decisionTaskCompletedEventId) + else + raise ArgumentError, 'Unsupported reset strategy' + end end end end diff --git a/lib/cadence/reset_strategy.rb b/lib/cadence/reset_strategy.rb new file mode 100644 index 00000000..d3be2c63 --- /dev/null +++ b/lib/cadence/reset_strategy.rb @@ -0,0 +1,7 @@ +module Cadence + module ResetStrategy + LAST_DECISION_TASK = :last_decision_task + FIRST_DECISION_TASK = :first_decision_task + LAST_FAILED_ACTIVITY = :last_failed_activity + end +end diff --git a/lib/cadence/version.rb b/lib/cadence/version.rb index 35c794b1..ae6a3f02 100644 --- a/lib/cadence/version.rb +++ b/lib/cadence/version.rb @@ -1,3 +1,3 @@ module Cadence - VERSION = '0.1.4'.freeze + VERSION = '0.1.5'.freeze end diff --git a/lib/cadence/workflow/history.rb b/lib/cadence/workflow/history.rb index e44ff99c..2b3c1333 100644 --- a/lib/cadence/workflow/history.rb +++ b/lib/cadence/workflow/history.rb @@ -11,8 +11,8 @@ def initialize(events) @iterator = @events.each end - def last_completed_decision_task - events.select { |event| event.type == 'DecisionTaskCompleted' }.last + def find_event_by_id(id) + events.find { |event| event.id == id } end # It is very important to replay the History window by window in order to diff --git a/spec/fabricators/thrift/history_event_fabricator.rb b/spec/fabricators/thrift/history_event_fabricator.rb index e406e729..9579d509 100644 --- a/spec/fabricators/thrift/history_event_fabricator.rb +++ b/spec/fabricators/thrift/history_event_fabricator.rb @@ -68,3 +68,52 @@ ) end end + +Fabricator(:activity_task_scheduled_event_thrift, from: :history_event_thrift) do + eventType { CadenceThrift::EventType::ActivityTaskScheduled } + activityTaskScheduledEventAttributes do |attrs| + CadenceThrift::ActivityTaskScheduledEventAttributes.new( + activityId: attrs[:eventId], + activityType: CadenceThrift::ActivityType.new(name: 'TestActivity'), + decisionTaskCompletedEventId: attrs[:eventId] - 1, + domain: 'test-domain', + taskList: Fabricate(:task_list_thrift) + ) + end +end + +Fabricator(:activity_task_started_event_thrift, from: :history_event_thrift) do + eventType { CadenceThrift::EventType::ActivityTaskStarted } + activityTaskStartedEventAttributes do |attrs| + CadenceThrift::ActivityTaskStartedEventAttributes.new( + scheduledEventId: attrs[:eventId] - 1, + identity: 'test-worker@test-host', + requestId: SecureRandom.uuid + ) + end +end + +Fabricator(:activity_task_completed_event_thrift, from: :history_event_thrift) do + eventType { CadenceThrift::EventType::ActivityTaskCompleted } + activityTaskCompletedEventAttributes do |attrs| + CadenceThrift::ActivityTaskCompletedEventAttributes.new( + result: nil, + scheduledEventId: attrs[:eventId] - 2, + startedEventId: attrs[:eventId] - 1, + identity: 'test-worker@test-host' + ) + end +end + +Fabricator(:activity_task_failed_event_thrift, from: :history_event_thrift) do + eventType { CadenceThrift::EventType::ActivityTaskFailed } + activityTaskFailedEventAttributes do |attrs| + CadenceThrift::ActivityTaskFailedEventAttributes.new( + reason: 'StandardError', + details: 'Activity failed', + scheduledEventId: attrs[:eventId] - 2, + startedEventId: attrs[:eventId] - 1, + identity: 'test-worker@test-host' + ) + end +end diff --git a/spec/unit/lib/cadence/client_spec.rb b/spec/unit/lib/cadence/client_spec.rb index 73cd790c..7a292f07 100644 --- a/spec/unit/lib/cadence/client_spec.rb +++ b/spec/unit/lib/cadence/client_spec.rb @@ -1,6 +1,8 @@ +require 'securerandom' require 'cadence/client' require 'cadence/configuration' require 'cadence/workflow' +require 'cadence/workflow/history' require 'cadence/connection/thrift' describe Cadence::Client do @@ -8,6 +10,9 @@ let(:config) { Cadence::Configuration.new } let(:connection) { instance_double(Cadence::Connection::Thrift) } + let(:domain) { 'detault-test-domain' } + let(:workflow_id) { SecureRandom.uuid } + let(:run_id) { SecureRandom.uuid } before do allow(Cadence::Connection) @@ -15,7 +20,12 @@ .with(config.for_connection) .and_return(connection) end - after { subject.remove_instance_variable(:@connection) } + + after do + if subject.instance_variable_get(:@connection) + subject.remove_instance_variable(:@connection) + end + end describe '#start_workflow' do let(:cadence_response) do @@ -324,8 +334,35 @@ class TestStartWorkflow < Cadence::Workflow describe '#reset_workflow' do let(:cadence_response) { CadenceThrift::StartWorkflowExecutionResponse.new(runId: 'xxx') } + let(:history) do + Cadence::Workflow::History.new([ + Fabricate(:workflow_execution_started_event_thrift, eventId: 1), + Fabricate(:decision_task_scheduled_event_thrift, eventId: 2), + Fabricate(:decision_task_started_event_thrift, eventId: 3), + Fabricate(:decision_task_completed_event_thrift, eventId: 4), + Fabricate(:activity_task_scheduled_event_thrift, eventId: 5), + Fabricate(:activity_task_started_event_thrift, eventId: 6), + Fabricate(:activity_task_completed_event_thrift, eventId: 7), + Fabricate(:decision_task_scheduled_event_thrift, eventId: 8), + Fabricate(:decision_task_started_event_thrift, eventId: 9), + Fabricate(:decision_task_completed_event_thrift, eventId: 10), + Fabricate(:activity_task_scheduled_event_thrift, eventId: 11), + Fabricate(:activity_task_started_event_thrift, eventId: 12), + Fabricate(:activity_task_failed_event_thrift, eventId: 13), + Fabricate(:decision_task_scheduled_event_thrift, eventId: 14), + Fabricate(:decision_task_started_event_thrift, eventId: 15), + Fabricate(:decision_task_completed_event_thrift, eventId: 16), + Fabricate(:workflow_execution_completed_event_thrift, eventId: 17) + ]) + end - before { allow(connection).to receive(:reset_workflow_execution).and_return(cadence_response) } + before do + allow(connection).to receive(:reset_workflow_execution).and_return(cadence_response) + allow(subject) + .to receive(:get_workflow_history) + .with(domain: domain, workflow_id: workflow_id, run_id: run_id) + .and_return(history) + end context 'when decision_task_id is provided' do let(:decision_task_id) { 42 } @@ -359,6 +396,86 @@ class TestStartWorkflow < Cadence::Workflow expect(result).to eq('xxx') end end + + context 'when neither strategy nor decision_task_id is provided' do + it 'uses default strategy' do + subject.reset_workflow(domain, workflow_id, run_id) + + expect(connection).to have_received(:reset_workflow_execution).with( + domain: domain, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + decision_task_event_id: 16 + ) + end + end + + context 'when both strategy and decision_task_id are provided' do + it 'uses default strategy' do + expect do + subject.reset_workflow( + domain, + workflow_id, + run_id, + strategy: :last_decision_task, + decision_task_id: 10 + ) + end.to raise_error(ArgumentError, 'Please specify either :strategy or :decision_task_id') + end + end + + context 'with a specified strategy' do + context ':last_decision_task' do + it 'resets workflow' do + subject.reset_workflow(domain, workflow_id, run_id, strategy: :last_decision_task) + + expect(connection).to have_received(:reset_workflow_execution).with( + domain: domain, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + decision_task_event_id: 16 + ) + end + end + + context ':first_decision_task' do + it 'resets workflow' do + subject.reset_workflow(domain, workflow_id, run_id, strategy: :first_decision_task) + + expect(connection).to have_received(:reset_workflow_execution).with( + domain: domain, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + decision_task_event_id: 4 + ) + end + end + + context ':last_failed_activity' do + it 'resets workflow' do + subject.reset_workflow(domain, workflow_id, run_id, strategy: :last_failed_activity) + + expect(connection).to have_received(:reset_workflow_execution).with( + domain: domain, + workflow_id: workflow_id, + run_id: run_id, + reason: 'manual reset', + decision_task_event_id: 10 + ) + end + end + + context 'unsupported strategy' do + it 'resets workflow' do + expect do + subject.reset_workflow(domain, workflow_id, run_id, strategy: :foobar) + end.to raise_error(ArgumentError, 'Unsupported reset strategy') + end + end + end end describe 'async activity operations' do diff --git a/spec/unit/lib/cadence/workflow/history_spec.rb b/spec/unit/lib/cadence/workflow/history_spec.rb index 5d154f8b..089131c2 100644 --- a/spec/unit/lib/cadence/workflow/history_spec.rb +++ b/spec/unit/lib/cadence/workflow/history_spec.rb @@ -2,31 +2,6 @@ describe Cadence::Workflow::History do let(:history) { described_class.new(events) } - describe '.last_completed_decision_task' do - let(:event_mock_1) do - double('EventMock', eventId: 1, timestamp: (Time.now - 1000).to_f, eventType: event_type, eventAttributes: '', public_send: '') - end - let(:event_mock_2) do - double('EventMock', eventId: 2, timestamp: Time.now.to_f, eventType: event_type, eventAttributes: '', public_send: '') - end - - context '> 1 completed decision task exists' do - let(:event_type) { CadenceThrift::EventType::DecisionTaskCompleted } - let(:events) { [event_mock_1, event_mock_2] } - - it 'returns the last completed decision task' do - expect(history.last_completed_decision_task.id).to be(2) - end - end - - context '0 completed decision task exists' do - let(:event_type) { CadenceThrift::EventType::DecisionTaskScheduled } - let(:events) { [event_mock_1] } - it 'returns nil' do - expect(history.last_completed_decision_task).to be(nil) - end - end - end describe '#next_window' do end