Skip to content

Commit

Permalink
refactor(pairing): switch to xandra for queries
Browse files Browse the repository at this point in the history
use xandra in place of cqex for all queries to the cluster.

feat: expose cluster name as an env var internally to the config
fix: the tests now wait for the cluster to be connected
fix: remove :autodiscovery key from xandra_options as it is deprecated
chore: update xandra to v0.17.0

Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
  • Loading branch information
noaccOS committed Aug 9, 2023
1 parent 53e7489 commit 7399bb5
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 401 deletions.
4 changes: 3 additions & 1 deletion apps/astarte_pairing/config/config.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -26,4 +26,6 @@ config :lager,
error_logger_redirect: false,
handlers: [level: :critical]

config :astarte_pairing, :cluster_name, :xandra

import_config "#{config_env()}.exs"
8 changes: 2 additions & 6 deletions apps/astarte_pairing/lib/astarte_pairing.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,13 +43,9 @@ defmodule Astarte.Pairing do
Config.validate!()
Config.init!()

xandra_options =
Config.xandra_options!()
|> Keyword.put(:name, :xandra)

children = [
Astarte.PairingWeb.Telemetry,
{Xandra.Cluster, xandra_options},
{Xandra.Cluster, Config.xandra_options!()},
{Astarte.RPC.AMQP.Server, [amqp_queue: Protocol.amqp_queue(), handler: Handler]},
{Astarte.Pairing.CredentialsSecret.Cache, []}
]
Expand Down
16 changes: 10 additions & 6 deletions apps/astarte_pairing/lib/astarte_pairing/config.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,17 +65,21 @@ defmodule Astarte.Pairing.Config do
Returns the cassandra node configuration
"""
@spec cassandra_node!() :: {String.t(), integer()}
def cassandra_node!, do: Enum.random(cqex_nodes!())
def cassandra_node!, do: Enum.random(xandra_nodes!())

@doc """
Returns Cassandra nodes formatted in the Xandra format.
"""
defdelegate xandra_nodes, to: DataAccessConfig
defdelegate xandra_nodes!, to: DataAccessConfig

defdelegate cqex_nodes, to: DataAccessConfig
defdelegate cqex_nodes!, to: DataAccessConfig
def xandra_options! do
cluster = Application.get_env(:astarte_pairing, :cluster_name)

defdelegate xandra_options!, to: DataAccessConfig
defdelegate cqex_options!, to: DataAccessConfig
# Dropping :autodiscovery since the option has been deprecated in xandra v0.15.0
# and is now always enabled.
DataAccessConfig.xandra_options!()
|> Keyword.drop([:autodiscovery])
|> Keyword.put(:name, cluster)
end
end
107 changes: 22 additions & 85 deletions apps/astarte_pairing/lib/astarte_pairing/engine.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -27,7 +27,6 @@ defmodule Astarte.Pairing.Engine do
alias Astarte.Pairing.Config
alias Astarte.Pairing.CredentialsSecret
alias Astarte.Pairing.Queries
alias CQEx.Client

require Logger

Expand Down Expand Up @@ -56,23 +55,10 @@ defmodule Astarte.Pairing.Engine do
end

def get_agent_public_key_pems(realm) do
cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, jwt_pems} <- Queries.get_agent_public_key_pems(client) do
{:ok, jwt_pems}
else
{:error, :shutdown} ->
{:error, :realm_not_found}

{:error, reason} ->
{:error, reason}
case Queries.get_agent_public_key_pems(realm) do
{:ok, jwt_pems} -> {:ok, jwt_pems}
{:error, :public_key_not_found} -> {:error, :realm_not_found}
{:error, reason} -> {:error, reason}
end
end

Expand All @@ -90,34 +76,25 @@ defmodule Astarte.Pairing.Engine do

:telemetry.execute([:astarte, :pairing, :get_credentials], %{}, %{realm: realm})

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, ip_tuple} <- parse_ip(device_ip),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_credentials_request(client, device_id),
{:ok, device_row} <- Queries.select_device_for_credentials_request(realm, device_id),
{:authorized?, true} <-
{:authorized?,
CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])},
CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])},
{:credentials_inhibited?, false} <-
{:credentials_inhibited?, device_row[:inhibit_credentials_request]},
_ <- CFSSLCredentials.revoke(device_row[:cert_serial], device_row[:cert_aki]),
{:credentials_inhibited?, device_row["inhibit_credentials_request"]},
_ <- CFSSLCredentials.revoke(device_row["cert_serial"], device_row["cert_aki"]),
encoded_device_id <- Device.encode_device_id(device_id),
{:ok, %{cert: cert, aki: _aki, serial: _serial} = cert_data} <-
CFSSLCredentials.get_certificate(csr, realm, encoded_device_id),
:ok <-
Queries.update_device_after_credentials_request(
client,
realm,
device_id,
cert_data,
ip_tuple,
device_row[:first_credentials_request]
device_row["first_credentials_request"]
) do
{:ok, %{client_crt: cert}}
else
Expand Down Expand Up @@ -153,20 +130,11 @@ defmodule Astarte.Pairing.Engine do
def get_info(realm, hardware_id, credentials_secret) do
Logger.debug("get_info request for device #{inspect(hardware_id)} in realm #{inspect(realm)}")

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_info(client, device_id),
{:ok, device_row} <- Queries.select_device_for_info(realm, device_id),
{:authorized?, true} <-
{:authorized?,
CredentialsSecret.verify(credentials_secret, device_row[:credentials_secret])} do
CredentialsSecret.verify(credentials_secret, device_row["credentials_secret"])} do
device_status = device_status_string(device_row)
protocols = get_protocol_info()

Expand All @@ -193,19 +161,10 @@ defmodule Astarte.Pairing.Engine do

:telemetry.execute([:astarte, :pairing, :register_new_device], %{}, %{realm: realm})

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
credentials_secret <- CredentialsSecret.generate(),
secret_hash <- CredentialsSecret.hash(credentials_secret),
:ok <- Queries.register_device(client, device_id, hardware_id, secret_hash, opts) do
:ok <- Queries.register_device(realm, device_id, hardware_id, secret_hash, opts) do
{:ok, credentials_secret}
else
{:error, :shutdown} ->
Expand All @@ -222,17 +181,8 @@ defmodule Astarte.Pairing.Engine do
"in realm #{inspect(realm)}"
)

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
:ok <- Queries.unregister_device(client, device_id) do
:ok <- Queries.unregister_device(realm, device_id) do
:ok
else
{:error, :shutdown} ->
Expand All @@ -248,19 +198,10 @@ defmodule Astarte.Pairing.Engine do
"verify_credentials request for device #{inspect(hardware_id)} in realm #{inspect(realm)}"
)

cqex_options =
Config.cqex_options!()
|> Keyword.put(:keyspace, realm)

with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
{:ok, client} <-
Client.new(
Config.cassandra_node!(),
cqex_options
),
{:ok, device_row} <- Queries.select_device_for_verify_credentials(client, device_id),
{:ok, device_row} <- Queries.select_device_for_verify_credentials(realm, device_id),
{:authorized?, true} <-
{:authorized?, CredentialsSecret.verify(secret, device_row[:credentials_secret])} do
{:authorized?, CredentialsSecret.verify(secret, device_row["credentials_secret"])} do
CertVerifier.verify(client_crt, Config.ca_cert!())
else
{:authorized?, false} ->
Expand All @@ -286,16 +227,12 @@ defmodule Astarte.Pairing.Engine do
end

defp device_status_string(device_row) do
# The device is pending until the first credendtial request
cond do
Keyword.get(device_row, :inhibit_credentials_request) ->
"inhibited"

Keyword.get(device_row, :first_credentials_request) ->
"confirmed"
# The device is pending until the first credential request

true ->
"pending"
case device_row do
%{"inhibit_credentials_request" => true} -> "inhibited"
%{"first_credentials_request" => nil} -> "pending"
%{} -> "confirmed"
end
end

Expand Down
Loading

0 comments on commit 7399bb5

Please sign in to comment.