Skip to content

Commit

Permalink
Cluster deregistered side effects (#1280)
Browse files Browse the repository at this point in the history
* Add deregistered_at field to Cluster read model

* ClusterDeregistered event projection

* Wip on clusters module, get clusters that are not soft deleted

* Clusters entrypoint tests

* Test broadcast of cluster deregistration in cluster projector

* Removed first from request_check_execution query

* Fix query parenthesis in request_clusters_checks_execution

* get_all_clusters, tested for deregistered and soft deleted clusters

* Handle HostRemovedFromCluster in host projection

* Cluster projector reorder of project clauses

* Host projector set the cluster to null when HostRemovedFromCluster
  • Loading branch information
CDimonaco committed Apr 17, 2023
1 parent 872e385 commit ee873a8
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 3 deletions.
26 changes: 26 additions & 0 deletions lib/trento/application/projectors/cluster_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Trento.ClusterProjector do

alias Trento.Domain.Events.{
ChecksSelected,
ClusterDeregistered,
ClusterDetailsUpdated,
ClusterHealthChanged,
ClusterRegistered
Expand Down Expand Up @@ -53,6 +54,21 @@ defmodule Trento.ClusterProjector do
end
)

project(
%ClusterDeregistered{
cluster_id: cluster_id,
deregistered_at: deregistered_at
},
fn multi ->
changeset =
ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{
deregistered_at: deregistered_at
})

Ecto.Multi.update(multi, :cluster, changeset)
end
)

project(
%ClusterDetailsUpdated{
cluster_id: id,
Expand Down Expand Up @@ -150,5 +166,15 @@ defmodule Trento.ClusterProjector do
})
end

@impl true
def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, _) do
%ClusterReadModel{name: name} = Repo.get!(ClusterReadModel, cluster_id)

TrentoWeb.Endpoint.broadcast("monitoring:clusters", "cluster_deregistered", %{
cluster_id: cluster_id,
name: name
})
end

def after_update(_, _, _), do: :ok
end
16 changes: 16 additions & 0 deletions lib/trento/application/projectors/host_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Trento.HostProjector do
HostDeregistered,
HostDetailsUpdated,
HostRegistered,
HostRemovedFromCluster,
ProviderUpdated
}

Expand Down Expand Up @@ -81,6 +82,21 @@ defmodule Trento.HostProjector do
end
)

project(
%HostRemovedFromCluster{
host_id: id,
cluster_id: cluster_id
},
fn multi ->
changeset =
HostReadModel.changeset(%HostReadModel{id: id, cluster_id: cluster_id}, %{
cluster_id: nil
})

Ecto.Multi.update(multi, :host, changeset)
end
)

