From cff2834821e41049e0923b135b0686a3429ebcc5 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Fri, 15 Dec 2023 22:49:09 +0100 Subject: [PATCH] Apply reviews and isolate shaping into the mongoose_shaper module --- include/mod_muc_room.hrl | 6 ++--- rebar.config | 2 +- rebar.lock | 7 +++--- src/c2s/mongoose_c2s.erl | 6 ++--- src/ejabberd_s2s_in.erl | 4 ++-- src/mongoose_shaper.erl | 46 +++++++++++++++++++++++++------------- src/mongoose_transport.erl | 18 +++++++-------- src/muc/mod_muc.erl | 4 ++-- src/muc/mod_muc_room.erl | 26 ++++++++++----------- 9 files changed, 68 insertions(+), 51 deletions(-) diff --git a/include/mod_muc_room.hrl b/include/mod_muc_room.hrl index e0b06b4e0e1..28c6aef0b93 100644 --- a/include/mod_muc_room.hrl +++ b/include/mod_muc_room.hrl @@ -62,8 +62,8 @@ -record(activity, {message_time = 0, presence_time = 0, - message_shaper :: opuntia:shaper(), - presence_shaper :: opuntia:shaper(), + message_shaper :: mongoose_shaper:shaper(), + presence_shaper :: mongoose_shaper:shaper(), message, presence }). @@ -85,7 +85,7 @@ subject_timestamp = <<>>, just_created = false :: boolean(), activity = treap:empty() :: treap:treap(), - room_shaper :: opuntia:shaper(), + room_shaper :: mongoose_shaper:shaper(), room_queue = queue:new(), http_auth_pool = none :: none | mongoose_http_client:pool(), http_auth_pids = [] :: [pid()], diff --git a/rebar.config b/rebar.config index 3ed575f3511..9e9ae287cff 100644 --- a/rebar.config +++ b/rebar.config @@ -70,7 +70,7 @@ {flatlog, "0.1.2"}, %%% Stateless libraries - {opuntia, "0.2.1"}, + {opuntia, {git, "https://github.com/NelsonVides/opuntia", {branch, "improvements"}}}, {fast_tls, "1.1.16"}, {fast_scram, "0.5.0"}, {idna, "6.1.1"}, diff --git a/rebar.lock b/rebar.lock index 0be8ef747be..fe369bdbf28 100644 --- a/rebar.lock +++ b/rebar.lock @@ -94,7 +94,10 @@ {ref,"c71265c00bd7b465b7214770895503383dbe299e"}}, 0}, {<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0}, - {<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.2.1">>},0}, + {<<"opuntia">>, + {git,"https://github.com/NelsonVides/opuntia", + {ref,"accb6f8846460235138742bc4943bc1be027616c"}}, + 0}, {<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1}, {<<"pa">>, {git,"https://github.com/erszcz/pa.git", @@ -171,7 +174,6 @@ {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, {<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>}, {<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>}, - {<<"opuntia">>, <<"7F86F2581FE97A43B1C6CAF4B839EC42D35C09DC64B3F1CCC11FDFD37340809E">>}, {<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>}, {<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>}, {<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>}, @@ -235,7 +237,6 @@ {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, {<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>}, {<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>}, - {<<"opuntia">>, <<"CC73528A67918836BD9C7440A6A3CB452F2B6DCE7A5A7696843192B9C4A0ED09">>}, {<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>}, {<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>}, {<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>}, diff --git a/src/c2s/mongoose_c2s.erl b/src/c2s/mongoose_c2s.erl index 589105c5272..870f7d250d2 100644 --- a/src/c2s/mongoose_c2s.erl +++ b/src/c2s/mongoose_c2s.erl @@ -35,7 +35,7 @@ jid :: undefined | jid:jid(), socket :: undefined | mongoose_c2s_socket:socket(), parser :: undefined | exml_stream:parser(), - shaper :: undefined | opuntia:shaper(), + shaper :: undefined | mongoose_shaper:shaper(), listener_opts :: undefined | listener_opts(), state_mod = #{} :: #{module() => term()}, info = #{} :: info() @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect, StateData = #c2s_data{listener_opts = #{shaper := ShaperName, max_stanza_size := MaxStanzaSize} = LOpts}) -> {ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]), - Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(ShaperName)), + Shaper = mongoose_shaper:new(ShaperName), C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts), StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper}, {next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)}; @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) -> -spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res(). handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) -> - {NewShaper, Pause} = opuntia:update(Shaper, Size), + {NewShaper, Pause} = mongoose_shaper:update(Shaper, Size), mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size), NewStateData = StateData#c2s_data{shaper = NewShaper}, MaybePauseTimeout = maybe_pause(NewStateData, Pause), diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index 0973965396b..77d0390388d 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -58,7 +58,7 @@ -record(state, {socket :: mongoose_transport:socket_data(), streamid :: ejabberd_s2s:stream_id(), - shaper :: opuntia:shaper(), + shaper :: mongoose_shaper:shaper(), tls = false :: boolean(), tls_enabled = false :: boolean(), tls_required = false :: boolean(), @@ -84,7 +84,7 @@ tls_enabled => boolean(), tls_options => mongoose_tls:options(), authenticated => boolean(), - shaper => opuntia:shaper(), + shaper => mongoose_shaper:shaper(), domains => [jid:lserver()]}. -type statename() :: 'stream_established' | 'wait_for_feature_request'. diff --git a/src/mongoose_shaper.erl b/src/mongoose_shaper.erl index 6cfdf9ba08a..2eba8bf4550 100644 --- a/src/mongoose_shaper.erl +++ b/src/mongoose_shaper.erl @@ -1,22 +1,28 @@ -module(mongoose_shaper). --export([child_spec/0, get_shaper_rate/1, wait/5, reset_all_shapers/1]). +-export([child_spec/0]). +-export([new/1, update/2, wait/5, reset_all_shapers/1]). -ignore_xref([reset_all_shapers/1]). +-type shaper() :: opuntia:shaper(). +-export_type([shaper/0]). + -spec child_spec() -> supervisor:child_spec(). child_spec() -> - WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {mongoose_shaper, #{}}}}], - {mongoose_shaper, - {wpool, start_pool, [mongoose_shaper, WPoolOpts]}, - permanent, infinity, supervisor, [opuntia_srv]}. + WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {?MODULE, #{}}}}], + #{id => ?MODULE, + start => {wpool, start_pool, [?MODULE, WPoolOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor}. --spec get_shaper_rate(atom()) -> number(). -get_shaper_rate(Name) -> - case mongoose_config:lookup_opt([shaper, Name]) of - {ok, #{max_rate := MaxRatePerSecond}} -> - MaxRatePerSecond / 1000; - {error, not_found} -> 0 - end. +-spec new(atom()) -> opuntia:shaper(). +new(Name) -> + opuntia:new(get_shaper_config(Name)). + +-spec update(opuntia:shaper(), opuntia:tokens()) -> {opuntia:shaper(), opuntia:delay()}. +update(Shaper, Tokens) -> + opuntia:update(Shaper, Tokens). %% @doc Shapes the caller from executing the action. -spec wait(HostType :: mongooseim:host_type_or_global(), @@ -25,8 +31,8 @@ get_shaper_rate(Name) -> FromJID :: jid:jid(), Size :: integer()) -> continue | {error, max_delay_reached}. wait(HostType, Domain, Action, FromJID, Size) -> - Worker = wpool_pool:hash_worker(mongoose_shaper, FromJID), - Config = get_shaper_rate(get_shaper_name(HostType, Domain, Action, FromJID)), + Worker = wpool_pool:hash_worker(?MODULE, FromJID), + Config = get_shaper_config(get_shaper_name(HostType, Domain, Action, FromJID)), Key = new_key(Domain, Action, FromJID), opuntia_srv:wait(Worker, Key, Size, Config). @@ -37,7 +43,7 @@ new_key(Domain, Action, FromJID) -> %% @doc Ask all shaper servers to forget current shapers and read settings again reset_all_shapers(_HostType) -> - [ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(mongoose_shaper) ], + [ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(?MODULE) ], ok. -spec get_shaper_name(HostType :: mongooseim:host_type_or_global(), @@ -50,5 +56,15 @@ get_shaper_name(HostType, Domain, Action, FromJID) -> Value -> Value end. +-spec get_shaper_config(atom()) -> number(). +get_shaper_config(Name) -> + case mongoose_config:lookup_opt([shaper, Name]) of + {ok, #{max_rate := MaxRatePerSecond}} -> + Rate = MaxRatePerSecond div 1000, + {MaxRatePerSecond, Rate, millisecond}; + {error, not_found} -> + 0 + end. + default_shaper() -> none. diff --git a/src/mongoose_transport.erl b/src/mongoose_transport.erl index 901da58b42e..46f6d9e42cf 100644 --- a/src/mongoose_transport.erl +++ b/src/mongoose_transport.erl @@ -49,7 +49,7 @@ -record(state, {socket :: socket(), sockmod = gen_tcp :: socket_module(), - shaper_state :: opuntia:shaper(), + shaper_state :: mongoose_shaper:shaper(), dest_pid :: undefined | pid(), %% gen_fsm_compat pid max_stanza_size :: stanza_size(), parser :: exml_stream:parser(), @@ -164,7 +164,7 @@ send_text(SocketData, Data) -> #socket_data{sockmod = SockMod, socket = Socket, connection_type = ConnectionType} = SocketData, case catch SockMod:send(Socket, Data) of - ok -> + ok -> update_transport_metrics(byte_size(Data), sent, ConnectionType), ok; {error, timeout} -> @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) -> gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []). init([Socket, Shaper, Opts]) -> - ShaperState = opuntia:new(mongoose_shaper:get_shaper_rate(Shaper)), + ShaperState = mongoose_shaper:new(Shaper), #{max_stanza_size := MaxStanzaSize, hibernate_after := HibernateAfter, connection_type := ConnectionType} = Opts, @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) -> {reply, ok, State, hibernate_or_timeout(State)}. handle_cast({change_shaper, Shaper}, State) -> - NewShaperState = opuntia:new(mongoose_shaper:get_shaper_rate(Shaper)), + NewShaperState = mongoose_shaper:new(Shaper), NewState = State#state{shaper_state = NewShaperState}, {noreply, NewState, hibernate_or_timeout(NewState)}; handle_cast(close, State) -> @@ -277,7 +277,7 @@ handle_cast(_Msg, State) -> handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) -> NewState = process_data(Data, State), - {noreply, NewState, hibernate_or_timeout(NewState)}; + {noreply, NewState, hibernate_or_timeout(NewState)}; handle_info({Tag, _TCPSocket, Data}, #state{socket = Socket, sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl -> @@ -394,21 +394,21 @@ process_data(Data, #state{parser = Parser, Size = byte_size(Data), {Events, NewParser} = case exml_stream:parse(Parser, Data) of - {ok, NParser, Elems} -> + {ok, NParser, Elems} -> {[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser}; {error, Reason} -> {[{xmlstreamerror, Reason}], Parser} end, - {NewShaperState, Pause} = opuntia:update(ShaperState, Size), + {NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size), update_transport_metrics(Size, received, State#state.connection_type), [gen_fsm_compat:send_event(DestPid, Event) || Event <- Events], maybe_pause(Pause, State), State#state{parser = NewParser, shaper_state = NewShaperState}. -wrap_xml_elements_and_update_metrics(#xmlel{} = E) -> +wrap_xml_elements_and_update_metrics(#xmlel{} = E) -> mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), {xmlstreamelement, E}; -wrap_xml_elements_and_update_metrics(E) -> +wrap_xml_elements_and_update_metrics(E) -> mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), E. diff --git a/src/muc/mod_muc.erl b/src/muc/mod_muc.erl index 32da32d548e..37c01a83de3 100644 --- a/src/muc/mod_muc.erl +++ b/src/muc/mod_muc.erl @@ -137,7 +137,7 @@ access, history_size :: integer(), default_room_opts :: list(), - room_shaper :: opuntia:shaper(), + room_shaper :: mongoose_shaper:shaper(), http_auth_pool :: mongoose_http_client:pool(), hibernated_room_check_interval :: timeout(), hibernated_room_timeout :: timeout() }). @@ -873,7 +873,7 @@ check_user_can_create_room(HostType, ServerHost, AccessCreate, From, RoomID) -> -spec start_room(HostType :: host_type(), ServerHost :: jid:lserver(), MucHost :: muc_host(), Access :: access(), room(), - HistorySize :: undefined | integer(), RoomShaper :: opuntia:shaper(), + HistorySize :: undefined | integer(), RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), From :: jid:jid(), nick(), DefRoomOpts :: undefined | [any()], Acc :: mongoose_acc:t()) -> {error, {failed_to_restore, Reason :: term()}} | {ok, pid()}. diff --git a/src/muc/mod_muc_room.erl b/src/muc/mod_muc_room.erl index 9a511aadea7..bd74029587b 100644 --- a/src/muc/mod_muc_room.erl +++ b/src/muc/mod_muc_room.erl @@ -156,7 +156,7 @@ -spec start_new(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(), Access :: _, Room :: mod_muc:room(), HistorySize :: integer(), - RoomShaper :: opuntia:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), + RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), Creator :: jid:jid(), Nick :: mod_muc:nick(), DefRoomOpts :: list()) -> {ok, pid()}. start_new(HostType, Host, ServerHost, Access, Room, @@ -171,7 +171,7 @@ start_new(HostType, Host, ServerHost, Access, Room, -spec start_restored(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(), Access :: _, Room :: mod_muc:room(), HistorySize :: integer(), - RoomShaper :: opuntia:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), + RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), Opts :: list()) -> {ok, pid()}. start_restored(HostType, Host, ServerHost, Access, Room, HistorySize, RoomShaper, HttpAuthPool, Opts) @@ -305,7 +305,7 @@ init_new(#{init_type := start_new, host_type := HostType, muc_host := Host, http_auth_pool := HttpAuthPool, creator := Creator, nick := _Nick, def_opts := DefRoomOpts}) when is_list(DefRoomOpts) -> process_flag(trap_exit, true), - Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(RoomShaper)), + Shaper = mongoose_shaper:new(RoomShaper), State = #state{host = Host, host_type = HostType, server_host = ServerHost, access = Access, room = Room, @@ -345,7 +345,7 @@ init_restored(#{init_type := start_restored, room_shaper := RoomShaper, http_auth_pool := HttpAuthPool, opts := Opts}) -> process_flag(trap_exit, true), - Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(RoomShaper)), + Shaper = mongoose_shaper:new(RoomShaper), State = set_opts(Opts, #state{host = Host, host_type = HostType, server_host = ServerHost, access = Access, @@ -1545,8 +1545,8 @@ get_user_activity(JID, StateData) -> case treap:lookup(jid:to_lower(JID), StateData#state.activity) of {ok, _P, A} -> A; error -> - MessageShaper = opuntia:new(mongoose_shaper:get_shaper_rate(get_opt(StateData, user_message_shaper))), - PresenceShaper = opuntia:new(mongoose_shaper:get_shaper_rate(get_opt(StateData, user_presence_shaper))), + MessageShaper = mongoose_shaper:new(get_opt(StateData, user_message_shaper)), + PresenceShaper = mongoose_shaper:new(get_opt(StateData, user_presence_shaper)), #activity{message_shaper = MessageShaper, presence_shaper = PresenceShaper} end. @@ -1581,10 +1581,10 @@ store_user_activity(JID, UserActivity, StateData) -> (UserActivity#activity.presence == undefined) of true -> {_, MessageShaperInterval} = - opuntia:update(UserActivity#activity.message_shaper, + mongoose_shaper:update(UserActivity#activity.message_shaper, 100000), {_, PresenceShaperInterval} = - opuntia:update(UserActivity#activity.presence_shaper, + mongoose_shaper:update(UserActivity#activity.presence_shaper, 100000), Delay = lists:max([MessageShaperInterval, PresenceShaperInterval, @@ -1632,7 +1632,7 @@ prepare_room_queue(StateData) -> Packet = Activity#activity.message, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - opuntia:update(StateData#state.room_shaper, Size), + mongoose_shaper:update(StateData#state.room_shaper, Size), erlang:send_after( RoomShaperInterval, self(), process_room_queue), @@ -1643,7 +1643,7 @@ prepare_room_queue(StateData) -> {_Nick, Packet} = Activity#activity.presence, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - opuntia:update(StateData#state.room_shaper, Size), + mongoose_shaper:update(StateData#state.room_shaper, Size), erlang:send_after( RoomShaperInterval, self(), process_room_queue), @@ -4220,7 +4220,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>, Now = os:system_time(microsecond), MinMessageInterval = trunc(get_opt(StateData, min_message_interval) * 1000000), Size = element_size(Packet), - {MessageShaper, MessageShaperInterval} = opuntia:update(Activity#activity.message_shaper, Size), + {MessageShaper, MessageShaperInterval} = mongoose_shaper:update(Activity#activity.message_shaper, Size), case {Activity#activity.message /= undefined, Now >= Activity#activity.message_time + MinMessageInterval, MessageShaperInterval} of @@ -4230,7 +4230,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>, ejabberd_router:route(StateData#state.jid, From, Err), StateData; {false, true, 0} -> - {RoomShaper, RoomShaperInterval} = opuntia:update(StateData#state.room_shaper, Size), + {RoomShaper, RoomShaperInterval} = mongoose_shaper:update(StateData#state.room_shaper, Size), RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), case {RoomShaperInterval, RoomQueueEmpty} of {0, true} -> @@ -4310,7 +4310,7 @@ route_message(#routed_message{from = From, packet = Packet, lang = Lang}, StateData. -spec schedule_queue_processing_when_empty(RoomQueueEmpty :: boolean(), - RoomShaper :: opuntia:shaper(), + RoomShaper :: mongoose_shaper:shaper(), RoomShaperInterval :: non_neg_integer(), StateData :: state()) -> state(). schedule_queue_processing_when_empty(true, RoomShaper, RoomShaperInterval, StateData) ->