Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument router and remove global spirals (S2S/component) #4319

Merged
merged 14 commits into from
Jul 17, 2024
Merged
1 change: 0 additions & 1 deletion big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
{vars, "mim1"},
{cluster, mim},
{secondary_domain, <<"localhost.bis">>},
{reloaded_domain, <<"sogndal">>},
{secondary_host_type, <<"localhost.bis">>},
{anonymous_host_type, <<"anonymous.localhost">>},
{muc_service, <<"muc.localhost">>},
Expand Down
8 changes: 4 additions & 4 deletions big_tests/tests/bosh_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ negative_instrumentation_events() ->
[{Name, #{}} || Name <- negative_instrumentation_events_names()].

negative_instrumentation_events_names() ->
[c2s_tcp_data_sent,
c2s_tcp_data_received,
c2s_tls_data_sent,
c2s_tls_data_received].
[c2s_tcp_data_out,
c2s_tcp_data_in,
c2s_tls_data_out,
c2s_tls_data_in].
58 changes: 42 additions & 16 deletions big_tests/tests/component_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(xep0114, Config) ->
instrument_helper:start(events()),
Config1 = get_components(Config),
escalus:create_users(Config1, escalus:get_users([alice, bob]));
init_per_group(subdomain, Config) ->
Expand All @@ -102,6 +103,9 @@ init_per_group(distributed, Config) ->
init_per_group(_GroupName, Config) ->
escalus:create_users(Config, escalus:get_users([alice, bob])).

end_per_group(xep0114, Config) ->
escalus:delete_users(Config, escalus:get_users([alice, bob])),
instrument_helper:stop();
end_per_group(subdomain, Config) ->
escalus:delete_users(Config, escalus:get_users([alice, astrid])),
restore_domain(Config);
Expand Down Expand Up @@ -130,14 +134,32 @@ dirty_disconnect(Config) ->
disconnect_component(Component1, Addr).

register_one_component(Config) ->
MongooseMetrics = [{[global, data, xmpp, received, component], changed},
{[global, data, xmpp, sent, component], changed}],
PreStoryData = escalus_mongooseim:pre_story([{mongoose_metrics, MongooseMetrics}]),
TS = instrument_helper:timestamp(),
%% Given one connected component
CompOpts = ?config(component1, Config),
{Component, ComponentAddr, _} = connect_component(CompOpts),
escalus_mongooseim:post_story(PreStoryData),
% start stream reply
instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS}),
% 1. start stream, 2. component handshake
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
instrument_helper:assert(component_tcp_data_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
% 1. start stream reply, 2. handshake reply
instrument_helper:assert(component_tcp_data_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),

TS1 = instrument_helper:timestamp(),
verify_component(Config, Component, ComponentAddr),

% Message from Alice
instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS1}),
% Reply to Alice
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS1}),

disconnect_component(Component, ComponentAddr).

verify_component(Config, Component, ComponentAddr) ->
Expand Down Expand Up @@ -165,22 +187,19 @@ intercomponent_communication(Config) ->
CompOpts2 = ?config(component2, Config),
{Comp1, CompAddr1, _} = connect_component(CompOpts1),
{Comp2, CompAddr2, _} = connect_component(CompOpts2),
MongooseMetrics = [{[global, data, xmpp, received, component], changed},
{[global, data, xmpp, sent, component], changed}],

PreStoryData = escalus_mongooseim:pre_story([{mongoose_metrics, MongooseMetrics}]),
%% note that there is no c2s communication happens and
%% xmpp_stanza_size_sent/xmpp_stanza_size_received metrics are bounced
%% for the components communication

TS = instrument_helper:timestamp(),
%% When the first component sends a message the second component
Msg0 = escalus_stanza:chat_to(CompAddr2, <<"intercomponent msg">>),
escalus:send(Comp1, escalus_stanza:from(Msg0, CompAddr1)),
%% Then the second component receives it
Reply0 = escalus:wait_for_stanza(Comp2),
escalus:assert(is_chat_message, [<<"intercomponent msg">>], Reply0),

escalus_mongooseim:post_story(PreStoryData),
instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS}),
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS}),

disconnect_component(Comp1, CompAddr1),
disconnect_component(Comp2, CompAddr2).
Expand All @@ -192,10 +211,9 @@ register_two_components(Config) ->
CompOpts2 = ?config(component2, Config),
{Comp1, CompAddr1, _} = connect_component(CompOpts1),
{Comp2, CompAddr2, _} = connect_component(CompOpts2),
MongooseMetrics = [{[global, data, xmpp, received, component], changed},
{[global, data, xmpp, sent, component], changed}],
TS = instrument_helper:timestamp(),

