Skip to content

Commit

Permalink
refactor(trigger_engine): port query to xandra
Browse files Browse the repository at this point in the history
feat(trigger_engine): tag connection_error log

Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
  • Loading branch information
noaccOS committed Sep 5, 2023
1 parent d046b26 commit 71f8dd1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2023 SECO Mind Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2023 SECO Mind Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -200,15 +200,9 @@ defmodule Astarte.TriggerEngine.Config do
defdelegate xandra_nodes, to: DataAccessConfig
defdelegate xandra_nodes!, to: DataAccessConfig

@doc "A list of {host, port} values of accessible Cassandra nodes in a CQEx compliant format"
defdelegate cqex_nodes, to: DataAccessConfig
defdelegate cqex_nodes!, to: DataAccessConfig

def xandra_options! do
DataAccessConfig.xandra_options!()
|> Keyword.put(:name, :xandra)
|> Keyword.drop([:autodiscovery])
end

defdelegate cqex_options!, to: DataAccessConfig
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,6 @@
defmodule Astarte.TriggerEngine.EventsConsumer do
alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent
alias Astarte.Core.Triggers.Trigger
alias Astarte.DataAccess.Database
alias CQEx.Query, as: DatabaseQuery
alias CQEx.Result, as: DatabaseResult
require Logger

defmodule Behaviour do
Expand Down Expand Up @@ -248,27 +245,53 @@ defmodule Astarte.TriggerEngine.EventsConsumer do
end

defp retrieve_trigger_configuration(realm_name, trigger_id) do
query =
DatabaseQuery.new()
|> DatabaseQuery.statement(
"SELECT value FROM kv_store WHERE group='triggers' AND key=:trigger_id;"
)
|> DatabaseQuery.put(:trigger_id, trigger_id)
statement =
"SELECT value FROM #{realm_name}.kv_store WHERE group='triggers' AND key=:trigger_id;"

with {:ok, client} <- Database.connect(realm: realm_name),
{:ok, result} <- DatabaseQuery.call(client, query),
[value: trigger_data] <- DatabaseResult.head(result),
trigger <- Trigger.decode(trigger_data),
{:ok, action} <- Jason.decode(trigger.action) do
params = %{"trigger_id" => trigger_id}

with :ok <- check_realm_name_ok(realm_name),
{:ok, prepared} <- Xandra.Cluster.prepare(:xandra, statement),
{:ok, result} <- Xandra.Cluster.execute(:xandra, prepared, params),
{:ok, action} <- parse_trigger_data(result) do
{:ok, action}
else
{:error, :database_connection_error} ->
Logger.warn("Database connection error.")
{:error, error = %Xandra.Error{}} ->
Logger.warning("Database error: #{Exception.message(error)}", tag: "database_error")

{:error, :database_error}

{:error, error = %Xandra.ConnectionError{}} ->
Logger.warning("Database connection error: #{Exception.message(error)}",
tag: "database_connection_error"
)

{:error, :database_connection_error}

error ->
Logger.warn("Error while processing event: #{inspect(error)}")
error
end
end

defp parse_trigger_data(xandra_page) do
with %{"value" => trigger_data} <-
Enum.at(xandra_page, 0, {:error, :invalid_query_result, xandra_page}),
trigger = Trigger.decode(trigger_data),
{:ok, action} <- Jason.decode(trigger.action) do
{:ok, action}
else
error ->
Logger.warning("Error retrieving trigger action: #{inspect(error)}")
{:error, :trigger_not_found}
end
end

defp check_realm_name_ok(realm_name) do
if Astarte.Core.Realm.valid_name?(realm_name) do
:ok
else
Logger.warning("Invalid realm name: #{realm_name}")
{:error, :invalid_realm_name}
end
end
end

0 comments on commit 71f8dd1

Please sign in to comment.