Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

**kwargs extravaganza #439

Closed
wants to merge 12 commits into from
28 changes: 19 additions & 9 deletions examples/future_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,33 @@ def initialize(id, name)

class CustomPassedObjectSerializer < ::Dynflow::Serializers::Abstract
def serialize(arg)
# Serialized output can be anything that is representable as JSON: Array, Hash...
{ :id => arg.id, :name => arg.name }
case arg
when CustomPassedObject
# Serialized output can be anything that is representable as JSON: Array, Hash...
{ :id => arg.id, :name => arg.name }
else
arg
end
end

def deserialize(arg)
# Deserialized output must be an Array
CustomPassedObject.new(arg[:id], arg[:name])
case arg
when Dynflow::Utils::IndifferentHash
# Deserialized output must be an Array
CustomPassedObject.new(arg[:id], arg[:name])
else
arg
end
end
end

class DelayedAction < Dynflow::Action
def delay(delay_options, *args)
CustomPassedObjectSerializer.new(args)
def delay(delay_options, *args, **kwargs)
CustomPassedObjectSerializer.new(args, nil, kwargs)
end

def plan(passed_object)
plan_self :object_id => passed_object.id, :object_name => passed_object.name
def plan(passed_object, foo: nil)
plan_self :object_id => passed_object.id, :object_name => passed_object.name, :foo => foo
end

def run
Expand All @@ -49,7 +59,7 @@ def run

past_plan = ExampleHelper.world.delay(DelayedAction, { :start_at => past, :start_before => past }, object)
near_future_plan = ExampleHelper.world.delay(DelayedAction, { :start_at => near_future, :start_before => future }, object)
future_plan = ExampleHelper.world.delay(DelayedAction, { :start_at => future }, object)
future_plan = ExampleHelper.world.delay(DelayedAction, { :start_at => future }, object, foo: 7)

puts <<-MSG.gsub(/^.*\|/, '')
|
Expand Down
14 changes: 9 additions & 5 deletions examples/orchestrate_evented.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

module OrchestrateEvented
class CreateInfrastructure < Dynflow::Action
def plan(get_stuck = false)
def plan(get_stuck: false)
sequence do
concurrence do
plan_action(CreateMachine, 'host1', 'db', get_stuck: get_stuck)
Expand All @@ -48,9 +48,9 @@ def plan(name, profile, config_options = {})
:disk => prepare_disk.output['path'])
plan_action(AddIPtoHosts, :name => name, :ip => create_vm.output[:ip])
plan_action(ConfigureMachine,
:ip => create_vm.output[:ip],
:profile => profile,
:config_options => config_options)
ip: create_vm.output[:ip],
profile: profile,
config_options: config_options)
plan_self(:name => name)
end

Expand Down Expand Up @@ -128,6 +128,10 @@ class ConfigureMachine < Base
param :config_options
end

def plan(ip:, profile:, config_options:)
plan_self(ip: ip, profile: profile, config_options: config_options)
end

def run(event = nil)
if event == Dynflow::Action::Cancellable::Cancel
output[:message] = "I was cancelled but we don't care"
Expand Down Expand Up @@ -155,7 +159,7 @@ def on_finish

if $0 == __FILE__
ExampleHelper.world.trigger(OrchestrateEvented::CreateInfrastructure)
ExampleHelper.world.trigger(OrchestrateEvented::CreateInfrastructure, true)
ExampleHelper.world.trigger(OrchestrateEvented::CreateInfrastructure, get_stuck: true)
puts example_description
ExampleHelper.run_web_console
end
32 changes: 16 additions & 16 deletions lib/dynflow/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ def error
@step.error
end

def execute(*args)
def execute(*args, **kwargs)
phase! Executable
self.send phase.execute_method_name, *args
self.send phase.execute_method_name, *args, **kwargs
end

# @api private
Expand All @@ -324,10 +324,10 @@ def required_step_ids(input = self.input)
recursion.(input)
end

def execute_delay(delay_options, *args)
def execute_delay(delay_options, *args, **kwargs)
with_error_handling(true) do
world.middleware.execute(:delay, self, delay_options, *args) do |*new_args|
@serializer = delay(*new_args).tap do |serializer|
world.middleware.execute(:delay, self, delay_options, *args, **kwargs) do |*new_args, **new_kwargs|
@serializer = delay(*new_args, **new_kwargs).tap do |serializer|
serializer.perform_serialization!
end
end
Expand Down Expand Up @@ -379,15 +379,15 @@ def save_state(conditions = {})
@step.save(conditions)
end

def delay(delay_options, *args)
Serializers::Noop.new(args)
def delay(delay_options, *args, **kwargs)
Serializers::Noop.new(args, nil, kwargs)
end