project(
%HostDetailsUpdated{
host_id: id,
Expand Down
2 changes: 2 additions & 0 deletions lib/trento/application/read_models/cluster_read_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ defmodule Trento.ClusterReadModel do

# Virtually enriched fields
field :cib_last_written, :string, virtual: true

field :deregistered_at, :utc_datetime_usec
end

@spec changeset(t() | Ecto.Changeset.t(), map) :: Ecto.Changeset.t()
Expand Down
14 changes: 11 additions & 3 deletions lib/trento/application/usecases/clusters/clusters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ defmodule Trento.Clusters do
@spec request_checks_execution(String.t()) ::
:ok | {:error, any}
def request_checks_execution(cluster_id) do
case Repo.get(ClusterReadModel, cluster_id) do
query =
from(c in ClusterReadModel,
where: is_nil(c.deregistered_at) and c.id == ^cluster_id
)

case Repo.one(query) do
%ClusterReadModel{} = cluster ->
Logger.debug("Requesting checks execution, cluster: #{cluster_id}")

Expand All @@ -49,7 +54,8 @@ defmodule Trento.Clusters do
def get_all_clusters do
from(c in ClusterReadModel,
order_by: [asc: c.name],
preload: [:tags]
preload: [:tags],
where: is_nil(c.deregistered_at)
)
|> enrich_cluster_model_query()
|> Repo.all()
Expand All @@ -71,7 +77,9 @@ defmodule Trento.Clusters do
query =
from(c in ClusterReadModel,
select: c.id,
where: c.type == ^ClusterType.hana_scale_up() or c.type == ^ClusterType.hana_scale_out()
where:
(c.type == ^ClusterType.hana_scale_up() or
c.type == ^ClusterType.hana_scale_out()) and is_nil(c.deregistered_at)
)

query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Trento.Repo.Migrations.AddDeregisteredAtToClusterReadModel do
use Ecto.Migration

def change do
alter table(:clusters) do
add :deregistered_at, :utc_datetime_usec
end
end
end
17 changes: 17 additions & 0 deletions test/trento/application/projectors/cluster_projector_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Trento.ClusterProjectorTest do
import Trento.Factory

alias Trento.Domain.Events.{
ClusterDeregistered,
ClusterDetailsUpdated,
ClusterHealthChanged
}
Expand Down Expand Up @@ -187,6 +188,22 @@ defmodule Trento.ClusterProjectorTest do
1000
end

test "should update the deregistered_at field when ClusterDeregistered is received" do
insert(:cluster, id: cluster_id = Faker.UUID.v4(), name: name = "deregistered_cluster")
deregistered_at = DateTime.utc_now()

event = ClusterDeregistered.new!(%{cluster_id: cluster_id, deregistered_at: deregistered_at})

ProjectorTestHelper.project(ClusterProjector, event, "cluster_projector")
cluster_projection = Repo.get!(ClusterReadModel, event.cluster_id)

assert event.deregistered_at == cluster_projection.deregistered_at

assert_broadcast "cluster_deregistered",
%{cluster_id: ^cluster_id, name: ^name},
1000
end

test "should broadcast cluster_health_changed after the ClusterHealthChanged event" do
insert(:cluster, id: cluster_id = Faker.UUID.v4())

Expand Down
22 changes: 22 additions & 0 deletions test/trento/application/projectors/host_projector_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Trento.HostProjectorTest do
HostAddedToCluster,
HostDeregistered,
HostDetailsUpdated,
HostRemovedFromCluster,
ProviderUpdated
}

Expand Down Expand Up @@ -129,6 +130,27 @@ defmodule Trento.HostProjectorTest do
refute_broadcast "host_details_updated", %{id: ^host_id, cluster_id: ^cluster_id}, 1000
end

test "should project a host without the cluster when HostRemovedFromCluster event is received" do
insert(:cluster, id: cluster_id = Faker.UUID.v4())

insert(
:host,
id: host_id = UUID.uuid4(),
hostname: Faker.StarWars.character(),
cluster_id: cluster_id
)

event = %HostRemovedFromCluster{
host_id: host_id,
cluster_id: cluster_id
}

ProjectorTestHelper.project(HostProjector, event, "host_projector")
projection = Repo.get!(HostReadModel, host_id)

assert nil == projection.cluster_id
end

test "should update an existing host when HostDetailsUpdated event is received", %{
host_id: host_id
} do
Expand Down
12 changes: 12 additions & 0 deletions test/trento/application/usecases/clusters_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ defmodule Trento.ClustersTest do
end

describe "get clusters" do
test "should not return soft deleted clusters" do
cib_last_written = Date.to_string(Faker.Date.forward(0))
cluster_id = Faker.UUID.v4()

insert(:cluster, id: cluster_id)
insert(:cluster, deregistered_at: DateTime.utc_now())
insert(:cluster_enrichment_data, cluster_id: cluster_id)

[%ClusterReadModel{id: ^cluster_id, cib_last_written: ^cib_last_written}] =
Clusters.get_all_clusters()
end

test "should return enriched clusters" do
cib_last_written = Date.to_string(Faker.Date.forward(0))
cluster_id = Faker.UUID.v4()
Expand Down

0 comments on commit ee873a8

Please sign in to comment.