Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projectors refactor #1624

Merged
merged 11 commits into from
Jul 13, 2023
18 changes: 12 additions & 6 deletions lib/trento/application/projectors/cluster_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ defmodule Trento.ClusterProjector do
deregistered_at: deregistered_at
},
fn multi ->
cluster = Repo.get!(ClusterReadModel, cluster_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a comment, in other project functions, we have this pipelined:

changeset =
    ClusterReadModel
    |> Repo.get!(cluster_id)
    |> ClusterReadModel.changeset(%{
         deregistered_at: deregistered_at
    })

Just in case if you prefer it 😝

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You triggered my ocd 👁️

Everything refactored to this style


changeset =
ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{
ClusterReadModel.changeset(cluster, %{
deregistered_at: deregistered_at
})

Expand Down Expand Up @@ -101,8 +103,10 @@ defmodule Trento.ClusterProjector do
details: details
},
fn multi ->
cluster = Repo.get!(ClusterReadModel, id)

changeset =
ClusterReadModel.changeset(%ClusterReadModel{id: id}, %{
ClusterReadModel.changeset(cluster, %{
name: name,
sid: sid,
additional_sids: additional_sids,
Expand Down Expand Up @@ -135,7 +139,9 @@ defmodule Trento.ClusterProjector do
)

project(%ClusterHealthChanged{cluster_id: cluster_id, health: health}, fn multi ->
changeset = ClusterReadModel.changeset(%ClusterReadModel{id: cluster_id}, %{health: health})
cluster = Repo.get!(ClusterReadModel, cluster_id)

changeset = ClusterReadModel.changeset(cluster, %{health: health})

Ecto.Multi.update(multi, :cluster, changeset)
end)
Expand Down Expand Up @@ -185,9 +191,9 @@ defmodule Trento.ClusterProjector do
end

@impl true
def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, _) do
%ClusterReadModel{name: name} = Repo.get!(ClusterReadModel, cluster_id)

def after_update(%ClusterDeregistered{cluster_id: cluster_id}, _, %{
cluster: %ClusterReadModel{name: name}
}) do
TrentoWeb.Endpoint.broadcast("monitoring:clusters", "cluster_deregistered", %{
id: cluster_id,
name: name
Expand Down
47 changes: 27 additions & 20 deletions lib/trento/application/projectors/database_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ defmodule Trento.DatabaseProjector do
health: health
},
fn multi ->
changeset =
DatabaseReadModel.changeset(%DatabaseReadModel{id: sap_system_id}, %{health: health})
db = Repo.get!(DatabaseReadModel, sap_system_id)

changeset = DatabaseReadModel.changeset(db, %{health: health})

Ecto.Multi.update(multi, :database, changeset)
end
Expand Down Expand Up @@ -103,13 +104,16 @@ defmodule Trento.DatabaseProjector do
health: health
},
fn multi ->
db_instance =
Repo.get_by(DatabaseInstanceReadModel,
sap_system_id: sap_system_id,
instance_number: instance_number,
host_id: host_id
)

changeset =
DatabaseInstanceReadModel.changeset(
%DatabaseInstanceReadModel{
sap_system_id: sap_system_id,
host_id: host_id,
instance_number: instance_number
},
db_instance,
%{health: health}
)

Expand All @@ -126,13 +130,16 @@ defmodule Trento.DatabaseProjector do
system_replication_status: system_replication_status
},
fn multi ->
db_instance =
Repo.get_by(DatabaseInstanceReadModel,
sap_system_id: sap_system_id,
instance_number: instance_number,
host_id: host_id
)

changeset =
DatabaseInstanceReadModel.changeset(
%DatabaseInstanceReadModel{
sap_system_id: sap_system_id,
host_id: host_id,
instance_number: instance_number
},
db_instance,
%{
system_replication: system_replication,
system_replication_status: system_replication_status
Expand All @@ -149,11 +156,11 @@ defmodule Trento.DatabaseProjector do
deregistered_at: deregistered_at
},
fn multi ->
db = Repo.get!(DatabaseReadModel, sap_system_id)

changeset =
DatabaseReadModel.changeset(
%DatabaseReadModel{
id: sap_system_id
},
db,
%{deregistered_at: deregistered_at}
)

Expand Down Expand Up @@ -323,12 +330,12 @@ defmodule Trento.DatabaseProjector do
sap_system_id: sap_system_id
},
_,
_
%{
database: %DatabaseReadModel{
sid: sid
}
}
) do
%DatabaseReadModel{
sid: sid
} = Repo.get(DatabaseReadModel, sap_system_id)

TrentoWeb.Endpoint.broadcast(
@databases_topic,
"database_deregistered",
Expand Down
80 changes: 39 additions & 41 deletions lib/trento/application/projectors/host_projector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ defmodule Trento.HostProjector do
deregistered_at: deregistered_at
},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel.changeset(host, %{
deregistered_at: deregistered_at
})

Expand Down Expand Up @@ -95,7 +97,8 @@ defmodule Trento.HostProjector do

Ecto.Multi.insert(multi, :host, changeset,
on_conflict: {:replace, [:cluster_id]},
conflict_target: [:id]
conflict_target: [:id],
returning: true
)
end
)
Expand Down Expand Up @@ -131,8 +134,10 @@ defmodule Trento.HostProjector do
agent_version: agent_version
},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel.changeset(host, %{
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
Expand Down Expand Up @@ -162,8 +167,10 @@ defmodule Trento.HostProjector do
project(
%HeartbeatSucceded{host_id: id},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel.changeset(host, %{
heartbeat: :passing
})

Expand All @@ -174,8 +181,10 @@ defmodule Trento.HostProjector do
project(
%HeartbeatFailed{host_id: id},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel.changeset(host, %{
heartbeat: :critical
})

Expand All @@ -186,8 +195,10 @@ defmodule Trento.HostProjector do
project(
%ProviderUpdated{host_id: id, provider: provider, provider_data: provider_data},
fn multi ->
host = Repo.get!(HostReadModel, id)

changeset =
HostReadModel.changeset(%HostReadModel{id: id}, %{
HostReadModel.changeset(host, %{
provider: provider,
provider_data: handle_provider_data(provider_data)
})
Expand All @@ -205,13 +216,10 @@ defmodule Trento.HostProjector do
@impl true
@spec after_update(any, any, any) :: :ok | {:error, any}
def after_update(
%HostRegistered{host_id: id},
%HostRegistered{},
_,
_
%{host: %HostReadModel{} = host}
) do
# We need to hit the database to get the cluster_id
host = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_registered",
Expand Down Expand Up @@ -239,10 +247,8 @@ defmodule Trento.HostProjector do
def after_update(
%HostDeregistered{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_deregistered",
Expand All @@ -253,32 +259,30 @@ defmodule Trento.HostProjector do
)
end

def after_update(%HostAddedToCluster{}, _, %{
host: %HostReadModel{hostname: nil}
}),
do: :ok

def after_update(
%HostAddedToCluster{host_id: id, cluster_id: cluster_id},
_,
_
) do
case Repo.get!(HostReadModel, id) do
# In case the host was not registered yet, we don't want to broadcast
%HostReadModel{hostname: nil} ->
:ok

%HostReadModel{} ->
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_details_updated",
%{
id: id,
cluster_id: cluster_id
}
)
end
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"host_details_updated",
%{
id: id,
cluster_id: cluster_id
}
)
end

def after_update(
%HostRemovedFromCluster{host_id: host_id},
_,
%{host: %Trento.HostReadModel{cluster_id: nil}}
%{host: %HostReadModel{cluster_id: nil}}
) do
TrentoWeb.Endpoint.broadcast("monitoring:hosts", "host_details_updated", %{
id: host_id,
Expand All @@ -289,7 +293,7 @@ defmodule Trento.HostProjector do
def after_update(
%HostDetailsUpdated{},
_,
%{host: host}
%{host: %HostReadModel{} = host}
) do
TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
Expand All @@ -301,10 +305,8 @@ defmodule Trento.HostProjector do
def after_update(
%HeartbeatSucceded{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"heartbeat_succeded",
Expand All @@ -320,10 +322,8 @@ defmodule Trento.HostProjector do
def after_update(
%HeartbeatFailed{host_id: id},
_,
_
%{host: %HostReadModel{hostname: hostname}}
) do
%HostReadModel{hostname: hostname} = Repo.get!(HostReadModel, id)

TrentoWeb.Endpoint.broadcast(
"monitoring:hosts",
"heartbeat_failed",
Expand All @@ -349,12 +349,10 @@ defmodule Trento.HostProjector do
end

def after_update(
%HostChecksSelected{host_id: host_id, checks: checks},
%HostChecksSelected{checks: checks},
_,
_
%{host: %HostReadModel{selected_checks: checks} = host}
) do
host = %HostReadModel{id: host_id, selected_checks: checks}

message =
HostView.render(
"host_details_updated.json",
Expand Down
Loading