From 89a1fc95e2dbcedab0ca66b852370fd0babd123d Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Tue, 22 Aug 2023 15:26:02 +0200 Subject: [PATCH] refactor(trigger_engine): port query to xandra feat(trigger_engine): tag connection_error log Signed-off-by: Francesco Noacco --- .../lib/astarte_trigger_engine/application.ex | 2 +- .../lib/astarte_trigger_engine/config.ex | 8 +-- .../astarte_trigger_engine/events_consumer.ex | 59 +++++++++++++------ 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex index 4d0f64ae5..6db695a2c 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2023 SECO Mind Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex index 66ac386b7..ed7e149fb 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2023 SECO Mind Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -200,15 +200,9 @@ defmodule Astarte.TriggerEngine.Config do defdelegate xandra_nodes, to: DataAccessConfig defdelegate xandra_nodes!, to: DataAccessConfig - @doc "A list of {host, port} values of accessible Cassandra nodes in a CQEx compliant format" - defdelegate cqex_nodes, to: DataAccessConfig - defdelegate cqex_nodes!, to: DataAccessConfig - def xandra_options! do DataAccessConfig.xandra_options!() |> Keyword.put(:name, :xandra) |> Keyword.drop([:autodiscovery]) end - - defdelegate cqex_options!, to: DataAccessConfig end diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex index 2e0034cd1..a7b293377 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,9 +19,6 @@ defmodule Astarte.TriggerEngine.EventsConsumer do alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent alias Astarte.Core.Triggers.Trigger - alias Astarte.DataAccess.Database - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Result, as: DatabaseResult require Logger defmodule Behaviour do @@ -248,27 +245,53 @@ defmodule Astarte.TriggerEngine.EventsConsumer do end defp retrieve_trigger_configuration(realm_name, trigger_id) do - query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT value FROM kv_store WHERE group='triggers' AND key=:trigger_id;" - ) - |> DatabaseQuery.put(:trigger_id, trigger_id) + statement = + "SELECT value FROM #{realm_name}.kv_store WHERE group='triggers' AND key=:trigger_id;" - with {:ok, client} <- Database.connect(realm: realm_name), - {:ok, result} <- DatabaseQuery.call(client, query), - [value: trigger_data] <- DatabaseResult.head(result), - trigger <- Trigger.decode(trigger_data), - {:ok, action} <- Jason.decode(trigger.action) do + params = %{"trigger_id" => trigger_id} + + with :ok <- check_realm_name_ok(realm_name), + {:ok, prepared} <- Xandra.Cluster.prepare(:xandra, statement), + {:ok, result} <- Xandra.Cluster.execute(:xandra, prepared, params), + {:ok, action} <- parse_trigger_data(result) do {:ok, action} else - {:error, :database_connection_error} -> - Logger.warn("Database connection error.") + {:error, error = %Xandra.Error{}} -> + Logger.warning("Database error: #{Exception.message(error)}", tag: "database_error") + + {:error, :database_error} + + {:error, error = %Xandra.ConnectionError{}} -> + Logger.warning("Database connection error while #{Exception.message(error)}", + tag: "database_connection_error" + ) + {:error, :database_connection_error} error -> - Logger.warn("Error while processing event: #{inspect(error)}") + error + end + end + + defp parse_trigger_data(xandra_page) do + with %{"value" => trigger_data} <- + Enum.at(xandra_page, 0, {:error, :invalid_query_result, xandra_page}), + trigger = Trigger.decode(trigger_data), + {:ok, action} <- Jason.decode(trigger.action) do + {:ok, action} + else + error -> + Logger.warning("Error while processing event: #{inspect(error)}") {:error, :trigger_not_found} end end + + defp check_realm_name_ok(realm_name) do + if Astarte.Core.Realm.valid_name?(realm_name) do + :ok + else + Logger.warning("Invalid realm name: #{realm_name}") + {:error, :invalid_realm_name} + end + end end