Skip to content

Commit

Permalink
Handle cluster host deregistration in the cluster aggregate
Browse files Browse the repository at this point in the history
Co-authored-by: Rubén Torrero Marijnissen <rtorreromarijnissen@suse.com>
  • Loading branch information
jamie-suse and rtorrero committed Mar 29, 2023
1 parent 3b75d39 commit ac61db8
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 2 deletions.
50 changes: 49 additions & 1 deletion lib/trento/domain/cluster/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ defmodule Trento.Domain.Cluster do

alias Trento.Domain.Commands.{
CompleteChecksExecution,
DeregisterClusterHost,
RegisterClusterHost,
RollUpCluster,
SelectChecks
Expand All @@ -79,14 +80,16 @@ defmodule Trento.Domain.Cluster do
ChecksExecutionStarted,
ChecksSelected,
ClusterChecksHealthChanged,
ClusterDeregistered,
ClusterDetailsUpdated,
ClusterDiscoveredHealthChanged,
ClusterHealthChanged,
ClusterRegistered,
ClusterRolledUp,
ClusterRollUpRequested,
HostAddedToCluster,
HostChecksExecutionCompleted
HostChecksExecutionCompleted,
HostRemovedFromCluster
}

@required_fields []
Expand All @@ -113,6 +116,8 @@ defmodule Trento.Domain.Cluster do
field :hosts, {:array, :string}, default: []
field :selected_checks, {:array, :string}, default: []
field :rolling_up, :boolean, default: false
field :deregistered_at, :utc_datetime_usec, default: nil

embeds_one :details, HanaClusterDetails
end

Expand Down Expand Up @@ -262,6 +267,26 @@ defmodule Trento.Domain.Cluster do
}
end

def execute(
%Cluster{cluster_id: cluster_id} = cluster,
%DeregisterClusterHost{
host_id: host_id,
cluster_id: cluster_id
} = command
) do
cluster
|> Multi.new()
|> Multi.execute(fn _ ->
[
%HostRemovedFromCluster{
cluster_id: cluster_id,
host_id: host_id
}
]
end)
|> Multi.execute(&maybe_emit_cluster_deregistered_event(&1, command))
end

def apply(
%Cluster{} = cluster,
%ClusterRegistered{
Expand Down Expand Up @@ -371,6 +396,17 @@ defmodule Trento.Domain.Cluster do
snapshot
end

def apply(%Cluster{hosts: hosts, hosts_number: hosts_number} = cluster, %HostRemovedFromCluster{
host_id: host_id
}) do
%Cluster{cluster | hosts: List.delete(hosts, host_id), hosts_number: hosts_number - 1}
end

# Deregistration
def apply(%Cluster{} = cluster, %ClusterDeregistered{deregistered_at: deregistered_at}) do
%Cluster{cluster | deregistered_at: deregistered_at}
end

def apply(cluster, %legacy_event{}) when legacy_event in @legacy_events, do: cluster

defp maybe_emit_host_added_to_cluster_event(
Expand Down Expand Up @@ -469,6 +505,18 @@ defmodule Trento.Domain.Cluster do
}
end

defp maybe_emit_cluster_deregistered_event(
%Cluster{cluster_id: cluster_id, hosts_number: hosts_number},
%DeregisterClusterHost{
cluster_id: cluster_id,
deregistered_at: deregistered_at
}
) do
if hosts_number == 0 do
%ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at}
end
end

defp maybe_add_checks_health(healths, _, []), do: healths
defp maybe_add_checks_health(healths, checks_health, _), do: [checks_health | healths]

Expand Down
10 changes: 9 additions & 1 deletion lib/trento/domain/cluster/lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ defmodule Trento.Domain.Cluster.Lifespan do

alias Commanded.Aggregates.DefaultLifespan

alias Trento.Domain.Events.ClusterRollUpRequested
alias Trento.Domain.Events.{
ClusterDeregistered,
ClusterRollUpRequested
}

@doc """
The cluster aggregate will be stopped after a ClusterRollUpRequested event is received.
This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream.
The cluster aggregate will be stopped after a ClusterDeregistered event is received.
This event is emitted when all hosts belonging to a cluster have been decommissioned,
meaning the cluster aggregate can be safely stopped.
"""
def after_event(%ClusterRollUpRequested{}), do: :stop
def after_event(%ClusterDeregistered{}), do: :stop
def after_event(event), do: DefaultLifespan.after_event(event)

def after_command(command), do: DefaultLifespan.after_command(command)
Expand Down
80 changes: 80 additions & 0 deletions test/trento/domain/cluster/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ defmodule Trento.ClusterTest do

import Trento.Factory

alias Trento.Domain.Events.HostRemovedFromCluster
alias Trento.Support.StructHelper

alias Trento.Domain.Commands.{
CompleteChecksExecution,
DeregisterClusterHost,
RegisterClusterHost,
RollUpCluster,
SelectChecks
Expand All @@ -18,6 +20,7 @@ defmodule Trento.ClusterTest do
ChecksExecutionStarted,
ChecksSelected,
ClusterChecksHealthChanged,
ClusterDeregistered,
ClusterDetailsUpdated,
ClusterDiscoveredHealthChanged,
ClusterHealthChanged,
Expand Down Expand Up @@ -757,6 +760,83 @@ defmodule Trento.ClusterTest do
end
end

describe "deregistration" do
test "should emit the HostRemovedFromCluster event after a DeregisterClusterHost command and remove the host from the cluster aggregate state" do
cluster_id = Faker.UUID.v4()
dat = DateTime.utc_now()
host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id)

host_2_added_event =
%{host_id: host_2_id} = build(:host_added_to_cluster_event, cluster_id: cluster_id)

assert_events_and_state(
[
build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2),
host_1_added_event,
host_2_added_event
],
[
%DeregisterClusterHost{
host_id: host_1_added_event.host_id,
cluster_id: cluster_id,
deregistered_at: dat
}
],
[
%HostRemovedFromCluster{
host_id: host_1_added_event.host_id,
cluster_id: cluster_id
}
],
fn cluster ->
assert %Cluster{hosts: [^host_2_id], hosts_number: 1} = cluster
end
)
end

test "should emit the ClusterDeregistered event when the last ClusterHost is deregistered and set the deregistration date into the state" do
cluster_id = Faker.UUID.v4()
dat = DateTime.utc_now()
host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id)
host_2_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id)

assert_events_and_state(
[
build(:cluster_registered_event, cluster_id: cluster_id, hosts_number: 2),
host_1_added_event,
host_2_added_event
],
[
%DeregisterClusterHost{
host_id: host_1_added_event.host_id,
cluster_id: cluster_id,
deregistered_at: dat
},
%DeregisterClusterHost{
host_id: host_2_added_event.host_id,
cluster_id: cluster_id,
deregistered_at: dat
}
],
[
%HostRemovedFromCluster{
host_id: host_1_added_event.host_id,
cluster_id: cluster_id
},
%HostRemovedFromCluster{
host_id: host_2_added_event.host_id,
cluster_id: cluster_id
},
%ClusterDeregistered{
cluster_id: cluster_id,
deregistered_at: dat
}
],
fn cluster -> assert dat == cluster.deregistered_at end
)
end
end

describe "legacy events" do
test "should ignore legacy events and not update the aggregate" do
cluster_id = Faker.UUID.v4()
Expand Down

0 comments on commit ac61db8

Please sign in to comment.