Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mongoose_rdbms_backend #3390

Merged
merged 1 commit into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
{xref_ignores, [
eldap_filter_yecc, 'XmppAddr', mongoose_xmpp_errors,
%% *_backend
mongoose_rdbms_backend,
mod_bosh_backend,
mod_global_distrib_mapping_backend,
mod_mam_cassandra_arch_params,
Expand Down
7 changes: 1 addition & 6 deletions src/mam/mod_mam_rdbms_prefs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@

-ignore_xref([remove_archive/4, start/2, stop/1, supported_features/0]).

-import(mongoose_rdbms,
[prepare/4,
escape_string/1,
escape_integer/1,
use_escaped_string/1,
use_escaped_integer/1]).
-import(mongoose_rdbms, [prepare/4]).

-include("mongoose.hrl").
-include("jlib.hrl").
Expand Down
43 changes: 5 additions & 38 deletions src/rdbms/mongoose_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,6 @@
sql_query/0,
sql_query_part/0]).

-callback escape_binary(binary()) -> sql_query_part().
-callback escape_string(binary()|list()) -> sql_query_part().

-callback unescape_binary(binary()) -> binary().
-callback connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
-callback disconnect(Connection :: term()) -> any().
-callback query(Connection :: term(), Query :: any(), Timeout :: infinity | non_neg_integer()) ->
query_result().
-callback prepare(Connection :: term(), Name :: atom(),
Table :: binary(), Fields :: [binary()], Statement :: iodata()) ->
{ok, Ref :: term()} | {error, Reason :: any()}.
-callback execute(Connection :: term(), Ref :: term(), Parameters :: [term()],
Timeout :: infinity | non_neg_integer()) -> query_result().

%% If not defined, generic escaping is used
-optional_callbacks([escape_string/1]).

%% External exports
-export([prepare/4,
prepared/1,
Expand Down Expand Up @@ -136,20 +118,6 @@
use_escaped/1, use_escaped_binary/1, use_escaped_boolean/1,
use_escaped_like/1, use_escaped_null/1]).

-define(MONGOOSE_RDBMS_BACKEND, mongoose_rdbms_backend).
-ignore_xref([
{?MONGOOSE_RDBMS_BACKEND, unescape_binary, 1},
{?MONGOOSE_RDBMS_BACKEND, disconnect, 1},
{?MONGOOSE_RDBMS_BACKEND, query, 3},
{?MONGOOSE_RDBMS_BACKEND, execute, 4},
{?MONGOOSE_RDBMS_BACKEND, prepare, 5},
{?MONGOOSE_RDBMS_BACKEND, escape_string, 1},
{?MONGOOSE_RDBMS_BACKEND, backend, 0},
{?MONGOOSE_RDBMS_BACKEND, escape_binary, 1},
{?MONGOOSE_RDBMS_BACKEND, backend_name, 0},
{?MONGOOSE_RDBMS_BACKEND, connect, 2}
]).

%% internal usage
-export([get_db_info/1]).

Expand Down Expand Up @@ -226,7 +194,7 @@ execute_successfully(HostType, Name, Parameters) ->
{updated, _} = Result ->
Result;
Other ->
Log = #{what => sql_execute_failed, host => HostType,statement_name => Name,
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
statement_query => query_name_to_string(Name),
statement_params => Parameters, reason => Other},
?LOG_ERROR(Log),
Expand Down Expand Up @@ -468,7 +436,7 @@ escape_like_internal(S) when is_list(S) ->
[escape_like_character(C) || C <- S].

escape_string_internal(S) ->
case erlang:function_exported(mongoose_rdbms_backend:backend(), escape_string, 1) of
case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
true ->
mongoose_rdbms_backend:escape_string(S);
false ->
Expand Down Expand Up @@ -763,9 +731,8 @@ abort_on_driver_error({Reply, State}) ->

-spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
db_engine(_HostType) ->
try mongoose_rdbms_backend:backend_name()
catch error:undef -> undefined end.

try mongoose_backend:get_backend_name(global, ?MODULE)
catch error:badarg -> undefined end.

