Skip to content

Commit

Permalink
Reimplement how async pools are configured
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Mar 3, 2022
1 parent 238989b commit 1f31dc4
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 110 deletions.
2 changes: 1 addition & 1 deletion doc/modules/mod_inbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Only RDBMS storage is supported, but `rdbms` means flushes to inbox are synchron

#### `modules.mod_inbox.async_writer.pool_size`
* **Syntax:** non-negative integer
* **Default:** `4 * erlang:system_info(schedulers_online)`
* **Default:** `2 * erlang:system_info(schedulers_online)`
* **Example:** `modules.mod_inbox.async_writer.pool_size = 32`

Number of workers in the pool. More than the number of available schedulers is recommended, to minimise lock contention on the message queues, and more than the number of DB workers, to fully utilise the DB capacity. How much more than these two parameters is then a good fine-tuning for specific deployments.
Expand Down
17 changes: 1 addition & 16 deletions src/async_pools/mongoose_async_pools.erl
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
-module(mongoose_async_pools).

-include("mongoose_config_spec.hrl").
-include("mongoose_logger.hrl").

-behaviour(supervisor).
-export([start_link/3, init/1]).
-ignore_xref([start_link/3]).

% API
-export([start_pool/3, stop_pool/2, config_spec/0]).
-export([start_pool/3, stop_pool/2]).
-export([put_task/3, put_task/4]).
-ignore_xref([put_task/3]).
-export([sync/2]).
Expand Down Expand Up @@ -73,20 +72,6 @@ stop_pool(HostType, PoolId) ->
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
ejabberd_sup:stop_child(sup_name(HostType, PoolId)).

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{
items = #{<<"enabled">> => #option{type = boolean},
<<"flush_interval">> => #option{type = integer, validate = non_negative},
<<"batch_size">> => #option{type = integer, validate = non_negative},
<<"pool_size">> => #option{type = integer, validate = non_negative}},
defaults = #{<<"enabled">> => true,
<<"flush_interval">> => 2000,
<<"batch_size">> => 30,
<<"pool_size">> => 4 * erlang:system_info(schedulers_online)},
format_items = map
}.

-spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
pool_name(HostType, PoolId) ->
persistent_term:get({?MODULE, HostType, PoolId}).
Expand Down
14 changes: 8 additions & 6 deletions src/inbox/mod_inbox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ config_spec() ->
Markers = mongoose_chat_markers:chat_marker_names(),
#section{
items = #{<<"backend">> => #option{type = atom, validate = {enum, [rdbms, rdbms_async]}},
<<"async_writer">> => mongoose_async_pools:config_spec(),
<<"async_writer">> => async_config_spec(),
<<"reset_markers">> => #list{items = #option{type = binary,
validate = {enum, Markers}}},
<<"groupchat">> => #list{items = #option{type = atom,
Expand All @@ -129,7 +129,6 @@ config_spec() ->
<<"iqdisc">> => mongoose_config_spec:iqdisc()
},
defaults = #{<<"backend">> => rdbms,
<<"async_writer">> => async_writer_defaults(),
<<"groupchat">> => [muclight],
<<"aff_changes">> => true,
<<"remove_on_kicked">> => true,
Expand All @@ -139,10 +138,13 @@ config_spec() ->
format_items = map
}.

async_writer_defaults() ->
#{<<"flush_interval">> => 500,
<<"batch_size">> => 1000,
<<"pool_size">> => 2}.
async_config_spec() ->
#section{
items = #{<<"pool_size">> => #option{type = integer, validate = non_negative}},
defaults = #{<<"pool_size">> => 2 * erlang:system_info(schedulers_online)},
format_items = map,
include = always
}.

%%%%%%%%%%%%%%%%%%%
%% Process IQ
Expand Down
24 changes: 20 additions & 4 deletions src/mam/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ muc_config_spec() ->

