Skip to content

Commit

Permalink
feat(listener): support changing options on the fly
Browse files Browse the repository at this point in the history
Also make sure that any occasoinal restarts will not erase any option
changes.
  • Loading branch information
keynslug committed Dec 12, 2023
1 parent 5cb22a8 commit c60472a
Show file tree
Hide file tree
Showing 13 changed files with 512 additions and 261 deletions.
25 changes: 22 additions & 3 deletions src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

-export([ get_stats/1
, get_options/1
, set_options/2
, get_acceptors/1
]).

Expand Down Expand Up @@ -79,6 +80,7 @@
, mfargs/0
, option/0
, listen_on/0
, listener_ref/0
]).

-type(proto() :: atom()).
Expand Down Expand Up @@ -106,6 +108,7 @@
-type(listen_on() :: inet:port_number() | {host(), inet:port_number()}).
-type ssl_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
-type dtls_options() :: [{handshake_timeout, pos_integer()} | ssl_option()].
-type listener_ref() :: {proto(), listen_on()}.

%%--------------------------------------------------------------------
%% APIs
Expand Down Expand Up @@ -210,7 +213,15 @@ get_stats({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Get options
-spec(get_options({atom(), listen_on()}) -> options()).
get_options({Proto, ListenOn}) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, []).

%% @doc Set applicable options
%% If some options could not be set, either because they are not applicable or
%% because they require a listener restart, function returns an error.
-spec set_options({atom(), listen_on()}, options()) ->
{ok, options()} | {error, _TODO}.
set_options({Proto, ListenOn}, Options) when is_atom(Proto) ->
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Options]).

%% @doc Get acceptors number
-spec(get_acceptors({atom(), listen_on()}) -> pos_integer()).
Expand All @@ -225,7 +236,7 @@ get_max_connections({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Set max connections
-spec(set_max_connections({atom(), listen_on()}, pos_integer()) -> ok).
set_max_connections({Proto, ListenOn}, MaxConns) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [MaxConns]).

%% @doc Set max connection rate
-spec(get_max_conn_rate({atom(), listen_on()}) -> conn_limit()).
Expand All @@ -235,7 +246,7 @@ get_max_conn_rate({Proto, ListenOn}) when is_atom(Proto) ->
%% @doc Set max connection rate
-spec(set_max_conn_rate({atom(), listen_on()}, conn_limit()) -> ok).
set_max_conn_rate({Proto, ListenOn}, Opt) when is_atom(Proto) ->
with_listener({Proto, ListenOn}, ?FUNCTION_NAME, [Proto, ListenOn, Opt]).
with_listener_ref({Proto, ListenOn}, ?FUNCTION_NAME, [Opt]).

%% @doc Get current connections
-spec(get_current_connections({atom(), listen_on()}) -> non_neg_integer()).
Expand Down Expand Up @@ -354,3 +365,11 @@ with_listener({Proto, ListenOn}, Fun, Args) ->
{LSup, Mod} ->
erlang:apply(Mod, Fun, [LSup | Args])
end.

with_listener_ref(ListenerRef = {Proto, ListenOn}, Fun, Args) ->
case esockd_sup:listener_and_module({Proto, ListenOn}) of
undefined ->
error(not_found);
{LSup, Mod} ->
erlang:apply(Mod, Fun, [ListenerRef, LSup | Args])
end.
3 changes: 3 additions & 0 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ set_conn_limiter(Acceptor, Limiter) ->
callback_mode() -> handle_event_function.

init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter, LSock]) ->
_ = erlang:process_flag(trap_exit, true),
_ = rand:seed(exsplus, erlang:timestamp()),
{ok, Sockname} = inet:sockname(LSock),
{ok, SockMod} = inet_db:lookup_socket(LSock),
Expand Down Expand Up @@ -215,6 +216,8 @@ handle_event(Type, Content, StateName, _) ->

