Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deregistration tombstoning rollup #1425

Merged
merged 13 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ defmodule Trento.StreamRollUpEventHandler do
RollUpSapSystem
}

alias Trento.Domain.Events.{
ClusterTombstoned,
HostTombstoned,
SapSystemTombstoned
}

require Logger

@max_stream_version Application.compile_env!(:trento, [__MODULE__, :max_stream_version])
Expand Down Expand Up @@ -116,6 +122,32 @@ defmodule Trento.StreamRollUpEventHandler do
end
end

def handle(%HostTombstoned{host_id: host_id}, _) do
Logger.info("Rolling up host: #{host_id} because HostTombstoned was received")

commanded().dispatch(%RollUpHost{host_id: host_id},
consistency: :strong
)
end

def handle(%ClusterTombstoned{cluster_id: cluster_id}, _) do
Logger.info("Rolling up cluster: #{cluster_id} because ClusterTombstoned was received")

commanded().dispatch(%RollUpCluster{cluster_id: cluster_id},
consistency: :strong
)
end

def handle(%SapSystemTombstoned{sap_system_id: sap_system_id}, _) do
Logger.info(
"Rolling up sap system: #{sap_system_id} because SapSystemTombstoned was received"
)

commanded().dispatch(%RollUpSapSystem{sap_system_id: sap_system_id},
consistency: :strong
)
end

defp commanded,
do: Application.fetch_env!(:trento, Trento.Commanded)[:adapter]
end
8 changes: 7 additions & 1 deletion lib/trento/domain/cluster/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ defmodule Trento.Domain.Cluster do
ClusterRegistered,
ClusterRolledUp,
ClusterRollUpRequested,
ClusterTombstoned,
HostAddedToCluster,
HostChecksExecutionCompleted,
HostRemovedFromCluster
Expand Down Expand Up @@ -424,6 +425,8 @@ defmodule Trento.Domain.Cluster do

def apply(cluster, %legacy_event{}) when legacy_event in @legacy_events, do: cluster

def apply(%Cluster{} = cluster, %ClusterTombstoned{}), do: cluster

defp maybe_emit_host_added_to_cluster_event(
%Cluster{cluster_id: cluster_id, hosts: hosts},
host_id
Expand Down Expand Up @@ -531,7 +534,10 @@ defmodule Trento.Domain.Cluster do
deregistered_at: deregistered_at
}
) do
%ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at}
[
%ClusterDeregistered{cluster_id: cluster_id, deregistered_at: deregistered_at},
%ClusterTombstoned{cluster_id: cluster_id}
]
end

defp maybe_emit_cluster_deregistered_event(_, _), do: nil
Expand Down
11 changes: 11 additions & 0 deletions lib/trento/domain/cluster/events/cluster_tombstoned.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Trento.Domain.Events.ClusterTombstoned do
@moduledoc """
This event is emitted after a successful cluster deregistration, to tombstone and stop the cluster aggregate
"""

use Trento.Event

defevent do
field :cluster_id, Ecto.UUID
end
end
10 changes: 1 addition & 9 deletions lib/trento/domain/cluster/lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,13 @@ defmodule Trento.Domain.Cluster.Lifespan do

alias Commanded.Aggregates.DefaultLifespan

alias Trento.Domain.Events.{
ClusterDeregistered,
ClusterRollUpRequested
}
alias Trento.Domain.Events.ClusterRollUpRequested

@doc """
The cluster aggregate will be stopped after a ClusterRollUpRequested event is received.
This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream.

The cluster aggregate will be stopped after a ClusterDeregistered event is received.
This event is emitted when all hosts belonging to a cluster have been decommissioned,
meaning the cluster aggregate can be safely stopped.
"""
def after_event(%ClusterRollUpRequested{}), do: :stop
def after_event(%ClusterDeregistered{}), do: :stop
def after_event(event), do: DefaultLifespan.after_event(event)

def after_command(command), do: DefaultLifespan.after_command(command)
Expand Down
11 changes: 11 additions & 0 deletions lib/trento/domain/host/events/host_tombstoned.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Trento.Domain.Events.HostTombstoned do
@moduledoc """
This event is emitted after a successful host deregistration, to tombstone and stop the host aggregate
"""

use Trento.Event