root_config_items() ->
Cache = mongoose_user_cache:config_spec(),
AsyncWriter = mod_mam_rdbms_arch_async:config_spec(),
AsyncWriter = async_config_spec(),
#{<<"cache">> => Cache#section{include = always},
<<"async_writer">> => AsyncWriter#section{include = always}}.

Expand Down Expand Up @@ -120,16 +120,29 @@ common_config_items() ->
}.

pm_config_items() ->
#{<<"async_writer">> => mod_mam_rdbms_arch_async:config_spec(),
#{<<"async_writer">> => async_config_spec(),
<<"archive_groupchats">> => #option{type = boolean},
<<"same_mam_id_for_peers">> => #option{type = boolean}}.

muc_config_items() ->
#{<<"async_writer">> => mod_mam_rdbms_arch_async:config_spec(),
#{<<"async_writer">> => async_config_spec(),
<<"host">> => #option{type = string,
validate = subdomain_template,
process = fun mongoose_subdomain_utils:make_subdomain_pattern/1}}.

async_config_spec() ->
#section{
items = #{<<"enabled">> => #option{type = boolean},
<<"flush_interval">> => #option{type = integer, validate = non_negative},
<<"batch_size">> => #option{type = integer, validate = non_negative},
<<"pool_size">> => #option{type = integer, validate = non_negative}},
defaults = #{<<"enabled">> => true,
<<"flush_interval">> => 2000,
<<"batch_size">> => 30,
<<"pool_size">> => 4 * erlang:system_info(schedulers_online)},
format_items = map
}.

