Skip to content

Commit

Permalink
Merge 3bfab10 into d7997ac
Browse files Browse the repository at this point in the history
  • Loading branch information
noaccOS committed Sep 5, 2023
2 parents d7997ac + 3bfab10 commit 2d52a57
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,13 +40,9 @@ defmodule Astarte.TriggerEngine.Application do
Config.validate!()
DataAccessConfig.validate!()

xandra_options =
Config.xandra_options!()
|> Keyword.put(:name, :xandra)

children = [
Astarte.TriggerEngineWeb.Telemetry,
{Xandra.Cluster, xandra_options},
{Xandra.Cluster, Config.xandra_options!()},
DeliverySupervisor
]

Expand Down
13 changes: 6 additions & 7 deletions apps/astarte_trigger_engine/lib/astarte_trigger_engine/config.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -200,10 +200,9 @@ defmodule Astarte.TriggerEngine.Config do
defdelegate xandra_nodes, to: DataAccessConfig
defdelegate xandra_nodes!, to: DataAccessConfig

@doc "A list of {host, port} values of accessible Cassandra nodes in a CQEx compliant format"
defdelegate cqex_nodes, to: DataAccessConfig
defdelegate cqex_nodes!, to: DataAccessConfig

defdelegate xandra_options!, to: DataAccessConfig
defdelegate cqex_options!, to: DataAccessConfig
def xandra_options! do
DataAccessConfig.xandra_options!()
|> Keyword.put(:name, :xandra)
|> Keyword.drop([:autodiscovery])
end
end
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,6 @@
defmodule Astarte.TriggerEngine.EventsConsumer do
alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent
alias Astarte.Core.Triggers.Trigger
alias Astarte.DataAccess.Database
alias CQEx.Query, as: DatabaseQuery
alias CQEx.Result, as: DatabaseResult
require Logger

defmodule Behaviour do
Expand Down Expand Up @@ -248,27 +245,53 @@ defmodule Astarte.TriggerEngine.EventsConsumer do
end

defp retrieve_trigger_configuration(realm_name, trigger_id) do
query =
DatabaseQuery.new()
|> DatabaseQuery.statement(
"SELECT value FROM kv_store WHERE group='triggers' AND key=:trigger_id;"
)
|> DatabaseQuery.put(:trigger_id, trigger_id)
statement =
"SELECT value FROM #{realm_name}.kv_store WHERE group='triggers' AND key=:trigger_id;"

with {:ok, client} <- Database.connect(realm: realm_name),
{:ok, result} <- DatabaseQuery.call(client, query),
[value: trigger_data] <- DatabaseResult.head(result),
trigger <- Trigger.decode(trigger_data),
{:ok, action} <- Jason.decode(trigger.action) do
params = %{"trigger_id" => trigger_id}

with :ok <- check_realm_name_ok(realm_name),
{:ok, prepared} <- Xandra.Cluster.prepare(:xandra, statement),
{:ok, result} <- Xandra.Cluster.execute(:xandra, prepared, params),
{:ok, action} <- parse_trigger_data(result) do
{:ok, action}
else
{:error, :database_connection_error} ->
Logger.warn("Database connection error.")
{:error, error = %Xandra.Error{}} ->
Logger.warning("Database error: #{Exception.message(error)}", tag: "database_error")

{:error, :database_error}

{:error, error = %Xandra.ConnectionError{}} ->
Logger.warning("Database connection error: #{Exception.message(error)}",
tag: "database_connection_error"
)

{:error, :database_connection_error}

error ->
Logger.warn("Error while processing event: #{inspect(error)}")
error
end
end

defp parse_trigger_data(xandra_page) do
with %{"value" => trigger_data} <-
Enum.at(xandra_page, 0, {:error, :invalid_query_result, xandra_page}),
trigger = Trigger.decode(trigger_data),
{:ok, action} <- Jason.decode(trigger.action) do
{:ok, action}
else
error ->
Logger.warning("Error while processing event: #{inspect(error)}")
{:error, :trigger_not_found}
end
end