# @override to implement the action's *Plan phase* behaviour.
# By default it plans itself and expects input-hash.
# Use #plan_self and #plan_action methods to plan actions.
# It can use DB in this phase.
def plan(*args)
def plan(*args, **kwargs)
if from_subscription?
# if the action is triggered by subscription, by default use the
# input of parent action.
Expand All @@ -396,7 +396,7 @@ def plan(*args)
else
# in this case, the action was triggered by plan_action. Use
# the argument specified there.
plan_self(*args)
plan_self(*args, **kwargs)
end
self
end
Expand Down Expand Up @@ -459,9 +459,9 @@ def plan_self(input = {})
return self # to stay consistent with plan_action
end

def plan_action(action_class, *args)
def plan_action(action_class, *args, **kwargs)
phase! Plan
@execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args)
@execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args, **kwargs)
end

# DSL for run phase
Expand Down Expand Up @@ -521,7 +521,7 @@ def set_error(error)
@step.error = ExecutionPlan::Steps::Error.new(error)
end

def execute_plan(*args)
def execute_plan(*args, **kwargs)
phase! Plan
self.state = :running
save_state
Expand All @@ -530,8 +530,8 @@ def execute_plan(*args)
# before getting out of the planning phase
with_error_handling(!root_action?) do
concurrence do
world.middleware.execute(:plan, self, *args) do |*new_args|
plan(*new_args)
world.middleware.execute(:plan, self, *args, **kwargs) do |*new_args, **new_kwargs|
plan(*new_args, **new_kwargs)
end
end

Expand All @@ -543,7 +543,7 @@ def execute_plan(*args)
@execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do
subscribed_actions.each do |action_class|
new_plan_step = @execution_plan.add_plan_step(action_class, self)
new_plan_step.execute(@execution_plan, self, true, *args)
new_plan_step.execute(@execution_plan, self, true, *args, **kwargs)
end
end
end
Expand Down Expand Up @@ -586,7 +586,7 @@ def execute_run(event)
# we run the Skip event only when the run accepts events
if event != Skip || run_accepts_events?
result = catch(SUSPEND) do
world.middleware.execute(:run, self, *[event].compact) do |*args|
world.middleware.execute(:run, self, *[event].compact, **{}) do |*args|
run(*args)
end
end
Expand Down
12 changes: 6 additions & 6 deletions lib/dynflow/action/progress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ module Dynflow
# the progress is 1 for success/skipped actions and 0 for errorneous ones.
module Action::Progress
class Calculate < Middleware
def run(*args)
with_progress_calculation(*args) do
def run(*args, **kwargs)
with_progress_calculation(*args, **kwargs) do
[action.run_progress, action.run_progress_weight]
end
end

def finalize(*args)
with_progress_calculation(*args) do
def finalize(*args, **kwargs)
with_progress_calculation(*args, **kwargs) do
[action.finalize_progress, action.finalize_progress_weight]
end
end

protected

def with_progress_calculation(*args)
pass(*args)
def with_progress_calculation(*args, **kwargs)
pass(*args, **kwargs)
ensure
begin
action.calculated_progress = yield
Expand Down
4 changes: 2 additions & 2 deletions lib/dynflow/action/v2/with_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def check_for_errors!
end

# Helper for creating sub plans
def trigger(action_class, *args)
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
def trigger(action_class, *args, **kwargs)
world.trigger { world.plan_with_options(action_class: action_class, args: args, kwargs: kwargs, caller_action: self) }
end

# Concurrency limitting
Expand Down
10 changes: 5 additions & 5 deletions lib/dynflow/action/with_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ def abort!
end

# Helper for creating sub plans
def trigger(action_class, *args)
def trigger(action_class, *args, **kwargs)
if uses_concurrency_control
trigger_with_concurrency_control(action_class, *args)
trigger_with_concurrency_control(action_class, *args, **kwargs)
else
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
world.trigger { world.plan_with_options(action_class: action_class, args: args, kwargs: kwargs, caller_action: self) }
end
end

def trigger_with_concurrency_control(action_class, *args)
record = world.plan_with_options(action_class: action_class, args: args, caller_action: self)
def trigger_with_concurrency_control(action_class, *args, **kwargs)
record = world.plan_with_options(action_class: action_class, args: args, kwargs: kwargs, caller_action: self)
records = [[record.id], []]
records.reverse! unless record.state == :planned
@world.throttle_limiter.handle_plans!(execution_plan_id, *records).first
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/actors/execution_plan_cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def start
def clean!
plans = @world.persistence.find_old_execution_plans(Time.now.utc - @max_age)
report(plans)
@world.persistence.delete_execution_plans(uuid: plans.map(&:id))
@world.persistence.delete_execution_plans(uuid: plans.map(&:id), **{})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have **{} here? I mean, delete_execution_plans looks to have def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil). It won't hurt, but it just seems unnecessary. On the other hand, I've noticed this: https://github.com/Dynflow/dynflow/blob/master/lib/dynflow/debug/telemetry/persistence.rb#L34-L38. I guess it makes more sense to add **kwargs support there just in case.

