Skip to content

Commit

Permalink
Merge pull request #4337 from esl/instrument/async-pools
Browse files Browse the repository at this point in the history
Instrument `async_pools`
  • Loading branch information
chrzaszcz authored Jul 31, 2024
2 parents fc8fb35 + 72c3de4 commit 2d51539
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 137 deletions.
24 changes: 18 additions & 6 deletions big_tests/tests/inbox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
given_conversations_between/2,
assert_invalid_inbox_form_value_error/3,
assert_invalid_reset_inbox/4,
extract_user_specs/1
extract_user_specs/1,
assert_async_request_event/1
]).

-define(ROOM3, <<"testroom3">>).
Expand Down Expand Up @@ -159,10 +160,13 @@ suite() ->

init_per_suite(Config) ->
mongoose_helper:inject_module(?MODULE),
HostType = domain_helper:host_type(),
instrument_helper:start([{async_pool_request, #{host_type => HostType, pool_id => inbox}}]),
escalus:init_per_suite(Config).

end_per_suite(Config) ->
escalus:end_per_suite(Config).
escalus:end_per_suite(Config),
instrument_helper:stop().

init_per_group(GroupName, Config) when GroupName =:= regular; GroupName =:= async_pools ->
HostType = domain_helper:host_type(),
Expand Down Expand Up @@ -1348,6 +1352,7 @@ bin_is_not_included_by_default(Config) ->
end).

rest_api_bin_flush_user(Config) ->
TS = instrument_helper:timestamp(),
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) ->
create_room_and_make_users_leave(Alice, Bob, Kate),
%% It is not in his bin anymore after triggering a bin flush
Expand All @@ -1357,7 +1362,8 @@ rest_api_bin_flush_user(Config) ->
{{<<"200">>, <<"OK">>}, NumOfRows} = rest_helper:delete(admin, Path),
?assertEqual(1, NumOfRows),
check_inbox(Bob, [], #{box => bin})
end).
end),
assert_async_request_event(TS).

rest_api_bin_flush_user_errors(Config) ->
Config1 = escalus_fresh:create_users(Config, [{alice, 1}]),
Expand All @@ -1373,6 +1379,7 @@ rest_api_bin_flush_user_errors(Config) ->
rest_helper:delete(admin, <<"/inbox/", Domain/binary, "/baduser/0/bin">>).

rest_api_bin_flush_all(Config) ->
TS = instrument_helper:timestamp(),
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) ->
create_room_and_make_users_leave(Alice, Bob, Kate),
%% It is not in any bin anymore after triggering a bin flush
Expand All @@ -1382,7 +1389,8 @@ rest_api_bin_flush_all(Config) ->
?assertEqual(2, NumOfRows),
check_inbox(Bob, [], #{box => bin}),
check_inbox(Kate, [], #{box => bin})
end).
end),
assert_async_request_event(TS).

rest_api_bin_flush_all_errors(_Config) ->
HostTypePath = uri_string:normalize(#{path => domain_helper:host_type()}),
Expand All @@ -1392,14 +1400,17 @@ rest_api_bin_flush_all_errors(_Config) ->
rest_helper:delete(admin, <<"/inbox/bad_host_type/0/bin">>).

timeout_cleaner_flush_all(Config) ->
TS = instrument_helper:timestamp(),
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) ->
create_room_and_make_users_leave(Alice, Bob, Kate),
%% It is eventually not in any bin thanks to the periodic cleanouts
check_inbox(Bob, [], #{box => bin}),
check_inbox(Kate, [], #{box => bin})
end).
end),
assert_async_request_event(TS).

xmpp_bin_flush(Config) ->
TS = instrument_helper:timestamp(),
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) ->
create_room_and_make_users_leave(Alice, Bob, Kate),
%% It is eventually not in any bin thanks to the periodic cleanouts
Expand All @@ -1411,7 +1422,8 @@ xmpp_bin_flush(Config) ->
escalus:send(Bob, Iq),
escalus:assert(is_iq_result, [Iq], escalus:wait_for_stanza(Bob)),
check_inbox(Bob, [], #{box => bin})
end).
end),
assert_async_request_event(TS).


%% helpers
Expand Down
10 changes: 9 additions & 1 deletion big_tests/tests/inbox_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
muc_domain/0,
domain/0,
to_bare_lower/1,
extract_user_specs/1
extract_user_specs/1,
assert_async_request_event/1
]).

