Skip to content

Commit

Permalink
Rework mam async workers using the new abstraction
Browse files Browse the repository at this point in the history
First of all, remove all the old modules implementing the mam rdbms
async pools.

Then, create a new module called mod_mam_rdbms_arch_async, that abstract
what the previous two where doing, by using the modules created in the
two previous commits. This module starts the pools with all the correct
parameters, and registers the hooks that will then call `worker_pool`
using the `hash_worker` strategy, to ensure MAM entries to a single
archive are all processed in parallel.

Note a few lessons here:
* Selecting the worker isn't sound based on the `rem` operator, because
  the distribution of archives over workers is not at all ensured to be
  uniform. `worker_pool` uses `erlang:phash2/2`, which ensures such
  uniform distribution.
* The callbacks for the batch workers abstract all the metrics.
* mod_mam_meta parsing has breaking changes: this is what was reported
  as broken in the previous implementation. Now there's an entire toml
  section called `async_writer`, that can be enabled, disabled and
  configured both globally for mam, or in a fine-grained manner within
  the `pm` and `muc` sections.
  • Loading branch information
NelsonVides committed Dec 6, 2021
1 parent 839a999 commit 99ddcc3
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 673 deletions.
5 changes: 0 additions & 5 deletions src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ init([]) ->
brutal_kill,
worker,
[mod_muc_iq]},
MAM =
{mod_mam_sup,
{mod_mam_sup, start_link, []},
permanent, infinity, supervisor, [mod_mam_sup]},
ShaperSup =
{ejabberd_shaper_sup,
{ejabberd_shaper_sup, start_link, []},
Expand All @@ -174,7 +170,6 @@ init([]) ->
IQSupervisor,
Listener,
MucIQ,
MAM,
ShaperSup]}}.

start_child(ChildSpec) ->
Expand Down
46 changes: 20 additions & 26 deletions src/mam/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,13 @@ config_items() ->
<<"cache">> => mongoose_user_cache:config_spec(),
<<"rdbms_message_format">> => #option{type = atom,
validate = {enum, [simple, internal]}},
<<"async_writer">> => #option{type = boolean},
<<"flush_interval">> => #option{type = integer,
validate = non_negative},
<<"max_batch_size">> => #option{type = integer,
validate = non_negative},
<<"pool_size">> => #option{type = integer,
validate = non_negative},
<<"async_writer">> => mod_mam_rdbms_arch_async:config_spec(),

%% Low-level options
<<"default_result_limit">> => #option{type = integer,
validate = non_negative},
<<"max_result_limit">> => #option{type = integer,
validate = non_negative},
<<"async_writer_rdbms_pool">> => #option{type = atom,
validate = pool_name},
<<"db_jid_format">> => #option{type = atom,
validate = module},
<<"db_message_format">> => #option{type = atom,
Expand Down Expand Up @@ -175,7 +167,8 @@ valid_core_mod_opts(mod_mam_muc) ->
[host] ++ common_opts().

common_opts() ->
[is_archivable_message,
[async_writer,
is_archivable_message,
send_message,
archive_chat_markers,
extra_fin_element,
Expand Down Expand Up @@ -213,13 +206,13 @@ parse_backend_opts(riak, Type, Opts, Deps0) ->
end;

parse_backend_opts(rdbms, Type, Opts0, Deps0) ->
Opts1 = add_default_rdbms_opts(Opts0),
Opts1 = add_rdbms_async_opts(Opts0),
Opts = add_rdbms_cache_opts(Opts1),

{ModRDBMSArch, ModAsyncWriter} =
case Type of
pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_async_pool_writer};
muc -> {mod_mam_muc_rdbms_arch, mod_mam_muc_rdbms_async_pool_writer}
pm -> {mod_mam_rdbms_arch, mod_mam_rdbms_arch_async};
muc -> {mod_mam_muc_rdbms_arch, mod_mam_rdbms_arch_async}
end,

Deps1 = add_dep(ModRDBMSArch, [Type], Deps0),
Expand Down Expand Up @@ -265,15 +258,18 @@ add_dep(Dep, Args, Deps) ->
maps:put(Dep, NewArgs, Deps).


-spec add_default_rdbms_opts(Opts :: proplists:proplist()) -> proplists:proplist().
add_default_rdbms_opts(Opts) ->
lists:foldl(
fun({Key, _} = DefaultOpt, Acc) ->
case proplists:lookup(Key, Opts) of
none -> [DefaultOpt | Acc];
_ -> Acc
end
end, Opts, [{async_writer, true}]).
-spec add_rdbms_async_opts(proplists:proplist()) -> proplists:proplist().
add_rdbms_async_opts(Opts) ->
case lists:keyfind(async_writer, 1, Opts) of
{async_writer, AsyncOpts} ->
case lists:keyfind(enabled, 1, AsyncOpts) of
{enabled, false} -> lists:keydelete(async_writer, 1, Opts);
_ -> Opts
end;
false ->
[{async_writer, []} | Opts]
end.


add_rdbms_cache_opts(Opts) ->
case {lists:keyfind(cache_users, 1, Opts), lists:keyfind(cache, 1, Opts)} of
Expand Down Expand Up @@ -303,11 +299,9 @@ parse_rdbms_opt(Type, ModRDBMSArch, ModAsyncWriter, Option, Deps) ->
add_dep(mod_mam_mnesia_prefs, [Type], Deps);
{rdbms_message_format, simple} ->
add_dep(ModRDBMSArch, rdbms_simple_opts(), Deps);
{async_writer, true} ->
{async_writer, Opts} ->
DepsWithNoWriter = add_dep(ModRDBMSArch, [no_writer], Deps),
add_dep(ModAsyncWriter, [Type], DepsWithNoWriter);
{async_writer_rdbms_pool, PoolName} ->
add_dep(ModAsyncWriter, [{rdbms_pool, PoolName}], Deps);
add_dep(ModAsyncWriter, [{Type, Opts}], DepsWithNoWriter);
_ -> Deps
end.

Expand Down
276 changes: 0 additions & 276 deletions src/mam/mod_mam_muc_rdbms_async_pool_writer.erl

This file was deleted.

Loading

0 comments on commit 99ddcc3

Please sign in to comment.