Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.5
- Implement strategies for resetting workflows

## 0.1.4
- Fix a bug which prevented retry_policy from being passed as explicit options
- Make retry_policy options mergeable with the values in an Activity or a Workflow
Expand Down
35 changes: 30 additions & 5 deletions lib/cadence/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'cadence/workflow'
require 'cadence/workflow/history'
require 'cadence/workflow/execution_info'
require 'cadence/reset_strategy'

module Cadence
class Client
Expand Down Expand Up @@ -73,9 +74,16 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
)
end

def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset')
decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id)
raise Error, 'Could not find a completed decision task event' unless decision_task_id
def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_decision_task unless decision_task_id

if strategy && decision_task_id
raise ArgumentError, 'Please specify either :strategy or :decision_task_id'
end

decision_task_id ||= find_decision_task(domain, workflow_id, run_id, strategy)&.id
raise Error, 'Could not find an event to reset to' unless decision_task_id

response = connection.reset_workflow_execution(
domain: domain,
Expand Down Expand Up @@ -150,14 +158,31 @@ def connection
@connection ||= Cadence::Connection.generate(config.for_connection)
end

def get_last_completed_decision_task(domain, workflow_id, run_id)
def find_decision_task(domain, workflow_id, run_id, strategy)
history = get_workflow_history(
domain: domain,
workflow_id: workflow_id,
run_id: run_id
)

history.last_completed_decision_task&.id
# TODO: Move this into a separate class if it keeps growing
case strategy
when ResetStrategy::LAST_DECISION_TASK
events = %[DecisionTaskCompleted DecisionTaskTimedOut DecisionTaskFailed].freeze
history.events.select { |event| events.include?(event.type) }.last
when ResetStrategy::FIRST_DECISION_TASK
events = %[DecisionTaskCompleted DecisionTaskTimedOut DecisionTaskFailed].freeze
history.events.select { |event| events.include?(event.type) }.first
when ResetStrategy::LAST_FAILED_ACTIVITY
events = %[ActivityTaskFailed ActivityTaskTimedOut].freeze
failed_event = history.events.select { |event| events.include?(event.type) }.last
return unless failed_event

scheduled_event = history.find_event_by_id(failed_event.attributes.scheduledEventId)
history.find_event_by_id(scheduled_event.attributes.decisionTaskCompletedEventId)
else
raise ArgumentError, 'Unsupported reset strategy'
end
end
end
end
7 changes: 7 additions & 0 deletions lib/cadence/reset_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module Cadence
module ResetStrategy
LAST_DECISION_TASK = :last_decision_task
FIRST_DECISION_TASK = :first_decision_task
LAST_FAILED_ACTIVITY = :last_failed_activity
end
end
2 changes: 1 addition & 1 deletion lib/cadence/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Cadence
VERSION = '0.1.4'.freeze
VERSION = '0.1.5'.freeze
end
4 changes: 2 additions & 2 deletions lib/cadence/workflow/history.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def initialize(events)
@iterator = @events.each
end

def last_completed_decision_task
events.select { |event| event.type == 'DecisionTaskCompleted' }.last
def find_event_by_id(id)
events.find { |event| event.id == id }
end

# It is very important to replay the History window by window in order to
Expand Down
49 changes: 49 additions & 0 deletions spec/fabricators/thrift/history_event_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,52 @@
)
end
end

Fabricator(:activity_task_scheduled_event_thrift, from: :history_event_thrift) do
eventType { CadenceThrift::EventType::ActivityTaskScheduled }
activityTaskScheduledEventAttributes do |attrs|
CadenceThrift::ActivityTaskScheduledEventAttributes.new(
activityId: attrs[:eventId],
activityType: CadenceThrift::ActivityType.new(name: 'TestActivity'),
decisionTaskCompletedEventId: attrs[:eventId] - 1,
domain: 'test-domain',
taskList: Fabricate(:task_list_thrift)
)
end
end

Fabricator(:activity_task_started_event_thrift, from: :history_event_thrift) do
eventType { CadenceThrift::EventType::ActivityTaskStarted }
activityTaskStartedEventAttributes do |attrs|
CadenceThrift::ActivityTaskStartedEventAttributes.new(
scheduledEventId: attrs[:eventId] - 1,
identity: 'test-worker@test-host',
requestId: SecureRandom.uuid
)
end
end

Fabricator(:activity_task_completed_event_thrift, from: :history_event_thrift) do
eventType { CadenceThrift::EventType::ActivityTaskCompleted }
activityTaskCompletedEventAttributes do |attrs|
CadenceThrift::ActivityTaskCompletedEventAttributes.new(
result: nil,
scheduledEventId: attrs[:eventId] - 2,
startedEventId: attrs[:eventId] - 1,
identity: 'test-worker@test-host'
)
end
end

Fabricator(:activity_task_failed_event_thrift, from: :history_event_thrift) do
eventType { CadenceThrift::EventType::ActivityTaskFailed }
activityTaskFailedEventAttributes do |attrs|
CadenceThrift::ActivityTaskFailedEventAttributes.new(
reason: 'StandardError',
details: 'Activity failed',
scheduledEventId: attrs[:eventId] - 2,
startedEventId: attrs[:eventId] - 1,
identity: 'test-worker@test-host'
)
end
end
121 changes: 119 additions & 2 deletions spec/unit/lib/cadence/client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
require 'securerandom'
require 'cadence/client'
require 'cadence/configuration'
require 'cadence/workflow'
require 'cadence/workflow/history'
require 'cadence/connection/thrift'

describe Cadence::Client do
subject { described_class.new(config) }