-import(muc_helper, [foreach_recipient/2]).
Expand Down Expand Up @@ -867,6 +868,13 @@ assert_message_content(Msg, Field, Value) ->
?assertNotEqual(nomatch, binary:match(Msg, Field)),
?assertNotEqual(nomatch, binary:match(Msg, Value)).

assert_async_request_event(TS) ->
instrument_helper:assert(
async_pool_request,
#{host_type => domain_helper:host_type(), pool_id => inbox},
fun(#{count := 1}) -> true end,
#{min_timestamp => TS}).

%% TODO: properly extract the specs from Bob
extract_user_specs(User) ->
{client,_,_,_,_,UserSpecs} = User,
Expand Down
60 changes: 53 additions & 7 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
assert_archive_message_event/2,
assert_lookup_event/2,
assert_flushed_event_if_async/2,
assert_async_batch_flush_event/3,
assert_async_timed_flush_event/3,
assert_dropped_iq_event/2,
assert_event_with_jid/2,
assert_no_event_with_jid/2
Expand Down Expand Up @@ -430,7 +432,8 @@ muc_light_cases() ->
muc_light_chat_markers_are_archived_if_enabled,
muc_light_chat_markers_are_not_archived_if_disabled,
muc_light_failed_to_decode_message_in_database,
muc_light_sql_query_failed
muc_light_sql_query_failed,
muc_light_async_pools_batch_flush
].

muc_rsm_cases() ->
Expand Down Expand Up @@ -503,13 +506,18 @@ muc_prefs_cases() ->
impl_specific() ->
[check_user_exist,
pm_failed_to_decode_message_in_database,
pm_sql_query_failed].
pm_sql_query_failed,
async_pools_batch_flush].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

init_per_suite(Config) ->
instrument_helper:start(instrument_helper:declared_events(instrumented_modules())),
PoolIds = [pm_mam, muc_mam],
AsyncPoolsEvents = [{async_pool_flush, #{host_type => host_type(), pool_id => PoolId}}
|| PoolId <- PoolIds],
instrument_helper:start(
instrument_helper:declared_events(instrumented_modules()) ++ AsyncPoolsEvents),
muc_helper:load_muc(),
mongoose_helper:inject_module(mim(), ?MODULE, reload),
mam_helper:prepare_for_suite(
Expand Down Expand Up @@ -568,7 +576,6 @@ init_per_group(rsm04_comp, Config) ->
[{props, mam04_props()}|Config];
init_per_group(with_rsm04, Config) ->
[{props, mam04_props()}, {with_rsm, true}|Config];

init_per_group(nostore, Config) ->
Config;
init_per_group(archived, Config) ->
Expand Down Expand Up @@ -801,6 +808,10 @@ maybe_skip(C, Config) when C =:= easy_text_search_request;
C =:= muc_text_search_request ->
skip_if(?config(configuration, Config) =:= cassandra,
"full text search is not implemented for cassandra backend");
maybe_skip(C, Config) when C =:= muc_light_async_pools_batch_flush;
C =:= async_pools_batch_flush ->
skip_if(?config(configuration, Config) =/= rdbms_async_pool,
"only for async pool");
maybe_skip(_C, _Config) ->
ok.

Expand Down Expand Up @@ -905,6 +916,11 @@ required_modules(CaseName, Config) when CaseName =:= muc_light_service_discovery
Opts = #{pm := PM} = ?config(mam_meta_opts, Config),
NewOpts = Opts#{pm := PM#{archive_groupchats => true}},
[{mod_mam, NewOpts}];
required_modules(CaseName, Config) when CaseName =:= async_pools_batch_flush;
CaseName =:= muc_light_async_pools_batch_flush ->
Opts = #{async_writer := Async} = ?config(mam_meta_opts, Config),
NewOpts = Opts#{async_writer := Async#{batch_size => 3, flush_interval => 5000}},
[{mod_mam, NewOpts}];
required_modules(muc_light_chat_markers_are_archived_if_enabled, Config) ->
Opts = #{muc := MUC} = ?config(mam_meta_opts, Config),
NewOpts = Opts#{muc := MUC#{archive_chat_markers => true}},
Expand Down Expand Up @@ -1705,6 +1721,7 @@ muc_light_easy(Config) ->
end).

muc_light_shouldnt_modify_pm_archive(Config) ->
TS = instrument_helper:timestamp(),
escalus:story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
Room = muc_helper:fresh_room_name(),
given_muc_light_room(Room, Alice, [{Bob, member}]),
Expand All @@ -1731,7 +1748,9 @@ muc_light_shouldnt_modify_pm_archive(Config) ->
then_archive_response_is(Alice, [{message, Alice, <<"private hi!">>}], Config),
when_archive_query_is_sent(Bob, undefined, Config),
then_archive_response_is(Bob, [{message, Alice, <<"private hi!">>}], Config)
end).
end),
assert_async_timed_flush_event(Config, TS, pm_mam),
assert_async_timed_flush_event(Config, TS, muc_mam).

