Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add execution plan chaining #446

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/execution_plan_chaining.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative 'example_helper'

class DelayedAction < Dynflow::Action
def plan
plan_self
end

def run
sleep 5
end
end

if $PROGRAM_NAME == __FILE__
ExampleHelper.world.action_logger.level = 1
ExampleHelper.world.logger.level = 0

plan1 = ExampleHelper.world.trigger(DelayedAction)
plan2 = ExampleHelper.world.chain(plan1.execution_plan_id, DelayedAction)
plan3 = ExampleHelper.world.chain(plan2.execution_plan_id, DelayedAction)
plan4 = ExampleHelper.world.chain(plan2.execution_plan_id, DelayedAction)

puts <<-MSG.gsub(/^.*\|/, '')
|
| Execution Plan Chaining example
| ========================
|
| This example shows the execution plan chaining functionality of Dynflow, which allows execution plans to wait until another execution plan finishes.
|
| Execution plans:
| #{plan1.id} runs immediately and should run successfully.
| #{plan2.id} is delayed and should run once #{plan1.id} finishes.
| #{plan3.id} and #{plan4.id} are delayed and should run once #{plan2.id} finishes.
|
| Visit #{ExampleHelper::DYNFLOW_URL} to see their status.
|
MSG

ExampleHelper.run_web_console
end
2 changes: 1 addition & 1 deletion lib/dynflow/debug/telemetry/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Persistence
:load_execution_plan,
:save_execution_plan,
:find_old_execution_plans,
:find_past_delayed_plans,
:find_ready_delayed_plans,
:delete_delayed_plans,
:save_delayed_plan,
:set_delayed_plan_frozen,
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/delayed_executors/abstract_core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def time

def delayed_execution_plans(time)
with_error_handling([]) do
world.persistence.find_past_delayed_plans(time)
world.persistence.find_ready_delayed_plans(time)
end
end

Expand Down
8 changes: 6 additions & 2 deletions lib/dynflow/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def find_old_execution_plans(age)
end
end

def find_past_delayed_plans(time)
adapter.find_past_delayed_plans(time).map do |plan|
def find_ready_delayed_plans(time)
adapter.find_ready_delayed_plans(time).map do |plan|
DelayedPlan.new_from_hash(@world, plan)
end
end
Expand Down Expand Up @@ -159,5 +159,9 @@ def prune_envelopes(receiver_ids)
def prune_undeliverable_envelopes
adapter.prune_undeliverable_envelopes
end

def chain_execution_plan(first, second)
adapter.chain_execution_plan(first, second)
end
end
end
2 changes: 1 addition & 1 deletion lib/dynflow/persistence_adapters/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def save_execution_plan(execution_plan_id, value)
raise NotImplementedError
end

def find_past_delayed_plans(options = {})
def find_ready_delayed_plans(options = {})
raise NotImplementedError
end

Expand Down
18 changes: 14 additions & 4 deletions lib/dynflow/persistence_adapters/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class action_class execution_plan_uuid queue),
envelope: %w(receiver_id),
coordinator_record: %w(id owner_id class),
delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen),
output_chunk: %w(execution_plan_uuid action_id kind timestamp) }
output_chunk: %w(execution_plan_uuid action_id kind timestamp),
execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) }

SERIALIZABLE_COLUMNS = { action: %w(input output),
delayed: %w(serialized_args),
Expand Down Expand Up @@ -139,12 +140,16 @@ def find_old_execution_plans(age)
.all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end

def find_past_delayed_plans(time)
def find_ready_delayed_plans(time)
table_name = :delayed
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.left_join(TABLES[:execution_plan_dependency], execution_plan_uuid: :execution_plan_uuid)
.left_join(TABLES[:execution_plan], uuid: :blocked_by_uuid)
.where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time))
.where(::Sequel[{ state: nil }] | ::Sequel[{ state: 'stopped' }])
.where(:frozen => false)
.order_by(:start_at)
.select_all(TABLES[table_name])
.all
.map { |plan| load_data(plan, table_name) }
end
Expand All @@ -159,6 +164,10 @@ def save_delayed_plan(execution_plan_id, value)
save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false
end

def chain_execution_plan(first, second)
save :execution_plan_dependency, { execution_plan_uuid: second }, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false
end

def load_step(execution_plan_id, step_id)
load :step, execution_plan_uuid: execution_plan_id, id: step_id
end
Expand Down Expand Up @@ -297,7 +306,8 @@ def abort_if_pending_migrations!
envelope: :dynflow_envelopes,
coordinator_record: :dynflow_coordinator_records,
delayed: :dynflow_delayed_plans,
output_chunk: :dynflow_output_chunks }
output_chunk: :dynflow_output_chunks,
execution_plan_dependency: :dynflow_execution_plan_dependencies }

