Skip to content

Commit

Permalink
Put mod_stream_management config opts in a map
Browse files Browse the repository at this point in the history
  • Loading branch information
gustawlippa committed Mar 4, 2022
1 parent 3611948 commit ff5f652
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 107 deletions.
40 changes: 18 additions & 22 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -183,44 +183,40 @@ end_per_testcase(CaseName, Config) ->
required_modules(Scope, Name) ->
SMConfig = case required_sm_opts(Scope, Name) of
stopped -> stopped;
ExtraOpts -> merge_proplists(common_sm_opts(), ExtraOpts)
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
end,
[{mod_stream_management, SMConfig}, {mod_offline, []}].
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, []}].

required_sm_opts(group, parallel) ->
[{ack_freq, never}];
#{ack_freq => never};
required_sm_opts(group, parallel_manual_ack_freq_1) ->
[{ack_freq, 1},
{resume_timeout, ?SHORT_TIMEOUT}];
#{ack_freq => 1,
resume_timeout => ?SHORT_TIMEOUT};
required_sm_opts(group, manual_ack_freq_2) ->
[{ack_freq, 2}];
#{ack_freq => 2};
required_sm_opts(group, stream_mgmt_disabled) ->
stopped;
required_sm_opts(group, parallel_unacknowledged_message_hook) ->
[{ack_freq, 1}];
#{ack_freq => 1};
required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
[{ack_freq, 1}, {buffer_max, 1000}];
#{ack_freq => 1, buffer_max => 1000};
required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
[{ack_freq, 1},
{resume_timeout, ?SHORT_TIMEOUT} | stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)];
#{ack_freq => 1,
resume_timeout => ?SHORT_TIMEOUT,
stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)};
required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT);
#{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)};
required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT).
#{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.

common_sm_opts() ->
[{buffer_max, ?SMALL_SM_BUFFER}].

merge_proplists(Defaults, Values) ->
Values ++ delete_keys(proplists:get_keys(Values), Defaults).

delete_keys(Keys, List) ->
lists:foldl(fun proplists:delete/2, List, Keys).
#{buffer_max => ?SMALL_SM_BUFFER}.

stale_h(RepeatAfter, Geriatric) ->
[{stale_h, [{enabled, true},
{stale_h_repeat_after, RepeatAfter},
{stale_h_geriatric, Geriatric}]}].
#{enabled => true,
stale_h_repeat_after => RepeatAfter,
stale_h_geriatric => Geriatric}.

make_smid() ->
base64:encode(crypto:strong_rand_bytes(21)).
Expand Down
2 changes: 2 additions & 0 deletions doc/modules/mod_stream_management.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ while the management of the session tables and configuration is implemented in
* **Default:** "mnesia"
* **Example:** `backend = "mnesia"`

Currently, only "mnesia" is supported.

### `modules.mod_stream_management.buffer`
* **Syntax:** boolean
* **Default:** true
Expand Down
9 changes: 3 additions & 6 deletions include/ejabberd_c2s.hrl
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
-include("mod_privacy.hrl").

-define(STREAM_MGMT_H_MAX, (1 bsl 32 - 1)).
-define(STREAM_MGMT_CACHE_MAX, 100).
-define(STREAM_MGMT_ACK_FREQ, 1). %% It's the *denominator* of the frequency
-define(STREAM_MGMT_RESUME_TIMEOUT, 600). %% seconds
-define(CONSTRAINT_CHECK_TIMEOUT, 5). %% seconds