muc_light_stored_in_pm_if_allowed_to(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
Expand Down Expand Up @@ -1911,6 +1930,23 @@ muc_light_sql_query_failed(Config) ->
escalus:assert(is_error, [<<"wait">>, <<"internal-server-error">>], Error)
end).

muc_light_async_pools_batch_flush(Config) ->
TS = instrument_helper:timestamp(),
escalus:story(Config, [{alice, 1}], fun(Alice) ->
Room = muc_helper:fresh_room_name(),
given_muc_light_room(Room, Alice, []),

M1 = when_muc_light_message_is_sent(Alice, Room,<<"Msg 1">>, <<"Id1">>),
then_muc_light_message_is_received_by([Alice], M1),

M2 = when_muc_light_message_is_sent(Alice, Room, <<"Msg 2">>, <<"Id2">>),
then_muc_light_message_is_received_by([Alice], M2),

M3 = when_muc_light_message_is_sent(Alice, Room, <<"Msg 3">>, <<"Id3">>),
then_muc_light_message_is_received_by([Alice], M3)
end),
assert_async_batch_flush_event(TS, 1, muc_mam).

pm_failed_to_decode_message_in_database(Config) ->
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi">>)),
Expand All @@ -1932,6 +1968,16 @@ pm_sql_query_failed(Config) ->
escalus:assert(is_error, [<<"wait">>, <<"internal-server-error">>], Error)
end).

async_pools_batch_flush(Config) ->
TS = instrument_helper:timestamp(),
escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) ->
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 1">>)),
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 2">>)),
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 3">>)),
mam_helper:wait_for_archive_size(Alice, 3)
end),
assert_async_batch_flush_event(TS, 2, pm_mam).

