Skip to content

Commit

Permalink
Merge pull request #183 from keynslug/feat/EMQX-11527/set-opts
Browse files Browse the repository at this point in the history
feat(listener): support changing options on the fly
  • Loading branch information
keynslug authored Dec 18, 2023
2 parents 5cb22a8 + 4dbb63a commit 36a0b25
Show file tree
Hide file tree
Showing 20 changed files with 773 additions and 363 deletions.
52 changes: 42 additions & 10 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

-export([ get_stats/1
, get_options/1
, set_options/2
, get_acceptors/1
]).

Expand Down Expand Up @@ -79,6 +80,7 @@
, mfargs/0
, option/0
, listen_on/0
, listener_ref/0
]).

-type(proto() :: atom()).
Expand Down Expand Up @@ -106,6 +108,7 @@
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
-type ssl_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
-type dtls_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
-type listener_ref() :: {proto(), listen_on()}.

%%--------------------------------------------------------------------
%% APIs
Expand Down Expand Up @@ -210,7 +213,15 @@ get_stats({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Get options
-spec(get_options({atom(), listen_on()}) -> options()).
get_options({Proto, ListenOn}) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, []).

%% @doc Set applicable options
%% If some options could not be set, either because they are not applicable or
%% because they require a listener restart, function returns an error.
-spec set_options({atom(), listen_on()}, options()) ->
{ok, options()} | {error, _TODO}.
set_options({Proto, ListenOn}, Options) when is_atom(Proto) ->
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Options]).

%% @doc Get acceptors number
-spec(get_acceptors({atom(), listen_on()}) -> pos_integer()).
Expand All @@ -225,7 +236,7 @@ get_max_connections({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Set max connections
-spec(set_max_connections({atom(), listen_on()}, pos_integer()) -> ok).
set_max_connections({Proto, ListenOn}, MaxConns) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).

%% @doc Set max connection rate
-spec(get_max_conn_rate({atom(), listen_on()}) -> conn_limit()).
Expand All @@ -235,7 +246,7 @@ get_max_conn_rate({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Set max connection rate
-spec(set_max_conn_rate({atom(), listen_on()}, conn_limit()) -> ok).
set_max_conn_rate({Proto, ListenOn}, Opt) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn, Opt]).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Opt]).

%% @doc Get current connections
-spec(get_current_connections({atom(), listen_on()}) -> non_neg_integer()).
Expand Down Expand Up @@ -268,13 +279,26 @@ deny({Proto, ListenOn}, CIDR) when is_atom(Proto) ->
%% @doc Merge two options
-spec(merge_opts(proplists:proplist(), proplists:proplist())
-> proplists:proplist()).
merge_opts(Defaults, Options) ->
lists:foldl(
fun({Opt, Val}, Acc) ->
lists:keystore(Opt, 1, Acc, {Opt, Val});
(Opt, Acc) ->
lists:usort([Opt | Acc])
end, Defaults, Options).
merge_opts(Opts1, Opts2) ->
squash_opts(Opts1 ++ Opts2).

squash_opts([{Name, Value} | Rest]) ->
Overrides = proplists:get_all_values(Name, Rest),
Merged = lists:foldl(fun(O, V) -> merge_opt(Name, V, O) end, Value, Overrides),
make_opt(Name, Merged) ++ squash_opts(proplists:delete(Name, Rest));
squash_opts([Name | Rest]) when is_atom(Name) ->
[Name | squash_opts([Opt || Opt <- Rest, Opt =/= Name])];
squash_opts([]) ->
[].

make_opt(_Name, undefined) -> [];
make_opt(Name, Value) -> [{Name, Value}].

