From ac61db84263fe2a68a0fcd655ae64809a9fca539 Mon Sep 17 00:00:00 2001 From: Jamie Rodriguez Date: Wed, 29 Mar 2023 14:46:34 +0200 Subject: [PATCH] Handle cluster host deregistration in the cluster aggregate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Rubén Torrero Marijnissen --- lib/trento/domain/cluster/cluster.ex | 50 ++++++++++++- lib/trento/domain/cluster/lifespan.ex | 10 ++- test/trento/domain/cluster/cluster_test.exs | 80 +++++++++++++++++++++ 3 files changed, 138 insertions(+), 2 deletions(-) diff --git a/lib/trento/domain/cluster/cluster.ex b/lib/trento/domain/cluster/cluster.ex index ddd50e3264..78da492a38 100644 --- a/lib/trento/domain/cluster/cluster.ex +++ b/lib/trento/domain/cluster/cluster.ex @@ -68,6 +68,7 @@ defmodule Trento.Domain.Cluster do alias Trento.Domain.Commands.{ CompleteChecksExecution, + DeregisterClusterHost, RegisterClusterHost, RollUpCluster, SelectChecks @@ -79,6 +80,7 @@ defmodule Trento.Domain.Cluster do ChecksExecutionStarted, ChecksSelected, ClusterChecksHealthChanged, + ClusterDeregistered, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -86,7 +88,8 @@ defmodule Trento.Domain.Cluster do ClusterRolledUp, ClusterRollUpRequested, HostAddedToCluster, - HostChecksExecutionCompleted + HostChecksExecutionCompleted, + HostRemovedFromCluster } @required_fields [] @@ -113,6 +116,8 @@ defmodule Trento.Domain.Cluster do field :hosts, {:array, :string}, default: [] field :selected_checks, {:array, :string}, default: [] field :rolling_up, :boolean, default: false + field :deregistered_at, :utc_datetime_usec, default: nil + embeds_one :details, HanaClusterDetails end @@ -262,6 +267,26 @@ defmodule Trento.Domain.Cluster do } end + def execute( + %Cluster{cluster_id: cluster_id} = cluster, + %DeregisterClusterHost{ + host_id: host_id, + cluster_id: cluster_id + } = command + ) do + cluster + |> Multi.new() + |> Multi.execute(fn _ -> + [ + %HostRemovedFromCluster{ + cluster_id: cluster_id, + host_id: host_id + } + ] + end) + |> Multi.execute(&maybe_emit_cluster_deregistered_event(&1, command)) + end + def apply( %Cluster{} = cluster, %ClusterRegistered{ @@ -371,6 +396,17 @@ defmodule Trento.Domain.Cluster do snapshot end + def apply(%Cluster{hosts: hosts, hosts_number: hosts_number} = cluster, %HostRemovedFromCluster{ + host_id: host_id + }) do + %Cluster{cluster | hosts: List.delete(hosts, host_id), hosts_number: hosts_number - 1} + end + + # Deregistration + def apply(%Cluster{} = cluster, %ClusterDeregistered{deregistered_at: deregistered_at}) do + %Cluster{cluster | deregistered_at: deregistered_at} + end + def apply(cluster, %legacy_event{}) when legacy_event in @legacy_events, do: cluster defp maybe_emit_host_added_to_cluster_event( @@ -469,6 +505,18 @@ defmodule Trento.Domain.Cluster do } end + defp maybe_emit_cluster_deregistered_event( + %Cluster{cluster_id: cluster_id, hosts_number: hosts_number}, + %DeregisterClusterHost{ + cluster_id: cluster_id, + deregistered_at: deregistered_at + } + ) do + if hosts_number == 0 do + %ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at} + end + end + defp maybe_add_checks_health(healths, _, []), do: healths defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths] diff --git a/lib/trento/domain/cluster/lifespan.ex b/lib/trento/domain/cluster/lifespan.ex index 3d5b35bb2b..cb250a0e7c 100644 --- a/lib/trento/domain/cluster/lifespan.ex +++ b/lib/trento/domain/cluster/lifespan.ex @@ -9,13 +9,21 @@ defmodule Trento.Domain.Cluster.Lifespan do alias Commanded.Aggregates.DefaultLifespan - alias Trento.Domain.Events.ClusterRollUpRequested + alias Trento.Domain.Events.{ + ClusterDeregistered, + ClusterRollUpRequested + } @doc """ The cluster aggregate will be stopped after a ClusterRollUpRequested event is received. This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream. + + The cluster aggregate will be stopped after a ClusterDeregistered event is received. + This event is emitted when all hosts belonging to a cluster have been decommissioned, + meaning the cluster aggregate can be safely stopped. """ def after_event(%ClusterRollUpRequested{}), do: :stop + def after_event(%ClusterDeregistered{}), do: :stop def after_event(event), do: DefaultLifespan.after_event(event) def after_command(command), do: DefaultLifespan.after_command(command) diff --git a/test/trento/domain/cluster/cluster_test.exs b/test/trento/domain/cluster/cluster_test.exs index 06a43dd515..61b43641a0 100644 --- a/test/trento/domain/cluster/cluster_test.exs +++ b/test/trento/domain/cluster/cluster_test.exs @@ -3,10 +3,12 @@ defmodule Trento.ClusterTest do import Trento.Factory + alias Trento.Domain.Events.HostRemovedFromCluster alias Trento.Support.StructHelper alias Trento.Domain.Commands.{ CompleteChecksExecution, + DeregisterClusterHost, RegisterClusterHost, RollUpCluster, SelectChecks @@ -18,6 +20,7 @@ defmodule Trento.ClusterTest do ChecksExecutionStarted, ChecksSelected, ClusterChecksHealthChanged, + ClusterDeregistered, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -757,6 +760,83 @@ defmodule Trento.ClusterTest do end end + describe "deregistration" do + test "should emit the HostRemovedFromCluster event after a DeregisterClusterHost command and remove the host from the cluster aggregate state" do + cluster_id = Faker.UUID.v4() + dat = DateTime.utc_now() + host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + host_2_added_event = + %{host_id: host_2_id} = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2), + host_1_added_event, + host_2_added_event + ], + [ + %DeregisterClusterHost{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + } + ], + [ + %HostRemovedFromCluster{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id + } + ], + fn cluster -> + assert %Cluster{hosts: [^host_2_id], hosts_number: 1} = cluster + end + ) + end + + test "should emit the ClusterDeregistered event when the last ClusterHost is deregistered and set the deregistration date into the state" do + cluster_id = Faker.UUID.v4() + dat = DateTime.utc_now() + host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + host_2_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2), + host_1_added_event, + host_2_added_event + ], + [ + %DeregisterClusterHost{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + }, + %DeregisterClusterHost{ + host_id: host_2_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + } + ], + [ + %HostRemovedFromCluster{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id + }, + %HostRemovedFromCluster{ + host_id: host_2_added_event.host_id, + cluster_id: cluster_id + }, + %ClusterDeregistered{ + cluster_id: cluster_id, + deregistered_at: dat + } + ], + fn cluster -> assert dat == cluster.deregistered_at end + ) + end + end + describe "legacy events" do test "should ignore legacy events and not update the aggregate" do cluster_id = Faker.UUID.v4()