Skip to content

Commit

Permalink
Clean up calls to gen_mod:get_* in mod_event_pusher
Browse files Browse the repository at this point in the history
Also: add missing iqdisc option
  • Loading branch information
chrzaszcz committed Apr 7, 2022
1 parent 3bdde49 commit 72abadc
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 36 deletions.
7 changes: 7 additions & 0 deletions doc/modules/mod_event_pusher_push.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ with 100 asynchronous workers that will handle all push notification related wor

## Options

### `modules.mod_event_pusher.push.iqdisc.type`
* **Syntax:** string, one of `"one_queue"`, `"no_queue"`, `"queues"`, `"parallel"`
* **Default:** `"one_queue"`

Strategy to handle incoming stanzas. For details, please refer to
[IQ processing policies](../configuration/Modules.md#iq-processing-policies).

### `modules.mod_event_pusher.push.backend`
* **Syntax:** string, one of `"mnesia"`, `"rdbms"`
* **Default:** `"mnesia"`
Expand Down
9 changes: 5 additions & 4 deletions src/event_pusher/mod_event_pusher_push.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ start(HostType, Opts) ->
start_pool(HostType, #{wpool := WpoolOpts}) ->
{ok, _} = mongoose_wpool:start(generic, HostType, pusher_push, maps:to_list(WpoolOpts)).

init_iq_handlers(HostType, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
init_iq_handlers(HostType, #{iqdisc := IQDisc}) ->
gen_iq_handler:add_iq_handler(ejabberd_local, HostType, ?NS_PUSH, ?MODULE,
iq_handler, IQDisc),
gen_iq_handler:add_iq_handler(ejabberd_sm, HostType, ?NS_PUSH, ?MODULE,
Expand All @@ -96,11 +95,13 @@ config_spec() ->
VirtPubSubHost = #option{type = string, validate = subdomain_template,
process = fun mongoose_subdomain_utils:make_subdomain_pattern/1},
#section{
items = #{<<"backend">> => #option{type = atom, validate = {module, ?MODULE}},
items = #{<<"iqdisc">> => mongoose_config_spec:iqdisc(),
<<"backend">> => #option{type = atom, validate = {module, ?MODULE}},
<<"wpool">> => wpool_spec(),
<<"plugin_module">> => #option{type = atom, validate = module},
<<"virtual_pubsub_hosts">> => #list{items = VirtPubSubHost}},
defaults = #{<<"backend">> => mnesia,
defaults = #{<<"iqdisc">> => one_queue,
<<"backend">> => mnesia,
<<"plugin_module">> => mod_event_pusher_push_plugin:default_plugin_module(),
<<"virtual_pubsub_hosts">> => []},
format_items = map
Expand Down
23 changes: 8 additions & 15 deletions src/event_pusher/mod_event_pusher_sns.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
%%%===================================================================

-define(TOPIC_BASE, ["arn", "aws", "sns"]).
-define(PUBLISH_RETRY_COUNT_DEFAULT, 2).

-type user_guid() :: binary().
-type topic_arn() :: string(). %% Full topic ARN in format arn:aws:sns:{REGION}:{ACCOUNT_ID}:{TOPIC}
Expand Down Expand Up @@ -158,7 +157,7 @@ handle_packet(HostType, From, To, Packet) ->
-spec async_publish(mongooseim:host_type(), topic_arn(), Content :: jiffy:json_value(),
attributes()) -> ok.
async_publish(HostType, TopicARN, Content, Attributes) ->
Retry = opt(HostType, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT),
Retry = opt(HostType, publish_retry_count),
mongoose_wpool:cast(generic, HostType, pusher_sns,
{?MODULE, try_publish, [HostType, TopicARN, Content, Attributes, Retry]}).

Expand Down Expand Up @@ -234,42 +233,36 @@ message_type(Packet) ->
end.

%% Getter for module options
-spec opt(mongooseim:host_type(), gen_mod:opt_key()) -> gen_mod:opt_value() | undefined.
-spec opt(mongooseim:host_type(), gen_mod:opt_key()) -> gen_mod:opt_value().
opt(HostType, Option) ->
opt(HostType, Option, undefined).

%% Getter for module options with default value
-spec opt(mongooseim:host_type(), gen_mod:opt_key(), Default :: gen_mod:opt_value()) ->
gen_mod:opt_value().
opt(HostType, Option, Default) ->
gen_mod:get_module_opt(HostType, ?MODULE, Option, Default).
gen_mod:get_module_opt(HostType, ?MODULE, Option).

%% ----------------------------------------------------------------------
%% Callbacks

-spec user_guid(mongooseim:host_type(), UserJID :: jid:jid()) -> user_guid().
user_guid(HostType, UserJID) ->
PluginModule = opt(HostType, plugin_module, mod_event_pusher_sns_defaults),
PluginModule = opt(HostType, plugin_module),
PluginModule:user_guid(UserJID).

-spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
UserJID :: jid:jid(), IsOnline :: boolean()) ->
attributes().
message_attributes(HostType, TopicARN, UserJID, IsOnline) ->
PluginModule = opt(HostType, plugin_module, mod_event_pusher_sns_defaults),
PluginModule = opt(HostType, plugin_module),
PluginModule:message_attributes(TopicARN, UserJID, IsOnline).

-spec message_attributes(mongooseim:host_type(), TopicARN :: topic_arn(),
From :: jid:jid(), To :: jid:jid(), MessageType :: pm | muc,
Packet :: exml:element()) -> attributes().
message_attributes(HostType, TopicARN, From, To, MessageType, Packet) ->
PluginModule = opt(HostType, plugin_module, mod_event_pusher_sns_defaults),
PluginModule = opt(HostType, plugin_module),
PluginModule:message_attributes(TopicARN, From, To, MessageType, Packet).

-spec calc_backoff_time(mongooseim:host_type(), integer()) -> integer().
calc_backoff_time(HostType, Retry) ->
MaxRetry = opt(HostType, publish_retry_count, ?PUBLISH_RETRY_COUNT_DEFAULT),
BaseTime = opt(HostType, publish_retry_time_ms, 50),
MaxRetry = opt(HostType, publish_retry_count),
BaseTime = opt(HostType, publish_retry_time_ms),
BackoffMaxTime = round(math:pow(2, MaxRetry - Retry)) * BaseTime,
Random = rand:uniform(BaseTime),
BackoffMaxTime - Random.
6 changes: 4 additions & 2 deletions test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ custom_mod_event_pusher_http() ->
pool_name => http_pool}]}.