retrieve_form_fields(ConfigIn) ->
escalus_fresh:story(ConfigIn, [{alice, 1}], fun(Alice) ->
P = ?config(props, ConfigIn),
Expand Down Expand Up @@ -3449,7 +3495,7 @@ muc_prefs_set_request(ConfigIn) ->
%% </never>
%% </prefs>
%% </iq>

Room = ?config(room, Config),
RoomAddr = room_address(Room),
escalus:send(Alice, stanza_to_room(stanza_prefs_set_request(<<"roster">>,
Expand Down Expand Up @@ -3666,7 +3712,7 @@ muc_run_prefs_cases(DefaultPolicy, ConfigIn) ->
maybe_wait_for_archive(Config),

%% Get ALL messages using several queries if required

Room = ?config(room, Config),
IQ = stanza_archive_request([{mam_ns, <<"urn:xmpp:mam:1">>}], <<>>),
escalus:send(Alice, stanza_to_room(IQ, Room)),
Expand Down
19 changes: 19 additions & 0 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,25 @@ assert_flushed_event_if_async(EventName, Config) ->
ok
end.

assert_async_batch_flush_event(TS, ExpectedCount, PoolId) ->
instrument_helper:assert(
async_pool_flush,
#{host_type => host_type(), pool_id => PoolId},
fun(#{batch := 1}) -> true end,
#{min_timestamp => TS, expected_count => ExpectedCount}).

assert_async_timed_flush_event(Config, TS, PoolId) ->
case ?config(configuration, Config) of
rdbms_async_pool ->
instrument_helper:assert(
async_pool_flush,
#{host_type => host_type(), pool_id => PoolId},
fun(#{timed := 1}) -> true end,
#{min_timestamp => TS});
_ ->
ok
end.

assert_dropped_iq_event(Config, BinJid) ->
EventName = case ?config(room, Config) of
undefined -> mod_mam_pm_dropped_iq;
Expand Down
3 changes: 2 additions & 1 deletion src/async_pools/mongoose_aggregator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ make_async_request(Request, #state{host_type = HostType, pool_id = PoolId,
drop ->
State;
ReqId ->
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1),
mongoose_instrument:execute(async_pool_request, #{host_type => HostType, pool_id => PoolId},
#{count => 1}),
State#state{async_request = {ReqId, Request}}
end.

Expand Down
17 changes: 11 additions & 6 deletions src/async_pools/mongoose_async_pools.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ start_pool(HostType, PoolId, PoolOpts) ->
start => {?MODULE, start_link, [HostType, PoolId, PoolOpts]},
restart => transient,
type => supervisor},
mongoose_instrument:set_up(instrumentation(HostType, PoolId)),
ejabberd_sup:start_child(ChildSpec).

-spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
stop_pool(HostType, PoolId) ->
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
mongoose_instrument:tear_down(instrumentation(HostType, PoolId)),
ejabberd_sup:stop_child(sup_name(HostType, PoolId)).

-spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
Expand Down Expand Up @@ -137,20 +139,17 @@ gen_pool_name(HostType, PoolId) ->

-spec process_pool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> [wpool:option()].
process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) ->
WorkerModule = select_worker_module(HostType, PoolId, Opts),
WorkerModule = select_worker_module(Opts),
WorkerOpts = make_worker_opts(HostType, PoolId, Opts),
Worker = {WorkerModule, WorkerOpts},
[{worker, Worker},
{workers, NumWorkers},
{worker_opt, [{spawn_opt, [{message_queue_data, off_heap}]}]},
{worker_shutdown, 10000}].

select_worker_module(HostType, PoolId, #{pool_type := batch}) ->
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter),
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter),
select_worker_module(#{pool_type := batch}) ->
mongoose_batch_worker;
select_worker_module(HostType, PoolId, #{pool_type := aggregate}) ->
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, async_request], counter),
select_worker_module(#{pool_type := aggregate}) ->
mongoose_aggregator_worker.

-spec make_worker_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> map().
Expand All @@ -172,3 +171,9 @@ maybe_init_handler(HostType, PoolId, Opts = #{init_callback := InitFun}, Extra)
Extra#{init_data => InitFun(HostType, PoolId, Opts)};
maybe_init_handler(_, _, _, Extra) ->
Extra.

instrumentation(HostType, PoolId) ->
[{async_pool_flush, #{pool_id => PoolId, host_type => HostType},
#{metrics => #{timed => spiral, batch => spiral}}},
{async_pool_request, #{pool_id => PoolId, host_type => HostType},
#{metrics => #{count => spiral}}}].
9 changes: 6 additions & 3 deletions src/async_pools/mongoose_batch_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ init(#{host_type := HostType,
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId,
flush_queue = [_|_]}) ->
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId},
#{timed => 1}),
{reply, ok, run_flush(State)};
handle_call(sync, _From, State = #state{flush_queue = []}) ->
{reply, skipped, State};
Expand All @@ -84,7 +85,8 @@ handle_cast(Msg, State) ->
handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef,
host_type = HostType,
pool_id = PoolId}) ->
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId},
#{timed => 1}),
{noreply, run_flush(State)};
handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) ->
{noreply, State};
Expand Down Expand Up @@ -153,7 +155,8 @@ maybe_run_flush(#state{host_type = HostType,
case Length >= MaxSize of
false -> State;
true ->
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1),
mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId},
#{batch => 1}),
run_flush(State)
end.

Expand Down
3 changes: 2 additions & 1 deletion src/instrument/mongoose_instrument.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
-type labels() :: #{host_type => mongooseim:host_type(),
function => atom(),
cache_name => atom(),
pool_id => atom(),
pool_tag => mongoose_wpool:tag()}. % to be extended
-type label_key() :: host_type | function | cache_name | pool_tag. % to be extended
-type label_key() :: host_type | function | cache_name | pool_id | pool_tag. % to be extended
-type label_value() :: mongooseim:host_type() | atom() | mongoose_wpool:tag(). % to be extended
-type metrics() :: #{metric_name() => metric_type()}.
-type metric_name() :: atom().
Expand Down
Loading

0 comments on commit 2d51539

Please sign in to comment.