Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument/mod global distrib #4292

Merged
merged 13 commits into from
Jun 12, 2024
125 changes: 112 additions & 13 deletions big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-include_lib("exml/include/exml.hrl").

-define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
-define(PROBE_INTERVAL, 1). %% seconds

-import(domain_helper, [domain/0]).
-import(config_parser_helper, [config/2, mod_config/2]).
Expand All @@ -50,7 +51,7 @@ groups() ->
[
test_pm_between_users_at_different_locations,
test_pm_between_users_before_available_presence,
test_component_disconnect ,
test_component_disconnect,
test_component_on_one_host,
test_components_in_different_regions,
test_hidden_component_disco_in_different_region,
Expand All @@ -65,6 +66,7 @@ groups() ->

%% with node 2 disabled
test_muc_conversation_on_one_host,
test_instrumentation_events_on_one_host,
test_global_disco
]},
{hosts_refresher, [],
Expand Down Expand Up @@ -127,17 +129,29 @@ init_per_suite(Config) ->
ok
end,
enable_logging(),
escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
instrument_helper:start(events()),
Config1 = mongoose_helper:backup_and_set_config_option(Config, [instrumentation, probe_interval], ?PROBE_INTERVAL),
escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config1]);
Result ->
ct:pal("Redis check result: ~p", [Result]),
{skip, "GD Redis default pool not available"}
end.

events() ->
% because mod_global_distrib starts instrumentation manually, it doesn't export instrumentation/1
Specs = rpc(europe_node1, mod_global_distrib, instrumentation, []),
GDEvents = [{Event, Labels} || {Event, Labels, _Config} <- Specs],
OtherModules = [mod_global_distrib_bounce, mod_global_distrib_hosts_refresher,
mod_global_distrib_mapping, mod_global_distrib_receiver],
GDEvents ++ lists:append([instrument_helper:declared_events(M) || M <- OtherModules]).

end_per_suite(Config) ->
disable_logging(),
escalus_fresh:clean(),
rpc(europe_node2, mongoose_cluster, leave, []),
escalus:end_per_suite(Config).
escalus:end_per_suite(Config),
mongoose_helper:restore_config_option(Config, [instrumentation, probe_interval]),
instrument_helper:stop().

init_per_group(start_checks, Config) ->
NodeName = europe_node1,
Expand Down Expand Up @@ -249,9 +263,9 @@ end_per_group_generic(Config) ->
dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config).

init_per_testcase(CaseName, Config)
when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
CaseName == test_muc_conversation_history ->
%% There is no helper to load MUC on node2
when CaseName == test_muc_conversation_on_one_host; CaseName == test_instrumentation_events_on_one_host;
CaseName == test_global_disco; CaseName == test_muc_conversation_history ->
%% There is no helper to load MUC, or count instrumentation events on node2
%% For now it's easier to hide node2
%% TODO: Do it right at some point!
hide_node(europe_node2, Config),
Expand Down Expand Up @@ -297,8 +311,8 @@ end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_di
catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
generic_end_per_testcase(CN, Config);
end_per_testcase(CaseName, Config)
when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
CaseName == test_muc_conversation_history ->
when CaseName == test_muc_conversation_on_one_host; CaseName == test_instrumentation_events_on_one_host;
CaseName == test_global_disco; CaseName == test_muc_conversation_history ->
refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
muc_helper:unload_muc(),
generic_end_per_testcase(CaseName, Config);
Expand Down Expand Up @@ -428,7 +442,19 @@ test_two_way_pm(Alice, Eve) ->
escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
FromAlice),
escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
FromEve).
FromEve),

