Skip to content

Commit

Permalink
Instrument mod_global_distrib_bounce
Browse files Browse the repository at this point in the history
Fixes a race condition in the test. Maybe it could be fixed by retrying bounce
with a small timeout after sending all the messages, but I didn't try this and
went with the known solution from the other test.
  • Loading branch information
gustawlippa committed May 31, 2024
1 parent d5a832a commit 92d0038
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
26 changes: 23 additions & 3 deletions big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,23 @@ init_per_suite(Config) ->
ok
end,
enable_logging(),
instrument_helper:start(instrument_helper:declared_events(mod_global_distrib)),
escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
instrument_helper:start(events([mod_global_distrib, mod_global_distrib_bounce])),
Config1 = mongoose_helper:backup_and_set_config_option(Config, [instrumentation, probe_interval], 1),
escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config1]);
Result ->
ct:pal("Redis check result: ~p", [Result]),
{skip, "GD Redis default pool not available"}
end.

events(Modules) ->
lists:append([instrument_helper:declared_events(M) || M <- Modules]).

end_per_suite(Config) ->
disable_logging(),
escalus_fresh:clean(),
rpc(europe_node2, mongoose_cluster, leave, []),
escalus:end_per_suite(Config),
mongoose_helper:restore_config_option(Config, [instrumentation, probe_interval]),
instrument_helper:stop().

init_per_group(start_checks, Config) ->
Expand Down Expand Up @@ -858,18 +863,28 @@ test_messages_bounced_in_order(Config) ->
%% Make sure all messages land in bounce storage
delete_mapping(europe_node1, Eve),

Seq = lists:seq(1, 100),
wait_for_bounce_size(0),

Seq = lists:seq(1, 99),
lists:foreach(
fun(I) ->
Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
escalus_client:send(Alice, Stanza)
end,
Seq),

wait_for_bounce_size(99),

%% Restore the mapping so that bounce eventually succeeds
?assertEqual(undefined, get_mapping(europe_node1, Eve)),
set_mapping(europe_node1, Eve, <<"reg1">>),

%% Test used to work if the mapping is restored while Alice was still sending the 100 stanzas.
%% This may actually be a race condition, and it should work like in the
%% test_in_order_messages_on_multiple_connections_with_bounce testcase:
%% Make sure that the last message is sent when the mapping is known
escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),

lists:foreach(
fun(I) ->
Stanza = escalus_client:wait_for_stanza(Eve, 5000),
Expand All @@ -878,6 +893,11 @@ test_messages_bounced_in_order(Config) ->
Seq)
end).

wait_for_bounce_size(ExpectedSize) ->
[Measurements | _] = instrument_helper:wait_for_new(mod_global_distrib_bounce_queue_size, #{}),
F = fun(#{size := Size}) -> Size =:= ExpectedSize end,
instrument_helper:assert(mod_global_distrib_bounce_queue_size, #{}, [Measurements], F).

test_update_senders_host(Config) ->
escalus:fresh_story(
Config, [{alice, 1}, {eve, 1}],
Expand Down
2 changes: 1 addition & 1 deletion include/global_distrib_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
-define(GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(Server), [mod_global_distrib, outgoing, established, Server]).
-define(GLOBAL_DISTRIB_OUTGOING_CLOSED(Server), [mod_global_distrib, outgoing, closed, Server]).
-define(GLOBAL_DISTRIB_OUTGOING_ERRORED(Server), [mod_global_distrib, outgoing, errored, Server]).
-define(GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, [mod_global_distrib, bounce_queue_size]).
-define(GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, mod_global_distrib_bounce_queue_size).
20 changes: 15 additions & 5 deletions src/global_distrib/mod_global_distrib_bounce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-behaviour(gen_mod).
-behaviour(gen_server).
-behaviour(mongoose_module_metrics).
-behaviour(mongoose_instrument_probe).

-include("mongoose.hrl").
-include("jlib.hrl").
Expand All @@ -28,10 +29,12 @@
-define(MESSAGE_STORE, mod_global_distrib_bounce_message_store).
-define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target).

-export([start_link/0, start/2, stop/1, hooks/1, deps/2]).
-export([start_link/0, start/2, stop/1, hooks/1, deps/2, instrumentation/1]).
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
-export([maybe_store_message/3, reroute_messages/3]).
-export([bounce_queue_size/0]).
%% mongoose_instrument_probe callback
-export([probe/2]).

-ignore_xref([bounce_queue_size/0, start_link/0]).

Expand All @@ -40,12 +43,9 @@
%%--------------------------------------------------------------------

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(_HostType, _Opts) ->
start(HostType, _Opts) ->
mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set),
mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag),
EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}], [value]},
QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef},
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef),
ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]},
ejabberd_sup:start_child(ChildSpec).

Expand All @@ -63,6 +63,16 @@ hooks(HostType) ->
[{mod_global_distrib_unknown_recipient, HostType, fun ?MODULE:maybe_store_message/3, #{}, 80},
{mod_global_distrib_known_recipient, HostType, fun ?MODULE:reroute_messages/3, #{}, 80}].

-spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()].
instrumentation(_HostType) ->
[{?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, #{}, #{probe => #{module => ?MODULE},
metrics => #{size => gauge}}}].

-spec probe(mongoose_instrument:event_name(), mongoose_instrument:labels()) ->
mongoose_instrument:measurements().
probe(?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, #{}) ->
#{size => bounce_queue_size()}.

-spec start_link() -> {ok, pid()} | {error, any()}.
start_link() ->
gen_server:start_link(?MODULE, [], []).
Expand Down

0 comments on commit 92d0038

Please sign in to comment.