riak_config_spec() ->
#section{
items = #{<<"search_index">> => #option{type = binary,
Expand Down Expand Up @@ -234,7 +247,7 @@ add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Dep
add_dep(mod_mam_cache_user, #{Type => true, cache => CacheOpts}, Deps1);
add_rdbms_deps(async_writer, Type, #{async_writer := AsyncOpts = #{enabled := true}}, Deps) ->
Deps1 = add_dep(rdbms_arch_module(Type), #{no_writer => true}, Deps),
add_dep(mod_mam_rdbms_arch_async, #{Type => AsyncOpts}, Deps1);
add_dep(rdbms_async_arch_module(Type), AsyncOpts, Deps1);
add_rdbms_deps(_, _Type, _Opts, Deps) ->
Deps.

Expand All @@ -260,6 +273,9 @@ rdbms_arch_defaults() ->
rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.

rdbms_async_arch_module(pm) -> mod_mam_rdbms_arch_async;
rdbms_async_arch_module(muc) -> mod_mam_muc_rdbms_arch_async.

elasticsearch_arch_module(pm) -> mod_mam_elasticsearch_arch;
elasticsearch_arch_module(muc) -> mod_mam_muc_elasticsearch_arch.

Expand Down
31 changes: 27 additions & 4 deletions src/mam/mod_mam_muc_rdbms_arch_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

-include("mongoose_logger.hrl").

-define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
-define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
-define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
-define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).

-behaviour(gen_mod).
-export([start/2, stop/1, supported_features/0]).
-export([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).
-ignore_xref([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).

Expand All @@ -20,11 +22,32 @@ mam_muc_archive_sync(Result, HostType) ->
mongoose_async_pools:sync(HostType, muc_mam),
Result.

%%% gen_mod callbacks
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, Opts) ->
{PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts),
mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra),
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts).

-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
mongoose_async_pools:stop_pool(HostType, muc_mam).

-spec supported_features() -> [atom()].
supported_features() ->
[dynamic_domains].

%%% flush callbacks
flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
mongoose_metrics:update(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
mongoose_metrics:update(HostType, ?MUC_FLUSH_TIME, FlushTime),
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
Result.

%% mam workers callbacks
Expand Down
95 changes: 20 additions & 75 deletions src/mam/mod_mam_rdbms_arch_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,48 @@

-include("mongoose_logger.hrl").

-define(PM_PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
-define(PM_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).

-define(MUC_MODULE, mod_mam_muc_rdbms_arch_async).
-define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
-define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
-define(PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
-define(FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).

-behaviour(gen_mod).
-export([start/2, stop/1, config_spec/0, supported_features/0]).

-export([archive_pm_message/3, archive_muc_message/3]).
-export([mam_archive_sync/2, mam_muc_archive_sync/2]).
-export([flush/2]).
-export([start/2, stop/1, supported_features/0]).
-export([archive_pm_message/3, mam_archive_sync/2, flush/2]).
-ignore_xref([archive_pm_message/3, mam_archive_sync/2]).

-type writer_type() :: pm | muc.

-ignore_xref([archive_pm_message/3, archive_muc_message/3]).
-ignore_xref([mam_archive_sync/2, mam_muc_archive_sync/2]).
-export([make_pool_opts/2, prepare_insert_queries/2]).

-spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok.
archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) ->
mongoose_async_pools:put_task(HostType, pm_mam, ArcID, Params).

-spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok.
archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) ->
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params).

-spec mam_archive_sync(term(), mongooseim:host_type()) -> term().
mam_archive_sync(Result, HostType) ->
mongoose_async_pools:sync(HostType, pm_mam),
Result.

-spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term().
mam_muc_archive_sync(Result, HostType) ->
mongoose_async_pools:sync(HostType, muc_mam),
Result.

%%% gen_mod callbacks
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, Opts) ->
[ start_pool(HostType, Mod) || Mod <- maps:to_list(Opts) ].
{PoolOpts, Extra} = make_pool_opts(pm, Opts),
prepare_insert_queries(pm, Extra),
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
mongoose_async_pools:start_pool(HostType, pm_mam, PoolOpts).

-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
[ stop_pool(HostType, Mod) || Mod <- maps:to_list(Opts) ].

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
mongoose_async_pools:config_spec().
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
mongoose_async_pools:stop_pool(HostType, pm_mam).

-spec supported_features() -> [atom()].
supported_features() ->
[dynamic_domains].

%%% internal callbacks
-spec start_pool(mongooseim:host_type(), {writer_type(), gen_mod:module_opts()}) ->
supervisor:startchild_ret().
start_pool(HostType, {Type, Opts}) ->
{PoolOpts, Extra} = make_pool_opts(Type, Opts),
prepare_insert_queries(Type, Extra),
ensure_metrics(Type, HostType),
register_hooks(Type, HostType),
start_pool(Type, HostType, PoolOpts).

-spec make_pool_opts(writer_type(), gen_mod:module_opts()) ->
-spec make_pool_opts(mod_mam_meta:mam_type(), gen_mod:module_opts()) ->
{mongoose_async_pools:pool_opts(), mongoose_async_pools:pool_extra()}.
make_pool_opts(Type, Opts) ->
Extra = add_batch_name(Type, Opts),
Expand All @@ -87,7 +62,7 @@ add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.

flush_callback(pm) -> fun ?MODULE:flush/2;
flush_callback(muc) -> fun ?MUC_MODULE:flush/2.
flush_callback(muc) -> fun mod_mam_muc_rdbms_arch_async:flush/2.

prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
Expand All @@ -99,41 +74,11 @@ prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) -
multi_name(Name, Times) ->
list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).

ensure_metrics(pm, HostType) ->
mongoose_metrics:ensure_metric(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, histogram),
mongoose_metrics:ensure_metric(HostType, ?PM_FLUSH_TIME, histogram);
ensure_metrics(muc, HostType) ->
mongoose_metrics:ensure_metric(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, histogram),
mongoose_metrics:ensure_metric(HostType, ?MUC_FLUSH_TIME, histogram).

register_hooks(pm, HostType) ->
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50);
register_hooks(muc, HostType) ->
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MUC_MODULE, mam_muc_archive_sync, 50),
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MUC_MODULE, archive_muc_message, 50).

