Skip to content

Commit

Permalink
Process execution completed (#922)
Browse files Browse the repository at this point in the history
* Process ExecutionCompleted event

* Implement domain logic to handle executions from wanda

* Add wanda event handling policy

* Use new commanded mocked version of tests

* Use uuid as cluster_id type

* Correct some typos

* Revert back the policy usage in the processor

* Dispatch command in handle itself
  • Loading branch information
arbulu89 authored Nov 4, 2022
1 parent c2f95e2 commit 87628c9
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,44 @@ defmodule Trento.Integration.Checks.Wanda.Messaging.AMQP.Processor do

@behaviour GenRMQ.Processor

alias Trento.Contracts

alias Trento.Checks.V1.ExecutionCompleted
alias Trento.Domain.Commands.CompleteChecksExecutionWanda

require Logger
require Trento.Domain.Enums.Health, as: Health

def process(%GenRMQ.Message{payload: payload} = message) do
Logger.debug("Received message: #{inspect(message)}")

case Contracts.from_event(payload) do
{:ok, event} ->
handle(event)

def process(%GenRMQ.Message{} = message) do
Logger.info("Received message: #{inspect(message)}")
:ok
{:error, reason} ->
{:error, reason}
end
end

defp handle(%ExecutionCompleted{
execution_id: execution_id,
group_id: group_id,
result: result
}) do
commanded().dispatch(
CompleteChecksExecutionWanda.new!(%{
cluster_id: group_id,
health: map_health(result)
}),
correlation_id: execution_id
)
end

defp map_health(:CRITICAL), do: Health.critical()
defp map_health(:WARNING), do: Health.warning()
defp map_health(:PASSING), do: Health.passing()

defp commanded,
do: Application.fetch_env!(:trento, Trento.Commanded)[:adapter]
end
41 changes: 41 additions & 0 deletions lib/trento/domain/cluster/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Trento.Domain.Cluster do
alias Trento.Domain.Commands.{
AbortClusterRollup,
CompleteChecksExecution,
CompleteChecksExecutionWanda,
RegisterClusterHost,
RequestChecksExecution,
RollupCluster,
Expand All @@ -30,6 +31,7 @@ defmodule Trento.Domain.Cluster do
ChecksExecutionRequested,
ChecksExecutionStarted,
ChecksSelected,
ClusterChecksHealthChanged,
ClusterDetailsUpdated,
ClusterDiscoveredHealthChanged,
ClusterHealthChanged,
Expand Down Expand Up @@ -241,6 +243,20 @@ defmodule Trento.Domain.Cluster do
|> Multi.execute(&maybe_emit_cluster_health_changed_event/1)
end

def execute(
%Cluster{
cluster_id: cluster_id
} = cluster,
%CompleteChecksExecutionWanda{
cluster_id: cluster_id
} = command
) do
cluster
|> Multi.new()
|> Multi.execute(&maybe_emit_cluster_checks_health_changed_event(&1, command))
|> Multi.execute(&maybe_emit_cluster_health_changed_event/1)
end

def execute(
%Cluster{
cluster_id: cluster_id,
Expand Down Expand Up @@ -317,6 +333,15 @@ defmodule Trento.Domain.Cluster do
}
end

def apply(%Cluster{} = cluster, %ClusterChecksHealthChanged{
checks_health: checks_health
}) do
%Cluster{
cluster
| checks_health: checks_health
}
end

def apply(
%Cluster{} = cluster,
%ClusterDetailsUpdated{
Expand Down Expand Up @@ -589,6 +614,22 @@ defmodule Trento.Domain.Cluster do
}
end

defp maybe_emit_cluster_checks_health_changed_event(
%Cluster{checks_health: checks_health},
%CompleteChecksExecutionWanda{health: checks_health}
),
do: nil

defp maybe_emit_cluster_checks_health_changed_event(
%Cluster{cluster_id: cluster_id},
%CompleteChecksExecutionWanda{health: checks_health}
) do
%ClusterChecksHealthChanged{
cluster_id: cluster_id,
checks_health: checks_health
}
end

defp maybe_add_checks_health(healths, _, []), do: healths
defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Trento.Domain.Commands.CompleteChecksExecutionWanda do
@moduledoc """
Complete the checks execution with the incoming result
"""

@required_fields :all

use Trento.Command

require Trento.Domain.Enums.Health, as: Health

defcommand do
field :cluster_id, Ecto.UUID
field :health, Ecto.Enum, values: Health.values()
end
end
14 changes: 14 additions & 0 deletions lib/trento/domain/cluster/events/cluster_checks_health_changed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Trento.Domain.Events.ClusterChecksHealthChanged do
@moduledoc """
This event is emitted when the checks health of a cluster changes.
"""

use Trento.Event

require Trento.Domain.Enums.Health, as: Health

defevent do
field :cluster_id, Ecto.UUID
field :checks_health, Ecto.Enum, values: Health.values()
end
end
4 changes: 3 additions & 1 deletion lib/trento/infrastructure/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Trento.Router do
alias Trento.Domain.Commands.{
AbortClusterRollup,
CompleteChecksExecution,
CompleteChecksExecutionWanda,
RegisterApplicationInstance,
RegisterClusterHost,
RegisterDatabaseInstance,
Expand Down Expand Up @@ -39,7 +40,8 @@ defmodule Trento.Router do
SelectChecks,
RequestChecksExecution,
StartChecksExecution,
CompleteChecksExecution
CompleteChecksExecution,
CompleteChecksExecutionWanda
],
to: Cluster,
lifespan: Cluster.Lifespan
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Trento.Integration.Checks.Wanda.Messaging.AMQP.ProcessorTest do
@moduledoc false
use ExUnit.Case, async: true
use Trento.DataCase

import Mox

alias Trento.Integration.Checks.Wanda.Messaging.AMQP.Processor

alias Trento.Checks.V1.ExecutionCompleted
alias Trento.Contracts
alias Trento.Domain.Commands.CompleteChecksExecutionWanda

require Trento.Domain.Enums.Health, as: Health

describe "process" do
test "should process ExecutionCompleted and dispatch command" do
execution_id = UUID.uuid4()
group_id = UUID.uuid4()

execution_completed =
Contracts.to_event(%ExecutionCompleted{
execution_id: execution_id,
group_id: group_id,
result: :PASSING
})

message = %GenRMQ.Message{payload: execution_completed, attributes: %{}, channel: nil}

expect(Trento.Commanded.Mock, :dispatch, fn command, opts ->
assert %CompleteChecksExecutionWanda{
cluster_id: ^group_id,
health: Health.passing()
} = command

assert [correlation_id: ^execution_id] = opts
:ok
end)

assert :ok = Processor.process(message)
end

test "should return error if the event handling fails" do
execution_completed =
Contracts.to_event(%ExecutionCompleted{
execution_id: UUID.uuid4(),
group_id: "invalid-id",
result: :PASSING
})

message = %GenRMQ.Message{payload: execution_completed, attributes: %{}, channel: nil}

assert_raise RuntimeError,
"%{cluster_id: [\"is invalid\"]}",
fn -> Processor.process(message) end
end

@tag capture_log: true
test "should return error if the event cannot be decoded" do
message = %GenRMQ.Message{payload: "bad-payload", attributes: %{}, channel: nil}
assert {:error, :decoding_error} = Processor.process(message)
end
end
end
115 changes: 114 additions & 1 deletion test/trento/domain/cluster/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Trento.ClusterTest do
alias Trento.Domain.Commands.{
AbortClusterRollup,
CompleteChecksExecution,
CompleteChecksExecutionWanda,
RegisterClusterHost,
RequestChecksExecution,
RollupCluster,
Expand All @@ -20,6 +21,7 @@ defmodule Trento.ClusterTest do
ChecksExecutionRequested,
ChecksExecutionStarted,
ChecksSelected,
ClusterChecksHealthChanged,
ClusterDetailsUpdated,
ClusterDiscoveredHealthChanged,
ClusterHealthChanged,
Expand All @@ -36,6 +38,8 @@ defmodule Trento.ClusterTest do
HostExecution
}

require Trento.Domain.Enums.Health, as: Health

describe "cluster registration" do
test "should register a cluster and add the node host to the cluster if the node is a DC" do
cluster_id = Faker.UUID.v4()
Expand Down Expand Up @@ -541,6 +545,115 @@ defmodule Trento.ClusterTest do
end
end

describe "checks execution using wanda" do
test "should change health state when checks health changes" do
cluster_id = Faker.UUID.v4()
selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end)

assert_events_and_state(
[
build(:cluster_registered_event, cluster_id: cluster_id, health: Health.passing()),
%ChecksSelected{
cluster_id: cluster_id,
checks: selected_checks
}
],
CompleteChecksExecutionWanda.new!(%{
cluster_id: cluster_id,
health: Health.critical()
}),
[
%ClusterChecksHealthChanged{
cluster_id: cluster_id,
checks_health: Health.critical()
},
%ClusterHealthChanged{
cluster_id: cluster_id,
health: Health.critical()
}
],
fn cluster ->
assert %Cluster{
cluster_id: ^cluster_id,
health: Health.critical(),
checks_health: Health.critical()
} = cluster
end
)
end

test "should not change the the cluster aggregated health if discovery health is worse" do
cluster_id = Faker.UUID.v4()
selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end)

assert_events_and_state(
[
build(:cluster_registered_event, cluster_id: cluster_id, health: Health.critical()),
%ChecksSelected{
cluster_id: cluster_id,
checks: selected_checks
},
%ClusterDiscoveredHealthChanged{
cluster_id: cluster_id,
discovered_health: Health.critical()
}
],
CompleteChecksExecutionWanda.new!(%{
cluster_id: cluster_id,
health: Health.warning()
}),
[
%ClusterChecksHealthChanged{
cluster_id: cluster_id,
checks_health: Health.warning()
}
],
fn cluster ->
assert %Cluster{
cluster_id: ^cluster_id,
health: Health.critical(),
checks_health: Health.warning()
} = cluster
end
)
end

test "should not change health if it is already critical" do
cluster_id = Faker.UUID.v4()
selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end)

assert_events_and_state(
[
build(:cluster_registered_event, cluster_id: cluster_id, health: Health.critical()),
%ChecksSelected{
cluster_id: cluster_id,
checks: selected_checks
},
%ClusterChecksHealthChanged{
cluster_id: cluster_id,
checks_health: Health.critical()
},
%ClusterDiscoveredHealthChanged{
cluster_id: cluster_id,
discovered_health: Health.critical()
}
],
CompleteChecksExecutionWanda.new!(%{
cluster_id: cluster_id,
health: Health.critical()
}),
[],
fn cluster ->
assert %Cluster{
cluster_id: ^cluster_id,
health: Health.critical(),
checks_health: Health.critical()
} = cluster
end
)
end
end

describe "discovered health" do
test "should change the discovered health and the cluster aggregated health" do
cluster_registered_event = build(:cluster_registered_event, health: :passing)
Expand Down Expand Up @@ -631,7 +744,7 @@ defmodule Trento.ClusterTest do
)
end

test "should not change the the cluster aggregated health if checks health is worst" do
test "should not change the the cluster aggregated health if checks health is worse" do
cluster_registered_event =
build(:cluster_registered_event, health: :passing, provider: :azure)

Expand Down

0 comments on commit 87628c9

Please sign in to comment.