diff --git a/lib/trento/application/integration/checks/adapter/wanda/messaging/amqp/processor.ex b/lib/trento/application/integration/checks/adapter/wanda/messaging/amqp/processor.ex index f8d65f5f12..ee78d429b8 100644 --- a/lib/trento/application/integration/checks/adapter/wanda/messaging/amqp/processor.ex +++ b/lib/trento/application/integration/checks/adapter/wanda/messaging/amqp/processor.ex @@ -5,10 +5,44 @@ defmodule Trento.Integration.Checks.Wanda.Messaging.AMQP.Processor do @behaviour GenRMQ.Processor + alias Trento.Contracts + + alias Trento.Checks.V1.ExecutionCompleted + alias Trento.Domain.Commands.CompleteChecksExecutionWanda + require Logger + require Trento.Domain.Enums.Health, as: Health + + def process(%GenRMQ.Message{payload: payload} = message) do + Logger.debug("Received message: #{inspect(message)}") + + case Contracts.from_event(payload) do + {:ok, event} -> + handle(event) - def process(%GenRMQ.Message{} = message) do - Logger.info("Received message: #{inspect(message)}") - :ok + {:error, reason} -> + {:error, reason} + end end + + defp handle(%ExecutionCompleted{ + execution_id: execution_id, + group_id: group_id, + result: result + }) do + commanded().dispatch( + CompleteChecksExecutionWanda.new!(%{ + cluster_id: group_id, + health: map_health(result) + }), + correlation_id: execution_id + ) + end + + defp map_health(:CRITICAL), do: Health.critical() + defp map_health(:WARNING), do: Health.warning() + defp map_health(:PASSING), do: Health.passing() + + defp commanded, + do: Application.fetch_env!(:trento, Trento.Commanded)[:adapter] end diff --git a/lib/trento/domain/cluster/cluster.ex b/lib/trento/domain/cluster/cluster.ex index e285c9a413..f9cba6df2b 100644 --- a/lib/trento/domain/cluster/cluster.ex +++ b/lib/trento/domain/cluster/cluster.ex @@ -18,6 +18,7 @@ defmodule Trento.Domain.Cluster do alias Trento.Domain.Commands.{ AbortClusterRollup, CompleteChecksExecution, + CompleteChecksExecutionWanda, RegisterClusterHost, RequestChecksExecution, RollupCluster, @@ -30,6 +31,7 @@ defmodule Trento.Domain.Cluster do ChecksExecutionRequested, ChecksExecutionStarted, ChecksSelected, + ClusterChecksHealthChanged, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -241,6 +243,20 @@ defmodule Trento.Domain.Cluster do |> Multi.execute(&maybe_emit_cluster_health_changed_event/1) end + def execute( + %Cluster{ + cluster_id: cluster_id + } = cluster, + %CompleteChecksExecutionWanda{ + cluster_id: cluster_id + } = command + ) do + cluster + |> Multi.new() + |> Multi.execute(&maybe_emit_cluster_checks_health_changed_event(&1, command)) + |> Multi.execute(&maybe_emit_cluster_health_changed_event/1) + end + def execute( %Cluster{ cluster_id: cluster_id, @@ -317,6 +333,15 @@ defmodule Trento.Domain.Cluster do } end + def apply(%Cluster{} = cluster, %ClusterChecksHealthChanged{ + checks_health: checks_health + }) do + %Cluster{ + cluster + | checks_health: checks_health + } + end + def apply( %Cluster{} = cluster, %ClusterDetailsUpdated{ @@ -589,6 +614,22 @@ defmodule Trento.Domain.Cluster do } end + defp maybe_emit_cluster_checks_health_changed_event( + %Cluster{checks_health: checks_health}, + %CompleteChecksExecutionWanda{health: checks_health} + ), + do: nil + + defp maybe_emit_cluster_checks_health_changed_event( + %Cluster{cluster_id: cluster_id}, + %CompleteChecksExecutionWanda{health: checks_health} + ) do + %ClusterChecksHealthChanged{ + cluster_id: cluster_id, + checks_health: checks_health + } + end + defp maybe_add_checks_health(healths, _, []), do: healths defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths] diff --git a/lib/trento/domain/cluster/commands/complete_checks_execution_wanda.ex b/lib/trento/domain/cluster/commands/complete_checks_execution_wanda.ex new file mode 100644 index 0000000000..110514a181 --- /dev/null +++ b/lib/trento/domain/cluster/commands/complete_checks_execution_wanda.ex @@ -0,0 +1,16 @@ +defmodule Trento.Domain.Commands.CompleteChecksExecutionWanda do + @moduledoc """ + Complete the checks execution with the incoming result + """ + + @required_fields :all + + use Trento.Command + + require Trento.Domain.Enums.Health, as: Health + + defcommand do + field :cluster_id, Ecto.UUID + field :health, Ecto.Enum, values: Health.values() + end +end diff --git a/lib/trento/domain/cluster/events/cluster_checks_health_changed.ex b/lib/trento/domain/cluster/events/cluster_checks_health_changed.ex new file mode 100644 index 0000000000..379f2bf865 --- /dev/null +++ b/lib/trento/domain/cluster/events/cluster_checks_health_changed.ex @@ -0,0 +1,14 @@ +defmodule Trento.Domain.Events.ClusterChecksHealthChanged do + @moduledoc """ + This event is emitted when the checks health of a cluster changes. + """ + + use Trento.Event + + require Trento.Domain.Enums.Health, as: Health + + defevent do + field :cluster_id, Ecto.UUID + field :checks_health, Ecto.Enum, values: Health.values() + end +end diff --git a/lib/trento/infrastructure/router.ex b/lib/trento/infrastructure/router.ex index 62a06f7b56..cdcec4ef54 100644 --- a/lib/trento/infrastructure/router.ex +++ b/lib/trento/infrastructure/router.ex @@ -12,6 +12,7 @@ defmodule Trento.Router do alias Trento.Domain.Commands.{ AbortClusterRollup, CompleteChecksExecution, + CompleteChecksExecutionWanda, RegisterApplicationInstance, RegisterClusterHost, RegisterDatabaseInstance, @@ -39,7 +40,8 @@ defmodule Trento.Router do SelectChecks, RequestChecksExecution, StartChecksExecution, - CompleteChecksExecution + CompleteChecksExecution, + CompleteChecksExecutionWanda ], to: Cluster, lifespan: Cluster.Lifespan diff --git a/test/trento/application/integration/checks/wanda/messaging/amqp/processor_test.exs b/test/trento/application/integration/checks/wanda/messaging/amqp/processor_test.exs new file mode 100644 index 0000000000..9856ded8e7 --- /dev/null +++ b/test/trento/application/integration/checks/wanda/messaging/amqp/processor_test.exs @@ -0,0 +1,64 @@ +defmodule Trento.Integration.Checks.Wanda.Messaging.AMQP.ProcessorTest do + @moduledoc false + use ExUnit.Case, async: true + use Trento.DataCase + + import Mox + + alias Trento.Integration.Checks.Wanda.Messaging.AMQP.Processor + + alias Trento.Checks.V1.ExecutionCompleted + alias Trento.Contracts + alias Trento.Domain.Commands.CompleteChecksExecutionWanda + + require Trento.Domain.Enums.Health, as: Health + + describe "process" do + test "should process ExecutionCompleted and dispatch command" do + execution_id = UUID.uuid4() + group_id = UUID.uuid4() + + execution_completed = + Contracts.to_event(%ExecutionCompleted{ + execution_id: execution_id, + group_id: group_id, + result: :PASSING + }) + + message = %GenRMQ.Message{payload: execution_completed, attributes: %{}, channel: nil} + + expect(Trento.Commanded.Mock, :dispatch, fn command, opts -> + assert %CompleteChecksExecutionWanda{ + cluster_id: ^group_id, + health: Health.passing() + } = command + + assert [correlation_id: ^execution_id] = opts + :ok + end) + + assert :ok = Processor.process(message) + end + + test "should return error if the event handling fails" do + execution_completed = + Contracts.to_event(%ExecutionCompleted{ + execution_id: UUID.uuid4(), + group_id: "invalid-id", + result: :PASSING + }) + + message = %GenRMQ.Message{payload: execution_completed, attributes: %{}, channel: nil} + + assert_raise RuntimeError, + "%{cluster_id: [\"is invalid\"]}", + fn -> Processor.process(message) end + end + + @tag capture_log: true + test "should return error if the event cannot be decoded" do + message = %GenRMQ.Message{payload: "bad-payload", attributes: %{}, channel: nil} + assert {:error, :decoding_error} = Processor.process(message) + end + end +end diff --git a/test/trento/domain/cluster/cluster_test.exs b/test/trento/domain/cluster/cluster_test.exs index 3b8749aa23..f627e61587 100644 --- a/test/trento/domain/cluster/cluster_test.exs +++ b/test/trento/domain/cluster/cluster_test.exs @@ -8,6 +8,7 @@ defmodule Trento.ClusterTest do alias Trento.Domain.Commands.{ AbortClusterRollup, CompleteChecksExecution, + CompleteChecksExecutionWanda, RegisterClusterHost, RequestChecksExecution, RollupCluster, @@ -20,6 +21,7 @@ defmodule Trento.ClusterTest do ChecksExecutionRequested, ChecksExecutionStarted, ChecksSelected, + ClusterChecksHealthChanged, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -36,6 +38,8 @@ defmodule Trento.ClusterTest do HostExecution } + require Trento.Domain.Enums.Health, as: Health + describe "cluster registration" do test "should register a cluster and add the node host to the cluster if the node is a DC" do cluster_id = Faker.UUID.v4() @@ -541,6 +545,115 @@ defmodule Trento.ClusterTest do end end + describe "checks execution using wanda" do + test "should change health state when checks health changes" do + cluster_id = Faker.UUID.v4() + selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, health: Health.passing()), + %ChecksSelected{ + cluster_id: cluster_id, + checks: selected_checks + } + ], + CompleteChecksExecutionWanda.new!(%{ + cluster_id: cluster_id, + health: Health.critical() + }), + [ + %ClusterChecksHealthChanged{ + cluster_id: cluster_id, + checks_health: Health.critical() + }, + %ClusterHealthChanged{ + cluster_id: cluster_id, + health: Health.critical() + } + ], + fn cluster -> + assert %Cluster{ + cluster_id: ^cluster_id, + health: Health.critical(), + checks_health: Health.critical() + } = cluster + end + ) + end + + test "should not change the the cluster aggregated health if discovery health is worse" do + cluster_id = Faker.UUID.v4() + selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, health: Health.critical()), + %ChecksSelected{ + cluster_id: cluster_id, + checks: selected_checks + }, + %ClusterDiscoveredHealthChanged{ + cluster_id: cluster_id, + discovered_health: Health.critical() + } + ], + CompleteChecksExecutionWanda.new!(%{ + cluster_id: cluster_id, + health: Health.warning() + }), + [ + %ClusterChecksHealthChanged{ + cluster_id: cluster_id, + checks_health: Health.warning() + } + ], + fn cluster -> + assert %Cluster{ + cluster_id: ^cluster_id, + health: Health.critical(), + checks_health: Health.warning() + } = cluster + end + ) + end + + test "should not change health if it is already critical" do + cluster_id = Faker.UUID.v4() + selected_checks = Enum.map(0..4, fn _ -> Faker.Cat.name() end) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, health: Health.critical()), + %ChecksSelected{ + cluster_id: cluster_id, + checks: selected_checks + }, + %ClusterChecksHealthChanged{ + cluster_id: cluster_id, + checks_health: Health.critical() + }, + %ClusterDiscoveredHealthChanged{ + cluster_id: cluster_id, + discovered_health: Health.critical() + } + ], + CompleteChecksExecutionWanda.new!(%{ + cluster_id: cluster_id, + health: Health.critical() + }), + [], + fn cluster -> + assert %Cluster{ + cluster_id: ^cluster_id, + health: Health.critical(), + checks_health: Health.critical() + } = cluster + end + ) + end + end + describe "discovered health" do test "should change the discovered health and the cluster aggregated health" do cluster_registered_event = build(:cluster_registered_event, health: :passing) @@ -631,7 +744,7 @@ defmodule Trento.ClusterTest do ) end - test "should not change the the cluster aggregated health if checks health is worst" do + test "should not change the the cluster aggregated health if checks health is worse" do cluster_registered_event = build(:cluster_registered_event, health: :passing, provider: :azure)