Skip to content

Commit

Permalink
Start all global_distrib metrics globally
Browse files Browse the repository at this point in the history
Also move instrumentation asserts to more correct places.
  • Loading branch information
gustawlippa committed Jun 7, 2024
1 parent 6f3982a commit 503e95b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
38 changes: 25 additions & 13 deletions big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ init_per_suite(Config) ->
end.

events() ->
Modules = [mod_global_distrib, mod_global_distrib_bounce, mod_global_distrib_hosts_refresher,
mod_global_distrib_mapping, mod_global_distrib_receiver],
lists:append([instrument_helper:declared_events(M) || M <- Modules]).
% because mod_global_distrib starts instrumentation manually, it doesn't export instrumentation/1
Specs = rpc(europe_node1, mod_global_distrib, instrumentation, []),
GDEvents = [{Event, Labels} || {Event, Labels, _Config} <- Specs],
OtherModules = [mod_global_distrib_bounce, mod_global_distrib_hosts_refresher,
mod_global_distrib_mapping, mod_global_distrib_receiver],
GDEvents ++ lists:append([instrument_helper:declared_events(M) || M <- OtherModules]).

end_per_suite(Config) ->
disable_logging(),
Expand Down Expand Up @@ -444,9 +447,14 @@ test_two_way_pm(Alice, Eve) ->
EveJidB = jid:to_binary(EveJid),
instrument_helper:assert(mod_global_distrib_mapping_cache_misses, #{},
fun(#{count := 1, jid := EveJidB}) -> true end),

instrument_helper:assert(mod_global_distrib_mapping_fetches, #{},
fun(#{count := 1, time := T, jid := EveJid}) -> T >= 0 end).
fun(#{count := 1, time := T, jid := EveJid}) -> T >= 0 end),
instrument_helper:assert(mod_global_distrib_outgoing_established, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end),
instrument_helper:assert(mod_global_distrib_outgoing_queue, #{},
fun(#{time := Time, host := <<"reg1">>}) -> Time >= 0 end),
instrument_helper:assert(mod_global_distrib_outgoing_messages, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end).

test_muc_conversation_on_one_host(Config0) ->
AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
Expand Down Expand Up @@ -551,7 +559,7 @@ test_muc_conversation_history(Config0) ->

% events are checked only on mim host, the other event was executed on Eve's reg ("asia_node") host
EveJid = escalus_client:full_jid(Eve),
instrument_helper:assert(mod_global_distrib_delivered_with_ttl, #{host_type => domain_helper:host_type()},
instrument_helper:assert(mod_global_distrib_delivered_with_ttl, #{},
fun(#{ttl := TTL, from := From}) ->
?assert(TTL > 0), jid:to_binary(From) =:= EveJid
end)
Expand Down Expand Up @@ -659,17 +667,21 @@ test_component_disconnect(Config) ->
escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
Error = escalus:wait_for_stanza(User, 5000),
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error),
instrument_helper:assert(mod_global_distrib_outgoing_established, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end),
instrument_helper:assert(mod_global_distrib_outgoing_queue, #{},
fun(#{time := Time, host := <<"reg1">>}) -> Time >= 0 end),
instrument_helper:assert(mod_global_distrib_outgoing_messages, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end),
instrument_helper:assert(mod_global_distrib_outgoing_closed, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end)
end,

[escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
AliceStory = fun(User) ->
Story(User),
% only check Alice, because Eve's event is executed on other node
Jid = escalus_client:full_jid(User),
instrument_helper:assert(mod_global_distrib_stop_ttl_zero, #{},
fun(#{count := 1, from := From}) ->
jid:to_binary(From) =:= Jid end)
end,

escalus:fresh_story(Config, [{alice, 1}], AliceStory),
escalus:fresh_story(Config, [{eve, 1}], Story).

test_location_disconnect(Config) ->
try
Expand Down
21 changes: 12 additions & 9 deletions src/global_distrib/mod_global_distrib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
-include("mongoose.hrl").
-include("mongoose_config_spec.hrl").

-export([deps/2, start/2, stop/1, config_spec/0, instrumentation/1]).
-export([deps/2, start/2, stop/1, config_spec/0, instrumentation/0]).
-export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
-export([maybe_reroute/3]).
-export([process_opts/1, process_endpoint/1]).

-ignore_xref([remove_metadata/2]).
-ignore_xref([remove_metadata/2, instrumentation/0]).

%%--------------------------------------------------------------------
%% gen_mod API
Expand All @@ -57,21 +57,24 @@ bounce_modules(#{enabled := false}) -> [].

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, #{global_host := HostType}) ->
mongoose_instrument:set_up(instrumentation()),
gen_hook:add_handlers(hooks());
start(_HostType, #{}) ->
ok.

-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
HostType -> gen_hook:delete_handlers(hooks());
HostType ->
mongoose_instrument:tear_down(instrumentation()),
gen_hook:delete_handlers(hooks());
_ -> ok
end.

-spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()].
instrumentation(HostType) ->
[{?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, #{host_type => HostType}, #{metrics => #{ttl => histogram}}},
{?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{host_type => HostType}, #{metrics => #{count => spiral}}}].
-spec instrumentation() -> [mongoose_instrument:spec()].
instrumentation() ->
[{?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, #{}, #{metrics => #{ttl => histogram}}},
{?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{}, #{metrics => #{count => spiral}}}].

hooks() ->
[{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}].
Expand Down Expand Up @@ -252,7 +255,7 @@ maybe_reroute({From, To, _, Packet} = FPacket, _, _) ->
ResultFPacket = case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of
{ok, LocalHost} ->
{ok, TTL} = find_metadata(Acc, ttl),
mongoose_instrument:execute(?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, #{host_type => GlobalHost}, #{ttl => TTL, from => From}),
mongoose_instrument:execute(?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, #{}, #{ttl => TTL, from => From}),

%% Continue routing with initialized metadata
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
Expand All @@ -271,7 +274,7 @@ maybe_reroute({From, To, _, Packet} = FPacket, _, _) ->
?LOG_INFO(#{what => gd_route_ttl_zero,
text => <<"Skip global distribution">>,
gd_id => ID, acc => Acc, target_host => TargetHost}),
mongoose_instrument:execute(?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{host_type => GlobalHost}, #{count => 1}),
mongoose_instrument:execute(?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{}, #{count => 1}),

Check warning on line 277 in src/global_distrib/mod_global_distrib.erl

View check run for this annotation

Codecov / codecov/patch

src/global_distrib/mod_global_distrib.erl#L277

Added line #L277 was not covered by tests
FPacket;
{ok, TTL} ->
?LOG_DEBUG(#{what => gd_reroute, ttl => TTL,
Expand Down
2 changes: 1 addition & 1 deletion src/global_distrib/mod_global_distrib_bounce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ maybe_store_message({From, To, Acc0, Packet} = FPacket, _, _) ->
?LOG_IF(error, To#jid.luser == <<>>,
#{what => gd_message_to_component_ttl_zero,
gd_id => ID, acc => Acc0}),
mongoose_instrument:execute(?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{host_type => LocalHost}, #{count => 1}),
mongoose_instrument:execute(?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{}, #{count => 1, from => From}),
{ok, FPacket};
OldTTL ->
ResendAfterMs = opt([bounce, resend_after_ms]),
Expand Down

0 comments on commit 503e95b

Please sign in to comment.