terminate(normal, _StateName, #state{}) ->
ok;
terminate(shutdown, _StateName, #state{}) ->
ok;
terminate(Reason, _StateName, #state{}) ->
error_logger:error_msg("~p terminating due to ~p", [?MODULE, Reason]),
ok.
Expand Down
111 changes: 103 additions & 8 deletions src/esockd_acceptor_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@

-behaviour(supervisor).

-export([start_link/6]).
-export([ start_link/6
, start_supervised/1
]).

-export([ start_acceptor/2
-export([ start_acceptors/2
, start_acceptor/2
, count_acceptors/1
]).

%% Supervisor callbacks
-export([init/1]).

%% callbacks
-export([tune_socket/2]).

-define(ACCEPTOR_POOL, 16).

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
Expand All @@ -36,8 +44,46 @@
esockd:sock_fun(), [esockd:sock_fun()], esockd_generic_limiter:limiter())
-> {ok, pid()}).
start_link(Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter) ->
supervisor:start_link(?MODULE, [Proto, ListenOn, ConnSup,
TuneFun, UpgradeFuns, Limiter]).
supervisor:start_link(?MODULE, { esockd_acceptor
, [Proto, ListenOn, ConnSup,
TuneFun, UpgradeFuns, Limiter]}).

%% @doc Start Acceptor Supervisor.
-spec start_supervised(esockd:listener_ref()) -> {ok, pid()}.
start_supervised(ListenerRef = {Proto, ListenOn}) ->
Type = esockd_server:get_listener_prop(ListenerRef, type),
Opts = esockd_server:get_listener_prop(ListenerRef, options),
TuneFun = tune_socket_fun(Opts),
UpgradeFuns = upgrade_funs(Type, Opts),
LimiterOpts = esockd_listener_sup:conn_limiter_opts(Opts, {listener, Proto, ListenOn}),
Limiter = esockd_listener_sup:conn_rate_limiter(LimiterOpts),
AcceptorMod = case Type of
dtls -> esockd_dtls_acceptor;
_ -> esockd_acceptor
end,
ConnSup = esockd_server:get_listener_prop(ListenerRef, connection_sup),
AcceptorArgs = [Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter],
case supervisor:start_link(?MODULE, {AcceptorMod, AcceptorArgs}) of
{ok, Pid} ->
_ = esockd_server:set_listener_prop(ListenerRef, acceptor_sup, Pid),
{ok, Pid};
{error, _} = Error ->
Error
end.

%% @doc Start acceptors.
-spec start_acceptors(esockd:listener_ref(), inet:socket()) -> ok.
start_acceptors(ListenerRef, LSock) ->
Opts = esockd_server:get_listener_prop(ListenerRef, options),
AcceptorNum = proplists:get_value(acceptors, Opts, ?ACCEPTOR_POOL),
AcceptorSup = esockd_server:get_listener_prop(ListenerRef, acceptor_sup),
lists:foreach(
fun(_) ->
%% FIXME
{ok, _} = supervisor:start_child(AcceptorSup, [LSock])
end,
lists:seq(1, AcceptorNum)
).

%% @doc Start a acceptor.
-spec(start_acceptor(pid(), inet:socket()) -> {ok, pid()} | {error, term()}).
Expand All @@ -53,17 +99,66 @@ count_acceptors(AcceptorSup) ->
%% Supervisor callbacks
%%--------------------------------------------------------------------

init([Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]) ->
init({AcceptorMod, AcceptorArgs}) ->
SupFlags = #{strategy => simple_one_for_one,
intensity => 100000,
period => 1
},
Acceptor = #{id => acceptor,
start => {esockd_acceptor, start_link,
[Proto, ListenOn, ConnSup, TuneFun, UpgradeFuns, Limiter]},
start => {AcceptorMod, start_link, AcceptorArgs},
restart => transient,
shutdown => 1000,
type => worker,
modules => [esockd_acceptor]
modules => [AcceptorMod]
},
{ok, {SupFlags, [Acceptor]}}.

%%--------------------------------------------------------------------
%% Internal functions
%% -------------------------------------------------------------------

tune_socket_fun(Opts) ->
TuneOpts = [ {tune_buffer, proplists:get_bool(tune_buffer, Opts)}
%% optional callback, returns ok | {error, Reason}
, {tune_fun, proplists:get_value(tune_fun, Opts, undefined)}],
{fun ?MODULE:tune_socket/2, [TuneOpts]}.

upgrade_funs(Type, Opts) ->
lists:append([proxy_upgrade_fun(Opts), ssl_upgrade_fun(Type, Opts)]).

proxy_upgrade_fun(Opts) ->
case proplists:get_bool(proxy_protocol, Opts) of
false -> [];
true -> [esockd_transport:proxy_upgrade_fun(Opts)]
end.

ssl_upgrade_fun(Type, Opts) ->
Key = case Type of
dtls -> dtls_options;
_ -> ssl_options
end,
case proplists:get_value(Key, Opts) of
undefined -> [];
SslOpts -> [esockd_transport:ssl_upgrade_fun(SslOpts)]
end.

tune_socket(Sock, []) ->
{ok, Sock};
tune_socket(Sock, [{tune_buffer, true}|More]) ->
case esockd_transport:getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} ->
BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
_ = esockd_transport:setopts(Sock, [{buffer, BufSz}]),
tune_socket(Sock, More);
Error -> Error
end;
tune_socket(Sock, [{tune_fun, undefined} | More]) ->
tune_socket(Sock, More);
tune_socket(Sock, [{tune_fun, {M, F, A}} | More]) ->
case apply(M, F, A) of
ok ->
tune_socket(Sock, More);
Error -> Error
end;
tune_socket(Sock, [_|More]) ->
tune_socket(Sock, More).
94 changes: 74 additions & 20 deletions src/esockd_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@

-import(proplists, [get_value/3]).

-export([start_link/2, stop/1]).
-export([start_link/2, start_supervised/2, stop/1]).

-export([ start_connection/3
, count_connections/1
]).

-export([ get_max_connections/1
, set_max_connections/2
, get_shutdown_count/1
, get_options/1
, set_options/2
]).

-export([get_shutdown_count/1]).