escalus:story([{mongoose_metrics, MongooseMetrics} | Config],
escalus:story(Config,
[{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
%% When the first component sends a message to Alice
Msg1 = escalus_stanza:chat_to(Alice, <<"Comp1-2-Alice msg">>),
Expand Down Expand Up @@ -228,6 +246,13 @@ register_two_components(Config) ->
escalus:assert(is_chat_message, [<<"Alice-2-Comp1 msg">>], Reply4)
end),

% Msg to Alice, msg to Bob
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
% Msg from Bob, msg from Alice
instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),

disconnect_component(Comp1, CompAddr1),
disconnect_component(Comp2, CompAddr2).

Expand Down Expand Up @@ -549,12 +574,13 @@ restore_domain(Config) ->
ejabberd_node_utils:restart_application(mongooseim),
Config.

events() ->
instrument_helper:declared_events(ejabberd_service, [#{connection_type => component}]).

%%--------------------------------------------------------------------
%% Stanzas
%%--------------------------------------------------------------------


cluster_users() ->
AllUsers = ct:get_config(escalus_users),
[proplists:lookup(alice, AllUsers), proplists:lookup(clusterguy, AllUsers)].
30 changes: 29 additions & 1 deletion big_tests/tests/dynamic_domains_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ all() ->
pm_messages,
disconnected_on_domain_disabling,
auth_domain_removal_is_triggered_on_hook,
no_route,
{group, with_mod_dynamic_domains_test}].

groups() ->
Expand All @@ -31,12 +32,14 @@ groups() ->
init_per_suite(Config0) ->
Config = cluster_nodes(?CLUSTER_NODES, Config0),
insert_domains(?TEST_NODES, ?DOMAINS),
instrument_helper:start([{router_no_route_found, #{host_type => ?HOST_TYPE}}]),
escalus:init_per_suite(Config).

end_per_suite(Config0) ->
Config = escalus:end_per_suite(Config0),
remove_domains(?TEST_NODES, ?DOMAINS),
uncluster_nodes(?CLUSTER_NODES, Config).
uncluster_nodes(?CLUSTER_NODES, Config),
instrument_helper:stop().

init_per_group(with_mod_dynamic_domains_test, Config) ->
MockedModules = [mod_dynamic_domains_test, mongoose_router],
Expand All @@ -57,11 +60,21 @@ end_per_group(with_mod_dynamic_domains_test, Config) ->
end_per_group(_, Config) ->
Config.

init_per_testcase(no_route, Config) ->
RoutingModules = rpc(mim(), mongoose_router, default_routing_modules, []),
% S2S always "routes" a stanza (returns {done, Acc}), even if it can't connect to a remote server.
% In order for the router_no_route_found event to be executed, S2S has to be disabled.
RoutingModulesWithoutS2S = lists:droplast(RoutingModules),
NewConfig = mongoose_helper:backup_and_set_config_option(Config, routing_modules, RoutingModulesWithoutS2S),
escalus:init_per_testcase(no_route, NewConfig);
init_per_testcase(CN, Config) ->
Modules = proplists:get_value(reset_meck, Config, []),
[rpc(mim(), meck, reset, [M]) || M <- Modules],
escalus:init_per_testcase(CN, Config).

end_per_testcase(no_route, Config) ->
mongoose_helper:restore_config(Config),
escalus:end_per_testcase(no_route, Config);
end_per_testcase(CN, Config) ->
escalus:end_per_testcase(CN, Config).

Expand All @@ -83,6 +96,21 @@ pm_messages(Config) ->
end,
escalus:story(Config, [{alice3, 1}, {bob3, 1}], StoryFn).

no_route(Config) ->
StoryFn =
fun(Alice, Bob) ->
% S2S is disabled, and Alice sends a stanza to a nonexistent server
Msg = escalus_stanza:chat_to(Bob, <<"OH, HAI!">>),
WrongJID = <<"bob@wrong_server.com">>,
WrongServerMsg = escalus_stanza:to(Msg, WrongJID),
escalus:send(Alice, WrongServerMsg),

escalus_assert:has_no_stanzas(Bob),
instrument_helper:wait_and_assert(router_no_route_found, #{host_type => ?HOST_TYPE},
fun(#{count := 1, to := To}) -> jid:to_binary(To) =:= WrongJID end)
end,
escalus:story(Config, [{alice3, 1}, {bob3, 1}], StoryFn).

disconnected_on_domain_disabling(Config) ->
StoryFn =
fun(Alice, Bob) ->
Expand Down
6 changes: 3 additions & 3 deletions big_tests/tests/graphql_metric_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,15 @@ get_metrics_as_dicts_with_nonexistent_key(Config) ->
Result = get_metrics_as_dicts_with_keys([<<"not_existing">>], Config),
ParsedResult = get_ok_value([data, metric, getMetricsAsDicts], Result),
Map = dict_objects_to_map(ParsedResult),
RecvName = [<<"global">>, <<"xmpp_stanza_size_received">>, <<"byte_size">>],
RecvName = [<<"global">>, <<"c2s_xmpp_element_size_in">>, <<"byte_size">>],
[] = maps:get(RecvName, Map).

get_metrics_as_dicts_empty_args(Config) ->
%% Empty name
Result = get_metrics_as_dicts([], [<<"median">>], Config),
ParsedResult = get_ok_value([data, metric, getMetricsAsDicts], Result),
Map = dict_objects_to_map(ParsedResult),
RecvName = [<<"global">>, <<"xmpp_stanza_size_received">>, <<"byte_size">>],
RecvName = [<<"global">>, <<"c2s_xmpp_element_size_in">>, <<"byte_size">>],
[#{<<"key">> := <<"median">>, <<"value">> := Median}] = maps:get(RecvName, Map),
?assert(is_integer(Median)),
%% Empty keys
Expand Down Expand Up @@ -359,7 +359,7 @@ check_node_result_is_valid(ResList, MetricsAreGlobal) ->
[#{<<"key">> := <<"value">>,<<"value">> := V} | _] =
maps:get([<<"global">>,<<"sm_unique_sessions">>,<<"count">>], Map),
?assert(is_integer(V)),
HistObjects = maps:get([<<"global">>, <<"xmpp_stanza_size_received">>, <<"byte_size">>], Map),
HistObjects = maps:get([<<"global">>, <<"c2s_xmpp_element_size_in">>, <<"byte_size">>], Map),
check_histogram(kv_objects_to_map(HistObjects)).

check_histogram(Map) ->
Expand Down
12 changes: 6 additions & 6 deletions big_tests/tests/mim_c2s_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ instrumentation_events() ->
++ [{c2s_message_processing_time, #{host_type => domain_helper:host_type()}}].

tcp_instrumentation_events() ->
[{c2s_tcp_data_sent, #{}},
{c2s_tcp_data_received, #{}}].
[{c2s_tcp_data_out, #{}},
{c2s_tcp_data_in, #{}}].

tls_instrumentation_events() ->
[{c2s_tls_data_sent, #{}},
{c2s_tls_data_received, #{}}].
[{c2s_tls_data_out, #{}},
{c2s_tls_data_in, #{}}].

common_instrumentation_events() ->
HostType = domain_helper:host_type(),
[{c2s_message_processing_time, #{host_type => HostType}},
{xmpp_stanza_size_received, #{}},
{xmpp_stanza_size_sent, #{}}].
{c2s_xmpp_element_size_in, #{}},
{c2s_xmpp_element_size_out, #{}}].
82 changes: 73 additions & 9 deletions big_tests/tests/s2s_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ essentials() ->

all_tests() ->
[connections_info, dns_discovery, dns_discovery_ip_fail, nonexistent_user,
unknown_domain, malformed_jid, dialback_with_wrong_key | essentials()].
unknown_domain, malformed_jid, dialback_with_wrong_key].

negative() ->
[timeout_waiting_for_message].
Expand Down Expand Up @@ -97,6 +97,7 @@ users() ->
%%%===================================================================

init_per_suite(Config) ->
instrument_helper:start(tested_events()),
mongoose_helper:inject_module(?MODULE, reload),
Config1 = s2s_helper:init_s2s(escalus:init_per_suite(Config)),
escalus:create_users(Config1, escalus:get_users(users())).
Expand All @@ -105,13 +106,15 @@ end_per_suite(Config) ->
escalus_fresh:clean(),
s2s_helper:end_s2s(Config),
escalus:delete_users(Config, escalus:get_users(users())),
escalus:end_per_suite(Config).
escalus:end_per_suite(Config),
instrument_helper:stop().

init_per_group(dialback, Config) ->
%% Tell mnesia that mim and mim2 nodes are clustered
distributed_helper:add_node_to_cluster(distributed_helper:mim2(), Config);
init_per_group(GroupName, Config) ->
s2s_helper:configure_s2s(GroupName, Config).
Config1 = s2s_helper:configure_s2s(GroupName, Config),
[{requires_tls, group_with_tls(GroupName)}, {group, GroupName} | Config1].

end_per_group(_GroupName, _Config) ->
ok.
Expand Down Expand Up @@ -158,11 +161,8 @@ end_per_testcase(CaseName, Config) ->
%%%===================================================================

simple_message(Config) ->
%% check that metrics are bounced
MongooseMetrics = [{[global, data, xmpp, received, s2s], changed},
{[global, data, xmpp, sent, s2s], changed}],
escalus:fresh_story([{mongoose_metrics, MongooseMetrics} | Config],
[{alice2, 1}, {alice, 1}], fun(Alice2, Alice1) ->
escalus:fresh_story(Config, [{alice2, 1}, {alice, 1}], fun(Alice2, Alice1) ->
TS = instrument_helper:timestamp(),

%% User on the main server sends a message to a user on a federated server
escalus:send(Alice1, escalus_stanza:chat_to(Alice2, <<"Hi, foreign Alice!">>)),
Expand All @@ -176,8 +176,10 @@ simple_message(Config) ->

%% User on the main server receives the message
Stanza2 = escalus:wait_for_stanza(Alice1, 10000),
escalus:assert(is_chat_message, [<<"Nice to meet you!">>], Stanza2)
escalus:assert(is_chat_message, [<<"Nice to meet you!">>], Stanza2),

% Instrumentation events are executed
assert_events(TS, Config)
end).

timeout_waiting_for_message(Config) ->
Expand Down Expand Up @@ -523,3 +525,65 @@ configure_secret_and_restart_s2s(NodeKey) ->

shared_secret(mim) -> <<"f623e54a0741269be7dd">>; %% Some random key
shared_secret(mim2) -> <<"9e438f25e81cf347100b">>.

assert_events(TS, Config) ->
TLS = proplists:get_value(requires_tls, Config),
instrument_helper:assert(s2s_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => element_count(in, TLS), min_timestamp => TS}),
instrument_helper:assert(s2s_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => element_count(out, TLS), min_timestamp => TS}),
instrument_helper:assert(s2s_tcp_data_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{min_timestamp => TS}),
instrument_helper:assert(s2s_tcp_data_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{min_timestamp => TS}).

element_count(_Dir, true) ->
% TLS tests are not checking a specific number of events, because the numbers are flaky
positive;
element_count(in, false) ->
% Some of these steps happen asynchronously, so the order may be different.
% Since S2S connections are unidirectional, mim1 acts both as initiating,
% and receiving (and authoritative) server in the dialback procedure.
% 1. Stream start response from fed1 (as initiating server)
% 2. Stream start from fed1 (as receiving server)
% 3. Dialback key (step 1, as receiving server)
% 4. Dialback verification request (step 2, as authoritative server)
% 5. Dialback result (step 4, as initiating server)
% New s2s process is started to verify fed1 as an authoritative server
% This process sends a new stream header, as it opens a new connection to fed1,
% now acting as an authoritative server. Also see comment on L360 in ejabberd_s2s.
% 6. Stream start response from fed1
% 7. Dialback verification response (step 3, as receiving server)
% 8. Message from federated Alice
% The number can be seen as the sum of all arrows from the dialback diagram, since mim
% acts as all three roles in the two dialback procedures that occur:
% https://xmpp.org/extensions/xep-0220.html#intro-howitworks
% (6 arrows) + one for stream header response + one for the actual message
8;
element_count(out, false) ->
% Since S2S connections are unidirectional, mim1 acts both as initiating,
% and receiving (and authoritative) server in the dialback procedure.
% 1. Dialback key (step 1, as initiating server)
% 2. Dialback verification response (step 3, as authoritative server)
% 3. Dialback request (step 2, as receiving server)
% 4. Message from Alice
% 5. Dialback result (step 4, as receiving server)
% The number calculation corresponds to in_element, however the stream headers are not
% sent as XML elements, but straight as text, and so these three events do not appear:
% - open stream to fed1,
% - stream response for fed1->mim stream,
% - open stream to fed1 as authoritative server.
5.

group_with_tls(both_tls_optional) -> true;
group_with_tls(both_tls_required) -> true;
group_with_tls(node1_tls_optional_node2_tls_required) -> true;
group_with_tls(node1_tls_required_node2_tls_optional) -> true;
group_with_tls(node1_tls_required_trusted_node2_tls_optional) -> true;
group_with_tls(node1_tls_optional_node2_tls_required_trusted_with_cachain) -> true;
group_with_tls(_GN) -> false.

tested_events() ->
Names = [s2s_xmpp_element_size_in, s2s_xmpp_element_size_out,
s2s_tcp_data_in, s2s_tcp_data_out],
[{Name, #{}} || Name <- Names].
Loading