Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared DB pools #1217

Merged
merged 4 commits into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions apps/ejabberd/src/ejabberd_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

-record(state, {opts = [] :: list(),
hosts = [] :: [host()],
odbc_pools = [] :: [atom()],
override_local = false :: boolean(),
override_global = false :: boolean(),
override_acls = false :: boolean()}).
Expand Down Expand Up @@ -177,10 +178,7 @@ get_ejabberd_config_path() ->
%% This function will crash if finds some error in the configuration file.
-spec load_file(File :: string()) -> ok.
load_file(File) ->
Terms = get_plain_terms_file(File),
State = lists:foldl(fun search_hosts/2, #state{}, Terms),
TermsMacros = replace_macros(Terms),
Res = lists:foldl(fun process_term/2, State, TermsMacros),
Res = parse_file(File),
set_opts(Res).


Expand Down Expand Up @@ -220,8 +218,10 @@ get_absolute_path(File) ->
end.


-spec search_hosts({host|hosts, [host()] | host()}, state()) -> any().
search_hosts(Term, State) ->
-spec search_hosts_and_pools({host|hosts, [host()] | host()}
| {pool, odbc, atom()}
| {pool, odbc, atom(), list()}, state()) -> any().
search_hosts_and_pools(Term, State) ->
case Term of
{host, Host} ->
case State of
Expand All @@ -241,6 +241,10 @@ search_hosts(Term, State) ->
"too many hosts definitions", []),
exit("too many hosts definitions")
end;
{pool, PoolType, PoolName, _Options} ->
search_hosts_and_pools({pool, PoolType, PoolName}, State);
{pool, odbc, PoolName} ->
add_odbc_pool_to_option(PoolName, State);
_ ->
State
end.
Expand All @@ -252,6 +256,9 @@ add_hosts_to_option(Hosts, State) ->
PrepHosts = normalize_hosts(Hosts),
add_option(hosts, PrepHosts, State#state{hosts = PrepHosts}).

add_odbc_pool_to_option(PoolName, State) ->
Pools = State#state.odbc_pools,
State#state{odbc_pools = [PoolName | Pools]}.

-spec normalize_hosts([host()]) -> [binary() | tuple()].
normalize_hosts(Hosts) ->
Expand Down Expand Up @@ -508,6 +515,13 @@ process_term(Term, State) ->
lists:foldl(fun(T, S) ->
process_host_term(T, list_to_binary(Host), S) end,
State, Terms);
{pool, odbc, _PoolName} ->
State;
{pool, odbc, PoolName, Options} ->
lists:foldl(fun(T, S) ->
process_db_pool_term(T, PoolName, S)
end,
State, Options);
{listen, Listeners} ->
Listeners2 =
lists:map(
Expand Down Expand Up @@ -576,11 +590,20 @@ process_term(Term, State) ->
{all_metrics_are_global, Value} ->
add_option(all_metrics_are_global, Value, State);
{_Opt, _Val} ->
process_term_for_hosts_and_pools(Term, State)
end.

process_term_for_hosts_and_pools(Term = {Key, _Val}, State) ->
BKey = atom_to_binary(Key, utf8),
case get_key_group(BKey, Key) of
odbc ->
lists:foldl(fun(Pool, S) -> process_db_pool_term(Term, Pool, S) end,
State, State#state.odbc_pools);
_ ->
lists:foldl(fun(Host, S) -> process_host_term(Term, Host, S) end,
State, State#state.hosts)
end.


-spec process_host_term(Term :: host_term(),
Host :: acl:host(),
State :: state()) -> state().
Expand All @@ -601,8 +624,8 @@ process_host_term(Term, Host, State) ->
State;
{hosts, _Hosts} ->
State;
{odbc_server, ODBCServer} ->
add_option({odbc_server, Host}, ODBCServer, State);
{odbc_pool, Pool} when is_atom(Pool) ->
add_option({odbc_pool, Host}, Pool, State);
{riak_server, RiakConfig} ->
add_option(riak_server, RiakConfig, State);
{cassandra_servers, CassandraConfig} ->
Expand All @@ -611,6 +634,8 @@ process_host_term(Term, Host, State) ->
add_option({Opt, Host}, Val, State)
end.

process_db_pool_term({Opt, Val}, Pool, State) when is_atom(Pool) ->
add_option({Opt, odbc_pool, Pool}, Val, State).

-spec add_option(Opt :: key(),
Val :: value(),
Expand Down Expand Up @@ -786,9 +811,11 @@ is_file_readable(Path) ->
-spec parse_file(file:name()) -> state().
parse_file(ConfigFile) ->
Terms = get_plain_terms_file(ConfigFile),
State = lists:foldl(fun search_hosts/2, #state{}, Terms),
State = lists:foldl(fun search_hosts_and_pools/2, #state{}, Terms),
TermsWExpandedMacros = replace_macros(Terms),
lists:foldl(fun process_term/2, State, TermsWExpandedMacros).
lists:foldl(fun process_term/2,
add_option(odbc_pools, State#state.odbc_pools, State),
TermsWExpandedMacros).

-spec reload_local() -> {ok, iolist()} | no_return().
reload_local() ->
Expand Down Expand Up @@ -1047,8 +1074,6 @@ handle_local_config_change({Key, _Old, _New} = El) ->

handle_local_hosts_config_add({{auth, Host}, _}) ->
ejabberd_auth:start(Host);
handle_local_hosts_config_add({{odbc, Host}, _}) ->
ejabberd_rdbms:start(Host);
handle_local_hosts_config_add({{ldap, _Host}, _}) ->
%% ignore ldap section
ok;
Expand All @@ -1072,8 +1097,6 @@ handle_local_hosts_config_del({{auth, Host}, Opts}) ->
M:stop(Host)
end, AuthModules)
end;
handle_local_hosts_config_del({{odbc, Host}, _}) ->
ejabberd_rdbms:stop_odbc(Host);
handle_local_hosts_config_del({{ldap, _Host}, _I}) ->
%% ignore ldap section, only appli
ok;
Expand All @@ -1087,15 +1110,6 @@ handle_local_hosts_config_del({{Key, _}, _} =El) ->
?WARNING_MSG("local hosts config delete option: ~p unhandled", [El])
end.

handle_local_hosts_config_change({{odbc, Host}, Old, _}) ->
%% stop rdbms
case lists:keyfind({odbc_server, Host}, 1, Old) of
false ->
ok;
#local_config{} ->
ejabberd_rdbms:stop_odbc(Host)
end,
ejabberd_rdbms:start(Host);
handle_local_hosts_config_change({{auth, Host}, OldVals, _}) ->
case lists:keyfind(auth_method, 1, OldVals) of
false ->
Expand Down Expand Up @@ -1149,9 +1163,10 @@ add_virtual_host(Host) ->
?DEBUG("Register host:~p", [Host]),
ejabberd_local:register_host(Host).

-spec can_be_ignored(Key :: atom()) -> boolean().
can_be_ignored(Key) when is_atom(Key) ->
L = [domain_certfile, s2s, all_metrics_are_global],
-spec can_be_ignored(Key :: atom() | tuple()) -> boolean().
can_be_ignored(Key) when is_atom(Key);
is_tuple(Key) ->
L = [domain_certfile, s2s, all_metrics_are_global, odbc],
lists:member(Key, L).

-spec remove_virtual_host(ejabberd:server()) -> any().
Expand Down Expand Up @@ -1219,11 +1234,16 @@ get_local_config() ->
get_global_config() ->
mnesia:dirty_match_object(config, {config, '_', '_'}).

-spec is_not_host_specific(atom() | {atom(), ejabberd:server()}) -> boolean().
-spec is_not_host_specific(atom()
| {atom(), ejabberd:server()}
| {atom(), atom(), atom()}) -> boolean().
is_not_host_specific(Key) when is_atom(Key) ->
true;
is_not_host_specific({Key, Host}) when is_atom(Key), is_binary(Host) ->
false.
false;
is_not_host_specific({Key, PoolType, PoolName})
when is_atom(Key), is_atom(PoolType), is_atom(PoolName) ->
true.

-spec categorize_options([term()]) -> {GlobalConfig, LocalConfig, HostsConfig} when
GlobalConfig :: list(),
Expand Down
64 changes: 15 additions & 49 deletions apps/ejabberd/src/ejabberd_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
-module(ejabberd_rdbms).
-author('alexey@process-one.net').

-export([start/0, start/1, stop/1, stop_odbc/1]).
-export([start/0, start_pool/1, stop_pool/1, pools/0]).
-include("ejabberd.hrl").

-spec start() -> 'ok' | {'error', 'lager_not_running'}.
Expand All @@ -39,63 +39,29 @@ start() ->
?INFO_MSG("MongooseIM has not been compiled with relational database support. "
"Skipping database startup.", []);
_ ->
%% If compiled with ODBC, start ODBC on the needed host
start_hosts()
{ok, _Pid} = start_pool_sup(),
[start_pool(Pool) || Pool <- pools()],
ok
end.

%% @doc Start relationnal DB module on the nodes where it is needed
-spec start_hosts() -> 'ok'.
start_hosts() ->
lists:foreach(fun start/1, ?MYHOSTS).

-spec stop(Host :: ejabberd:server()) -> 'ok'.
stop(Host) ->
case needs_odbc(Host) of
true -> stop_odbc(Host);
false -> ok
end.

-spec start(Host :: ejabberd:server()) -> 'ok'.
start(Host) ->
case needs_odbc(Host) of
true -> start_odbc(Host);
false -> ok
end.

%% @doc Start the ODBC module on the given host
-spec start_odbc(binary() | string()) -> 'ok'.
start_odbc(Host) ->
Supervisor_name = gen_mod:get_module_proc(Host, mongoose_rdbms_sup),
start_pool_sup() ->
ChildSpec =
{Supervisor_name,
{mongoose_rdbms_sup, start_link, [Host]},
{mongoose_rdbms_sup,
{mongoose_rdbms_sup, start_link, []},
transient,
infinity,
supervisor,
[mongoose_rdbms_sup]},
case supervisor:start_child(ejabberd_sup, ChildSpec) of
{ok, _PID} ->
ok;
_Error ->
?ERROR_MSG("Start of supervisor ~p failed:~n~p~nRetrying...~n",
[Supervisor_name, _Error]),
start_odbc(Host)
end.
supervisor:start_child(ejabberd_sup, ChildSpec).

-spec stop_odbc(binary() | string()) -> 'ok'.
stop_odbc(Host) ->
Proc = gen_mod:get_module_proc(Host, mongoose_rdbms_sup),
supervisor:terminate_child(ejabberd_sup, Proc).
pools() ->
ejabberd_config:get_local_option_or_default(odbc_pools, []).

%% @doc Returns true if we have configured odbc_server for the given host
-spec needs_odbc(_) -> boolean().
needs_odbc(Host) ->
LHost = jid:nameprep(Host),
case ejabberd_config:get_local_option({odbc_server, LHost}) of
undefined ->
false;
_ -> true
end.
start_pool(Pool) ->
mongoose_rdbms_sup:add_pool(Pool).

stop_pool(Pool) ->
mongoose_rdbms_sup:remove_pool(Pool).

compile_odbc_type_helper() ->
Key = {odbc_server_type, ?MYNAME},
Expand Down
6 changes: 4 additions & 2 deletions apps/ejabberd/src/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ add_default_odbc_opts(Opts) ->
[{cache_users, true}, {async_writer, true}]).


-spec parse_backend_opt(Option :: {module(), term()}, Type :: pm | muc,
module(), module(), deps()) -> deps().
-spec parse_backend_opt(Type :: pm | muc, module(), module(),
Option :: {module(), term()}, deps()) -> deps().
parse_backend_opt(Type, ModODBCArch, ModAsyncWriter, Option, Deps) ->
case Option of
{cache_users, true} ->
Expand All @@ -176,6 +176,8 @@ parse_backend_opt(Type, ModODBCArch, ModAsyncWriter, Option, Deps) ->
{async_writer, true} ->
DepsWithNoWriter = add_dep(ModODBCArch, [no_writer], Deps),
add_dep(ModAsyncWriter, [Type], DepsWithNoWriter);
{async_writer_odbc_pool, PoolName} ->
add_dep(ModAsyncWriter, [{odbc_pool, PoolName}], Deps);
_ -> Deps
end.

Expand Down
6 changes: 2 additions & 4 deletions apps/ejabberd/src/mod_mam_muc_odbc_async_pool_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ worker_number(Host, ArcID) ->

-spec start(ejabberd:server(), _) -> 'ok'.
start(Host, Opts) ->
PoolName = gen_mod:get_module_proc(Host, ?MODULE),
{ok, _} = mongoose_rdbms_sup:add_pool(Host, ?MODULE, PoolName, worker_count(Host)),
PoolName = gen_mod:get_opt(odbc_pool, Opts, mongoose_rdbms_sup:pool(Host)),
MaxSize = gen_mod:get_module_opt(Host, ?MODULE, max_packet_size, 30),
mod_mam_muc_odbc_arch:prepare_insert(insert_mam_muc_message, 1),
mod_mam_muc_odbc_arch:prepare_insert(insert_mam_muc_messages, MaxSize),
Expand All @@ -101,8 +100,7 @@ start(Host, Opts) ->
-spec stop(ejabberd:server()) -> any().
stop(Host) ->
stop_muc(Host),
stop_workers(Host),
mongoose_rdbms_sup:remove_pool(Host, ?MODULE).
stop_workers(Host).

%% ----------------------------------------------------------------------
%% Add hooks for mod_mam_muc
Expand Down
8 changes: 2 additions & 6 deletions apps/ejabberd/src/mod_mam_odbc_async_pool_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ worker_number(Host, ArcID) ->

start(Host, Opts) ->
mongoose_metrics:ensure_metric(Host, ?PER_MESSAGE_FLUSH_TIME, histogram),

PoolName = gen_mod:get_module_proc(Host, ?MODULE),
{ok, _} = mongoose_rdbms_sup:add_pool(Host, ?MODULE, PoolName, worker_count(Host)),

PoolName = gen_mod:get_opt(odbc_pool, Opts, mongoose_rdbms_sup:pool(Host)),
MaxSize = gen_mod:get_module_opt(Host, ?MODULE, max_packet_size, 30),
mod_mam_odbc_arch:prepare_insert(insert_mam_message, 1),
mod_mam_odbc_arch:prepare_insert(insert_mam_messages, MaxSize),
Expand Down Expand Up @@ -114,8 +111,7 @@ stop(Host) ->
false ->
ok
end,
stop_workers(Host),
mongoose_rdbms_sup:remove_pool(Host, ?MODULE).
stop_workers(Host).

%% ----------------------------------------------------------------------
%% Add hooks for mod_mam
Expand Down
30 changes: 13 additions & 17 deletions apps/ejabberd/src/mongoose_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
init_predefined_host_metrics/1,
init_subscriptions/0,
create_generic_hook_metric/2,
ensure_db_pool_metric/1,
update/3,
ensure_metric/3,
get_metric_value/1,
Expand All @@ -34,7 +35,7 @@
get_aggregated_values/1,
increment_generic_hook_metric/2,
get_odbc_data_stats/0,
get_odbc_mam_async_stats/0,
get_odbc_data_stats/1,
get_dist_data_stats/0,
get_up_time/0,
remove_host_metrics/1,
Expand Down Expand Up @@ -83,6 +84,12 @@ create_generic_hook_metric(Host, Hook) ->
FilteredHookName = filter_hook(Hook),
do_create_generic_hook_metric(Host, FilteredHookName).

ensure_db_pool_metric(Pool) ->
ensure_metric(global,
[data, odbc, Pool],
{function, mongoose_metrics, get_odbc_data_stats, [[Pool]], proplist,
[workers | ?INET_STATS]}).

-spec update(Host :: ejabberd:lserver() | global, Name :: term() | list(),
Change :: term()) -> any().
update(Host, Name, Change) when is_list(Name) ->
Expand Down Expand Up @@ -124,22 +131,11 @@ increment_generic_hook_metric(Host, Hook) ->
do_increment_generic_hook_metric(Host, FilteredHook).

get_odbc_data_stats() ->
RegularODBCWorkers = [catch mongoose_rdbms_sup:get_pids(Host) || Host <- ?MYHOSTS],
get_odbc_stats(lists:flatten(RegularODBCWorkers)).

get_odbc_mam_async_stats() ->
%% MAM async ODBC workers are organized differently...
GetChildren = fun(Host, Pool) ->
Name = gen_mod:get_module_proc(Host, Pool),
case catch mongoose_rdbms_sup:get_pids(Name) of
[_ | _] = Children -> Children;
_ -> []
end
end,

MamPools = [mod_mam_odbc_async_pool_writer, mod_mam_muc_odbc_async_pool_writer],
MamChildren = lists:flatten([GetChildren(Host, Pool) || Host <- ?MYHOSTS, Pool <- MamPools]),
get_odbc_stats(MamChildren).
get_odbc_data_stats(ejabberd_rdbms:pools()).

get_odbc_data_stats(Pools) ->
ODBCWorkers = [catch mongoose_rdbms_sup:get_pids(Pool) || Pool <- Pools],
get_odbc_stats(lists:flatten(ODBCWorkers)).

get_dist_data_stats() ->
DistStats = [inet_stats(Port) || {_, Port} <- erlang:system_info(dist_ctrl)],
Expand Down
Loading