%% Allow, Deny
-export([ access_rules/1
, allow/2
Expand All @@ -49,6 +49,10 @@

-type(shutdown() :: brutal_kill | infinity | pos_integer()).

-type option() :: {shutdown, shutdown()}
| {max_connections, pos_integer()}
| {access_rules, list()}.

-record(state, {
curr_connections :: map(),
max_connections :: pos_integer(),
Expand All @@ -68,9 +72,29 @@
start_link(Opts, MFA) ->
gen_server:start_link(?MODULE, [Opts, MFA], []).

-spec start_supervised(esockd:listener_ref(), esockd:mfargs())
-> {ok, pid()} | ignore | {error, term()}.
start_supervised(ListenerRef, MFA) ->
Opts = esockd_server:get_listener_prop(ListenerRef, options),
case start_link(Opts, MFA) of
{ok, Pid} ->
_ = esockd_server:set_listener_prop(ListenerRef, connection_sup, Pid),
{ok, Pid};
{error, _} = Error ->
Error
end.

-spec(stop(pid()) -> ok).
stop(Pid) -> gen_server:stop(Pid).

-spec get_options(pid()) -> [option()].
get_options(Pid) ->
call(Pid, get_options).

-spec set_options(pid(), [option()]) -> ok | {error, _Reason}.
set_options(Pid, Opts) ->
call(Pid, {set_options, Opts}).

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -102,20 +126,16 @@ start_connection_proc({M, F, Args}, Sock) when is_atom(M), is_atom(F), is_list(A
count_connections(Sup) ->
call(Sup, count_connections).

-spec(get_max_connections(pid()) -> integer()).
-spec get_max_connections(pid()) -> pos_integer().
get_max_connections(Sup) when is_pid(Sup) ->
call(Sup, get_max_connections).

-spec(set_max_connections(pid(), integer()) -> ok).
set_max_connections(Sup, MaxConns) when is_pid(Sup) ->
call(Sup, {set_max_connections, MaxConns}).
proplists:get_value(max_connections, get_options(Sup)).

-spec(get_shutdown_count(pid()) -> [{atom(), integer()}]).
get_shutdown_count(Sup) ->
call(Sup, get_shutdown_count).

access_rules(Sup) ->
call(Sup, access_rules).
proplists:get_value(access_rules, get_options(Sup)).

allow(Sup, CIDR) ->
call(Sup, {add_rule, {allow, CIDR}}).
Expand Down Expand Up @@ -175,19 +195,10 @@ handle_call({start_connection, Sock}, _From,
handle_call(count_connections, _From, State = #state{curr_connections = Conns}) ->
{reply, maps:size(Conns), State};

handle_call(get_max_connections, _From, State = #state{max_connections = MaxConns}) ->
{reply, MaxConns, State};

handle_call({set_max_connections, MaxConns}, _From, State) ->
{reply, ok, State#state{max_connections = MaxConns}};

handle_call(get_shutdown_count, _From, State) ->
Counts = [{Reason, Count} || {{shutdown_count, Reason}, Count} <- get()],
{reply, Counts, State};

handle_call(access_rules, _From, State = #state{access_rules = Rules}) ->
{reply, [raw(Rule) || Rule <- Rules], State};

handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
try esockd_access:compile(RawRule) of
Rule ->
Expand All @@ -203,6 +214,22 @@ handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
{reply, {error, bad_access_rule}, State}
end;

handle_call(get_options, _From, State) ->
Options = [
{shutdown, get_state_option(shutdown, State)},
{max_connections, get_state_option(max_connections, State)},
{access_rules, get_state_option(access_rules, State)}
],
{reply, Options, State};

handle_call({set_options, Options}, _From, State) ->
case set_state_options(Options, State) of
NState = #state{} ->
{reply, ok, NState};
{error, Reason} ->
{reply, {error, Reason}, State}
end;

%% mimic the supervisor's which_children reply
handle_call(which_children, _From, State = #state{curr_connections = Conns, mfargs = MFA}) ->
Mod = get_module(MFA),
Expand Down Expand Up @@ -248,6 +275,33 @@ allowed(Addr, Rules) ->
{matched, deny} -> false
end.

get_state_option(max_connections, #state{max_connections = MaxConnections}) ->
MaxConnections;
get_state_option(shutdown, #state{shutdown = Shutdown}) ->
Shutdown;
get_state_option(access_rules, #state{access_rules = Rules}) ->
[raw(Rule) || Rule <- Rules].

set_state_option({max_connections, MaxConns}, State) ->
State#state{max_connections = MaxConns};
set_state_option({shutdown, Shutdown}, State) ->
State#state{shutdown = Shutdown};
set_state_option({access_rules, Rules}, State) ->
try
CompiledRules = [esockd_access:compile(Rule) || Rule <- Rules],
State#state{access_rules = CompiledRules}
catch
error:_Reason -> {error, bad_access_rules}
end;
set_state_option(_, State) ->
State.

set_state_options(Options, State) ->
lists:foldl(fun
(Option, St = #state{}) -> set_state_option(Option, St);
(_, Error) -> Error
end, State, Options).

raw({allow, CIDR = {_Start, _End, _Len}}) ->
{allow, esockd_cidr:to_string(CIDR)};
raw({deny, CIDR = {_Start, _End, _Len}}) ->
Expand Down
Loading

0 comments on commit c60472a

Please sign in to comment.