-type jid_set() :: gb_sets:set(jid:simple_jid()).
Expand Down Expand Up @@ -68,9 +65,9 @@
stream_mgmt_out_acked = 0,
stream_mgmt_buffer = [] :: [mongoose_acc:t()],
stream_mgmt_buffer_size = 0,
stream_mgmt_buffer_max = ?STREAM_MGMT_CACHE_MAX,
stream_mgmt_ack_freq = ?STREAM_MGMT_ACK_FREQ,
stream_mgmt_resume_timeout = ?STREAM_MGMT_RESUME_TIMEOUT,
stream_mgmt_buffer_max,
stream_mgmt_ack_freq,
stream_mgmt_resume_timeout,
stream_mgmt_resume_tref,
stream_mgmt_resumed_from,
stream_mgmt_constraint_check_tref,
Expand Down
18 changes: 3 additions & 15 deletions src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2781,9 +2781,9 @@ maybe_enable_stream_mgmt(NextState, El, StateData = #state{host_type = HostType}
enable_stream_resumption(StateData)
end,
send_element_from_server_jid(NewSD, EnabledEl),
BufferMax = get_buffer_max(HostType),
AckFreq = get_ack_freq(HostType),
ResumeTimeout = get_resume_timeout(HostType),
BufferMax = mod_stream_management:get_buffer_max(HostType),
AckFreq = mod_stream_management:get_ack_freq(HostType),
ResumeTimeout = mod_stream_management:get_resume_timeout(HostType),
fsm_next_state(NextState,
NewSD#state{stream_mgmt = true,
stream_mgmt_buffer_max = BufferMax,
Expand Down Expand Up @@ -2968,18 +2968,6 @@ drop_last(N, List) ->
end, {N, []}, List),
{N - ToDrop, List2}.

-spec get_buffer_max(mongooseim:host_type()) -> pos_integer() | infinity.
get_buffer_max(HostType) ->
mod_stream_management:get_buffer_max(HostType, ?STREAM_MGMT_CACHE_MAX).

-spec get_ack_freq(mongooseim:host_type()) -> pos_integer().
get_ack_freq(HostType) ->
mod_stream_management:get_ack_freq(HostType, ?STREAM_MGMT_ACK_FREQ).

-spec get_resume_timeout(mongooseim:host_type()) -> pos_integer().
get_resume_timeout(HostType) ->
mod_stream_management:get_resume_timeout(HostType, ?STREAM_MGMT_RESUME_TIMEOUT).

maybe_send_ack_request(Acc, #state{stream_mgmt = StreamMgmt})
when StreamMgmt =:= false; StreamMgmt =:= disabled ->
Acc;
Expand Down
55 changes: 33 additions & 22 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
%% API for `ejabberd_c2s'
-export([make_smid/0,
get_session_from_smid/2,
get_buffer_max/2,
get_ack_freq/2,
get_resume_timeout/2,
get_buffer_max/1,
get_ack_freq/1,
get_resume_timeout/1,
register_smid/3]).

%% API for inspection and tests
Expand Down Expand Up @@ -76,23 +76,30 @@ config_spec() ->
validate = positive},
<<"stale_h">> => stale_h_config_spec()
},
process = fun ?MODULE:process_buffer_and_ack/1
process = fun ?MODULE:process_buffer_and_ack/1,
format_items = map,
defaults = #{<<"backend">> => mnesia,
<<"buffer">> => true,
<<"buffer_max">> => 100,
<<"ack">> => true,
<<"ack_freq">> => 1,
<<"resume_timeout">> => 600 % seconds
}
}.

supported_features() -> [dynamic_domains].

process_buffer_and_ack(KVs) ->
{[Buffer, Ack], Opts} = proplists:split(KVs, [buffer, ack]),
process_buffer_and_ack(Opts = #{buffer := Buffer, ack := Ack}) ->
OptsWithBuffer = check_buffer(Buffer, Opts),
check_ack(Ack, OptsWithBuffer).

check_buffer([{buffer, false}], Opts) ->
lists:ukeysort(1, [{buffer_max, no_buffer}] ++ Opts);
check_buffer(false, Opts) ->
Opts#{buffer_max => no_buffer};
check_buffer(_, Opts) ->
Opts.

check_ack([{ack, false}], Opts) ->
lists:ukeysort(1, [{ack_freq, never}] ++ Opts);
check_ack(false, Opts) ->
Opts#{ack_freq => never};
check_ack(_, Opts) ->
Opts.

Expand All @@ -104,7 +111,12 @@ stale_h_config_spec() ->
wrap = {kv, stale_h_repeat_after}},
<<"geriatric">> => #option{type = integer,
validate = positive,
wrap = {kv, stale_h_geriatric}}
wrap = {kv, stale_h_geriatric}}},
format_items = map,
include = always,
defaults = #{<<"enabled">> => false,
<<"repeat_after">> => 1800, % seconds
<<"geriatric">> => 3600 % seconds
}
}.

Expand Down Expand Up @@ -176,17 +188,17 @@ get_stale_h(HostType, SMID) ->
true -> read_stale_h(HostType, SMID)
end.

-spec get_buffer_max(mongooseim:host_type(), buffer_max()) -> buffer_max().
get_buffer_max(HostType, Default) ->
gen_mod:get_module_opt(HostType, ?MODULE, buffer_max, Default).
-spec get_buffer_max(mongooseim:host_type()) -> buffer_max().
get_buffer_max(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, buffer_max).

-spec get_ack_freq(mongooseim:host_type(), ack_freq()) -> ack_freq().
get_ack_freq(HostType, Default) ->
gen_mod:get_module_opt(HostType, ?MODULE, ack_freq, Default).
-spec get_ack_freq(mongooseim:host_type()) -> ack_freq().
get_ack_freq(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, ack_freq).

-spec get_resume_timeout(mongooseim:host_type(), pos_integer()) -> pos_integer().
get_resume_timeout(HostType, Default) ->
gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout, Default).
-spec get_resume_timeout(mongooseim:host_type()) -> pos_integer().
get_resume_timeout(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout).


register_stale_smid_h(HostType, SMID, H) ->
Expand All @@ -202,8 +214,7 @@ remove_stale_smid_h(HostType, SMID) ->
end.

is_stale_h_enabled(HostType) ->
MaybeModOpts = gen_mod:get_module_opt(HostType, ?MODULE, stale_h, []),
proplists:get_value(enabled, MaybeModOpts, false).
gen_mod:get_module_opt(HostType, ?MODULE, [stale_h, enabled]).

%% Backend operations

Expand Down
32 changes: 12 additions & 20 deletions src/stream_management/mod_stream_management_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,22 @@
{smid :: mod_stream_management:smid(),
sid :: ejabberd_sm:sid() }).

init(_HostType, Opts) ->
init(_HostType, #{stale_h := StaleOpts}) ->
mnesia:create_table(sm_session, [{ram_copies, [node()]},
{attributes, record_info(fields, sm_session)}]),
mnesia:add_table_index(sm_session, sid),
mnesia:add_table_copy(sm_session, node(), ram_copies),
maybe_init_stale_h(Opts),
maybe_init_stale_h(StaleOpts),
ok.

maybe_init_stale_h(Opts) ->
StaleOpts = gen_mod:get_opt(stale_h, Opts, [{enabled, false}]),
case proplists:get_value(enabled, StaleOpts, false) of
false ->
ok;
true ->
?LOG_INFO(#{what => stream_mgmt_stale_h_start}),
mnesia:create_table(stream_mgmt_stale_h,
[{ram_copies, [node()]},
{attributes, record_info(fields, stream_mgmt_stale_h)}]),
mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies),
start_cleaner(StaleOpts)
end.
maybe_init_stale_h(StaleOpts = #{enabled := true}) ->
?LOG_INFO(#{what => stream_mgmt_stale_h_start}),
mnesia:create_table(stream_mgmt_stale_h,
[{ram_copies, [node()]},
{attributes, record_info(fields, stream_mgmt_stale_h)}]),
mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies),
start_cleaner(StaleOpts);
maybe_init_stale_h(_) -> ok.

-spec register_smid(HostType, SMID, SID) ->
ok | {error, term()} when
Expand Down Expand Up @@ -140,10 +135,7 @@ start_cleaner(Opts) ->
start_link(Opts) ->
gen_server:start_link({local, stream_management_stale_h}, ?MODULE, [Opts], []).

init([Opts]) ->
%% In seconds
RepeatAfter = proplists:get_value(stale_h_repeat_after, Opts, 1800),
GeriatricAge = proplists:get_value(stale_h_geriatric, Opts, 3600),
init([#{stale_h_repeat_after := RepeatAfter, stale_h_geriatric := GeriatricAge}]) ->
State = #smgc_state{gc_repeat_after = RepeatAfter,
gc_geriatric = GeriatricAge},
schedule_check(State),
Expand All @@ -166,7 +158,7 @@ handle_info(Info, State) ->
{noreply, State}.

schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) ->
erlang:send_after(RepeatAfter * 1000, self(), check).
erlang:send_after(timer:seconds(RepeatAfter), self(), check).

clear_table(GeriatricAge) ->
TimeToDie = erlang:monotonic_time(second) - GeriatricAge,
Expand Down
33 changes: 24 additions & 9 deletions test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,13 @@ all_modules() ->
mod_mam_muc_rdbms_arch =>
mod_config(mod_mam_muc_rdbms_arch, #{db_message_format => mam_message_xml}),
mod_stream_management =>
[{ack_freq, 1},
{buffer_max, 30},
{resume_timeout, 600},
{stale_h,
[{enabled, true},
{stale_h_geriatric, 3600},
{stale_h_repeat_after, 1800}]}]}.
mod_config(mod_stream_management, #{ack_freq => 2,
buffer_max => 30,
resume_timeout => 600,
stale_h => #{enabled => true,
stale_h_geriatric => 3600,
stale_h_repeat_after => 1800}})
}.

pgsql_modules() ->
#{mod_adhoc => default_mod_config(mod_adhoc),
Expand All @@ -671,7 +671,8 @@ pgsql_modules() ->
{ip_access, [{allow, "127.0.0.0/8"}, {deny, "0.0.0.0/0"}]},
{welcome_message, {"Hello", "I am MongooseIM"}}],
mod_roster => [{backend, rdbms}],
mod_sic => default_mod_config(mod_sic), mod_stream_management => [],
mod_sic => default_mod_config(mod_sic),
mod_stream_management => default_mod_config(mod_stream_management),
mod_vcard => mod_config(mod_vcard, #{backend => rdbms, host => {prefix, <<"vjud.">>}})}.

auth_with_methods(Methods) ->
Expand Down Expand Up @@ -850,6 +851,16 @@ default_mod_config(mod_private) ->
#{iqdisc => one_queue, backend => rdbms};
default_mod_config(mod_sic) ->
#{iqdisc => one_queue};
default_mod_config(mod_stream_management) ->
#{backend => mnesia,
buffer => true,
buffer_max => 100,
ack => true,
ack_freq => 1,
resume_timeout => 600,
stale_h => #{enabled => false,
stale_h_repeat_after => 1800,
stale_h_geriatric => 3600}};
default_mod_config(mod_time) ->
#{iqdisc => one_queue};
default_mod_config(mod_vcard) ->
Expand Down Expand Up @@ -931,7 +942,11 @@ default_config([modules, mod_mam_meta, async_writer]) ->
#{batch_size => 30, enabled => true, flush_interval => 2000,
pool_size => 4 * erlang:system_info(schedulers_online)};
default_config([modules, mod_mam_meta, riak]) ->
#{bucket_type => <<"mam_yz">>, search_index => <<"mam">>}.
#{bucket_type => <<"mam_yz">>, search_index => <<"mam">>};
default_config([modules, mod_stream_management, stale_h]) ->
#{enabled => false,
stale_h_repeat_after => 1800,
stale_h_geriatric => 3600}.

common_mam_config() ->
#{no_stanzaid_element => false,
Expand Down
24 changes: 12 additions & 12 deletions test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2900,15 +2900,14 @@ mod_sic(_Config) ->
check_iqdisc_map(mod_sic).

mod_stream_management(_Config) ->
check_module_defaults(mod_stream_management),
T = fun(Opts) -> #{<<"modules">> => #{<<"mod_stream_management">> => Opts}} end,
M = fun(Cfg) -> modopts(mod_stream_management, Cfg) end,
?cfgh(M([{buffer_max, no_buffer}]), T(#{<<"buffer">> => false})),
?cfgh(M([{buffer_max, 10}]), T(#{<<"buffer_max">> => 10})),
?cfgh(M([{ack_freq, never}]), T(#{<<"ack">> => false})),
?cfgh(M([{ack_freq, 1}]), T(#{<<"ack_freq">> => 1})),
?cfgh(M([{resume_timeout, 600}]), T(#{<<"resume_timeout">> => 600})),
?cfgh(M([{backend, mnesia}]),
T(#{<<"backend">> => <<"mnesia">>})),
P = [modules, mod_stream_management],
?cfgh(P ++ [buffer_max], no_buffer, T(#{<<"buffer">> => false})),
?cfgh(P ++ [buffer_max], 10, T(#{<<"buffer_max">> => 10})),
?cfgh(P ++ [ack_freq], never, T(#{<<"ack">> => false})),
?cfgh(P ++ [ack_freq], 1, T(#{<<"ack_freq">> => 1})),
?cfgh(P ++ [resume_timeout], 999, T(#{<<"resume_timeout">> => 999})),

?errh(T(#{<<"buffer">> => 0})),
?errh(T(#{<<"buffer_max">> => -1})),
Expand All @@ -2918,12 +2917,13 @@ mod_stream_management(_Config) ->
?errh(T(#{<<"backend">> => <<"iloveyou">>})).

mod_stream_management_stale_h(_Config) ->
P = [modules, mod_stream_management, stale_h],
T = fun(Opts) -> #{<<"modules">> =>
#{<<"mod_stream_management">> => #{<<"stale_h">> => Opts}}} end,
M = fun(Cfg) -> modopts(mod_stream_management, [{stale_h, Cfg}]) end,
?cfgh(M([{enabled, true}]), T(#{<<"enabled">> => true})),
?cfgh(M([{stale_h_repeat_after, 1800}]), T(#{<<"repeat_after">> => 1800})),
?cfgh(M([{stale_h_geriatric, 3600}]), T(#{<<"geriatric">> => 3600})),
?cfgh(P ++ [enabled], true, T(#{<<"enabled">> => true})),
?cfgh(P ++ [stale_h_repeat_after], 999, T(#{<<"repeat_after">> => 999})),
?cfgh(P ++ [stale_h_geriatric], 999, T(#{<<"geriatric">> => 999})),
?cfgh(P, config_parser_helper:default_config(P), T(#{})),

?errh(T(#{<<"enabled">> => <<"true">>})),
?errh(T(#{<<"repeat_after">> => -1})),
Expand Down
2 changes: 1 addition & 1 deletion test/config_parser_SUITE_data/modules.toml
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@

[modules.mod_stream_management]
buffer_max = 30
ack_freq = 1
ack_freq = 2
resume_timeout = 600
stale_h.enabled = true
stale_h.repeat_after = 1800
Expand Down

0 comments on commit ff5f652

Please sign in to comment.