end

def report(plans)
Expand Down
6 changes: 4 additions & 2 deletions lib/dynflow/delayed_plan.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def plan
execution_plan.root_plan_step.load_action
execution_plan.generate_action_id
execution_plan.generate_step_id
execution_plan.plan(*@args_serializer.perform_deserialization!)
@args_serializer.perform_deserialization!
execution_plan.plan(*@args_serializer.args!, **@args_serializer.kwargs!)
end

def timeout
Expand Down Expand Up @@ -56,6 +57,7 @@ def to_hash
:start_at => @start_at,
:start_before => @start_before,
:serialized_args => @args_serializer.serialized_args,
:serialized_kwargs => @args_serializer.serialized_kwargs,
:args_serializer => @args_serializer.class.name,
:frozen => @frozen
end
Expand All @@ -70,7 +72,7 @@ def args

# @api private
def self.new_from_hash(world, hash, *args)
serializer = Utils.constantize(hash[:args_serializer]).new(nil, hash[:serialized_args])
serializer = Utils.constantize(hash[:args_serializer]).new(nil, hash[:serialized_args], nil, hash[:serialized_kwargs])
self.new(world,
hash[:execution_plan_uuid],
string_to_time(hash[:start_at]),
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/director.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def initialize(request_id, execution_plan_id, step, event, queue, sender_orchest
end

def execute
@step.execute(@event)
@step.execute(@event, **{})
end

def to_hash
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/director/sequential_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def finalize
unless execution_plan.error?
step_id = execution_plan.finalize_flow.all_step_ids.first
action_class = execution_plan.steps[step_id].action_class
world.middleware.execute(:finalize_phase, action_class, execution_plan) do
world.middleware.execute(:finalize_phase, action_class, execution_plan, **{}) do
dispatch(execution_plan.finalize_flow)
end
end
Expand Down
17 changes: 10 additions & 7 deletions lib/dynflow/execution_plan.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def run_hooks(state)
action_ids = records.compact.map { |record| record[:id] }
return if action_ids.empty?
persistence.load_actions(self, action_ids).each do |action|
world.middleware.execute(:hook, action, self) do
world.middleware.execute(:hook, action, self, **{}) do
action.class.execution_plan_hooks.run(self, action, state)
end
end
Expand Down Expand Up @@ -251,11 +251,11 @@ def generate_step_id
@last_step_id += 1
end

def delay(caller_action, action_class, delay_options, *args)
def delay(caller_action, action_class, delay_options, *args, **kwargs)
save
@root_plan_step = add_scheduling_step(action_class, caller_action)
run_hooks(:pending)
serializer = root_plan_step.delay(delay_options, args)
serializer = root_plan_step.delay(delay_options, args, kwargs)
delayed_plan = DelayedPlan.new(@world,
id,
delay_options[:start_at],
Expand All @@ -282,11 +282,14 @@ def prepare(action_class, options = {})
step
end

def plan(*args)
def plan(*args, **kwargs)
update_state(:planning)
world.middleware.execute(:plan_phase, root_plan_step.action_class, self) do

# TODO: Remove the trailing **{} when we drop support for ruby < 3
# https://bugs.ruby-lang.org/issues/14909
world.middleware.execute(:plan_phase, root_plan_step.action_class, self, **{}) do
with_planning_scope do
root_action = root_plan_step.execute(self, nil, false, *args)
root_action = root_plan_step.execute(self, nil, false, *args, **kwargs)
@label = root_action.label

if @dependency_graph.unresolved?
Expand Down Expand Up @@ -568,7 +571,7 @@ def unlock_all_singleton_locks!

def toggle_telemetry_state(original, new)
return if original == new
@label = root_plan_step.action_class if @label.nil?
@label = root_plan_step.action_class.name if @label.nil?
Dynflow::Telemetry.with_instance do |t|
t.set_gauge(:dynflow_active_execution_plans, '-1',
telemetry_common_options.merge(:state => original)) unless original.nil?
Expand Down
2 changes: 1 addition & 1 deletion lib/dynflow/execution_plan/steps/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def self.states
@states ||= [:scheduling, :pending, :running, :success, :suspended, :skipping, :skipped, :error, :cancelled]
end

def execute(*args)
def execute(*args, **kwargs)
raise NotImplementedError
end

Expand Down
4 changes: 2 additions & 2 deletions lib/dynflow/execution_plan/steps/abstract_flow_step.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ def update_from_action(action)
@queue ||= :default
end

def execute(*args)
def execute(*args, **kwargs)
return self if [:skipped, :success].include? self.state
open_action do |action|
with_meta_calculation(action) do
action.execute(*args)
action.execute(*args, **kwargs)
@delayed_events = action.delayed_events
end
end
Expand Down
Loading
Loading