-spec start_pool(writer_type(), mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term().
start_pool(pm, HostType, Opts) ->
mongoose_async_pools:start_pool(HostType, pm_mam, Opts);
start_pool(muc, HostType, Opts) ->
mongoose_async_pools:start_pool(HostType, muc_mam, Opts).

-spec stop_pool(mongooseim:host_type(), {writer_type(), term()}) -> ok.
stop_pool(HostType, {pm, _}) ->
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
mongoose_async_pools:stop_pool(HostType, pm_mam);
stop_pool(HostType, {muc, _}) ->
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MUC_MODULE, mam_muc_archive_sync, 50),
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MUC_MODULE, archive_muc_message, 50),
mongoose_async_pools:stop_pool(HostType, muc_mam).

%%% flush callbacks
flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
{FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]),
mongoose_metrics:update(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
mongoose_metrics:update(HostType, ?PM_FLUSH_TIME, FlushTime),
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
Result.

%% mam workers callbacks
Expand Down
4 changes: 3 additions & 1 deletion test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ all_modules() ->
{virtual_pubsub_hosts, [{fqdn, <<"host1">>}, {fqdn, <<"host2">>}]},
{wpool, [{workers, 200}]}],
mod_adhoc => #{iqdisc => one_queue, report_commands_node => true},
mod_mam_rdbms_arch_async => #{pm => default_config([modules, mod_mam_meta, async_writer])},
mod_mam_rdbms_arch_async => default_config([modules, mod_mam_meta, async_writer]),
mod_keystore =>
[{keys,
[{access_secret, ram},
Expand Down Expand Up @@ -582,6 +582,7 @@ all_modules() ->
{pool_name, http_pool}]]}],
mod_inbox =>
#{backend => rdbms,
async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)},
iqdisc => no_queue,
aff_changes => true,
groupchat => [muclight],
Expand Down Expand Up @@ -839,6 +840,7 @@ default_mod_config(mod_last) ->
#{iqdisc => one_queue, backend => mnesia};
default_mod_config(mod_inbox) ->
#{backend => rdbms,
async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)},
groupchat => [muclight],
aff_changes => true,
remove_on_kicked => true,
Expand Down
6 changes: 3 additions & 3 deletions test/mod_mam_meta_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ produces_valid_configurations(_Config) ->
check_equal_opts(mod_mam_cache_user, #{pm => true, muc => true, cache => []}, Deps),
check_equal_opts(mod_mam_mnesia_prefs, #{muc => true}, Deps),
check_equal_opts(mod_mam_rdbms_prefs, #{pm => true}, Deps),
check_equal_opts(mod_mam_rdbms_arch_async, #{muc => AsyncOpts}, Deps).
check_equal_opts(mod_mam_muc_rdbms_arch_async, AsyncOpts, Deps).

handles_riak_config(_Config) ->
PM = config([modules, mod_mam_meta, pm], #{user_prefs_store => mnesia}),
Expand Down Expand Up @@ -116,7 +116,7 @@ example_muc_only_no_pref_good_performance(_Config) ->
[{mod_mam_rdbms_user, #{muc => true, pm => true}},
{mod_mam_cache_user, #{muc => true, cache => []}},
{mod_mam_muc_rdbms_arch, mod_config(mod_mam_muc_rdbms_arch, #{no_writer => true})},
{mod_mam_rdbms_arch_async, #{muc => AsyncOpts}},
{mod_mam_muc_rdbms_arch_async, AsyncOpts},
{mod_mam_muc, mod_config(mod_mam_muc, mod_config(mod_mam_muc, MUCOpts))}
], Deps).

Expand All @@ -130,7 +130,7 @@ example_pm_only_good_performance(_Config) ->
{mod_mam_cache_user, #{pm => true, cache => []}},
{mod_mam_mnesia_prefs, #{pm => true}},
{mod_mam_rdbms_arch, mod_config(mod_mam_rdbms_arch, #{no_writer => true})},
{mod_mam_rdbms_arch_async, #{pm => AsyncOpts}},
{mod_mam_rdbms_arch_async, AsyncOpts},
{mod_mam, default_mod_config(mod_mam)}
], Deps).

Expand Down

0 comments on commit 1f31dc4

Please sign in to comment.