def table(which)
db[TABLES.fetch(which)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

Sequel.migration do
up do
type = database_type
create_table(:dynflow_execution_plan_dependencies) do
column_properties = if type.to_s.include?('postgres')
{ type: :uuid }
else
{ type: String, size: 36, fixed: true, null: false }
end
foreign_key :execution_plan_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
foreign_key :blocked_by_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties
index :blocked_by_uuid
end
end

down do
drop_table(:dynflow_execution_plan_dependencies)
end
end
12 changes: 12 additions & 0 deletions lib/dynflow/world.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'dynflow/world/invalidation'

module Dynflow
# rubocop:disable Metrics/ClassLength
class World
include Algebrick::TypeCheck
include Algebrick::Matching
Expand Down Expand Up @@ -201,6 +202,16 @@ def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_act
Scheduled[execution_plan.id]
end

def chain(plan_uuids, action_class, *args)
plan_uuids = [plan_uuids] unless plan_uuids.is_a? Array
result = delay_with_options(action_class: action_class, args: args, delay_options: { frozen: true })
plan_uuids.each do |plan_uuid|
persistence.chain_execution_plan(plan_uuid, result.execution_plan_id)
end
persistence.set_delayed_plan_frozen(result.execution_plan_id, false)
result
end

def plan_elsewhere(action_class, *args)
execution_plan = ExecutionPlan.new(self, nil)
execution_plan.delay(nil, action_class, {}, *args)
Expand Down Expand Up @@ -390,4 +401,5 @@ def spawn_and_wait(klass, name, *args)
return actor
end
end
# rubocop:enable Metrics/ClassLength
end
6 changes: 3 additions & 3 deletions test/future_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ module FutureExecutionTest
it 'finds delayed plans' do
@start_at = Time.now.utc - 100
delayed_plan
past_delayed_plans = world.persistence.find_past_delayed_plans(@start_at + 10)
past_delayed_plans = world.persistence.find_ready_delayed_plans(@start_at + 10)
_(past_delayed_plans.length).must_equal 1
_(past_delayed_plans.first.execution_plan_uuid).must_equal execution_plan.id
end
Expand Down Expand Up @@ -112,8 +112,8 @@ module FutureExecutionTest

it 'checks for delayed plans in regular intervals' do
start_time = klok.current_time
persistence.expect(:find_past_delayed_plans, [], [start_time])
persistence.expect(:find_past_delayed_plans, [], [start_time + options[:poll_interval]])
persistence.expect(:find_ready_delayed_plans, [], [start_time])
persistence.expect(:find_ready_delayed_plans, [], [start_time + options[:poll_interval]])
dummy_world.stub :persistence, persistence do
_(klok.pending_pings.length).must_equal 0
delayed_executor.start.wait
Expand Down
63 changes: 60 additions & 3 deletions test/persistence_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def self.it_acts_as_persistence_adapter
end
end

describe '#find_past_delayed_plans' do
describe '#find_ready_delayed_plans' do
it 'finds plans with start_before in past' do
start_time = Time.now.utc
prepare_and_save_plans
Expand All @@ -352,7 +352,7 @@ def self.it_acts_as_persistence_adapter
adapter.save_delayed_plan('plan3', :execution_plan_uuid => 'plan3', :frozen => false, :start_at => format_time(start_time + 60))
adapter.save_delayed_plan('plan4', :execution_plan_uuid => 'plan4', :frozen => false, :start_at => format_time(start_time - 60),
:start_before => format_time(start_time - 60))
plans = adapter.find_past_delayed_plans(start_time)
plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 3
_(plans.map { |plan| plan[:execution_plan_uuid] }).must_equal %w(plan2 plan4 plan1)
end
Expand All @@ -366,10 +366,67 @@ def self.it_acts_as_persistence_adapter
adapter.save_delayed_plan('plan2', :execution_plan_uuid => 'plan2', :frozen => true, :start_at => format_time(start_time + 60),
:start_before => format_time(start_time - 60))

plans = adapter.find_past_delayed_plans(start_time)
plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 1
_(plans.first[:execution_plan_uuid]).must_equal 'plan1'
end

it 'finds plans with null start_at' do
start_time = Time.now.utc
prepare_and_save_plans

adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false)

plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 1
_(plans.first[:execution_plan_uuid]).must_equal 'plan1'
end

it 'does not find blocked plans' do
start_time = Time.now.utc
prepare_and_save_plans

adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false)
adapter.chain_execution_plan('plan2', 'plan1')
adapter.chain_execution_plan('plan3', 'plan1')

plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 0
end

it 'finds plans which are no longer blocked' do
start_time = Time.now.utc
prepare_and_save_plans

adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false)
adapter.chain_execution_plan('plan2', 'plan1')

plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 1
_(plans.first[:execution_plan_uuid]).must_equal 'plan1'
end

it 'does not find plans which are no longer blocked but are frozen' do
start_time = Time.now.utc
prepare_and_save_plans

adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => true)
adapter.chain_execution_plan('plan2', 'plan1')

plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 0
end

it 'does not find plans which are no longer blocked but their start_at is in the future' do
start_time = Time.now.utc
prepare_and_save_plans

adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false, :start_at => start_time + 60)
adapter.chain_execution_plan('plan2', 'plan1') # plan2 is already stopped

plans = adapter.find_ready_delayed_plans(start_time)
_(plans.length).must_equal 0
end
end

describe '#delete_output_chunks' do
Expand Down
20 changes: 20 additions & 0 deletions test/world_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ module WorldTest
_(terminated_event.resolved?).must_equal true
end
end

describe '#chain' do
it 'chains two execution plans' do
plan1 = world.plan(Support::DummyExample::Dummy)
plan2 = world.chain(plan1.id, Support::DummyExample::Dummy)

ready = world.persistence.find_ready_delayed_plans(Time.now)
_(ready.count).must_equal 0

done = Concurrent::Promises.resolvable_future
world.execute(plan1.id, done)
done.wait

plan1 = world.persistence.load_execution_plan(plan1.id)
_(plan1.state).must_equal :stopped
ready = world.persistence.find_ready_delayed_plans(Time.now)
_(ready.count).must_equal 1
_(ready.first.execution_plan_uuid).must_equal plan2.execution_plan_id
end
end
end
end
end
Loading