diff --git a/lib/sage/exceptions.ex b/lib/sage/exceptions.ex index 45cb352..3221cd9 100644 --- a/lib/sage/exceptions.ex +++ b/lib/sage/exceptions.ex @@ -148,3 +148,17 @@ defmodule Sage.MalformedCompensationReturnError do """ end end + +defmodule Sage.ExecutorPlannerError do + @moduledoc """ + Raised at runtime when it's not possible to build a plan for Sage execution. + """ + defexception [:message] + + def dependency_on_itself_message(name), + do: "Stage #{name} lists itself as a dependency" + def unreachable_dependency_message(stage_name, dependency_name), + do: "Unreachable dependency #{dependency_name} for stage #{stage_name}" + def can_not_converge_message, + do: "Could not sort dependencies. There are cycles in the dependency graph" +end diff --git a/lib/sage/executor/planner.ex b/lib/sage/executor/planner.ex new file mode 100644 index 0000000..c3d5f68 --- /dev/null +++ b/lib/sage/executor/planner.ex @@ -0,0 +1,98 @@ +defmodule Sage.Executor.Planner do + @moduledoc """ + This module is the one responsible for planning asynchronous transactions dependencies + and making sure their dependency graph converges. + """ + import Sage.ExecutorPlannerError + + def plan_execution(%Sage{} = sage) do + {plan, _stage_names} = + sage.stages + |> Enum.reduce([], fn + stage, [] -> + [stage] + + stage, [{:stages_group, prev_stages} | rest_stages] = acc -> + if async_stage?(stage) do + [{:stages_group, [stage] ++ prev_stages}] ++ rest_stages + else + [stage] ++ acc + end + + stage, [prev_stage | rest_stages] = acc -> + if async_stage?(stage) && async_stage?(prev_stage) do + [{:stages_group, [prev_stage, stage]}] ++ rest_stages + else + [stage] ++ acc + end + end) + |> Enum.reduce({[], []}, fn + {:stages_group, stages_group}, {stages, reachable_stage_names} -> + sorted = topological_sort(stages_group, reachable_stage_names) + {Enum.reverse(sorted) ++ stages, Enum.map(sorted, &elem(&1, 0)) ++ reachable_stage_names} + + {name, operation} = stage, {stages, reachable_stage_names} -> + operation + |> operation_deps() + |> Enum.each(fn dep -> + if dep == name do + raise Sage.ExecutorPlannerError, dependency_on_itself_message(name) + end + + if dep not in reachable_stage_names do + raise Sage.ExecutorPlannerError, unreachable_dependency_message(name, dep) + end + end) + + {[stage] ++ stages, [elem(stage, 0)] ++ reachable_stage_names} + end) + + Enum.reverse(plan) + end + + defp async_stage?({_name, operation}) when elem(operation, 0) == :run_async, do: true + defp async_stage?(_), do: false + + def topological_sort(stages, reachable_stage_names) do + graph = :digraph.new() + graph_names = Enum.map(stages, &elem(&1, 0)) + + try do + Enum.each(stages, fn {name, _operation} -> + :digraph.add_vertex(graph, name) + end) + + Enum.each(stages, fn {name, operation} -> + deps = operation_deps(operation) + + Enum.each(deps, fn dep -> + cond do + dep == name -> + raise Sage.ExecutorPlannerError, dependency_on_itself_message(name) + + dep in reachable_stage_names -> + :noop + + dep in graph_names -> + :digraph.add_edge(graph, dep, name) + + dep not in reachable_stage_names -> + raise Sage.ExecutorPlannerError, unreachable_dependency_message(name, dep) + end + end) + end) + + if ordered_stages = :digraph_utils.topsort(graph) do + Enum.map(ordered_stages, fn name -> + List.keyfind(stages, name, 0) + end) + else + raise Sage.ExecutorPlannerError, can_not_converge_message() + end + after + :digraph.delete(graph) + end + end + + defp operation_deps({_type, _tx, _cmp, opts}), do: opts |> Keyword.get(:after, []) |> List.wrap() +end diff --git a/test/sage/executor/planner_test.exs b/test/sage/executor/planner_test.exs new file mode 100644 index 0000000..2181c9b --- /dev/null +++ b/test/sage/executor/planner_test.exs @@ -0,0 +1,147 @@ +defmodule Sage.Executor.PlannerTest do + use Sage.EffectsCase + import Sage.Executor.Planner + + describe "plan_execution/1" do + test "ignores synchronous operations" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run(:step2, transaction(:t2), compensation()) + |> run(:step3, transaction(:t3), compensation()) + + assert sage |> plan_execution() |> names() == [:step1, :step2, :step3] + end + + test "ignores single asynchronous operations" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation()) + |> run(:step3, transaction(:t3), compensation()) + + assert sage |> plan_execution() |> names() == [:step1, :step2, :step3] + end + + test "allows asynchronous operations to depend on reachable synchronous operations" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation(), after: :step1) + |> run(:step3, transaction(:t3), compensation()) + + assert sage |> plan_execution() |> names() == [:step1, :step2, :step3] + end + + test "raises on unreachable synchronous operations" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation(), after: :step3) + |> run(:step3, transaction(:t3), compensation()) + + assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step3 for stage step2", fn -> + plan_execution(sage) + end + end + + test "raises on unreachable asynchronous operations" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation(), after: :step4) + |> run(:step3, transaction(:t3), compensation()) + |> run_async(:step4, transaction(:t4), compensation()) + + assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step4 for stage step2", fn -> + plan_execution(sage) + end + + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation(), after: :step5) + |> run_async(:step3, transaction(:t3), compensation()) + |> run(:step4, transaction(:t4), compensation()) + |> run_async(:step5, transaction(:t5), compensation()) + + assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step5 for stage step2", fn -> + plan_execution(sage) + end + end + + test "raises on dependency on undefined stages" do + sage = run_async(new(), :step1, transaction(:t1), compensation(), after: :undefined) + assert_raise Sage.ExecutorPlannerError, "Unreachable dependency undefined for stage step1", fn -> + plan_execution(sage) + end + end + + test "raises on stage that depends on itself" do + sage = run_async(new(), :step1, transaction(:t1), compensation(), after: :step1) + assert_raise Sage.ExecutorPlannerError, "Stage step1 lists itself as a dependency", fn -> + plan_execution(sage) + end + + sage = + new() + |> run_async(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t1), compensation(), after: :step2) + + assert_raise Sage.ExecutorPlannerError, "Stage step2 lists itself as a dependency", fn -> + plan_execution(sage) + end + end + + test "raises on circular dependencies" do + sage = + new() + |> run_async(:step1, transaction(:t1), compensation(), after: :step2) + |> run_async(:step2, transaction(:t1), compensation(), after: :step1) + + message = "Could not sort dependencies. There are cycles in the dependency graph" + assert_raise Sage.ExecutorPlannerError, message, fn -> + plan_execution(sage) + end + + sage = + new() + |> run_async(:step1, transaction(:t1), compensation(), after: :step3) + |> run_async(:step2, transaction(:t1), compensation(), after: :step1) + |> run_async(:step3, transaction(:t1), compensation(), after: :step2) + + message = "Could not sort dependencies. There are cycles in the dependency graph" + assert_raise Sage.ExecutorPlannerError, message, fn -> + plan_execution(sage) + end + end + + test "does not reorder asynchronous stages without dependencies" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation()) + |> run_async(:step3, transaction(:t3), compensation()) + |> run(:step4, transaction(:t4), compensation()) + + assert sage |> plan_execution() |> names() == [:step1, :step2, :step3, :step4] + end + + test "orders asynchronous stages by their dependencies topology" do + sage = + new() + |> run(:step1, transaction(:t1), compensation()) + |> run_async(:step2, transaction(:t2), compensation(), after: :step4) + |> run_async(:step3, transaction(:t3), compensation(), after: :step1) + |> run_async(:step4, transaction(:t4), compensation()) + |> run_async(:step5, transaction(:t5), compensation(), after: :step3) + |> run_async(:step6, transaction(:t6), compensation(), after: [:step3, :step1]) + |> run_async(:step7, transaction(:t7), compensation()) + + expected_plan = [:step1, :step3, :step4, :step2, :step5, :step6, :step7] + assert sage |> plan_execution() |> names() == expected_plan + end + end + + defp names(stages), do: Enum.map(stages, &elem(&1, 0)) +end