diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex index 47f9296ad..6db695a2c 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,13 +40,9 @@ defmodule Astarte.TriggerEngine.Application do Config.validate!() DataAccessConfig.validate!() - xandra_options = - Config.xandra_options!() - |> Keyword.put(:name, :xandra) - children = [ Astarte.TriggerEngineWeb.Telemetry, - {Xandra.Cluster, xandra_options}, + {Xandra.Cluster, Config.xandra_options!()}, DeliverySupervisor ] diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex index 699ea2ae5..ed7e149fb 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -200,10 +200,9 @@ defmodule Astarte.TriggerEngine.Config do defdelegate xandra_nodes, to: DataAccessConfig defdelegate xandra_nodes!, to: DataAccessConfig - @doc "A list of {host, port} values of accessible Cassandra nodes in a CQEx compliant format" - defdelegate cqex_nodes, to: DataAccessConfig - defdelegate cqex_nodes!, to: DataAccessConfig - - defdelegate xandra_options!, to: DataAccessConfig - defdelegate cqex_options!, to: DataAccessConfig + def xandra_options! do + DataAccessConfig.xandra_options!() + |> Keyword.put(:name, :xandra) + |> Keyword.drop([:autodiscovery]) + end end diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex index 2e0034cd1..7fffad036 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,9 +19,6 @@ defmodule Astarte.TriggerEngine.EventsConsumer do alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent alias Astarte.Core.Triggers.Trigger - alias Astarte.DataAccess.Database - alias CQEx.Query, as: DatabaseQuery - alias CQEx.Result, as: DatabaseResult require Logger defmodule Behaviour do @@ -248,27 +245,53 @@ defmodule Astarte.TriggerEngine.EventsConsumer do end defp retrieve_trigger_configuration(realm_name, trigger_id) do - query = - DatabaseQuery.new() - |> DatabaseQuery.statement( - "SELECT value FROM kv_store WHERE group='triggers' AND key=:trigger_id;" - ) - |> DatabaseQuery.put(:trigger_id, trigger_id) + statement = + "SELECT value FROM #{realm_name}.kv_store WHERE group='triggers' AND key=:trigger_id;" - with {:ok, client} <- Database.connect(realm: realm_name), - {:ok, result} <- DatabaseQuery.call(client, query), - [value: trigger_data] <- DatabaseResult.head(result), - trigger <- Trigger.decode(trigger_data), - {:ok, action} <- Jason.decode(trigger.action) do + params = %{"trigger_id" => trigger_id} + + with :ok <- check_realm_name_ok(realm_name), + {:ok, prepared} <- Xandra.Cluster.prepare(:xandra, statement), + {:ok, result} <- Xandra.Cluster.execute(:xandra, prepared, params), + {:ok, action} <- parse_trigger_data(result) do {:ok, action} else - {:error, :database_connection_error} -> - Logger.warn("Database connection error.") + {:error, error = %Xandra.Error{}} -> + Logger.warning("Database error: #{Exception.message(error)}", tag: "database_error") + + {:error, :database_error} + + {:error, error = %Xandra.ConnectionError{}} -> + Logger.warning("Database connection error: #{Exception.message(error)}", + tag: "database_connection_error" + ) + {:error, :database_connection_error} error -> - Logger.warn("Error while processing event: #{inspect(error)}") + error + end + end + + defp parse_trigger_data(xandra_page) do + with %{"value" => trigger_data} <- + Enum.at(xandra_page, 0, {:error, :invalid_query_result, xandra_page}), + trigger = Trigger.decode(trigger_data), + {:ok, action} <- Jason.decode(trigger.action) do + {:ok, action} + else + error -> + Logger.warning("Error while processing event: #{inspect(error)}") {:error, :trigger_not_found} end end + + defp check_realm_name_ok(realm_name) do + if Astarte.Core.Realm.valid_name?(realm_name) do + :ok + else + Logger.warning("Invalid realm name: #{realm_name}") + {:error, :invalid_realm_name} + end + end end diff --git a/apps/astarte_trigger_engine/mix.exs b/apps/astarte_trigger_engine/mix.exs index fad56f8b1..95259aec7 100644 --- a/apps/astarte_trigger_engine/mix.exs +++ b/apps/astarte_trigger_engine/mix.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2021 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -90,7 +90,7 @@ defmodule Astarte.TriggerEngine.Mixfile do {:mox, "~> 0.5", only: :test}, {:pretty_log, "~> 0.1"}, {:telemetry, "~> 0.4"}, - {:xandra, "~> 0.13"}, + {:xandra, "~> 0.17"}, {:skogsra, "~> 2.2"}, {:observer_cli, "~> 1.5"}, {:dialyxir, "~> 1.0", only: [:dev, :ci], runtime: false}, diff --git a/apps/astarte_trigger_engine/mix.lock b/apps/astarte_trigger_engine/mix.lock index b316b8f63..b68f037fe 100644 --- a/apps/astarte_trigger_engine/mix.lock +++ b/apps/astarte_trigger_engine/mix.lock @@ -14,8 +14,8 @@ "cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"}, "cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"}, - "db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"}, - "decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"}, + "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, "dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []}, "ecto": {:hex, :ecto, "3.5.8", "8ebf12be6016cb99313348ba7bb4612f4114b9a506d6da79a2adc7ef449340bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea0be182ea8922eb7742e3ae8e71b67ee00ae177de1bf76210299a5f16ba4c77"}, @@ -37,6 +37,7 @@ "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"}, + "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, "observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, @@ -60,5 +61,5 @@ "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "uuid": {:hex, :uuid_erl, "2.0.1", "1fd9079c544d521063897887a1c5b3302dca98f9bb06aadcdc6fb0663f256797", [:rebar3], [{:quickrand, "~> 2.0.1", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "ab57caccd51f170011e5f444ce865f84b41605e483a9efcc468c1afaec87553b"}, - "xandra": {:hex, :xandra, "0.13.1", "f82866e6c47527f74f35dd3007b5311121852dd861a29ed1613e27ccfaba0102", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "a2efdb8921e3b694bf3505e40c5ec9d353d8fa3755cec946be7c18b8236d7230"}, + "xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"}, } diff --git a/apps/astarte_trigger_engine/test/amqp_consumer/amqp_consumer_tracker_test.exs b/apps/astarte_trigger_engine/test/amqp_consumer/amqp_consumer_tracker_test.exs index 4dbc1a11d..828f7cbe4 100644 --- a/apps/astarte_trigger_engine/test/amqp_consumer/amqp_consumer_tracker_test.exs +++ b/apps/astarte_trigger_engine/test/amqp_consumer/amqp_consumer_tracker_test.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2022 SECO Mind Srl +# Copyright 2022-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTrackerTest do @test_realm DatabaseTestHelper.test_realm() setup_all do + DatabaseTestHelper.await_cluster_connected() DatabaseTestHelper.create_test_env() on_exit(&DatabaseTestHelper.drop_test_env/0) diff --git a/apps/astarte_trigger_engine/test/support/database_test_helper.exs b/apps/astarte_trigger_engine/test/support/database_test_helper.exs index 9a3b37b91..d87797779 100644 --- a/apps/astarte_trigger_engine/test/support/database_test_helper.exs +++ b/apps/astarte_trigger_engine/test/support/database_test_helper.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2022 SECO Mind Srl +# Copyright 2022-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ defmodule Astarte.TriggerEngine.DatabaseTestHelper do require Logger + alias Astarte.TriggerEngine.Config alias Astarte.Core.Triggers.Policy alias Astarte.Core.Triggers.PolicyProtobuf.Policy, as: PolicyProto @@ -119,4 +120,19 @@ defmodule Astarte.TriggerEngine.DatabaseTestHelper do end def test_realm, do: @test_realm + + # TODO: include in astarte_data_access + def await_cluster_connected(cluster \\ nil, tries \\ 10) do + cluster = cluster || Config.xandra_options!()[:name] + fun = &Xandra.execute!(&1, "SELECT * FROM system.local") + + with {:error, %Xandra.ConnectionError{}} <- Xandra.Cluster.run(cluster, _options = [], fun) do + if tries > 0 do + Process.sleep(100) + await_cluster_connected(cluster, tries - 1) + else + raise("Connection to the cluster failed") + end + end + end end