defevent do
field :host_id, Ecto.UUID
end
end
15 changes: 11 additions & 4 deletions lib/trento/domain/host/host.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule Trento.Domain.Host do
HostRegistered,
HostRolledUp,
HostRollUpRequested,
HostTombstoned,
ProviderUpdated,
SlesSubscriptionsUpdated
}
Expand Down Expand Up @@ -286,10 +287,15 @@ defmodule Trento.Domain.Host do
%Host{host_id: host_id},
%DeregisterHost{deregistered_at: deregistered_at}
) do
%HostDeregistered{
host_id: host_id,
deregistered_at: deregistered_at
}
[
%HostDeregistered{
host_id: host_id,
deregistered_at: deregistered_at
},
%HostTombstoned{
host_id: host_id
}
]
end

def apply(
Expand Down Expand Up @@ -408,4 +414,5 @@ defmodule Trento.Domain.Host do
end

def apply(%Host{} = host, %HostDeregistrationRequested{}), do: host
def apply(%Host{} = host, %HostTombstoned{}), do: host
end
9 changes: 1 addition & 8 deletions lib/trento/domain/host/lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,13 @@ defmodule Trento.Domain.Host.Lifespan do

alias Commanded.Aggregates.DefaultLifespan

alias Trento.Domain.Events.{
HostDeregistered,
HostRollUpRequested
}
alias Trento.Domain.Events.HostRollUpRequested

@doc """
The host aggregate will be stopped after a HostRollUpRequested event is received.
This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream.

The host aggregate will be stopped after a HostDeregistered event is received.
The host is decommissioned and can be safely stopped.
"""
def after_event(%HostRollUpRequested{}), do: :stop
def after_event(%HostDeregistered{}), do: :stop
def after_event(event), do: DefaultLifespan.after_event(event)

def after_command(command), do: DefaultLifespan.after_command(command)
Expand Down
11 changes: 11 additions & 0 deletions lib/trento/domain/sap_system/events/sap_system_tombstoned.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Trento.Domain.Events.SapSystemTombstoned do
@moduledoc """
This event is emitted when a SAP system is deregistered (decommissioned)
"""

use Trento.Event

defevent do
field :sap_system_id, Ecto.UUID
end
end
1 change: 1 addition & 0 deletions lib/trento/domain/sap_system/lifespan.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Trento.Domain.SapSystem.Lifespan do
This is needed to reset the aggregate version, so the aggregate can start appending events to the new stream.
"""
def after_event(%SapSystemRollUpRequested{}), do: :stop

def after_event(event), do: DefaultLifespan.after_event(event)

def after_command(command), do: DefaultLifespan.after_command(command)
Expand Down
21 changes: 20 additions & 1 deletion lib/trento/domain/sap_system/sap_system.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ defmodule Trento.Domain.SapSystem do
SapSystemHealthChanged,
SapSystemRegistered,
SapSystemRolledUp,
SapSystemRollUpRequested
SapSystemRollUpRequested,
SapSystemTombstoned
}

alias Trento.Domain.HealthService
Expand Down Expand Up @@ -251,6 +252,7 @@ defmodule Trento.Domain.SapSystem do
|> Multi.execute(fn sap_system ->
maybe_emit_sap_system_deregistered_event(sap_system, deregistered_at)
end)
|> Multi.execute(&maybe_emit_sap_system_tombstoned_event/1)
end

# Deregister an application instance and emit a ApplicationInstanceDeregistered
Expand Down Expand Up @@ -282,6 +284,7 @@ defmodule Trento.Domain.SapSystem do
deregistered_at
)
end)
|> Multi.execute(&maybe_emit_sap_system_tombstoned_event/1)
end

def apply(
Expand Down Expand Up @@ -567,6 +570,8 @@ defmodule Trento.Domain.SapSystem do
}
end

def apply(%SapSystem{} = sap_system, %SapSystemTombstoned{}), do: sap_system

defp maybe_emit_database_instance_registered_event(
nil,
%RegisterDatabaseInstance{
Expand Down Expand Up @@ -840,6 +845,20 @@ defmodule Trento.Domain.SapSystem do

defp maybe_emit_sap_system_deregistered_event(_, _), do: nil

defp maybe_emit_sap_system_tombstoned_event(%SapSystem{
sap_system_id: sap_system_id,
application: %Application{
instances: []
},
database: %Database{
instances: []
}
}) do
%SapSystemTombstoned{sap_system_id: sap_system_id}
end

defp maybe_emit_sap_system_tombstoned_event(_), do: nil

defp maybe_emit_database_deregistered_event(
%SapSystem{
sap_system_id: sap_system_id,
Expand Down
21 changes: 21 additions & 0 deletions test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ defmodule Trento.Factory do
alias Trento.Domain.Events.{
ApplicationInstanceRegistered,
ClusterRegistered,
ClusterTombstoned,
DatabaseInstanceRegistered,
DatabaseRegistered,
HostAddedToCluster,
HostDetailsUpdated,
HostRegistered,
HostTombstoned,
SapSystemRegistered,
SapSystemTombstoned,
SlesSubscriptionsUpdated
}

Expand Down Expand Up @@ -455,4 +458,22 @@ defmodule Trento.Factory do
"NodesRunningOn" => 1
}
end

def host_tombstoned_event_factory do
HostTombstoned.new!(%{
host_id: Faker.UUID.v4()
})
end

def cluster_tombstoned_event_factory do
ClusterTombstoned.new!(%{
cluster_id: Faker.UUID.v4()
})
end

def sap_system_tombstoned_event_factory do
SapSystemTombstoned.new!(%{
sap_system_id: Faker.UUID.v4()
})
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,38 @@ defmodule Trento.StreamRollUpEventHandlerTest do
assert :ok =
StreamRollUpEventHandler.handle(event, %{stream_version: @max_stream_version + 1})
end

test "should dispatch the host roll-up command when HostTombstoned is received" do
host_id = UUID.uuid4()
event = build(:host_tombstoned_event, host_id: host_id)

expect(Trento.Commanded.Mock, :dispatch, fn %RollUpHost{host_id: ^host_id}, _ ->
:ok
end)

assert :ok = StreamRollUpEventHandler.handle(event, %{stream_version: 1})
end

test "should dispatch the cluster roll-up command when ClusterTombstoned is received" do
cluster_id = UUID.uuid4()
event = build(:cluster_tombstoned_event, cluster_id: cluster_id)

expect(Trento.Commanded.Mock, :dispatch, fn %RollUpCluster{cluster_id: ^cluster_id}, _ ->
:ok
end)

assert :ok = StreamRollUpEventHandler.handle(event, %{stream_version: 1})
end

test "should dispatch the SAP system roll-up command when SapSystemTombstoned is received" do
sap_system_id = UUID.uuid4()
event = build(:sap_system_tombstoned_event, sap_system_id: sap_system_id)

expect(Trento.Commanded.Mock, :dispatch, fn %RollUpSapSystem{sap_system_id: ^sap_system_id},
_ ->
:ok
end)

assert :ok = StreamRollUpEventHandler.handle(event, %{stream_version: 1})
end
end
6 changes: 5 additions & 1 deletion test/trento/domain/cluster/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Trento.ClusterTest do
ClusterRegistered,
ClusterRolledUp,
ClusterRollUpRequested,
ClusterTombstoned,
HostAddedToCluster,
HostChecksExecutionCompleted,
HostRemovedFromCluster
Expand Down Expand Up @@ -809,7 +810,7 @@ defmodule Trento.ClusterTest do
)
end

test "should emit the ClusterDeregistered event when the last ClusterHost is deregistered and set the deregistration date into the state" do
test "should emit the ClusterDeregistered and ClusterTombstoned events when the last ClusterHost is deregistered and set the deregistration date into the state" do
cluster_id = Faker.UUID.v4()
dat = DateTime.utc_now()
host_1_added_event = build(:host_added_to_cluster_event, cluster_id: cluster_id)
Expand Down Expand Up @@ -845,6 +846,9 @@ defmodule Trento.ClusterTest do
%ClusterDeregistered{
cluster_id: cluster_id,
deregistered_at: dat
},
%ClusterTombstoned{
cluster_id: cluster_id
}
],
fn cluster -> assert dat == cluster.deregistered_at end
Expand Down
16 changes: 11 additions & 5 deletions test/trento/domain/host/host_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Trento.HostTest do
HostRegistered,
HostRolledUp,
HostRollUpRequested,
HostTombstoned,
ProviderUpdated,
SlesSubscriptionsUpdated
}
Expand Down Expand Up @@ -650,7 +651,7 @@ defmodule Trento.HostTest do
end

describe "deregistration" do
test "should emit the HostDeregistered event" do
test "should emit the HostDeregistered and HostTombstoned events" do
host_id = Faker.UUID.v4()
dat = DateTime.utc_now()

Expand All @@ -664,10 +665,15 @@ defmodule Trento.HostTest do
deregistered_at: dat
}
],
%HostDeregistered{
host_id: host_id,
deregistered_at: dat
}
[
%HostDeregistered{
host_id: host_id,
deregistered_at: dat
},
%HostTombstoned{
host_id: host_id
}
]
)
end

Expand Down
Loading