-spec connect(Settings :: term(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
Expand All @@ -777,7 +744,7 @@ connect(Settings, Retry, RetryAfter, MaxRetryDelay) ->
Error;
Error ->
SleepFor = rand:uniform(RetryAfter),
Backend = mongoose_rdbms_backend:backend_name(),
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
error => Error, sleep_for => SleepFor}),
timer:sleep(timer:seconds(SleepFor)),
Expand Down
82 changes: 82 additions & 0 deletions src/rdbms/mongoose_rdbms_backend.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
%%%-------------------------------------------------------------------
%%% @copyright 2021, Erlang Solutions Ltd.
%%% @doc Proxy module for rdbms backends.
%%%
%%% @end
%%%-------------------------------------------------------------------
-module(mongoose_rdbms_backend).
-export([escape_binary/1,
escape_string/1,
unescape_binary/1,
connect/2,
disconnect/1,
query/3,
prepare/5,
execute/4]).

-define(MAIN_MODULE, mongoose_rdbms).


-callback escape_binary(binary()) -> mongoose_rdbms:sql_query_part().
-callback escape_string(binary()|list()) -> mongoose_rdbms:sql_query_part().

-callback unescape_binary(binary()) -> binary().
-callback connect(Args :: any(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
-callback disconnect(Connection :: term()) -> any().
-callback query(Connection :: term(), Query :: any(), Timeout :: infinity | non_neg_integer()) ->
mongoose_rdbms:query_result().
-callback prepare(Connection :: term(), Name :: atom(),
Table :: binary(), Fields :: [binary()], Statement :: iodata()) ->
{ok, Ref :: term()} | {error, Reason :: any()}.
-callback execute(Connection :: term(), Ref :: term(), Parameters :: [term()],
Timeout :: infinity | non_neg_integer()) -> mongoose_rdbms:query_result().

%% If not defined, generic escaping is used
-optional_callbacks([escape_string/1]).


-spec escape_binary(binary()) -> mongoose_rdbms:sql_query_part().
escape_binary(Binary) ->
Args = [Binary],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec escape_string(binary() | list()) -> mongoose_rdbms:sql_query_part().
escape_string(String) ->
Args = [String],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec unescape_binary(binary()) -> binary().
unescape_binary(Binary) ->
Args = [Binary],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec connect(Settings :: any(), QueryTimeout :: non_neg_integer()) ->
{ok, Connection :: term()} | {error, Reason :: any()}.
connect(Settings, QueryTimeout) ->
Args = [Settings, QueryTimeout],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec disconnect(Connection :: term()) -> any().
disconnect(Connection) ->
Args = [Connection],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec query(Connection :: term(), Query :: any(), Timeout :: infinity | non_neg_integer()) ->
mongoose_rdbms:query_result().
query(Connection, Query, Timeout) ->
Args = [Connection, Query, Timeout],
mongoose_backend:call_tracked(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec prepare(Connection :: term(), Name :: atom(),
Table :: binary(), Fields :: [binary()], Statement :: iodata()) ->
{ok, Ref :: term()} | {error, Reason :: any()}.
prepare(Connection, Name, Table, Fields, Statement) ->
Args = [Connection, Name, Table, Fields, Statement],
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).

-spec execute(Connection :: term(), Ref :: term(), Parameters :: [term()],
Timeout :: infinity | non_neg_integer()) -> mongoose_rdbms:query_result().
execute(Connection, Ref, Parameters, Timeout) ->
Args = [Connection, Ref, Parameters, Timeout],
mongoose_backend:call_tracked(global, ?MAIN_MODULE, ?FUNCTION_NAME, Args).
2 changes: 1 addition & 1 deletion src/rdbms/mongoose_rdbms_mysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(mongoose_rdbms_mysql).
-author('konrad.zemek@erlang-solutions.com').
-behaviour(mongoose_rdbms).
-behaviour(mongoose_rdbms_backend).

-include("mongoose.hrl").

Expand Down
2 changes: 1 addition & 1 deletion src/rdbms/mongoose_rdbms_odbc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(mongoose_rdbms_odbc).
-author('konrad.zemek@erlang-solutions.com').
-behaviour(mongoose_rdbms).
-behaviour(mongoose_rdbms_backend).
-include("mongoose_logger.hrl").

-export([escape_binary/1, escape_string/1,
Expand Down
2 changes: 1 addition & 1 deletion src/rdbms/mongoose_rdbms_pgsql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

-module(mongoose_rdbms_pgsql).
-author('konrad.zemek@erlang-solutions.com').
-behaviour(mongoose_rdbms).
-behaviour(mongoose_rdbms_backend).

-include_lib("epgsql/include/epgsql.hrl").

Expand Down
12 changes: 3 additions & 9 deletions src/wpool/mongoose_wpool_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,9 @@ stop(_, _) ->
%% Helper functions
do_start(HostType, Tag, WpoolOpts0, RdbmsOpts) when is_list(WpoolOpts0) and is_list(RdbmsOpts) ->
BackendName = backend_name(RdbmsOpts),
try mongoose_rdbms_backend:backend_name() of
BackendName -> ok;
OtherBackend ->
throw(#{reason => "Cannot start an RDBMS connection pool: only one RDBMS backend can be used",
opts => RdbmsOpts, new_backend => BackendName, existing_backend => OtherBackend})
catch
error:undef ->
backend_module:create(mongoose_rdbms, BackendName, [query, execute])
end,
BackendOpts = RdbmsOpts ++ [{backend, BackendName}],
mongoose_backend:init(global, mongoose_rdbms, [query, execute], BackendOpts),

mongoose_metrics:ensure_db_pool_metric({rdbms, HostType, Tag}),
WpoolOpts = make_wpool_opts(WpoolOpts0, RdbmsOpts),
ProcName = mongoose_wpool:make_pool_name(rdbms, HostType, Tag),
Expand Down
16 changes: 9 additions & 7 deletions test/mongoose_rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,30 @@ tests() ->
init_per_group(odbc, Config) ->
case code:ensure_loaded(eodbc) of
{module, eodbc} ->
mongoose_backend:init(global, mongoose_rdbms, [], [{backend, odbc}]),
[{db_type, odbc} | Config];
_ ->
{skip, no_odbc_application}
end;
init_per_group(Group, Config) ->
mongoose_backend:init(global, mongoose_rdbms, [], [{backend, Group}]),
[{db_type, Group} | Config].

end_per_group(_, Config) ->
% clean up after mongoose_backend:init
persistent_term:erase({backend_module, global, mongoose_rdbms}),
Config.

init_per_testcase(does_backoff_increase_to_a_point, Config) ->
DbType = ?config(db_type, Config),
backend_module:create(mongoose_rdbms, DbType, []),
meck_config(DbType),
meck_config(),
meck_db(DbType),
meck_connection_error(DbType),
meck_rand(),
[{db_opts, [{server, server(DbType)}, {keepalive_interval, 2}, {start_interval, 10}]} | Config];
init_per_testcase(_, Config) ->
DbType = ?config(db_type, Config),
backend_module:create(mongoose_rdbms, DbType, []),
meck_config(DbType),
meck_config(),
meck_db(DbType),
[{db_opts, [{server, server(DbType)}, {keepalive_interval, ?KEEPALIVE_INTERVAL},
{start_interval, ?MAX_INTERVAL}]} | Config].
Expand Down Expand Up @@ -95,11 +97,11 @@ keepalive_exit(Config) ->
ct:fail(no_down_message)
end.

%% 5 retries. Max retry 10. Iniitial retry 2.
%% 5 retries. Max retry 10. Initial retry 2.
%% We should get a sequence: 2 -> 4 -> 10 -> 10 -> 10.
does_backoff_increase_to_a_point(Config) ->
{error, _} = gen_server:start(mongoose_rdbms, ?config(db_opts, Config), []),
% We expect to have 2 at the begininng, then values up to 10 and 10 three times in total
% We expect to have 2 at the beginning, then values up to 10 and 10 three times in total
receive_backoffs(2, 10, 3).

receive_backoffs(InitialValue, MaxValue, MaxCount) ->
Expand Down Expand Up @@ -128,7 +130,7 @@ meck_rand() ->
meck_unload_rand() ->
meck:unload(rand).

meck_config(Server) ->
meck_config() ->
meck:new(ejabberd_config, [no_link]),
meck:expect(ejabberd_config, get_local_option,
fun(max_fsm_queue) -> 1024;
Expand Down