EveJidB = jid:to_binary(EveJid),
instrument_helper:assert(mod_global_distrib_mapping_cache_misses, #{},
fun(#{count := 1, jid := EveJidB}) -> true end),
instrument_helper:assert(mod_global_distrib_mapping_fetches, #{},
fun(#{count := 1, time := T, jid := EveJid}) -> T >= 0 end),
instrument_helper:assert(mod_global_distrib_outgoing_established, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end),
instrument_helper:assert(mod_global_distrib_outgoing_queue, #{},
fun(#{time := Time, host := <<"reg1">>}) -> Time >= 0 end),
instrument_helper:assert(mod_global_distrib_outgoing_messages, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end).

test_muc_conversation_on_one_host(Config0) ->
AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
Expand Down Expand Up @@ -465,6 +491,35 @@ test_muc_conversation_on_one_host(Config0) ->
end),
muc_helper:destroy_room(Config).

test_instrumentation_events_on_one_host(Config) ->
% testing is done with mim1 and reg1, and without mim2, so that we don't miss any events that could have been
% emitted there
Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
{ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
{ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),

test_two_way_pm(Alice, Eve),

Host = escalus_client:server(Alice),
instrument_helper:assert(mod_global_distrib_incoming_established, #{},
fun(#{count := 1, peer := Host}) -> true end),
instrument_helper:assert(mod_global_distrib_incoming_first_packet, #{},
fun(#{count := 1, host := Host}) -> true end),
instrument_helper:assert(mod_global_distrib_incoming_transfer, #{},
fun(#{time := T, host := Host}) -> T >= 0 end),
instrument_helper:assert(mod_global_distrib_incoming_messages, #{},
fun(#{count := 1, host := Host}) -> true end),
instrument_helper:assert(mod_global_distrib_incoming_queue, #{},
fun(#{time := T, host := Host}) -> T >= 0 end),

escalus_client:stop(Config1, Alice),
escalus_client:stop(Config1, Eve),

{ok, _} = mongoose_helper:wait_until(fun() -> instrument_helper:lookup(mod_global_distrib_incoming_closed, #{}) end,
fun(L) -> L =/= [] end,
#{name => mod_global_distrib_incoming_closed}),
instrument_helper:assert(mod_global_distrib_incoming_closed, #{}, fun(#{count := 1, host := Host}) -> true end).

test_muc_conversation_history(Config0) ->
AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
Expand Down Expand Up @@ -500,7 +555,14 @@ test_muc_conversation_history(Config0) ->
%% the service MAY then send discussion history, the room subject,
%% live messages, presence updates, and other in-room traffic.
receive_n_muc_messages(Eve, 3),
wait_for_subject(Eve)
wait_for_subject(Eve),

% events are checked only on mim host, the other event was executed on Eve's reg ("asia_node") host
EveJid = escalus_client:full_jid(Eve),
instrument_helper:assert(mod_global_distrib_delivered_with_ttl, #{},
fun(#{value := TTL, from := From}) ->
?assert(TTL > 0), jid:to_binary(From) =:= EveJid
end)
end),
muc_helper:destroy_room(Config).

Expand Down Expand Up @@ -604,10 +666,22 @@ test_component_disconnect(Config) ->
Story = fun(User) ->
escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
Error = escalus:wait_for_stanza(User, 5000),
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error),
instrument_helper:assert(mod_global_distrib_outgoing_closed, #{},
fun(#{count := 1, host := <<"reg1">>}) -> true end)
end,

[escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
AliceStory = fun(User) ->
Story(User),
% only check Alice, because Eve's event is executed on other node
Jid = escalus_client:full_jid(User),
instrument_helper:assert(mod_global_distrib_stop_ttl_zero, #{},
fun(#{count := 1, from := From}) ->
jid:to_binary(From) =:= Jid end)
end,

escalus:fresh_story(Config, [{alice, 1}], AliceStory),
escalus:fresh_story(Config, [{eve, 1}], Story).

test_location_disconnect(Config) ->
try
Expand Down Expand Up @@ -849,18 +923,28 @@ test_messages_bounced_in_order(Config) ->
%% Make sure all messages land in bounce storage
delete_mapping(europe_node1, Eve),

Seq = lists:seq(1, 100),
wait_for_bounce_size(0),

Seq = lists:seq(1, 99),
lists:foreach(
fun(I) ->
Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
escalus_client:send(Alice, Stanza)
end,
Seq),

wait_for_bounce_size(99),

%% Restore the mapping so that bounce eventually succeeds
?assertEqual(undefined, get_mapping(europe_node1, Eve)),
set_mapping(europe_node1, Eve, <<"reg1">>),

%% Test used to work if the mapping is restored while Alice was still sending the 100 stanzas.
%% This may actually be a race condition, and it should work like in the
%% test_in_order_messages_on_multiple_connections_with_bounce testcase:
%% Make sure that the last message is sent when the mapping is known
escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),

lists:foreach(
fun(I) ->
Stanza = escalus_client:wait_for_stanza(Eve, 5000),
Expand Down Expand Up @@ -1285,6 +1369,21 @@ bare_client(Client) ->
service_port() ->
ct:get_config({hosts, mim, service_port}).

wait_for_bounce_size(ExpectedSize) ->
wait_for_bounce_size(ExpectedSize, 5).
wait_for_bounce_size(ExpectedSize, 0) ->
F = fun(#{size := Size}) -> Size =:= ExpectedSize end,
instrument_helper:assert(mod_global_distrib_bounce_queue, #{}, F);
wait_for_bounce_size(ExpectedSize, Retries) ->
Measurements = instrument_helper:wait_for_new(mod_global_distrib_bounce_queue, #{}),
F = fun(#{size := Size}) -> Size =:= ExpectedSize end,
case lists:any(F, Measurements) of
true ->
instrument_helper:assert(mod_global_distrib_bounce_queue, #{}, Measurements, F);
false ->
timer:sleep(timer:seconds(?PROBE_INTERVAL)),
wait_for_bounce_size(ExpectedSize, Retries - 1)
end.

%% -----------------------------------------------------------------------
%% Waiting helpers
Expand Down
36 changes: 18 additions & 18 deletions doc/modules/mod_global_distrib.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,28 @@ In the example above, the message from **U2** would be temporarily stored at **D

Global distribution modules expose several per-datacenter metrics that can be used to monitor health of the system. All metrics begin with **global.mod_global_distrib** prefix:

* `outgoing.messages.<host>`: number of cross-datacenter messages sent by this cluster to a given host.
* `incoming.messages.<host>`: number of cross-datacenter messages received by this cluster from a given host.
* `incoming.transfer_time.<host>` *[us]*: time elapsed between sending and receiving the message over the network from a given host.
* `mod_global_distrib_outgoing_messages_count`: number of cross-datacenter messages sent by this cluster.
* `mod_global_distrib_incoming_messages_count`: number of cross-datacenter messages received by this cluster.
* `mod_global_distrib_incoming_transfer_time` *[us]*: time elapsed between sending and receiving the message over the network.
The duration is calculated using wall clock times on sender and receiver node.
* `outgoing.queue_time.<host>` *[us]*: time elapsed while message waits in a queue of a sender's connection to a given host.
* `mod_global_distrib_outgoing_queue_time` *[us]*: time elapsed while message waits in a queue of a sender's connection.
High value of this metric may be remedied by increasing the number of connections to other hosts.
* `incoming.queue_time` *[us]*: time elapsed while message waits in routing worker's queue.
* `mod_global_distrib_incoming_queue_time` *[us]*: time elapsed while message waits in routing worker's queue.
This value is not reported per-host as routing workers are bound to the sender's JID.
* `incoming.established`: incremented when a new connection is established from another cluster.
* `mod_global_distrib_incoming_established_count`: incremented when a new connection is established from another cluster.
At this point the origin domain of the cluster is not known, so this metric is common for all of them.
* `incoming.first_packet.<host>`: incremented when a receiver process gets the first packet from a remote cluster and learns its local domain.
* `incoming.closed.<host>`: incremented when an incoming connection gets closed.
* `incoming.errored.<host>`: incremented when an incoming connection gets closed with an error.
* `outgoing.established.<host>`: incremented when an outgoing connection is established.
* `outgoing.closed.<host>`: incremented when an outgoing connection gets closed.
* `outgoing.errored.<host>`: incremented when an outgoing connection gets closed with an error.
* `mapping_fetch_time` *[us]*: time spent on fetching an entry from the session table, cached or otherwise.
* `mapping_fetches`: number of fetches of session table entries, cached or otherwise.
* `mapping_cache_misses`: number of fetches of session table entries that hit the database.
* `delivered_with_ttl`: A histogram of packets' TTL values recorded when the global routing layer decides to route them locally (but not due to TTL = 0).
* `stop_ttl_zero`: A number of packets that weren't processed by global routing due to TTL=0.
* `bounce_queue_size`: a number of messages enqueued for rerouting (the value of this metric is individual per MongooseIM node!).
* `mod_global_distrib_incoming_first_packet_count`: incremented when a receiver process gets the first packet from a remote cluster and learns its local domain.
* `mod_global_distrib_incoming_closed_count`: incremented when an incoming connection gets closed.
* `mod_global_distrib_incoming_errored_count`: incremented when an incoming connection gets closed with an error.
* `mod_global_distrib_outgoing_established_count`: incremented when an outgoing connection is established.
* `mod_global_distrib_outgoing_closed_count`: incremented when an outgoing connection gets closed.
* `mod_global_distrib_outgoing_errored_count`: incremented when an outgoing connection gets closed with an error.
* `mod_global_distrib_mapping_fetches_time` *[us]*: time spent on fetching an entry from the session table, cached or otherwise.
* `mod_global_distrib_mapping_fetches_count`: number of fetches of session table entries, cached or otherwise.
* `mod_global_distrib_mapping_cache_misses_count`: number of fetches of session table entries that hit the database.
* `mod_global_distrib_delivered_with_ttl_value`: A histogram of packets' TTL values recorded when the global routing layer decides to route them locally (but not due to TTL = 0).
* `mod_global_distrib_stop_ttl_zero_count`: A number of packets that weren't processed by global routing due to TTL=0.
* `mod_global_distrib_bounce_queue_size`: a number of messages enqueued for rerouting (the value of this metric is individual per MongooseIM node!).

## Notes

Expand Down
35 changes: 17 additions & 18 deletions include/global_distrib_metrics.hrl
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
-define(GLOBAL_DISTRIB_MESSAGES_SENT(Server), [mod_global_distrib, outgoing, messages, Server]).
-define(GLOBAL_DISTRIB_MESSAGES_RECEIVED(Server), [mod_global_distrib, incoming, messages, Server]).
-define(GLOBAL_DISTRIB_TRANSFER_TIME(Server), [mod_global_distrib, incoming, transfer_time, Server]).
-define(GLOBAL_DISTRIB_SEND_QUEUE_TIME(Server), [mod_global_distrib, outgoing, queue_time, Server]).
-define(GLOBAL_DISTRIB_RECV_QUEUE_TIME, [mod_global_distrib, incoming, queue_time]).
-define(GLOBAL_DISTRIB_MAPPING_FETCH_TIME, [mod_global_distrib, mapping_fetch_time]).
-define(GLOBAL_DISTRIB_MAPPING_FETCHES, [mod_global_distrib, mapping_fetches]).
-define(GLOBAL_DISTRIB_MAPPING_CACHE_MISSES, [mod_global_distrib, mapping_cache_misses]).
-define(GLOBAL_DISTRIB_DELIVERED_WITH_TTL, [mod_global_distrib, delivered_with_ttl]).
-define(GLOBAL_DISTRIB_STOP_TTL_ZERO, [mod_global_distrib, stop_ttl_zero]).
-define(GLOBAL_DISTRIB_INCOMING_ESTABLISHED, [mod_global_distrib, incoming, established]).
-define(GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Server), [mod_global_distrib, incoming, first_packet, Server]).
-define(GLOBAL_DISTRIB_INCOMING_CLOSED(Server), [mod_global_distrib, incoming, closed, Server]).
-define(GLOBAL_DISTRIB_INCOMING_ERRORED(Server), [mod_global_distrib, incoming, errored, Server]).
-define(GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(Server), [mod_global_distrib, outgoing, established, Server]).
-define(GLOBAL_DISTRIB_OUTGOING_CLOSED(Server), [mod_global_distrib, outgoing, closed, Server]).
-define(GLOBAL_DISTRIB_OUTGOING_ERRORED(Server), [mod_global_distrib, outgoing, errored, Server]).
-define(GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, [mod_global_distrib, bounce_queue_size]).
-define(GLOBAL_DISTRIB_MESSAGES_SENT, mod_global_distrib_outgoing_messages).
-define(GLOBAL_DISTRIB_MESSAGES_RECEIVED, mod_global_distrib_incoming_messages).
-define(GLOBAL_DISTRIB_TRANSFER, mod_global_distrib_incoming_transfer).
-define(GLOBAL_DISTRIB_SEND_QUEUE, mod_global_distrib_outgoing_queue).
-define(GLOBAL_DISTRIB_RECV_QUEUE, mod_global_distrib_incoming_queue).
-define(GLOBAL_DISTRIB_MAPPING_FETCHES, mod_global_distrib_mapping_fetches).
-define(GLOBAL_DISTRIB_MAPPING_CACHE_MISSES, mod_global_distrib_mapping_cache_misses).
-define(GLOBAL_DISTRIB_DELIVERED_WITH_TTL, mod_global_distrib_delivered_with_ttl).
-define(GLOBAL_DISTRIB_STOP_TTL_ZERO, mod_global_distrib_stop_ttl_zero).
-define(GLOBAL_DISTRIB_INCOMING_ESTABLISHED, mod_global_distrib_incoming_established).
-define(GLOBAL_DISTRIB_INCOMING_FIRST_PACKET, mod_global_distrib_incoming_first_packet).
-define(GLOBAL_DISTRIB_INCOMING_CLOSED, mod_global_distrib_incoming_closed).
-define(GLOBAL_DISTRIB_INCOMING_ERRORED, mod_global_distrib_incoming_errored).
-define(GLOBAL_DISTRIB_OUTGOING_ESTABLISHED, mod_global_distrib_outgoing_established).
-define(GLOBAL_DISTRIB_OUTGOING_CLOSED, mod_global_distrib_outgoing_closed).
-define(GLOBAL_DISTRIB_OUTGOING_ERRORED, mod_global_distrib_outgoing_errored).
-define(GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, mod_global_distrib_bounce_queue).
Loading