Skip to content

Commit

Permalink
Cluster deregistration process manager & aggregate changes (#1278)
Browse files Browse the repository at this point in the history
* Make Process Manager aware of cluster deregistration

Co-authored-by: Rubén Torrero Marijnissen <rtorreromarijnissen@suse.com>

* Handle cluster host deregistration in the cluster aggregate

Co-authored-by: Rubén Torrero Marijnissen <rtorreromarijnissen@suse.com>

* Route the DeregisterClusterHost to the cluster aggregate


---------

Co-authored-by: Jamie Rodriguez <jamie.rodriguez@suse.com>
  • Loading branch information
2 people authored and CDimonaco committed Apr 17, 2023
1 parent 000961b commit 872e385
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@ defmodule Trento.DeregistrationProcessManager do
name: "deregistration_process_manager"

deftype do
field :host_id, Ecto.UUID
field :cluster_id, Ecto.UUID
end

alias Trento.DeregistrationProcessManager

alias Trento.Domain.Events.{
ClusterRolledUp,
HostAddedToCluster,
HostDeregistered,
HostDeregistrationRequested,
HostRegistered,
HostRemovedFromCluster,
HostRolledUp
}

alias Trento.Domain.Commands.DeregisterHost
alias Trento.Domain.Commands.{
DeregisterClusterHost,
DeregisterHost
}

@doc """
The process manager is interested in HostRegistered which starts or joins an existing process
Expand All @@ -42,24 +48,55 @@ defmodule Trento.DeregistrationProcessManager do
The process manager starts with a Deregistration request and stops when the host is fully deregistered.
"""
# Start the Process Manager
def interested?(%HostRegistered{host_id: host_id}), do: {:start, host_id}
def interested?(%HostRolledUp{host_id: host_id}), do: {:start, host_id}
def interested?(%HostAddedToCluster{host_id: host_id}), do: {:start, host_id}
def interested?(%ClusterRolledUp{snapshot: %{hosts: hosts}}), do: {:start, hosts}
# Continue the Process Manager
def interested?(%HostDeregistrationRequested{host_id: host_id}), do: {:continue, host_id}
def interested?(%HostRemovedFromCluster{host_id: host_id}), do: {:continue, host_id}
# Stop the Process Manager
def interested?(%HostDeregistered{host_id: host_id}), do: {:stop, host_id}

def interested?(_event), do: false

def handle(%DeregistrationProcessManager{}, %HostDeregistrationRequested{
# Deregister host that doesn't belong to any cluster
def handle(%DeregistrationProcessManager{cluster_id: nil}, %HostDeregistrationRequested{
host_id: host_id,
requested_at: requested_at
}) do
%DeregisterHost{host_id: host_id, deregistered_at: requested_at}
end

def apply(%DeregistrationProcessManager{} = state, %HostRegistered{host_id: host_id}) do
%DeregistrationProcessManager{state | host_id: host_id}
# First step in host deregistration when host belongs to a cluster
def handle(%DeregistrationProcessManager{cluster_id: cluster_id}, %HostDeregistrationRequested{
host_id: host_id,
requested_at: requested_at
}) do
[
%DeregisterClusterHost{
host_id: host_id,
cluster_id: cluster_id,
deregistered_at: requested_at
},
%DeregisterHost{host_id: host_id, deregistered_at: requested_at}
]
end

def apply(%DeregistrationProcessManager{} = state, %ClusterRolledUp{
cluster_id: cluster_id
}) do
%DeregistrationProcessManager{state | cluster_id: cluster_id}
end

def apply(%DeregistrationProcessManager{} = state, %HostAddedToCluster{
cluster_id: cluster_id
}) do
%DeregistrationProcessManager{state | cluster_id: cluster_id}
end

def apply(%DeregistrationProcessManager{} = state, %HostRolledUp{host_id: host_id}) do
%DeregistrationProcessManager{state | host_id: host_id}
def apply(%DeregistrationProcessManager{} = state, %HostRemovedFromCluster{}) do
%DeregistrationProcessManager{state | cluster_id: nil}
end
end
48 changes: 47 additions & 1 deletion lib/trento/domain/cluster/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ defmodule Trento.Domain.Cluster do

alias Trento.Domain.Commands.{
CompleteChecksExecution,
DeregisterClusterHost,
RegisterClusterHost,
RollUpCluster,
SelectChecks
Expand All @@ -79,14 +80,16 @@ defmodule Trento.Domain.Cluster do
ChecksExecutionStarted,
ChecksSelected,
ClusterChecksHealthChanged,
ClusterDeregistered,
ClusterDetailsUpdated,
ClusterDiscoveredHealthChanged,
ClusterHealthChanged,
ClusterRegistered,
ClusterRolledUp,
ClusterRollUpRequested,
HostAddedToCluster,
HostChecksExecutionCompleted
HostChecksExecutionCompleted,
HostRemovedFromCluster
}

@required_fields []
Expand All @@ -113,6 +116,8 @@ defmodule Trento.Domain.Cluster do
field :hosts, {:array, :string}, default: []
field :selected_checks, {:array, :string}, default: []
field :rolling_up, :boolean, default: false
field :deregistered_at, :utc_datetime_usec, default: nil

embeds_one :details, HanaClusterDetails
end

Expand Down Expand Up @@ -262,6 +267,24 @@ defmodule Trento.Domain.Cluster do
}
end

def execute(
%Cluster{cluster_id: cluster_id} = cluster,
%DeregisterClusterHost{
host_id: host_id,
cluster_id: cluster_id
} = command
) do
cluster
|> Multi.new()
|> Multi.execute(fn _ ->
%HostRemovedFromCluster{
cluster_id: cluster_id,
host_id: host_id
}
end)
|> Multi.execute(&maybe_emit_cluster_deregistered_event(&1, command))
end

def apply(
%Cluster{} = cluster,
%ClusterRegistered{
Expand Down Expand Up @@ -371,6 +394,17 @@ defmodule Trento.Domain.Cluster do
snapshot
end

def apply(%Cluster{hosts: hosts} = cluster, %HostRemovedFromCluster{
host_id: host_id
}) do
%Cluster{cluster | hosts: List.delete(hosts, host_id)}
end

# Deregistration
def apply(%Cluster{} = cluster, %ClusterDeregistered{deregistered_at: deregistered_at}) do
%Cluster{cluster | deregistered_at: deregistered_at}
end

def apply(cluster, %legacy_event{}) when legacy_event in @legacy_events, do: cluster

defp maybe_emit_host_added_to_cluster_event(
Expand Down Expand Up @@ -469,6 +503,18 @@ defmodule Trento.Domain.Cluster do
}
end

defp maybe_emit_cluster_deregistered_event(
%Cluster{cluster_id: cluster_id, hosts: []},
%DeregisterClusterHost{
cluster_id: cluster_id,
deregistered_at: deregistered_at
}
) do
%ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at}
end

defp maybe_emit_cluster_deregistered_event(_, _), do: nil

defp maybe_add_checks_health(healths, _, []), do: healths
defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths]

Expand Down
10 changes: 9 additions & 1 deletion lib/trento/domain/cluster/lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ defmodule Trento.Domain.Cluster.Lifespan do

alias Commanded.Aggregates.DefaultLifespan

alias Trento.Domain.Events.ClusterRollUpRequested
alias Trento.Domain.Events.{
ClusterDeregistered,
ClusterRollUpRequested
}

@doc """
The cluster aggregate will be stopped after a ClusterRollUpRequested event is received.
This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream.
The cluster aggregate will be stopped after a ClusterDeregistered event is received.
This event is emitted when all hosts belonging to a cluster have been decommissioned,
meaning the cluster aggregate can be safely stopped.
"""
def after_event(%ClusterRollUpRequested{}), do: :stop
def after_event(%ClusterDeregistered{}), do: :stop
def after_event(event), do: DefaultLifespan.after_event(event)

def after_command(command), do: DefaultLifespan.after_command(command)
Expand Down
2 changes: 2 additions & 0 deletions lib/trento/infrastructure/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Trento.Router do

alias Trento.Domain.Commands.{
CompleteChecksExecution,
DeregisterClusterHost,
DeregisterHost,
RegisterApplicationInstance,
RegisterClusterHost,
Expand Down Expand Up @@ -46,6 +47,7 @@ defmodule Trento.Router do
by: :cluster_id

dispatch [
DeregisterClusterHost,
RollUpCluster,
RegisterClusterHost,
SelectChecks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ defmodule Trento.DeregistrationProcessManagerTest do
use ExUnit.Case

alias Trento.Domain.Events.{
ClusterRolledUp,
HostAddedToCluster,
HostDeregistered,
HostDeregistrationRequested,
HostRegistered,
HostRemovedFromCluster,
HostRolledUp
}

alias Trento.DeregistrationProcessManager
alias Trento.Domain.Commands.DeregisterHost
alias Trento.Domain.Cluster

alias Trento.Domain.Commands.{
DeregisterClusterHost,
DeregisterHost
}

describe "events interested" do
test "should start the process manager when HostRegistered event arrives" do
Expand All @@ -26,6 +34,22 @@ defmodule Trento.DeregistrationProcessManagerTest do
DeregistrationProcessManager.interested?(%HostRolledUp{host_id: host_id})
end

test "should start the process manager when HostAddedToCluster arrives" do
host_id = UUID.uuid4()

assert {:start, ^host_id} =
DeregistrationProcessManager.interested?(%HostAddedToCluster{host_id: host_id})
end

test "should start the process manager when ClusterRolledUp arrives" do
cluster_hosts = [UUID.uuid4(), UUID.uuid4()]

assert {:start, ^cluster_hosts} =
DeregistrationProcessManager.interested?(%ClusterRolledUp{
snapshot: %Cluster{hosts: cluster_hosts}
})
end

test "should continue the process manager when HostDeregistrationRequested arrives" do
host_id = UUID.uuid4()

Expand All @@ -41,37 +65,46 @@ defmodule Trento.DeregistrationProcessManagerTest do
assert {:stop, ^host_id} =
DeregistrationProcessManager.interested?(%HostDeregistered{host_id: host_id})
end

test "should continue the process manager when HostRemovedFromCluster arrives" do
host_id = UUID.uuid4()

assert {:continue, ^host_id} =
DeregistrationProcessManager.interested?(%HostRemovedFromCluster{host_id: host_id})
end
end

describe "host deregistration procedure" do
test "should update the state with the proper host id when HostRegistered event is emitted" do
test "should update the state with the proper cluster id when ClusterRolledUp event is emitted" do
initial_state = %DeregistrationProcessManager{}
host_id = UUID.uuid4()
cluster_id = UUID.uuid4()
cluster_hosts = [UUID.uuid4(), UUID.uuid4()]

events = [%HostRegistered{host_id: host_id}]
events = [%ClusterRolledUp{cluster_id: cluster_id, snapshot: cluster_hosts}]

{commands, state} = reduce_events(events, initial_state)

assert [] == commands
assert %DeregistrationProcessManager{host_id: ^host_id} = state
assert %DeregistrationProcessManager{cluster_id: ^cluster_id} = state
end

test "should update the state with the proper host when HostRolledUp event is emitted" do
test "should update the state with the proper cluster id when HostAddedToCluster event is emitted" do
initial_state = %DeregistrationProcessManager{}
cluster_id = UUID.uuid4()
host_id = UUID.uuid4()

events = [%HostRolledUp{host_id: host_id}]
events = [%HostAddedToCluster{cluster_id: cluster_id, host_id: host_id}]

{commands, state} = reduce_events(events, initial_state)

assert [] == commands
assert %DeregistrationProcessManager{host_id: ^host_id} = state
assert %DeregistrationProcessManager{cluster_id: ^cluster_id} = state
end

test "should dispatch DeregisterHost command when HostDeregistrationRequested is emitted" do
host_id = UUID.uuid4()
requested_at = DateTime.utc_now()
initial_state = %DeregistrationProcessManager{host_id: host_id}
initial_state = %DeregistrationProcessManager{}

events = [%HostDeregistrationRequested{host_id: host_id, requested_at: requested_at}]

Expand All @@ -80,6 +113,44 @@ defmodule Trento.DeregistrationProcessManagerTest do
assert ^initial_state = state
assert %DeregisterHost{host_id: ^host_id, deregistered_at: ^requested_at} = commands
end

test "should dispatch DeregisterClusterHost and then DeregisterHost commands when HostDeregistrationRequested is emitted and the host belongs to a cluster" do
host_id = UUID.uuid4()
cluster_id = UUID.uuid4()
requested_at = DateTime.utc_now()
initial_state = %DeregistrationProcessManager{cluster_id: cluster_id}

events = [%HostDeregistrationRequested{host_id: host_id, requested_at: requested_at}]

{commands, state} = reduce_events(events, initial_state)

assert ^initial_state = state

assert [
%DeregisterClusterHost{
host_id: ^host_id,
cluster_id: ^cluster_id,
deregistered_at: ^requested_at
},
%DeregisterHost{host_id: ^host_id, deregistered_at: ^requested_at}
] = commands
end

test "should update the state and remove the cluster id when HostRemovedFromCluster event is emitted" do
initial_state = %DeregistrationProcessManager{}
cluster_id = UUID.uuid4()
host_id = UUID.uuid4()

events = [
%HostAddedToCluster{cluster_id: cluster_id, host_id: host_id},
%HostRemovedFromCluster{host_id: host_id}
]

{commands, state} = reduce_events(events, initial_state)

assert [] == commands
assert %DeregistrationProcessManager{cluster_id: nil} = state
end
end

defp reduce_events(events, initial_state) do
Expand Down
Loading

0 comments on commit 872e385

Please sign in to comment.