From 9651b5e9fda0068a8a1a77da514d67dce36da434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Torrero=20Marijnissen?= Date: Fri, 31 Mar 2023 09:49:27 +0200 Subject: [PATCH] Cluster deregistration process manager & aggregate changes (#1278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Make Process Manager aware of cluster deregistration Co-authored-by: Rubén Torrero Marijnissen * Handle cluster host deregistration in the cluster aggregate Co-authored-by: Rubén Torrero Marijnissen * Route the DeregisterClusterHost to the cluster aggregate --------- Co-authored-by: Jamie Rodriguez --- .../deregistration_process_manager.ex | 51 +++++++++-- lib/trento/domain/cluster/cluster.ex | 48 +++++++++- lib/trento/domain/cluster/lifespan.ex | 10 ++- lib/trento/infrastructure/router.ex | 2 + .../deregistration_process_manager_test.exs | 89 +++++++++++++++++-- test/trento/domain/cluster/cluster_test.exs | 82 ++++++++++++++++- 6 files changed, 263 insertions(+), 19 deletions(-) diff --git a/lib/trento/application/process_managers/deregistration_process_manager.ex b/lib/trento/application/process_managers/deregistration_process_manager.ex index 96337d27ae..e804cb97d0 100644 --- a/lib/trento/application/process_managers/deregistration_process_manager.ex +++ b/lib/trento/application/process_managers/deregistration_process_manager.ex @@ -18,19 +18,25 @@ defmodule Trento.DeregistrationProcessManager do name: "deregistration_process_manager" deftype do - field :host_id, Ecto.UUID + field :cluster_id, Ecto.UUID end alias Trento.DeregistrationProcessManager alias Trento.Domain.Events.{ + ClusterRolledUp, + HostAddedToCluster, HostDeregistered, HostDeregistrationRequested, HostRegistered, + HostRemovedFromCluster, HostRolledUp } - alias Trento.Domain.Commands.DeregisterHost + alias Trento.Domain.Commands.{ + DeregisterClusterHost, + DeregisterHost + } @doc """ The process manager is interested in HostRegistered which starts or joins an existing process @@ -42,24 +48,55 @@ defmodule Trento.DeregistrationProcessManager do The process manager starts with a Deregistration request and stops when the host is fully deregistered. """ + # Start the Process Manager def interested?(%HostRegistered{host_id: host_id}), do: {:start, host_id} def interested?(%HostRolledUp{host_id: host_id}), do: {:start, host_id} + def interested?(%HostAddedToCluster{host_id: host_id}), do: {:start, host_id} + def interested?(%ClusterRolledUp{snapshot: %{hosts: hosts}}), do: {:start, hosts} + # Continue the Process Manager def interested?(%HostDeregistrationRequested{host_id: host_id}), do: {:continue, host_id} + def interested?(%HostRemovedFromCluster{host_id: host_id}), do: {:continue, host_id} + # Stop the Process Manager def interested?(%HostDeregistered{host_id: host_id}), do: {:stop, host_id} + def interested?(_event), do: false - def handle(%DeregistrationProcessManager{}, %HostDeregistrationRequested{ + # Deregister host that doesn't belong to any cluster + def handle(%DeregistrationProcessManager{cluster_id: nil}, %HostDeregistrationRequested{ host_id: host_id, requested_at: requested_at }) do %DeregisterHost{host_id: host_id, deregistered_at: requested_at} end - def apply(%DeregistrationProcessManager{} = state, %HostRegistered{host_id: host_id}) do - %DeregistrationProcessManager{state | host_id: host_id} + # First step in host deregistration when host belongs to a cluster + def handle(%DeregistrationProcessManager{cluster_id: cluster_id}, %HostDeregistrationRequested{ + host_id: host_id, + requested_at: requested_at + }) do + [ + %DeregisterClusterHost{ + host_id: host_id, + cluster_id: cluster_id, + deregistered_at: requested_at + }, + %DeregisterHost{host_id: host_id, deregistered_at: requested_at} + ] + end + + def apply(%DeregistrationProcessManager{} = state, %ClusterRolledUp{ + cluster_id: cluster_id + }) do + %DeregistrationProcessManager{state | cluster_id: cluster_id} + end + + def apply(%DeregistrationProcessManager{} = state, %HostAddedToCluster{ + cluster_id: cluster_id + }) do + %DeregistrationProcessManager{state | cluster_id: cluster_id} end - def apply(%DeregistrationProcessManager{} = state, %HostRolledUp{host_id: host_id}) do - %DeregistrationProcessManager{state | host_id: host_id} + def apply(%DeregistrationProcessManager{} = state, %HostRemovedFromCluster{}) do + %DeregistrationProcessManager{state | cluster_id: nil} end end diff --git a/lib/trento/domain/cluster/cluster.ex b/lib/trento/domain/cluster/cluster.ex index ddd50e3264..15670be0c2 100644 --- a/lib/trento/domain/cluster/cluster.ex +++ b/lib/trento/domain/cluster/cluster.ex @@ -68,6 +68,7 @@ defmodule Trento.Domain.Cluster do alias Trento.Domain.Commands.{ CompleteChecksExecution, + DeregisterClusterHost, RegisterClusterHost, RollUpCluster, SelectChecks @@ -79,6 +80,7 @@ defmodule Trento.Domain.Cluster do ChecksExecutionStarted, ChecksSelected, ClusterChecksHealthChanged, + ClusterDeregistered, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -86,7 +88,8 @@ defmodule Trento.Domain.Cluster do ClusterRolledUp, ClusterRollUpRequested, HostAddedToCluster, - HostChecksExecutionCompleted + HostChecksExecutionCompleted, + HostRemovedFromCluster } @required_fields [] @@ -113,6 +116,8 @@ defmodule Trento.Domain.Cluster do field :hosts, {:array, :string}, default: [] field :selected_checks, {:array, :string}, default: [] field :rolling_up, :boolean, default: false + field :deregistered_at, :utc_datetime_usec, default: nil + embeds_one :details, HanaClusterDetails end @@ -262,6 +267,24 @@ defmodule Trento.Domain.Cluster do } end + def execute( + %Cluster{cluster_id: cluster_id} = cluster, + %DeregisterClusterHost{ + host_id: host_id, + cluster_id: cluster_id + } = command + ) do + cluster + |> Multi.new() + |> Multi.execute(fn _ -> + %HostRemovedFromCluster{ + cluster_id: cluster_id, + host_id: host_id + } + end) + |> Multi.execute(&maybe_emit_cluster_deregistered_event(&1, command)) + end + def apply( %Cluster{} = cluster, %ClusterRegistered{ @@ -371,6 +394,17 @@ defmodule Trento.Domain.Cluster do snapshot end + def apply(%Cluster{hosts: hosts} = cluster, %HostRemovedFromCluster{ + host_id: host_id + }) do + %Cluster{cluster | hosts: List.delete(hosts, host_id)} + end + + # Deregistration + def apply(%Cluster{} = cluster, %ClusterDeregistered{deregistered_at: deregistered_at}) do + %Cluster{cluster | deregistered_at: deregistered_at} + end + def apply(cluster, %legacy_event{}) when legacy_event in @legacy_events, do: cluster defp maybe_emit_host_added_to_cluster_event( @@ -469,6 +503,18 @@ defmodule Trento.Domain.Cluster do } end + defp maybe_emit_cluster_deregistered_event( + %Cluster{cluster_id: cluster_id, hosts: []}, + %DeregisterClusterHost{ + cluster_id: cluster_id, + deregistered_at: deregistered_at + } + ) do + %ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at} + end + + defp maybe_emit_cluster_deregistered_event(_, _), do: nil + defp maybe_add_checks_health(healths, _, []), do: healths defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths] diff --git a/lib/trento/domain/cluster/lifespan.ex b/lib/trento/domain/cluster/lifespan.ex index 3d5b35bb2b..cb250a0e7c 100644 --- a/lib/trento/domain/cluster/lifespan.ex +++ b/lib/trento/domain/cluster/lifespan.ex @@ -9,13 +9,21 @@ defmodule Trento.Domain.Cluster.Lifespan do alias Commanded.Aggregates.DefaultLifespan - alias Trento.Domain.Events.ClusterRollUpRequested + alias Trento.Domain.Events.{ + ClusterDeregistered, + ClusterRollUpRequested + } @doc """ The cluster aggregate will be stopped after a ClusterRollUpRequested event is received. This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream. + + The cluster aggregate will be stopped after a ClusterDeregistered event is received. + This event is emitted when all hosts belonging to a cluster have been decommissioned, + meaning the cluster aggregate can be safely stopped. """ def after_event(%ClusterRollUpRequested{}), do: :stop + def after_event(%ClusterDeregistered{}), do: :stop def after_event(event), do: DefaultLifespan.after_event(event) def after_command(command), do: DefaultLifespan.after_command(command) diff --git a/lib/trento/infrastructure/router.ex b/lib/trento/infrastructure/router.ex index e9f519c29d..48124954ff 100644 --- a/lib/trento/infrastructure/router.ex +++ b/lib/trento/infrastructure/router.ex @@ -11,6 +11,7 @@ defmodule Trento.Router do alias Trento.Domain.Commands.{ CompleteChecksExecution, + DeregisterClusterHost, DeregisterHost, RegisterApplicationInstance, RegisterClusterHost, @@ -46,6 +47,7 @@ defmodule Trento.Router do by: :cluster_id dispatch [ + DeregisterClusterHost, RollUpCluster, RegisterClusterHost, SelectChecks, diff --git a/test/trento/application/process_managers/deregistration_process_manager_test.exs b/test/trento/application/process_managers/deregistration_process_manager_test.exs index 478b8c0bd6..cd663a9c36 100644 --- a/test/trento/application/process_managers/deregistration_process_manager_test.exs +++ b/test/trento/application/process_managers/deregistration_process_manager_test.exs @@ -2,14 +2,22 @@ defmodule Trento.DeregistrationProcessManagerTest do use ExUnit.Case alias Trento.Domain.Events.{ + ClusterRolledUp, + HostAddedToCluster, HostDeregistered, HostDeregistrationRequested, HostRegistered, + HostRemovedFromCluster, HostRolledUp } alias Trento.DeregistrationProcessManager - alias Trento.Domain.Commands.DeregisterHost + alias Trento.Domain.Cluster + + alias Trento.Domain.Commands.{ + DeregisterClusterHost, + DeregisterHost + } describe "events interested" do test "should start the process manager when HostRegistered event arrives" do @@ -26,6 +34,22 @@ defmodule Trento.DeregistrationProcessManagerTest do DeregistrationProcessManager.interested?(%HostRolledUp{host_id: host_id}) end + test "should start the process manager when HostAddedToCluster arrives" do + host_id = UUID.uuid4() + + assert {:start, ^host_id} = + DeregistrationProcessManager.interested?(%HostAddedToCluster{host_id: host_id}) + end + + test "should start the process manager when ClusterRolledUp arrives" do + cluster_hosts = [UUID.uuid4(), UUID.uuid4()] + + assert {:start, ^cluster_hosts} = + DeregistrationProcessManager.interested?(%ClusterRolledUp{ + snapshot: %Cluster{hosts: cluster_hosts} + }) + end + test "should continue the process manager when HostDeregistrationRequested arrives" do host_id = UUID.uuid4() @@ -41,37 +65,46 @@ defmodule Trento.DeregistrationProcessManagerTest do assert {:stop, ^host_id} = DeregistrationProcessManager.interested?(%HostDeregistered{host_id: host_id}) end + + test "should continue the process manager when HostRemovedFromCluster arrives" do + host_id = UUID.uuid4() + + assert {:continue, ^host_id} = + DeregistrationProcessManager.interested?(%HostRemovedFromCluster{host_id: host_id}) + end end describe "host deregistration procedure" do - test "should update the state with the proper host id when HostRegistered event is emitted" do + test "should update the state with the proper cluster id when ClusterRolledUp event is emitted" do initial_state = %DeregistrationProcessManager{} - host_id = UUID.uuid4() + cluster_id = UUID.uuid4() + cluster_hosts = [UUID.uuid4(), UUID.uuid4()] - events = [%HostRegistered{host_id: host_id}] + events = [%ClusterRolledUp{cluster_id: cluster_id, snapshot: cluster_hosts}] {commands, state} = reduce_events(events, initial_state) assert [] == commands - assert %DeregistrationProcessManager{host_id: ^host_id} = state + assert %DeregistrationProcessManager{cluster_id: ^cluster_id} = state end - test "should update the state with the proper host when HostRolledUp event is emitted" do + test "should update the state with the proper cluster id when HostAddedToCluster event is emitted" do initial_state = %DeregistrationProcessManager{} + cluster_id = UUID.uuid4() host_id = UUID.uuid4() - events = [%HostRolledUp{host_id: host_id}] + events = [%HostAddedToCluster{cluster_id: cluster_id, host_id: host_id}] {commands, state} = reduce_events(events, initial_state) assert [] == commands - assert %DeregistrationProcessManager{host_id: ^host_id} = state + assert %DeregistrationProcessManager{cluster_id: ^cluster_id} = state end test "should dispatch DeregisterHost command when HostDeregistrationRequested is emitted" do host_id = UUID.uuid4() requested_at = DateTime.utc_now() - initial_state = %DeregistrationProcessManager{host_id: host_id} + initial_state = %DeregistrationProcessManager{} events = [%HostDeregistrationRequested{host_id: host_id, requested_at: requested_at}] @@ -80,6 +113,44 @@ defmodule Trento.DeregistrationProcessManagerTest do assert ^initial_state = state assert %DeregisterHost{host_id: ^host_id, deregistered_at: ^requested_at} = commands end + + test "should dispatch DeregisterClusterHost and then DeregisterHost commands when HostDeregistrationRequested is emitted and the host belongs to a cluster" do + host_id = UUID.uuid4() + cluster_id = UUID.uuid4() + requested_at = DateTime.utc_now() + initial_state = %DeregistrationProcessManager{cluster_id: cluster_id} + + events = [%HostDeregistrationRequested{host_id: host_id, requested_at: requested_at}] + + {commands, state} = reduce_events(events, initial_state) + + assert ^initial_state = state + + assert [ + %DeregisterClusterHost{ + host_id: ^host_id, + cluster_id: ^cluster_id, + deregistered_at: ^requested_at + }, + %DeregisterHost{host_id: ^host_id, deregistered_at: ^requested_at} + ] = commands + end + + test "should update the state and remove the cluster id when HostRemovedFromCluster event is emitted" do + initial_state = %DeregistrationProcessManager{} + cluster_id = UUID.uuid4() + host_id = UUID.uuid4() + + events = [ + %HostAddedToCluster{cluster_id: cluster_id, host_id: host_id}, + %HostRemovedFromCluster{host_id: host_id} + ] + + {commands, state} = reduce_events(events, initial_state) + + assert [] == commands + assert %DeregistrationProcessManager{cluster_id: nil} = state + end end defp reduce_events(events, initial_state) do diff --git a/test/trento/domain/cluster/cluster_test.exs b/test/trento/domain/cluster/cluster_test.exs index 06a43dd515..1570210cb1 100644 --- a/test/trento/domain/cluster/cluster_test.exs +++ b/test/trento/domain/cluster/cluster_test.exs @@ -7,6 +7,7 @@ defmodule Trento.ClusterTest do alias Trento.Domain.Commands.{ CompleteChecksExecution, + DeregisterClusterHost, RegisterClusterHost, RollUpCluster, SelectChecks @@ -18,6 +19,7 @@ defmodule Trento.ClusterTest do ChecksExecutionStarted, ChecksSelected, ClusterChecksHealthChanged, + ClusterDeregistered, ClusterDetailsUpdated, ClusterDiscoveredHealthChanged, ClusterHealthChanged, @@ -25,7 +27,8 @@ defmodule Trento.ClusterTest do ClusterRolledUp, ClusterRollUpRequested, HostAddedToCluster, - HostChecksExecutionCompleted + HostChecksExecutionCompleted, + HostRemovedFromCluster } alias Trento.Domain.Cluster @@ -757,6 +760,83 @@ defmodule Trento.ClusterTest do end end + describe "deregistration" do + test "should emit the HostRemovedFromCluster event after a DeregisterClusterHost command and remove the host from the cluster aggregate state" do + cluster_id = Faker.UUID.v4() + dat = DateTime.utc_now() + host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + host_2_added_event = + %{host_id: host_2_id} = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2), + host_1_added_event, + host_2_added_event + ], + [ + %DeregisterClusterHost{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + } + ], + [ + %HostRemovedFromCluster{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id + } + ], + fn cluster -> + assert %Cluster{hosts: [^host_2_id]} = cluster + end + ) + end + + test "should emit the ClusterDeregistered event when the last ClusterHost is deregistered and set the deregistration date into the state" do + cluster_id = Faker.UUID.v4() + dat = DateTime.utc_now() + host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + host_2_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id) + + assert_events_and_state( + [ + build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2), + host_1_added_event, + host_2_added_event + ], + [ + %DeregisterClusterHost{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + }, + %DeregisterClusterHost{ + host_id: host_2_added_event.host_id, + cluster_id: cluster_id, + deregistered_at: dat + } + ], + [ + %HostRemovedFromCluster{ + host_id: host_1_added_event.host_id, + cluster_id: cluster_id + }, + %HostRemovedFromCluster{ + host_id: host_2_added_event.host_id, + cluster_id: cluster_id + }, + %ClusterDeregistered{ + cluster_id: cluster_id, + deregistered_at: dat + } + ], + fn cluster -> assert dat == cluster.deregistered_at end + ) + end + end + describe "legacy events" do test "should ignore legacy events and not update the aggregate" do cluster_id = Faker.UUID.v4()