Skip to content

Commit

Permalink
Implement planner for async deps call graph
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewDryga committed Nov 29, 2017
1 parent 5143fe1 commit b0221f6
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 0 deletions.
14 changes: 14 additions & 0 deletions lib/sage/exceptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,17 @@ defmodule Sage.MalformedCompensationReturnError do
"""
end
end

defmodule Sage.ExecutorPlannerError do
@moduledoc """
Raised at runtime when it's not possible to build a plan for Sage execution.
"""
defexception [:message]

def dependency_on_itself_message(name),
do: "Stage #{name} lists itself as a dependency"
def unreachable_dependency_message(stage_name, dependency_name),
do: "Unreachable dependency #{dependency_name} for stage #{stage_name}"
def can_not_converge_message,
do: "Could not sort dependencies. There are cycles in the dependency graph"
end
98 changes: 98 additions & 0 deletions lib/sage/executor/planner.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Sage.Executor.Planner do
@moduledoc """
This module is the one responsible for planning asynchronous transactions dependencies
and making sure their dependency graph converges.
"""
import Sage.ExecutorPlannerError

def plan_execution(%Sage{} = sage) do
{plan, _stage_names} =
sage.stages
|> Enum.reduce([], fn
stage, [] ->
[stage]

stage, [{:stages_group, prev_stages} | rest_stages] = acc ->
if async_stage?(stage) do
[{:stages_group, [stage] ++ prev_stages}] ++ rest_stages
else
[stage] ++ acc
end

stage, [prev_stage | rest_stages] = acc ->
if async_stage?(stage) && async_stage?(prev_stage) do
[{:stages_group, [prev_stage, stage]}] ++ rest_stages
else
[stage] ++ acc
end
end)
|> Enum.reduce({[], []}, fn
{:stages_group, stages_group}, {stages, reachable_stage_names} ->
sorted = topological_sort(stages_group, reachable_stage_names)
{Enum.reverse(sorted) ++ stages, Enum.map(sorted, &elem(&1, 0)) ++ reachable_stage_names}

{name, operation} = stage, {stages, reachable_stage_names} ->
operation
|> operation_deps()
|> Enum.each(fn dep ->
if dep == name do
raise Sage.ExecutorPlannerError, dependency_on_itself_message(name)
end

if dep not in reachable_stage_names do
raise Sage.ExecutorPlannerError, unreachable_dependency_message(name, dep)
end
end)

{[stage] ++ stages, [elem(stage, 0)] ++ reachable_stage_names}
end)

Enum.reverse(plan)
end

defp async_stage?({_name, operation}) when elem(operation, 0) == :run_async, do: true
defp async_stage?(_), do: false

def topological_sort(stages, reachable_stage_names) do
graph = :digraph.new()
graph_names = Enum.map(stages, &elem(&1, 0))

try do
Enum.each(stages, fn {name, _operation} ->
:digraph.add_vertex(graph, name)
end)

Enum.each(stages, fn {name, operation} ->
deps = operation_deps(operation)

Enum.each(deps, fn dep ->
cond do
dep == name ->
raise Sage.ExecutorPlannerError, dependency_on_itself_message(name)

dep in reachable_stage_names ->
:noop

dep in graph_names ->
:digraph.add_edge(graph, dep, name)

dep not in reachable_stage_names ->
raise Sage.ExecutorPlannerError, unreachable_dependency_message(name, dep)
end
end)
end)

if ordered_stages = :digraph_utils.topsort(graph) do
Enum.map(ordered_stages, fn name ->
List.keyfind(stages, name, 0)
end)
else
raise Sage.ExecutorPlannerError, can_not_converge_message()
end
after
:digraph.delete(graph)
end
end

defp operation_deps({_type, _tx, _cmp, opts}), do: opts |> Keyword.get(:after, []) |> List.wrap()
end
147 changes: 147 additions & 0 deletions test/sage/executor/planner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
defmodule Sage.Executor.PlannerTest do
use Sage.EffectsCase
import Sage.Executor.Planner

describe "plan_execution/1" do
test "ignores synchronous operations" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run(:step2, transaction(:t2), compensation())
|> run(:step3, transaction(:t3), compensation())

assert sage |> plan_execution() |> names() == [:step1, :step2, :step3]
end

test "ignores single asynchronous operations" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation())
|> run(:step3, transaction(:t3), compensation())

assert sage |> plan_execution() |> names() == [:step1, :step2, :step3]
end

test "allows asynchronous operations to depend on reachable synchronous operations" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation(), after: :step1)
|> run(:step3, transaction(:t3), compensation())

assert sage |> plan_execution() |> names() == [:step1, :step2, :step3]
end

test "raises on unreachable synchronous operations" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation(), after: :step3)
|> run(:step3, transaction(:t3), compensation())

assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step3 for stage step2", fn ->
plan_execution(sage)
end
end

test "raises on unreachable asynchronous operations" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation(), after: :step4)
|> run(:step3, transaction(:t3), compensation())
|> run_async(:step4, transaction(:t4), compensation())

assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step4 for stage step2", fn ->
plan_execution(sage)
end

sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation(), after: :step5)
|> run_async(:step3, transaction(:t3), compensation())
|> run(:step4, transaction(:t4), compensation())
|> run_async(:step5, transaction(:t5), compensation())

assert_raise Sage.ExecutorPlannerError, "Unreachable dependency step5 for stage step2", fn ->
plan_execution(sage)
end
end

test "raises on dependency on undefined stages" do
sage = run_async(new(), :step1, transaction(:t1), compensation(), after: :undefined)
assert_raise Sage.ExecutorPlannerError, "Unreachable dependency undefined for stage step1", fn ->
plan_execution(sage)
end
end

test "raises on stage that depends on itself" do
sage = run_async(new(), :step1, transaction(:t1), compensation(), after: :step1)
assert_raise Sage.ExecutorPlannerError, "Stage step1 lists itself as a dependency", fn ->
plan_execution(sage)
end

sage =
new()
|> run_async(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t1), compensation(), after: :step2)

assert_raise Sage.ExecutorPlannerError, "Stage step2 lists itself as a dependency", fn ->
plan_execution(sage)
end
end

test "raises on circular dependencies" do
sage =
new()
|> run_async(:step1, transaction(:t1), compensation(), after: :step2)
|> run_async(:step2, transaction(:t1), compensation(), after: :step1)

message = "Could not sort dependencies. There are cycles in the dependency graph"
assert_raise Sage.ExecutorPlannerError, message, fn ->
plan_execution(sage)
end

sage =
new()
|> run_async(:step1, transaction(:t1), compensation(), after: :step3)
|> run_async(:step2, transaction(:t1), compensation(), after: :step1)
|> run_async(:step3, transaction(:t1), compensation(), after: :step2)

message = "Could not sort dependencies. There are cycles in the dependency graph"
assert_raise Sage.ExecutorPlannerError, message, fn ->
plan_execution(sage)
end
end

test "does not reorder asynchronous stages without dependencies" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation())
|> run_async(:step3, transaction(:t3), compensation())
|> run(:step4, transaction(:t4), compensation())

assert sage |> plan_execution() |> names() == [:step1, :step2, :step3, :step4]
end

test "orders asynchronous stages by their dependencies topology" do
sage =
new()
|> run(:step1, transaction(:t1), compensation())
|> run_async(:step2, transaction(:t2), compensation(), after: :step4)
|> run_async(:step3, transaction(:t3), compensation(), after: :step1)
|> run_async(:step4, transaction(:t4), compensation())
|> run_async(:step5, transaction(:t5), compensation(), after: :step3)
|> run_async(:step6, transaction(:t6), compensation(), after: [:step3, :step1])
|> run_async(:step7, transaction(:t7), compensation())

expected_plan = [:step1, :step3, :step4, :step2, :step5, :step6, :step7]
assert sage |> plan_execution() |> names() == expected_plan
end
end

defp names(stages), do: Enum.map(stages, &elem(&1, 0))
end

0 comments on commit b0221f6

Please sign in to comment.