custom_mod_event_pusher_push() ->
#{backend => mnesia,
#{iqdisc => one_queue,
backend => mnesia,
plugin_module => mod_event_pusher_push_plugin_defaults,
virtual_pubsub_hosts =>
[{fqdn,<<"host1">>},{fqdn,<<"host2">>}],
Expand Down Expand Up @@ -1049,7 +1050,8 @@ default_config([modules, M]) ->
default_config([modules, mod_event_pusher, http]) ->
#{handlers => []};
default_config([modules, mod_event_pusher, push]) ->
#{backend => mnesia,
#{iqdisc => one_queue,
backend => mnesia,
wpool => default_config([modules, mod_event_pusher, push, wpool]),
plugin_module => mod_event_pusher_push_plugin_defaults,
virtual_pubsub_hosts => []};
Expand Down
37 changes: 22 additions & 15 deletions test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,7 @@ mod_event_pusher_push(_Config) ->
#{<<"mod_event_pusher">> => #{<<"push">> => Opts}}}
end,
?cfgh(P, default_config(P), T(#{})),
check_iqdisc(P, T),
test_wpool(P ++ [wpool], fun(Opts) -> T(#{<<"wpool">> => Opts}) end),
?cfgh(P ++ [backend], rdbms, T(#{<<"backend">> => <<"rdbms">>})),
?cfgh(P ++ [plugin_module], mod_event_pusher_push_plugin_enhanced,
Expand Down Expand Up @@ -3140,22 +3141,28 @@ service_mongoose_system_metrics(_Config) ->

%% Helpers for module tests

iqdisc({queues, Workers}) -> #{<<"type">> => <<"queues">>, <<"workers">> => Workers};
iqdisc(Atom) -> #{<<"type">> => atom_to_binary(Atom, utf8)}.

iq_disc_generic(Module, RequiredOpts, Value) ->
Opts = RequiredOpts#{<<"iqdisc">> => Value},
#{<<"modules">> => #{atom_to_binary(Module, utf8) => Opts}}.

check_iqdisc(Module) ->
check_iqdisc(Module, #{}).

check_iqdisc(Module, RequiredOpts) ->
?cfgh([modules, Module, iqdisc], {queues, 10},
iq_disc_generic(Module, RequiredOpts, iqdisc({queues, 10}))),
?cfgh([modules, Module, iqdisc], parallel,
iq_disc_generic(Module, RequiredOpts, iqdisc(parallel))),
?errh(iq_disc_generic(Module, RequiredOpts, iqdisc(bad_haha))).
P = [modules, Module],
T = fun(Opts) -> #{<<"modules">> => #{atom_to_binary(Module) => Opts}} end,
check_iqdisc(P, T).

check_iqdisc(Module, RequiredOpts) when is_map(RequiredOpts) ->
P = [modules, Module],
T = fun(Opts) ->
#{<<"modules">> => #{atom_to_binary(Module) => maps:merge(RequiredOpts, Opts)}}
end,
check_iqdisc(P, T);
check_iqdisc(ParentP, ParentT) when is_function(ParentT, 1) ->
P = ParentP ++ [iqdisc],
T = fun(Opts) -> ParentT(#{<<"iqdisc">> => Opts}) end,
?cfgh(P, {queues, 10}, T(#{<<"type">> => <<"queues">>, <<"workers">> => 10})),
?cfgh(P, parallel, T(#{<<"type">> => <<"parallel">>})),
?cfgh(P, one_queue, T(#{<<"type">> => <<"one_queue">>})),
?cfgh(P, no_queue, T(#{<<"type">> => <<"no_queue">>})),
?errh(T(#{<<"type">> => <<"one_queue_and_a_half">>})),
?errh(T(#{<<"type">> => <<"queues">>, <<"workers">> => 0})),
?errh(T(#{<<"type">> => <<"no_queue">>, <<"workers">> => 10})),
?errh(T(#{<<"workers">> => 10})).

check_module_defaults(Mod) ->
ExpectedCfg = default_mod_config(Mod),
Expand Down

0 comments on commit 72abadc

Please sign in to comment.