From e61f7c41c3f5e038bc7cd2d75a31e136e620fcb7 Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Fri, 4 Aug 2023 16:10:30 +0200 Subject: [PATCH 1/2] feat(pairing): expose custom query API using Xandra Signed-off-by: Francesco Noacco --- .../lib/astarte_pairing/queries.ex | 130 +++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-) diff --git a/apps/astarte_pairing/lib/astarte_pairing/queries.ex b/apps/astarte_pairing/lib/astarte_pairing/queries.ex index 1d9cef2c1..b5d150bed 100644 --- a/apps/astarte_pairing/lib/astarte_pairing/queries.ex +++ b/apps/astarte_pairing/lib/astarte_pairing/queries.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,7 +26,36 @@ defmodule Astarte.Pairing.Queries do require Logger + alias Astarte.Core.Realm + + @typedoc """ + A string representing the current context, to include in error messages. + Not included if nil. + Default: nil + """ + @type custom_context() :: String.t() | nil + + @typedoc """ + The format of the result. + Unless otherwise stated, all results are returned in an {:ok, value} tuple. + - `:page`: default xandra return type + - `:list`: a list + - `{:first, default}`: only return the first element, `default` if empty + - `:first`: shorthand for `{:first, nil}` + - `{:first!, error}`: like `{:first, default}`, but returns `{:error, error}` if empty + - `:first!`: shorthand for `{:first!, :not_found}` + """ + @type custom_result() :: :page | :list | :first | :first! | {:first, any()} | {:first!, any()} + + @type custom_opt() :: {:context, custom_context()} | {:result, custom_result()} + @type xandra_opt() :: {atom(), any()} + + @type query_opt() :: custom_opt() | xandra_opt() + @protocol_revision 1 + @default_query_opts [uuid_format: :binary, timestamp_format: :integer] + + @default_custom_query_opts [result: :page, context: nil] def get_agent_public_key_pems(client) do get_jwt_public_key_pem = """ @@ -338,4 +367,103 @@ defmodule Astarte.Pairing.Queries do {:error, :database_connection_error} end end + + @spec custom_query(Xandra.statement(), String.t() | nil, Xandra.values(), [query_opt()]) :: + {:ok, term()} | {:error, term()} + def custom_query(statement, realm \\ nil, params \\ %{}, opts \\ []) do + {custom_opts, query_opts} = parse_opts(opts) + + Xandra.Cluster.run(:xandra, fn conn -> + execute_query(conn, statement, realm, params, query_opts, custom_opts) + end) + end + + defp execute_query(conn, statement, realm, params, query_opts, custom_opts) do + with {:ok, prepared} <- prepare_query(conn, statement, realm) do + case Xandra.execute(conn, prepared, params, query_opts) do + {:ok, page} -> + cast_query_result(page, custom_opts) + + {:error, error} -> + %{message: message, tag: tag} = database_error_message(error, custom_opts[:context]) + + _ = Logger.warn(message, tag: tag) + + {:error, :database_error} + end + end + end + + defp use_realm(_conn, nil = _realm), do: :ok + + defp use_realm(conn, realm) when is_binary(realm) do + with true <- Realm.valid_name?(realm), + {:ok, %Xandra.SetKeyspace{}} <- Xandra.execute(conn, "USE #{realm}") do + :ok + else + _ -> {:error, :realm_not_found} + end + end + + defp prepare_query(conn, statement, realm) do + with :ok <- use_realm(conn, realm) do + case Xandra.prepare(conn, statement) do + {:ok, page} -> + {:ok, page} + + {:error, reason} -> + _ = Logger.warn("Cannot prepare query: #{inspect(reason)}.", tag: "db_error") + {:error, :database_error} + end + end + end + + defp parse_opts(opts) do + {custom_opts, query_opts} = Keyword.split(opts, Keyword.keys(@default_custom_query_opts)) + query_opts = Keyword.merge(@default_query_opts, query_opts) + custom_opts = Keyword.validate!(custom_opts, @default_custom_query_opts) + + {custom_opts, query_opts} + end + + defp cast_query_result(page, opts) do + result_with_defaults = + case opts[:result] do + :first -> {:first, nil} + :first! -> {:first!, :not_found} + x -> x + end + + case result_with_defaults do + :page -> + {:ok, page} + + :list -> + {:ok, Enum.to_list(page)} + + {:first, default} -> + {:ok, Enum.at(page, 0, default)} + + {:first!, error} -> + Enum.at(page, 0) + |> case do + nil -> {:error, error} + first -> {:ok, first} + end + end + end + + defp database_error_message(error, context) do + {error_type, error_tag} = + case error do + %Xandra.Error{} -> {"Database error", "db_error"} + %Xandra.ConnectionError{} -> {"Database connection error", "db_connection_error"} + end + + context = if context == nil, do: "", else: " during #{context}" + + message = error_type <> context <> ": " <> Exception.message(error) + + %{message: message, tag: error_tag} + end end From 06b2a2a7a5fd6bc096555decc07ebc8a8cac5a79 Mon Sep 17 00:00:00 2001 From: Francesco Noacco Date: Fri, 4 Aug 2023 16:52:06 +0200 Subject: [PATCH 2/2] refactor(pairing): switch to xandra for queries use xandra in place of cqex for all queries to the cluster. feat: expose cluster name as an env var internally to the config fix: the tests now wait for the cluster to be connected fix: remove :autodiscovery key from xandra_options as it is deprecated chore: update xandra to v0.17.0 Signed-off-by: Francesco Noacco --- apps/astarte_pairing/config/config.exs | 4 +- apps/astarte_pairing/lib/astarte_pairing.ex | 8 +- .../lib/astarte_pairing/config.ex | 26 +- .../lib/astarte_pairing/engine.ex | 107 ++------ .../lib/astarte_pairing/queries.ex | 247 ++++++++---------- .../lib/astarte_pairing/rpc/handler.ex | 45 +++- apps/astarte_pairing/mix.exs | 4 +- apps/astarte_pairing/mix.lock | 9 +- .../test/astarte_pairing/engine_test.exs | 38 ++- .../test/astarte_pairing/rpc/handler_test.exs | 1 + .../test/support/database_test_helper.ex | 211 ++++++--------- 11 files changed, 287 insertions(+), 413 deletions(-) diff --git a/apps/astarte_pairing/config/config.exs b/apps/astarte_pairing/config/config.exs index c1e5d36d9..2f9ed07c4 100644 --- a/apps/astarte_pairing/config/config.exs +++ b/apps/astarte_pairing/config/config.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,4 +26,6 @@ config :lager, error_logger_redirect: false, handlers: [level: :critical] +config :astarte_pairing, :cluster_name, :xandra + import_config "#{config_env()}.exs" diff --git a/apps/astarte_pairing/lib/astarte_pairing.ex b/apps/astarte_pairing/lib/astarte_pairing.ex index 99cc205c0..1ba9c77a6 100644 --- a/apps/astarte_pairing/lib/astarte_pairing.ex +++ b/apps/astarte_pairing/lib/astarte_pairing.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -43,13 +43,9 @@ defmodule Astarte.Pairing do Config.validate!() Config.init!() - xandra_options = - Config.xandra_options!() - |> Keyword.put(:name, :xandra) - children = [ Astarte.PairingWeb.Telemetry, - {Xandra.Cluster, xandra_options}, + {Xandra.Cluster, Config.xandra_options!()}, {Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: Handler]}, {Astarte.Pairing.CredentialsSecret.Cache, []} ] diff --git a/apps/astarte_pairing/lib/astarte_pairing/config.ex b/apps/astarte_pairing/lib/astarte_pairing/config.ex index 02d65442d..f5b87a02c 100644 --- a/apps/astarte_pairing/lib/astarte_pairing/config.ex +++ b/apps/astarte_pairing/lib/astarte_pairing/config.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -61,21 +61,13 @@ defmodule Astarte.Pairing.Config do end end - @doc """ - Returns the cassandra node configuration - """ - @spec cassandra_node!() :: {String.t(), integer()} - def cassandra_node!, do: Enum.random(cqex_nodes!()) - - @doc """ - Returns Cassandra nodes formatted in the Xandra format. - """ - defdelegate xandra_nodes, to: DataAccessConfig - defdelegate xandra_nodes!, to: DataAccessConfig + def xandra_options! do + cluster = Application.get_env(:astarte_pairing, :cluster_name) - defdelegate cqex_nodes, to: DataAccessConfig - defdelegate cqex_nodes!, to: DataAccessConfig - - defdelegate xandra_options!, to: DataAccessConfig - defdelegate cqex_options!, to: DataAccessConfig + # Dropping :autodiscovery since the option has been deprecated in xandra v0.15.0 + # and is now always enabled. + DataAccessConfig.xandra_options!() + |> Keyword.drop([:autodiscovery]) + |> Keyword.put(:name, cluster) + end end diff --git a/apps/astarte_pairing/lib/astarte_pairing/engine.ex b/apps/astarte_pairing/lib/astarte_pairing/engine.ex index 34faa985c..dba463320 100644 --- a/apps/astarte_pairing/lib/astarte_pairing/engine.ex +++ b/apps/astarte_pairing/lib/astarte_pairing/engine.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ defmodule Astarte.Pairing.Engine do alias Astarte.Pairing.Config alias Astarte.Pairing.CredentialsSecret alias Astarte.Pairing.Queries - alias CQEx.Client require Logger @@ -56,23 +55,10 @@ defmodule Astarte.Pairing.Engine do end def get_agent_public_key_pems(realm) do - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - - with {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), - {:ok, jwt_pems} <- Queries.get_agent_public_key_pems(client) do - {:ok, jwt_pems} - else - {:error, :shutdown} -> - {:error, :realm_not_found} - - {:error, reason} -> - {:error, reason} + case Queries.get_agent_public_key_pems(realm) do + {:ok, jwt_pems} -> {:ok, jwt_pems} + {:error, :public_key_not_found} -> {:error, :realm_not_found} + {:error, reason} -> {:error, reason} end end @@ -90,34 +76,25 @@ defmodule Astarte.Pairing.Engine do :telemetry.execute([:astarte, :pairing, :get_credentials], %{}, %{realm: realm}) - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true), {:ok, ip_tuple} <- parse_ip(device_ip), - {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), - {:ok, device_row} <- Queries.select_device_for_credentials_request(client, device_id), + {:ok, device_row} <- Queries.select_device_for_credentials_request(realm, device_id), {:authorized?, true} <- {:authorized?, - CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])}, + CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])}, {:credentials_inhibited?, false} <- - {:credentials_inhibited?, device_row[:inhibit_credentials_request]}, - _ <- CFSSLCredentials.revoke(device_row[:cert_serial], device_row[:cert_aki]), + {:credentials_inhibited?, device_row["inhibit_credentials_request"]}, + _ <- CFSSLCredentials.revoke(device_row["cert_serial"], device_row["cert_aki"]), encoded_device_id <- Device.encode_device_id(device_id), {:ok, %{cert: cert, aki: _aki, serial: _serial} = cert_data} <- CFSSLCredentials.get_certificate(csr, realm, encoded_device_id), :ok <- Queries.update_device_after_credentials_request( - client, + realm, device_id, cert_data, ip_tuple, - device_row[:first_credentials_request] + device_row["first_credentials_request"] ) do {:ok, %{client_crt: cert}} else @@ -153,20 +130,11 @@ defmodule Astarte.Pairing.Engine do def get_info(realm, hardware_id, credentials_secret) do Logger.debug("get_info request for device #{inspect(hardware_id)} in realm #{inspect(realm)}") - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true), - {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), - {:ok, device_row} <- Queries.select_device_for_info(client, device_id), + {:ok, device_row} <- Queries.select_device_for_info(realm, device_id), {:authorized?, true} <- {:authorized?, - CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])} do + CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])} do device_status = device_status_string(device_row) protocols = get_protocol_info() @@ -193,19 +161,10 @@ defmodule Astarte.Pairing.Engine do :telemetry.execute([:astarte, :pairing, :register_new_device], %{}, %{realm: realm}) - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true), - {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), credentials_secret <- CredentialsSecret.generate(), secret_hash <- CredentialsSecret.hash(credentials_secret), - :ok <- Queries.register_device(client, device_id, hardware_id, secret_hash, opts) do + :ok <- Queries.register_device(realm, device_id, hardware_id, secret_hash, opts) do {:ok, credentials_secret} else {:error, :shutdown} -> @@ -222,17 +181,8 @@ defmodule Astarte.Pairing.Engine do "in realm #{inspect(realm)}" ) - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - with {:ok, device_id} <- Device.decode_device_id(encoded_device_id), - {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), - :ok <- Queries.unregister_device(client, device_id) do + :ok <- Queries.unregister_device(realm, device_id) do :ok else {:error, :shutdown} -> @@ -248,19 +198,10 @@ defmodule Astarte.Pairing.Engine do "verify_credentials request for device #{inspect(hardware_id)} in realm #{inspect(realm)}" ) - cqex_options = - Config.cqex_options!() - |> Keyword.put(:keyspace, realm) - with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true), - {:ok, client} <- - Client.new( - Config.cassandra_node!(), - cqex_options - ), - {:ok, device_row} <- Queries.select_device_for_verify_credentials(client, device_id), + {:ok, device_row} <- Queries.select_device_for_verify_credentials(realm, device_id), {:authorized?, true} <- - {:authorized?, CredentialsSecret.verify(secret, device_row[:credentials_secret])} do + {:authorized?, CredentialsSecret.verify(secret, device_row["credentials_secret"])} do CertVerifier.verify(client_crt, Config.ca_cert!()) else {:authorized?, false} -> @@ -286,16 +227,12 @@ defmodule Astarte.Pairing.Engine do end defp device_status_string(device_row) do - # The device is pending until the first credendtial request - cond do - Keyword.get(device_row, :inhibit_credentials_request) -> - "inhibited" - - Keyword.get(device_row, :first_credentials_request) -> - "confirmed" + # The device is pending until the first credential request - true -> - "pending" + case device_row do + %{"inhibit_credentials_request" => true} -> "inhibited" + %{"first_credentials_request" => nil} -> "pending" + %{} -> "confirmed" end end diff --git a/apps/astarte_pairing/lib/astarte_pairing/queries.ex b/apps/astarte_pairing/lib/astarte_pairing/queries.ex index b5d150bed..9b6136195 100644 --- a/apps/astarte_pairing/lib/astarte_pairing/queries.ex +++ b/apps/astarte_pairing/lib/astarte_pairing/queries.ex @@ -21,9 +21,6 @@ defmodule Astarte.Pairing.Queries do This module is responsible for the interaction with the database. """ - alias CQEx.Query - alias CQEx.Result - require Logger alias Astarte.Core.Realm @@ -54,168 +51,160 @@ defmodule Astarte.Pairing.Queries do @protocol_revision 1 @default_query_opts [uuid_format: :binary, timestamp_format: :integer] - @default_custom_query_opts [result: :page, context: nil] - def get_agent_public_key_pems(client) do - get_jwt_public_key_pem = """ + @cluster Application.compile_env!(:astarte_pairing, :cluster_name) + + def get_agent_public_key_pems(realm_name) do + statement = """ SELECT blobAsVarchar(value) FROM kv_store WHERE group='auth' AND key='jwt_public_key_pem'; """ - # TODO: add additional keys - query = - Query.new() - |> Query.statement(get_jwt_public_key_pem) - - with {:ok, res} <- Query.call(client, query), - ["system.blobasvarchar(value)": pem] <- Result.head(res) do + with {:ok, result} <- + custom_query(statement, realm_name, %{}, + context: "agent public key fetch for realm #{realm_name}", + result: {:first!, :public_key_not_found} + ) do + %{"system.blobasvarchar(value)" => pem} = result {:ok, [pem]} - else - :empty_dataset -> - {:error, :public_key_not_found} - - error -> - Logger.warn("DB error: #{inspect(error)}") - {:error, :database_error} end end - def register_device(client, device_id, extended_id, credentials_secret, opts \\ []) do + def register_device(realm_name, device_id, extended_id, credentials_secret, opts \\ []) do statement = """ SELECT first_credentials_request, first_registration FROM devices WHERE device_id=:device_id """ - device_exists_query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) - |> Query.consistency(:quorum) + params = %{"device_id" => device_id} - with {:ok, res} <- Query.call(client, device_exists_query) do - case Result.head(res) do - :empty_dataset -> + with {:ok, result} <- + custom_query(statement, realm_name, params, consistency: :quorum, result: :first) do + case result do + nil -> registration_timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) Logger.info("register request for new device: #{inspect(extended_id)}") - do_register_device(client, device_id, credentials_secret, registration_timestamp, opts) - [first_credentials_request: nil, first_registration: registration_timestamp] -> + do_register_device( + realm_name, + device_id, + credentials_secret, + registration_timestamp, + opts + ) + + %{"first_credentials_request" => nil, "first_registration" => registration_timestamp} -> Logger.info("register request for existing unconfirmed device: #{inspect(extended_id)}") - do_register_device(client, device_id, credentials_secret, registration_timestamp, opts) - [first_credentials_request: _timestamp, first_registration: _registration_timestamp] -> + do_register_device( + realm_name, + device_id, + credentials_secret, + registration_timestamp, + opts + ) + + %{ + "first_credentials_request" => _timestamp, + "first_registration" => _registration_timestamp + } -> Logger.warn("register request for existing confirmed device: #{inspect(extended_id)}") {:error, :already_registered} end - else - error -> - Logger.warn("DB error: #{inspect(error)}") - {:error, :database_error} end end - def unregister_device(client, device_id) do - with :ok <- check_already_registered_device(client, device_id), - :ok <- do_unregister_device(client, device_id) do + def unregister_device(realm_name, device_id) do + with :ok <- check_already_registered_device(realm_name, device_id), + :ok <- do_unregister_device(realm_name, device_id) do :ok - else - %{acc: _acc, msg: msg} -> - Logger.warn("DB error: #{inspect(msg)}") - {:error, :database_error} - - {:error, reason} -> - Logger.warn("Unregister error: #{inspect(reason)}") - {:error, reason} end end - defp check_already_registered_device(client, device_id) do + defp check_already_registered_device(realm_name, device_id) do statement = """ SELECT device_id FROM devices WHERE device_id=:device_id """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) - |> Query.consistency(:quorum) - - with {:ok, res} <- Query.call(client, query) do - case Result.head(res) do - [device_id: _device_id] -> - :ok + params = %{"device_id" => device_id} - :empty_dataset -> - {:error, :device_not_registered} - end + with {:ok, _result} <- + custom_query(statement, realm_name, params, + consistency: :quorum, + result: {:first!, :device_not_registered} + ) do + :ok end end - defp do_unregister_device(client, device_id) do + defp do_unregister_device(realm_name, device_id) do statement = """ INSERT INTO devices (device_id, first_credentials_request, credentials_secret) VALUES (:device_id, :first_credentials_request, :credentials_secret) """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) - |> Query.put(:first_credentials_request, nil) - |> Query.put(:credentials_secret, nil) - |> Query.consistency(:quorum) + params = %{ + "device_id" => device_id, + "first_credentials_request" => nil, + "credentials_secret" => nil + } - with {:ok, _res} <- Query.call(client, query) do - :ok + case custom_query(statement, realm_name, params, consistency: :quorum) do + {:ok, _result} -> + :ok + + {:error, reason} -> + _ = Logger.warn("Unregister error: #{reason}") + {:error, reason} end end - def select_device_for_credentials_request(client, device_id) do + def select_device_for_credentials_request(realm_name, device_id) do statement = """ SELECT first_credentials_request, cert_aki, cert_serial, inhibit_credentials_request, credentials_secret FROM devices WHERE device_id=:device_id """ - do_select_device(client, device_id, statement) + do_select_device(realm_name, device_id, statement) end - def select_device_for_info(client, device_id) do + def select_device_for_info(realm_name, device_id) do statement = """ SELECT credentials_secret, inhibit_credentials_request, first_credentials_request FROM devices WHERE device_id=:device_id """ - do_select_device(client, device_id, statement) + do_select_device(realm_name, device_id, statement) end - def select_device_for_verify_credentials(client, device_id) do + def select_device_for_verify_credentials(realm_name, device_id) do statement = """ SELECT credentials_secret FROM devices WHERE device_id=:device_id """ - do_select_device(client, device_id, statement) + do_select_device(realm_name, device_id, statement) end - def update_device_after_credentials_request(client, device_id, cert_data, device_ip, nil) do + def update_device_after_credentials_request(realm_name, device_id, cert_data, device_ip, nil) do first_credentials_request_timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) update_device_after_credentials_request( - client, + realm_name, device_id, cert_data, device_ip, @@ -224,7 +213,7 @@ defmodule Astarte.Pairing.Queries do end def update_device_after_credentials_request( - client, + realm_name, device_id, %{serial: serial, aki: aki} = _cert_data, device_ip, @@ -237,48 +226,30 @@ defmodule Astarte.Pairing.Queries do WHERE device_id=:device_id """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) - |> Query.put(:cert_aki, aki) - |> Query.put(:cert_serial, serial) - |> Query.put(:last_credentials_request_ip, device_ip) - |> Query.put(:first_credentials_request, first_credentials_request_timestamp) - |> Query.put(:protocol_revision, @protocol_revision) - |> Query.consistency(:quorum) - - case Query.call(client, query) do - {:ok, _res} -> - :ok + params = %{ + "device_id" => device_id, + "cert_aki" => aki, + "cert_serial" => serial, + "last_credentials_request_ip" => device_ip, + "first_credentials_request" => first_credentials_request_timestamp, + "protocol_revision" => @protocol_revision + } - error -> - Logger.warn("DB error: #{inspect(error)}") - {:error, :database_error} + with {:ok, _} <- custom_query(statement, realm_name, params, consistency: :quorum) do + :ok end end - defp do_select_device(client, device_id, select_statement) do - device_query = - Query.new() - |> Query.statement(select_statement) - |> Query.put(:device_id, device_id) - |> Query.consistency(:quorum) - - with {:ok, res} <- Query.call(client, device_query), - device_row when is_list(device_row) <- Result.head(res) do - {:ok, device_row} - else - :empty_dataset -> - {:error, :device_not_found} + defp do_select_device(realm_name, device_id, select_statement) do + params = %{"device_id" => device_id} - error -> - Logger.warn("DB error: #{inspect(error)}") - {:error, :database_error} - end + custom_query(select_statement, realm_name, params, + consistency: :quorum, + result: {:first!, :device_not_found} + ) end - defp do_register_device(client, device_id, credentials_secret, registration_timestamp, opts) do + defp do_register_device(realm_name, device_id, credentials_secret, registration_timestamp, opts) do statement = """ INSERT INTO devices (device_id, first_registration, credentials_secret, inhibit_credentials_request, @@ -295,39 +266,33 @@ defmodule Astarte.Pairing.Queries do |> Keyword.get(:initial_introspection, []) |> build_initial_introspection_maps() - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) - |> Query.put(:first_registration, registration_timestamp) - |> Query.put(:credentials_secret, credentials_secret) - |> Query.put(:inhibit_credentials_request, false) - |> Query.put(:protocol_revision, 0) - |> Query.put(:total_received_bytes, 0) - |> Query.put(:total_received_msgs, 0) - |> Query.put(:introspection, introspection) - |> Query.put(:introspection_minor, introspection_minor) - |> Query.consistency(:quorum) - - case Query.call(client, query) do - {:ok, _res} -> - :ok - - error -> - Logger.warn("DB error: #{inspect(error)}") - {:error, :database_error} + params = %{ + "device_id" => device_id, + "first_registration" => registration_timestamp, + "credentials_secret" => credentials_secret, + "inhibit_credentials_request" => false, + "protocol_revision" => 0, + "total_received_bytes" => 0, + "total_received_msgs" => 0, + "introspection" => introspection, + "introspection_minor" => introspection_minor + } + + with {:ok, _result} <- custom_query(statement, realm_name, params, consistency: :quorum) do + :ok end end defp build_initial_introspection_maps(initial_introspection) do - Enum.reduce(initial_introspection, {[], []}, fn introspection_entry, {majors, minors} -> + Enum.reduce(initial_introspection, {%{}, %{}}, fn introspection_entry, {majors, minors} -> %{ interface_name: interface_name, major_version: major_version, minor_version: minor_version } = introspection_entry - {[{interface_name, major_version} | majors], [{interface_name, minor_version} | minors]} + {Map.put(majors, interface_name, major_version), + Map.put(minors, interface_name, minor_version)} end) end @@ -338,7 +303,7 @@ defmodule Astarte.Pairing.Queries do """ with {:ok, %Xandra.Page{} = page} <- - Xandra.Cluster.execute(:xandra, query, %{}, consistency: consistency), + Xandra.Cluster.execute(@cluster, query, %{}, consistency: consistency), {:ok, _} <- Enum.fetch(page, 0) do :ok else @@ -373,7 +338,7 @@ defmodule Astarte.Pairing.Queries do def custom_query(statement, realm \\ nil, params \\ %{}, opts \\ []) do {custom_opts, query_opts} = parse_opts(opts) - Xandra.Cluster.run(:xandra, fn conn -> + Xandra.Cluster.run(@cluster, fn conn -> execute_query(conn, statement, realm, params, query_opts, custom_opts) end) end diff --git a/apps/astarte_pairing/lib/astarte_pairing/rpc/handler.ex b/apps/astarte_pairing/lib/astarte_pairing/rpc/handler.ex index e8709dbed..e16ac1d65 100644 --- a/apps/astarte_pairing/lib/astarte_pairing/rpc/handler.ex +++ b/apps/astarte_pairing/lib/astarte_pairing/rpc/handler.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -272,13 +272,52 @@ defmodule Astarte.Pairing.RPC.Handler do end defp generic_error( - error_name, + error, user_readable_message \\ nil, user_readable_error_name \\ nil, error_data \\ nil + ) + + defp generic_error( + %Xandra.Error{message: message, reason: reason} = _error, + user_readable_message, + user_readable_error_name, + error_data + ) do + %GenericErrorReply{ + error_name: inspect(reason), + user_readable_message: user_readable_message || message, + user_readable_error_name: user_readable_error_name, + error_data: error_data + } + |> encode_reply(:generic_error_reply) + |> ok_wrap() + end + + defp generic_error( + %Xandra.ConnectionError{action: _action, reason: reason} = _error, + user_readable_message, + user_readable_error_name, + error_data + ) do + %GenericErrorReply{ + error_name: inspect(reason), + user_readable_message: user_readable_message, + user_readable_error_name: user_readable_error_name, + error_data: error_data + } + |> encode_reply(:generic_error_reply) + |> ok_wrap() + end + + defp generic_error( + error, + user_readable_message, + user_readable_error_name, + error_data ) do %GenericErrorReply{ - error_name: to_string(error_name), + error_name: to_string(error), user_readable_message: user_readable_message, user_readable_error_name: user_readable_error_name, error_data: error_data diff --git a/apps/astarte_pairing/mix.exs b/apps/astarte_pairing/mix.exs index 9489c22c3..e732a7dff 100644 --- a/apps/astarte_pairing/mix.exs +++ b/apps/astarte_pairing/mix.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2021 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -83,7 +83,7 @@ defmodule Astarte.Pairing.Mixfile do {:telemetry_metrics_prometheus_core, "~> 0.4"}, {:telemetry_metrics, "~> 0.4"}, {:telemetry_poller, "~> 0.4"}, - {:xandra, "~> 0.13"}, + {:xandra, "~> 0.17"}, {:pretty_log, "~> 0.1"}, {:skogsra, "~> 2.2"}, {:telemetry, "~> 0.4"}, diff --git a/apps/astarte_pairing/mix.lock b/apps/astarte_pairing/mix.lock index efbcddeee..8e3551aa2 100644 --- a/apps/astarte_pairing/mix.lock +++ b/apps/astarte_pairing/mix.lock @@ -17,8 +17,8 @@ "cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"}, "cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"}, - "db_connection": {:hex, :db_connection, "2.3.1", "4c9f3ed1ef37471cbdd2762d6655be11e38193904d9c5c1c9389f1b891a3088e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "abaab61780dde30301d840417890bd9f74131041afd02174cf4e10635b3a63f5"}, - "decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"}, + "db_connection": {:hex, :db_connection, "2.5.0", "bb6d4f30d35ded97b29fe80d8bd6f928a1912ca1ff110831edcd238a1973652c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c92d5ba26cd69ead1ff7582dbb860adeedfff39774105a4f1c92cbb654b55aa2"}, + "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, "dialyzex": {:git, "https://github.com/Comcast/dialyzex.git", "cdc7cf71fe6df0ce4cf59e3f497579697a05c989", []}, "ecto": {:hex, :ecto, "3.5.8", "8ebf12be6016cb99313348ba7bb4612f4114b9a506d6da79a2adc7ef449340bc", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ea0be182ea8922eb7742e3ae8e71b67ee00ae177de1bf76210299a5f16ba4c77"}, @@ -39,6 +39,7 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, "observer_cli": {:hex, :observer_cli, "1.6.1", "d176f967c978ab8b8a29c35c12524f78b7bb36fd4e9b8276dd75c9cb56e07e42", [:mix, :rebar3], [{:recon, "~>2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "3418e319764b9dff1f469e43cbdffd7fd54ea47cbf765027c557abd146a19fb3"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, @@ -55,11 +56,11 @@ "skogsra": {:hex, :skogsra, "2.3.3", "90ea76d98ad749241b31e724ca17ed8aca0202001972aeca3cb834f44027f3ea", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:yamerl, "~> 0.8", [hex: :yamerl, repo: "hexpm", optional: true]}], "hexpm", "e36880922431d41ac56d6cb4529b0526039a108fb44f8ecc90b517d494b86c28"}, "snappyer": {:hex, :snappyer, "1.2.6", "34181e3233f68a92044e176fe96e54fee7957acc2be554f0460d799c495166c2", [:rebar3], [], "hexpm", "d538d1e8892af09dc8b2771b2652c9d70f009cd1556246b3e22706df643f47b4"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.0", "da9d49ee7e6bb1c259d36ce6539cd45ae14d81247a2b0c90edf55e2b50507f7b", [:mix], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5cfe67ad464b243835512aa44321cee91faed6ea868d7fb761d7016e02915c3d"}, "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "0.4.2", "9c5b2cce20222f3c16256dcd6cfaf59d05a84dc664060ab938c6ac40e91d05e0", [:mix], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.5", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "9cd707328a1971c886993f3e4b22c68835153efa674bf31a957b30439de9b258"}, "telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "uuid": {:hex, :uuid_erl, "2.0.1", "1fd9079c544d521063897887a1c5b3302dca98f9bb06aadcdc6fb0663f256797", [:rebar3], [{:quickrand, "~> 2.0.1", [hex: :quickrand, repo: "hexpm", optional: false]}], "hexpm", "ab57caccd51f170011e5f444ce865f84b41605e483a9efcc468c1afaec87553b"}, - "xandra": {:hex, :xandra, "0.13.1", "f82866e6c47527f74f35dd3007b5311121852dd861a29ed1613e27ccfaba0102", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "a2efdb8921e3b694bf3505e40c5ec9d353d8fa3755cec946be7c18b8236d7230"}, + "xandra": {:hex, :xandra, "0.17.0", "c1291a6ade16d19ddf4ebb5e3e947b5e3177e3a0791913a2c4a947b34aa5d400", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.7 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "65937898bbfe5eba692a5ce2937cba792bef01deae866cecdd0f0f59b327c88a"}, } diff --git a/apps/astarte_pairing/test/astarte_pairing/engine_test.exs b/apps/astarte_pairing/test/astarte_pairing/engine_test.exs index 1bb974983..034e0591a 100644 --- a/apps/astarte_pairing/test/astarte_pairing/engine_test.exs +++ b/apps/astarte_pairing/test/astarte_pairing/engine_test.exs @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ defmodule Astarte.Pairing.EngineTest do use ExUnit.Case alias Astarte.Core.Device - alias Astarte.Pairing.Config alias Astarte.Pairing.CredentialsSecret alias Astarte.Pairing.DatabaseTestHelper alias Astarte.Pairing.Engine @@ -55,6 +54,7 @@ defmodule Astarte.Pairing.EngineTest do @valid_ip "2.3.4.5" setup_all do + DatabaseTestHelper.await_cluster_connected() DatabaseTestHelper.create_db() on_exit(fn -> @@ -153,11 +153,11 @@ defmodule Astarte.Pairing.EngineTest do introspection = DatabaseTestHelper.get_introspection(hw_id) introspection_minor = DatabaseTestHelper.get_introspection_minor(hw_id) - assert Enum.member?(introspection, {"org.astarteplatform.Values", 0}) - assert Enum.member?(introspection_minor, {"org.astarteplatform.Values", 3}) + assert introspection["org.astarteplatform.Values"] == 0 + assert introspection_minor["org.astarteplatform.Values"] == 3 - assert Enum.member?(introspection, {"org.astarteplatform.OtherValues", 1}) - assert Enum.member?(introspection_minor, {"org.astarteplatform.OtherValues", 2}) + assert introspection["org.astarteplatform.OtherValues"] == 1 + assert introspection_minor["org.astarteplatform.OtherValues"] == 2 end end @@ -359,14 +359,10 @@ defmodule Astarte.Pairing.EngineTest do {:ok, device_id} = Device.decode_device_id(hw_id, allow_extended_id: true) - db_client = - Config.cassandra_node!() - |> CQEx.Client.new!(keyspace: @test_realm) + {:ok, device} = Queries.select_device_for_credentials_request(@test_realm, device_id) - {:ok, device} = Queries.select_device_for_credentials_request(db_client, device_id) - - assert device[:cert_aki] == second_aki - assert device[:cert_serial] == second_serial + assert device["cert_aki"] == second_aki + assert device["cert_serial"] == second_serial end test "retains first_credentials_request timestamp" do @@ -375,14 +371,10 @@ defmodule Astarte.Pairing.EngineTest do {:ok, device_id} = Device.decode_device_id(hw_id, allow_extended_id: true) - db_client = - Config.cassandra_node!() - |> CQEx.Client.new!(keyspace: @test_realm) - {:ok, no_credentials_requested_device} = - Queries.select_device_for_credentials_request(db_client, device_id) + Queries.select_device_for_credentials_request(@test_realm, device_id) - assert no_credentials_requested_device[:first_credentials_request] == nil + assert no_credentials_requested_device["first_credentials_request"] == nil assert {:ok, %{client_crt: _first_certificate}} = Engine.get_credentials( @@ -395,10 +387,10 @@ defmodule Astarte.Pairing.EngineTest do ) {:ok, credentials_requested_device} = - Queries.select_device_for_credentials_request(db_client, device_id) + Queries.select_device_for_credentials_request(@test_realm, device_id) first_credentials_request_timestamp = - credentials_requested_device[:first_credentials_request] + credentials_requested_device["first_credentials_request"] assert first_credentials_request_timestamp != nil @@ -413,10 +405,10 @@ defmodule Astarte.Pairing.EngineTest do ) {:ok, credentials_requested_again_device} = - Queries.select_device_for_credentials_request(db_client, device_id) + Queries.select_device_for_credentials_request(@test_realm, device_id) assert first_credentials_request_timestamp == - credentials_requested_again_device[:first_credentials_request] + credentials_requested_again_device["first_credentials_request"] end end diff --git a/apps/astarte_pairing/test/astarte_pairing/rpc/handler_test.exs b/apps/astarte_pairing/test/astarte_pairing/rpc/handler_test.exs index 134641f2e..3572492c0 100644 --- a/apps/astarte_pairing/test/astarte_pairing/rpc/handler_test.exs +++ b/apps/astarte_pairing/test/astarte_pairing/rpc/handler_test.exs @@ -106,6 +106,7 @@ defmodule Astarte.Pairing.RPC.HandlerTest do """ setup_all do + DatabaseTestHelper.await_cluster_connected() DatabaseTestHelper.create_db() on_exit(fn -> diff --git a/apps/astarte_pairing/test/support/database_test_helper.ex b/apps/astarte_pairing/test/support/database_test_helper.ex index 0b6bedb7c..5c6f45866 100644 --- a/apps/astarte_pairing/test/support/database_test_helper.ex +++ b/apps/astarte_pairing/test/support/database_test_helper.ex @@ -1,7 +1,7 @@ # # This file is part of Astarte. # -# Copyright 2017-2018 Ispirata Srl +# Copyright 2017-2023 SECO Mind Srl # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,23 +18,22 @@ defmodule Astarte.Pairing.DatabaseTestHelper do alias Astarte.Core.Device - alias Astarte.Pairing.Config + alias Astarte.Pairing.Queries alias Astarte.Pairing.TestHelper alias Astarte.Pairing.CredentialsSecret alias Astarte.Pairing.CredentialsSecret.Cache - alias CQEx.Query - alias CQEx.Client - alias CQEx.Result + + @test_realm "autotestrealm" @create_autotestrealm """ - CREATE KEYSPACE autotestrealm + CREATE KEYSPACE #{@test_realm} WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true; """ @create_devices_table """ - CREATE TABLE autotestrealm.devices ( + CREATE TABLE #{@test_realm}.devices ( device_id uuid, introspection map, introspection_minor map, @@ -60,7 +59,7 @@ defmodule Astarte.Pairing.DatabaseTestHelper do """ @create_kv_store_table """ - CREATE TABLE autotestrealm.kv_store ( + CREATE TABLE #{@test_realm}.kv_store ( group varchar, key varchar, value blob, @@ -77,16 +76,14 @@ defmodule Astarte.Pairing.DatabaseTestHelper do """ @insert_jwt_public_key_pem """ - INSERT INTO autotestrealm.kv_store (group, key, value) + INSERT INTO #{@test_realm}.kv_store (group, key, value) VALUES ('auth', 'jwt_public_key_pem', varcharAsBlob('#{@jwt_public_key_pem}')) """ @drop_autotestrealm """ - DROP KEYSPACE autotestrealm; + DROP KEYSPACE #{@test_realm}; """ - @test_realm "autotestrealm" - @unregistered_128_bit_hw_id TestHelper.random_128_bit_hw_id() @unregistered_256_bit_hw_id TestHelper.random_256_bit_hw_id() @@ -103,7 +100,7 @@ defmodule Astarte.Pairing.DatabaseTestHelper do @registered_and_inhibited_credentials_secret CredentialsSecret.generate() @insert_device """ - INSERT INTO devices + INSERT INTO #{@test_realm}.devices (device_id, credentials_secret, inhibit_credentials_request, first_registration, protocol_revision, total_received_bytes, total_received_msgs, first_credentials_request) VALUES (:device_id, :credentials_secret, :inhibit_credentials_request, :first_registration, @@ -138,18 +135,27 @@ defmodule Astarte.Pairing.DatabaseTestHelper do def registered_and_inhibited_credentials_secret(), do: @registered_and_inhibited_credentials_secret + # https://github.com/lexhide/xandra/blob/47cabaa3a5ae49127f1a9da91acd003f5ada7c1d/test/support/test_helper.ex#L7C15-L7C15 + def await_cluster_connected(cluster \\ nil, tries \\ 10) do + cluster = cluster || Application.get_env(:astarte_pairing, :cluster_name) + fun = &Xandra.execute!(&1, "SELECT * FROM system.local") + + with {:error, %Xandra.ConnectionError{}} <- Xandra.Cluster.run(cluster, _options = [], fun) do + if tries > 0 do + Process.sleep(100) + await_cluster_connected(cluster, tries - 1) + else + raise("Connection to the cluster failed") + end + end + end + def create_db do - client = - Config.cassandra_node!() - |> Client.new!() - - with {:ok, _} <- Query.call(client, @create_autotestrealm), - {:ok, _} <- Query.call(client, @create_devices_table), - {:ok, _} <- Query.call(client, @create_kv_store_table), - {:ok, _} <- Query.call(client, @insert_jwt_public_key_pem) do + with {:ok, _} <- Queries.custom_query(@create_autotestrealm), + {:ok, _} <- Queries.custom_query(@create_devices_table), + {:ok, _} <- Queries.custom_query(@create_kv_store_table), + {:ok, _} <- Queries.custom_query(@insert_jwt_public_key_pem) do :ok - else - %{msg: msg} -> {:error, msg} end end @@ -157,176 +163,123 @@ defmodule Astarte.Pairing.DatabaseTestHelper do end def seed_devices do - client = - Config.cassandra_node!() - |> Client.new!(keyspace: @test_realm) - {:ok, registered_not_confirmed_device_id} = Device.decode_device_id(@registered_not_confirmed_hw_id, allow_extended_id: true) secret_hash = CredentialsSecret.hash(@registered_not_confirmed_credentials_secret) - registered_not_confirmed_query = - Query.new() - |> Query.statement(@insert_device) - |> Query.put(:device_id, registered_not_confirmed_device_id) - |> Query.put(:credentials_secret, secret_hash) - |> Query.put(:inhibit_credentials_request, false) - |> Query.put( - :first_registration, - TestHelper.now_millis() - ) - |> Query.put(:first_credentials_request, nil) + registered_not_confirmed_params = %{ + "device_id" => registered_not_confirmed_device_id, + "credentials_secret" => secret_hash, + "inhibit_credentials_request" => false, + "first_registration" => TestHelper.now_millis(), + "first_credentials_request" => nil + } {:ok, registered_and_confirmed_256_device_id} = Device.decode_device_id(@registered_and_confirmed_256_hw_id, allow_extended_id: true) secret_hash = CredentialsSecret.hash(@registered_and_confirmed_256_credentials_secret) - registered_and_confirmed_256_query = - Query.new() - |> Query.statement(@insert_device) - |> Query.put(:device_id, registered_and_confirmed_256_device_id) - |> Query.put(:credentials_secret, secret_hash) - |> Query.put(:inhibit_credentials_request, false) - |> Query.put( - :first_registration, - TestHelper.now_millis() - ) - |> Query.put( - :first_credentials_request, - TestHelper.now_millis() - ) + registered_and_confirmed_256_params = %{ + "device_id" => registered_and_confirmed_256_device_id, + "credentials_secret" => secret_hash, + "inhibit_credentials_request" => false, + "first_registration" => TestHelper.now_millis(), + "first_credentials_request" => TestHelper.now_millis() + } {:ok, registered_and_confirmed_128_device_id} = Device.decode_device_id(@registered_and_confirmed_128_hw_id, allow_extended_id: true) secret_hash = CredentialsSecret.hash(@registered_and_confirmed_128_credentials_secret) - registered_and_confirmed_128_query = - Query.new() - |> Query.statement(@insert_device) - |> Query.put(:device_id, registered_and_confirmed_128_device_id) - |> Query.put(:credentials_secret, secret_hash) - |> Query.put(:inhibit_credentials_request, false) - |> Query.put( - :first_registration, - TestHelper.now_millis() - ) - |> Query.put( - :first_credentials_request, - TestHelper.now_millis() - ) + registered_and_confirmed_128_params = %{ + "device_id" => registered_and_confirmed_128_device_id, + "credentials_secret" => secret_hash, + "inhibit_credentials_request" => false, + "first_registration" => TestHelper.now_millis(), + "first_credentials_request" => TestHelper.now_millis() + } {:ok, registered_and_inhibited_device_id} = Device.decode_device_id(@registered_and_inhibited_hw_id, allow_extended_id: true) secret_hash = CredentialsSecret.hash(@registered_and_inhibited_credentials_secret) - registered_and_inhibited_query = - Query.new() - |> Query.statement(@insert_device) - |> Query.put(:device_id, registered_and_inhibited_device_id) - |> Query.put(:credentials_secret, secret_hash) - |> Query.put(:inhibit_credentials_request, true) - |> Query.put( - :first_registration, - TestHelper.now_millis() - ) - |> Query.put( - :first_credentials_request, - TestHelper.now_millis() - ) - - with {:ok, _} <- Query.call(client, registered_not_confirmed_query), - {:ok, _} <- Query.call(client, registered_and_confirmed_256_query), - {:ok, _} <- Query.call(client, registered_and_confirmed_128_query), - {:ok, _} <- Query.call(client, registered_and_inhibited_query) do + registered_and_inhibited_params = %{ + "device_id" => registered_and_inhibited_device_id, + "credentials_secret" => secret_hash, + "inhibit_credentials_request" => true, + "first_registration" => TestHelper.now_millis(), + "first_credentials_request" => TestHelper.now_millis() + } + + with {:ok, _} <- + Queries.custom_query(@insert_device, @test_realm, registered_not_confirmed_params), + {:ok, _} <- + Queries.custom_query(@insert_device, @test_realm, registered_and_confirmed_256_params), + {:ok, _} <- + Queries.custom_query(@insert_device, @test_realm, registered_and_confirmed_128_params), + {:ok, _} <- + Queries.custom_query(@insert_device, @test_realm, registered_and_inhibited_params) do :ok end end def get_first_registration(hardware_id) do - client = - Config.cassandra_node!() - |> Client.new!(keyspace: @test_realm) - {:ok, device_id} = Device.decode_device_id(hardware_id, allow_extended_id: true) statement = """ SELECT first_registration - FROM devices + FROM #{@test_realm}.devices WHERE device_id=:device_id """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) + params = %{"device_id" => device_id} - with {:ok, result} <- Query.call(client, query), - [first_registration: first_registration] <- Result.head(result) do + with {:ok, result} <- Queries.custom_query(statement, @test_realm, params, result: :first), + %{"first_registration" => first_registration} <- result do first_registration - else - :empty_dataset -> - nil end end def get_introspection(hardware_id) do - client = - Config.cassandra_node!() - |> Client.new!(keyspace: @test_realm) - {:ok, device_id} = Device.decode_device_id(hardware_id, allow_extended_id: true) statement = """ SELECT introspection - FROM devices + FROM #{@test_realm}.devices WHERE device_id=:device_id """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) + params = %{"device_id" => device_id} - with {:ok, result} <- Query.call(client, query), - [introspection: introspection] <- Result.head(result) do + with {:ok, result} <- Queries.custom_query(statement, @test_realm, params, result: :first!) do + %{"introspection" => introspection} = result introspection end end def get_introspection_minor(hardware_id) do - client = - Config.cassandra_node!() - |> Client.new!(keyspace: @test_realm) - {:ok, device_id} = Device.decode_device_id(hardware_id, allow_extended_id: true) statement = """ SELECT introspection_minor - FROM devices + FROM #{@test_realm}.devices WHERE device_id=:device_id """ - query = - Query.new() - |> Query.statement(statement) - |> Query.put(:device_id, device_id) + params = %{"device_id" => device_id} - with {:ok, result} <- Query.call(client, query), - [introspection_minor: introspection_minor] <- Result.head(result) do + with {:ok, result} <- Queries.custom_query(statement, @test_realm, params, result: :first!) do + %{"introspection_minor" => introspection_minor} = result introspection_minor end end def clean_devices do - client = - Config.cassandra_node!() - |> Client.new!(keyspace: @test_realm) - - Query.call!(client, "TRUNCATE devices") + Queries.custom_query("TRUNCATE #{@test_realm}.devices") # Also clean the cache Cache.flush() @@ -334,11 +287,7 @@ defmodule Astarte.Pairing.DatabaseTestHelper do end def drop_db do - client = - Config.cassandra_node!() - |> Client.new!() - - Query.call(client, @drop_autotestrealm) + Queries.custom_query(@drop_autotestrealm) # Also clean the cache Cache.flush() end