let(:config) { Cadence::Configuration.new }
let(:connection) { instance_double(Cadence::Connection::Thrift) }
let(:domain) { 'detault-test-domain' }
let(:workflow_id) { SecureRandom.uuid }
let(:run_id) { SecureRandom.uuid }

before do
allow(Cadence::Connection)
.to receive(:generate)
.with(config.for_connection)
.and_return(connection)
end
after { subject.remove_instance_variable(:@connection) }

after do
if subject.instance_variable_get(:@connection)
subject.remove_instance_variable(:@connection)
end
end

describe '#start_workflow' do
let(:cadence_response) do
Expand Down Expand Up @@ -324,8 +334,35 @@ class TestStartWorkflow < Cadence::Workflow

describe '#reset_workflow' do
let(:cadence_response) { CadenceThrift::StartWorkflowExecutionResponse.new(runId: 'xxx') }
let(:history) do
Cadence::Workflow::History.new([
Fabricate(:workflow_execution_started_event_thrift, eventId: 1),
Fabricate(:decision_task_scheduled_event_thrift, eventId: 2),
Fabricate(:decision_task_started_event_thrift, eventId: 3),
Fabricate(:decision_task_completed_event_thrift, eventId: 4),
Fabricate(:activity_task_scheduled_event_thrift, eventId: 5),
Fabricate(:activity_task_started_event_thrift, eventId: 6),
Fabricate(:activity_task_completed_event_thrift, eventId: 7),
Fabricate(:decision_task_scheduled_event_thrift, eventId: 8),
Fabricate(:decision_task_started_event_thrift, eventId: 9),
Fabricate(:decision_task_completed_event_thrift, eventId: 10),
Fabricate(:activity_task_scheduled_event_thrift, eventId: 11),
Fabricate(:activity_task_started_event_thrift, eventId: 12),
Fabricate(:activity_task_failed_event_thrift, eventId: 13),
Fabricate(:decision_task_scheduled_event_thrift, eventId: 14),
Fabricate(:decision_task_started_event_thrift, eventId: 15),
Fabricate(:decision_task_completed_event_thrift, eventId: 16),
Fabricate(:workflow_execution_completed_event_thrift, eventId: 17)
])
end

before { allow(connection).to receive(:reset_workflow_execution).and_return(cadence_response) }
before do
allow(connection).to receive(:reset_workflow_execution).and_return(cadence_response)
allow(subject)
.to receive(:get_workflow_history)
.with(domain: domain, workflow_id: workflow_id, run_id: run_id)
.and_return(history)
end

context 'when decision_task_id is provided' do
let(:decision_task_id) { 42 }
Expand Down Expand Up @@ -359,6 +396,86 @@ class TestStartWorkflow < Cadence::Workflow
expect(result).to eq('xxx')
end
end

context 'when neither strategy nor decision_task_id is provided' do
it 'uses default strategy' do
subject.reset_workflow(domain, workflow_id, run_id)

expect(connection).to have_received(:reset_workflow_execution).with(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
decision_task_event_id: 16
)
end
end

context 'when both strategy and decision_task_id are provided' do
it 'uses default strategy' do
expect do
subject.reset_workflow(
domain,
workflow_id,
run_id,
strategy: :last_decision_task,
decision_task_id: 10
)
end.to raise_error(ArgumentError, 'Please specify either :strategy or :decision_task_id')
end
end

context 'with a specified strategy' do
context ':last_decision_task' do
it 'resets workflow' do
subject.reset_workflow(domain, workflow_id, run_id, strategy: :last_decision_task)

expect(connection).to have_received(:reset_workflow_execution).with(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
decision_task_event_id: 16
)
end
end

context ':first_decision_task' do
it 'resets workflow' do
subject.reset_workflow(domain, workflow_id, run_id, strategy: :first_decision_task)

expect(connection).to have_received(:reset_workflow_execution).with(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
decision_task_event_id: 4
)
end
end

context ':last_failed_activity' do
it 'resets workflow' do
subject.reset_workflow(domain, workflow_id, run_id, strategy: :last_failed_activity)

expect(connection).to have_received(:reset_workflow_execution).with(
domain: domain,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
decision_task_event_id: 10
)
end
end

context 'unsupported strategy' do
it 'resets workflow' do
expect do
subject.reset_workflow(domain, workflow_id, run_id, strategy: :foobar)
end.to raise_error(ArgumentError, 'Unsupported reset strategy')
end
end
end
end

describe 'async activity operations' do
Expand Down
25 changes: 0 additions & 25 deletions spec/unit/lib/cadence/workflow/history_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,6 @@

describe Cadence::Workflow::History do
let(:history) { described_class.new(events) }
describe '.last_completed_decision_task' do
let(:event_mock_1) do
double('EventMock', eventId: 1, timestamp: (Time.now - 1000).to_f, eventType: event_type, eventAttributes: '', public_send: '')
end
let(:event_mock_2) do
double('EventMock', eventId: 2, timestamp: Time.now.to_f, eventType: event_type, eventAttributes: '', public_send: '')
end

context '> 1 completed decision task exists' do
let(:event_type) { CadenceThrift::EventType::DecisionTaskCompleted }
let(:events) { [event_mock_1, event_mock_2] }

it 'returns the last completed decision task' do
expect(history.last_completed_decision_task.id).to be(2)
end
end

context '0 completed decision task exists' do
let(:event_type) { CadenceThrift::EventType::DecisionTaskScheduled }
let(:events) { [event_mock_1] }
it 'returns nil' do
expect(history.last_completed_decision_task).to be(nil)
end
end
end

describe '#next_window' do
end
Expand Down