defp check_realm_name_ok(realm_name) do
if Astarte.Core.Realm.valid_name?(realm_name) do
:ok
else
Logger.warning("Invalid realm name: #{realm_name}")
{:error, :invalid_realm_name}
end
end
end
4 changes: 2 additions & 2 deletions apps/astarte_trigger_engine/mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2021 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,7 +90,7 @@ defmodule Astarte.TriggerEngine.Mixfile do
{:mox, "~> 0.5", only: :test},
{:pretty_log, "~> 0.1"},
{:telemetry, "~> 0.4"},
{:xandra, "~> 0.13"},
{:xandra, "~> 0.17"},
{:skogsra, "~> 2.2"},
{:observer_cli, "~> 1.5"},
{:dialyxir, "~> 1.0", only: [:dev, :ci], runtime: false},
Expand Down
7 changes: 4 additions & 3 deletions apps/astarte_trigger_engine/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"},
"cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"},
"db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"},
"decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"},
"db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
"dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []},
"ecto": {:hex, :ecto, "3.5.8", "8ebf12be6016cb99313348ba7bb4612f4114b9a506d6da79a2adc7ef449340bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea0be182ea8922eb7742e3ae8e71b67ee00ae177de1bf76210299a5f16ba4c77"},
Expand All @@ -37,6 +37,7 @@
"mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mox": {:hex, :mox, "0.5.2", "55a0a5ba9ccc671518d068c8dddd20eeb436909ea79d1799e2209df7eaa98b6c", [:mix], [], "hexpm", "df4310628cd628ee181df93f50ddfd07be3e5ecc30232d3b6aadf30bdfe6092b"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"},
Expand All @@ -60,5 +61,5 @@
"telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"uuid": {:hex, :uuid_erl, "2.0.1", "1fd9079c544d521063897887a1c5b3302dca98f9bb06aadcdc6fb0663f256797", [:rebar3], [{:quickrand, "~> 2.0.1", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "ab57caccd51f170011e5f444ce865f84b41605e483a9efcc468c1afaec87553b"},
"xandra": {:hex, :xandra, "0.13.1", "f82866e6c47527f74f35dd3007b5311121852dd861a29ed1613e27ccfaba0102", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "a2efdb8921e3b694bf3505e40c5ec9d353d8fa3755cec946be7c18b8236d7230"},
"xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"},
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2022 SECO Mind Srl
# Copyright 2022-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -28,6 +28,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTrackerTest do
@test_realm DatabaseTestHelper.test_realm()

setup_all do
DatabaseTestHelper.await_cluster_connected()
DatabaseTestHelper.create_test_env()

on_exit(&DatabaseTestHelper.drop_test_env/0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2022 SECO Mind Srl
# Copyright 2022-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
defmodule Astarte.TriggerEngine.DatabaseTestHelper do
require Logger

alias Astarte.TriggerEngine.Config
alias Astarte.Core.Triggers.Policy
alias Astarte.Core.Triggers.PolicyProtobuf.Policy, as: PolicyProto

Expand Down Expand Up @@ -119,4 +120,19 @@ defmodule Astarte.TriggerEngine.DatabaseTestHelper do
end

def test_realm, do: @test_realm

# TODO: include in astarte_data_access
def await_cluster_connected(cluster \\ nil, tries \\ 10) do
cluster = cluster || Config.xandra_options!()[:name]
fun = &Xandra.execute!(&1, "SELECT * FROM system.local")

with {:error, %Xandra.ConnectionError{}} <- Xandra.Cluster.run(cluster, _options = [], fun) do
if tries > 0 do
Process.sleep(100)
await_cluster_connected(cluster, tries - 1)
else
raise("Connection to the cluster failed")
end
end
end
end

0 comments on commit 2d52a57

Please sign in to comment.