merge_opt(ssl_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
merge_opt(tcp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
merge_opt(udp_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
merge_opt(dtls_options, Opts1, Opts2) -> merge_opts(Opts1, Opts2);
merge_opt(_, _Opt1, Opt2) -> Opt2.

%% @doc Parse option.
parse_opt(Options) ->
Expand Down Expand Up @@ -354,3 +378,11 @@ with_listener({Proto, ListenOn}, Fun, Args) ->
{LSup, Mod} ->
erlang:apply(Mod, Fun, [LSup | Args])
end.

with_listener_ref(ListenerRef = {Proto, ListenOn}, Fun, Args) ->
case esockd_sup:listener_and_module({Proto, ListenOn}) of
undefined ->
error(not_found);
{LSup, Mod} ->
erlang:apply(Mod, Fun, [ListenerRef, LSup | Args])
end.
20 changes: 11 additions & 9 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
-include("esockd.hrl").

-export([
start_link/7,
set_conn_limiter/2
start_link/7
]).

%% state callbacks
Expand Down Expand Up @@ -98,16 +97,13 @@ start_link(
[]
).

-spec set_conn_limiter(pid(), esockd_generic_limiter:limiter()) -> ok.
set_conn_limiter(Acceptor, Limiter) ->
gen_statem:call(Acceptor, {set_conn_limiter, Limiter}, 5000).

%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
callback_mode() -> handle_event_function.

init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
_ = erlang:process_flag(trap_exit, true),
_ = rand:seed(exsplus, erlang:timestamp()),
{ok, Sockname} = inet:sockname(LSock),
{ok, SockMod} = inet_db:lookup_socket(LSock),
Expand Down Expand Up @@ -197,8 +193,6 @@ handle_event(state_timeout, {token_request, _} = Content, suspending, State) ->
{next_state, token_request, State, {next_event, internal, Content}};
handle_event(state_timeout, begin_waiting, suspending, State) ->
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event({call, From}, {set_conn_limiter, Limiter}, _, State) ->
{keep_state, State#state{conn_limiter = Limiter}, {reply, From, ok}};
handle_event(
info,
{inet_async, LSock, Ref, {error, Reason}},
Expand All @@ -215,6 +209,8 @@ handle_event(Type, Content, StateName, _) ->

terminate(normal, _StateName, #state{}) ->
ok;
terminate(shutdown, _StateName, #state{}) ->
ok;
terminate(Reason, _StateName, #state{}) ->
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
ok.
Expand All @@ -226,7 +222,13 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%% Internal funcs
%%--------------------------------------------------------------------

close(Sock) -> catch port_close(Sock).
close(Sock) ->
try
true = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
error:_ -> ok
end.

eval_tune_socket_fun({Fun, Args1}, Sock) ->
apply(Fun, [Sock | Args1]).
Expand Down
106 changes: 94 additions & 12 deletions src/esockd_acceptor_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,59 @@

-behaviour(supervisor).

-export([start_link/6]).
-export([ start_supervised/1
]).

-export([ start_acceptor/2
-export([ start_acceptors/2
, start_acceptor/2
, count_acceptors/1
]).

%% Supervisor callbacks
-export([init/1]).

%% callbacks
-export([tune_socket/2]).

-define(ACCEPTOR_POOL, 16).

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------

%% @doc Start Acceptor Supervisor.
-spec(start_link(atom(), esockd:listen_on(), pid(),
esockd:sock_fun(), [esockd:sock_fun()], esockd_generic_limiter:limiter())
-> {ok, pid()}).
start_link(Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter) ->
supervisor:start_link(?MODULE, [Proto, ListenOn, ConnSup,
TuneFun, UpgradeFuns, Limiter]).
-spec start_supervised(esockd:listener_ref()) -> {ok, pid()}.
start_supervised(ListenerRef = {Proto, ListenOn}) ->
Type = esockd_server:get_listener_prop(ListenerRef, type),
Opts = esockd_server:get_listener_prop(ListenerRef, options),
TuneFun = tune_socket_fun(Opts),
UpgradeFuns = upgrade_funs(Type, Opts),
LimiterOpts = esockd_listener_sup:conn_limiter_opts(Opts, {listener, Proto, ListenOn}),
Limiter = esockd_listener_sup:conn_rate_limiter(LimiterOpts),
AcceptorMod = case Type of
dtls -> esockd_dtls_acceptor;
_ -> esockd_acceptor
end,
ConnSup = esockd_server:get_listener_prop(ListenerRef, connection_sup),
AcceptorArgs = [Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter],
case supervisor:start_link(?MODULE, {AcceptorMod, AcceptorArgs}) of
{ok, Pid} ->
_ = esockd_server:set_listener_prop(ListenerRef, acceptor_sup, Pid),
{ok, Pid};
{error, _} = Error ->
Error
end.

%% @doc Start acceptors.
-spec start_acceptors(esockd:listener_ref(), inet:socket()) -> ok.
start_acceptors(ListenerRef, LSock) ->
Opts = esockd_server:get_listener_prop(ListenerRef, options),
AcceptorNum = proplists:get_value(acceptors, Opts, ?ACCEPTOR_POOL),
AcceptorSup = esockd_server:get_listener_prop(ListenerRef, acceptor_sup),
lists:foreach(
fun(_) -> {ok, _} = start_acceptor(AcceptorSup, LSock) end,
lists:seq(1, AcceptorNum)
).

%% @doc Start a acceptor.
-spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | {error, term()}).
Expand All @@ -53,17 +86,66 @@ count_acceptors(AcceptorSup) ->
%% Supervisor callbacks
%%--------------------------------------------------------------------

init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]) ->
init({AcceptorMod, AcceptorArgs}) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 100000,
period => 1
},
Acceptor = #{id => acceptor,
start => {esockd_acceptor, start_link,
[Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]},
start => {AcceptorMod, start_link, AcceptorArgs},
restart => transient,
shutdown => 1000,
type => worker,
modules => [esockd_acceptor]
modules => [AcceptorMod]
},
{ok, {SupFlags, [Acceptor]}}.

%%--------------------------------------------------------------------
%% Internal functions
%% -------------------------------------------------------------------

tune_socket_fun(Opts) ->
TuneOpts = [ {tune_buffer, proplists:get_bool(tune_buffer, Opts)}
%% optional callback, returns ok | {error, Reason}
, {tune_fun, proplists:get_value(tune_fun, Opts, undefined)}],
{fun ?MODULE:tune_socket/2, [TuneOpts]}.

upgrade_funs(Type, Opts) ->
lists:append([proxy_upgrade_fun(Opts), ssl_upgrade_fun(Type, Opts)]).

proxy_upgrade_fun(Opts) ->
case proplists:get_bool(proxy_protocol, Opts) of
false -> [];
true -> [esockd_transport:proxy_upgrade_fun(Opts)]
end.

ssl_upgrade_fun(Type, Opts) ->
Key = case Type of
dtls -> dtls_options;
_ -> ssl_options
end,
case proplists:get_value(Key, Opts) of
undefined -> [];
SslOpts -> [esockd_transport:ssl_upgrade_fun(SslOpts)]
end.

tune_socket(Sock, []) ->
{ok, Sock};
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
_ = esockd_transport:setopts(Sock, [{buffer, BufSz}]),
tune_socket(Sock, More);
Error -> Error
end;
tune_socket(Sock, [{tune_fun, undefined} | More]) ->
tune_socket(Sock, More);
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
case apply(M, F, A) of
ok ->
tune_socket(Sock, More);
Error -> Error
end;
tune_socket(Sock, [_|More]) ->
tune_socket(Sock, More).
Loading

0 comments on commit 36a0b25

Please sign in to comment.