Skip to content

Commit

Permalink
feat(pairing): expose custom query API using Xandra
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
  • Loading branch information
noaccOS committed Aug 9, 2023
1 parent d7997ac commit c42c32a
Showing 1 changed file with 135 additions and 1 deletion.
136 changes: 135 additions & 1 deletion apps/astarte_pairing/lib/astarte_pairing/queries.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# This file is part of Astarte.
#
# Copyright 2017-2018 Ispirata Srl
# Copyright 2017-2023 SECO Mind Srl
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,36 @@ defmodule Astarte.Pairing.Queries do

require Logger

alias Astarte.Core.Realm

@typedoc """
A string representing the current context, to include in error messages.
Not included if nil.
Default: nil
"""
@type custom_context() :: String.t() | nil

@typedoc """
The type to cast the result in.
Unless otherwise stated, all results are returned in an {:ok, value} tuple.
- `:page`: default xandra return type
- `:list`: a list
- `{:first, default}`: only return the first element, `default` if empty
- `:first`: shorthand for `{:first, nil}`
- `{:first!, error}`: like `{:first, default}`, but returns `{:error, error}` if empty
- `:first!`: shorthand for `{:first!, :not_found}`
"""
@type custom_result() :: :page | :list | :first | :first! | {:first, any()} | {:first!, any()}

@type custom_opt() :: {:context, custom_context()} | {:result, custom_result()}
@type xandra_opt() :: {atom(), any()}

@type query_opt() :: custom_opt() | xandra_opt()

@protocol_revision 1
@default_query_opts [uuid_format: :binary, timestamp_format: :integer]

@default_custom_query_opts [result: :page, context: nil]

def get_agent_public_key_pems(client) do
get_jwt_public_key_pem = """
Expand Down Expand Up @@ -338,4 +367,109 @@ defmodule Astarte.Pairing.Queries do
{:error, :database_connection_error}
end
end

@spec custom_query(Xandra.statement(), String.t() | nil, Xandra.values(), [query_opt()]) ::
{:ok, term()} | {:error, term()}
def custom_query(statement, realm \\ nil, params \\ %{}, opts \\ []) do
{custom_opts, query_opts} = parse_opts(opts)

Xandra.Cluster.run(:xandra, fn conn ->
execute_query(conn, statement, realm, params, query_opts, custom_opts)
end)
end

defp execute_query(conn, statement, realm, params, query_opts, custom_opts) do
with {:ok, prepared} <- prepare_query(conn, statement, realm) do
case Xandra.execute(conn, prepared, params, query_opts) do
{:ok, page} ->
cast_query_result(page, custom_opts)

{:error, error} ->
%{message: message, tag: tag} = database_error_message(error, custom_opts[:context])

_ = Logger.warn(message, tag: tag)

{:error, :database_error}
end
end
end

defp use_realm(_conn, nil = _realm), do: :ok

defp use_realm(conn, realm) when is_binary(realm) do
with true <- Realm.valid_name?(realm),
{:ok, %Xandra.SetKeyspace{}} <- Xandra.execute(conn, "USE #{realm}") do
:ok
else
_ -> {:error, :realm_not_found}
end
end

defp prepare_query(conn, statement, realm) do
with :ok <- use_realm(conn, realm) do
case Xandra.prepare(conn, statement) do
{:ok, page} ->
{:ok, page}

{:error, reason} ->
_ = Logger.warn("Cannot prepare query: #{inspect(reason)}.", tag: "db_error")
{:error, :database_error}
end
end
end

defp parse_opts(opts) do
{custom_opts, query_opts} = Keyword.split(opts, Keyword.keys(@default_custom_query_opts))
query_opts = Keyword.merge(@default_query_opts, query_opts)
custom_opts = Keyword.validate!(custom_opts, @default_custom_query_opts)

{custom_opts, query_opts}
end

defp cast_query_result(page, opts) do
result_with_defaults = case opts[:result] do
:first -> {:first, nil}
:first! -> {:first!, :not_found}
x -> x
end
case result_with_defaults do
:page ->
{:ok, page}

:list ->
{:ok, Enum.to_list(page)}

{:first, default} ->
{:ok, Enum.at(page, 0, default)}

{:first!, error} ->
Enum.at(page, 0)
|> case do
nil -> {:error, error}
first -> {:ok, first}
end
end
end

defp database_error_message(%Xandra.Error{message: message, reason: reason}, nil = _context) do
%{message: "Database error #{reason}: #{message}", tag: "db_error"}
end

defp database_error_message(%Xandra.Error{message: message, reason: reason}, context) do
%{message: "Database error #{reason} during #{context}: #{message}", tag: "db_error"}
end

defp database_error_message(
%Xandra.ConnectionError{action: action, reason: reason},
nil = _context
) do
%{message: "Database connection error: #{reason} (#{action})", tag: "db_connection_error"}
end

defp database_error_message(%Xandra.ConnectionError{action: action, reason: reason}, context) do
%{
message: "Database connection error during #{context}: #{reason} (#{action})",
tag: "db_connection_error"
}
end
end

0 comments on commit c42c32a

Please sign in to comment.