diff --git a/include/esockd.hrl b/include/esockd.hrl index 8a4a2f6..7a65363 100644 --- a/include/esockd.hrl +++ b/include/esockd.hrl @@ -71,4 +71,24 @@ -type ssl_option() :: ssl:ssl_option(). -endif. % OTP_RELEASE + +-define(ERROR_MAXLIMIT, maxlimit). + +-define(ARG_ACCEPTED, accepted). +-define(ARG_CLOSED_SYS_LIMIT, closed_sys_limit). +-define(ARG_CLOSED_MAX_LIMIT, closed_max_limit). +-define(ARG_CLOSED_OVERLOADED, closed_overloaded). +-define(ARG_CLOSED_RATE_LIMITED, closed_rate_limited). +-define(ARG_CLOSED_OTHER_REASONS, closed_other_reasons). + +-define(ACCEPT_RESULT_GROUPS, + [ + ?ARG_ACCEPTED, + ?ARG_CLOSED_SYS_LIMIT, + ?ARG_CLOSED_MAX_LIMIT, + ?ARG_CLOSED_OVERLOADED, + ?ARG_CLOSED_RATE_LIMITED, + ?ARG_CLOSED_OTHER_REASONS + ]). + -endif. % ESOCKD_HRL diff --git a/src/esockd.erl b/src/esockd.erl index 4a3e507..bafb581 100644 --- a/src/esockd.erl +++ b/src/esockd.erl @@ -411,7 +411,23 @@ parse_opt([_|Opts], Acc) -> %% @doc System 'ulimit -n' -spec(ulimit() -> pos_integer()). ulimit() -> - proplists:get_value(max_fds, hd(erlang:system_info(check_io))). + find_max_fd(erlang:system_info(check_io)). + +find_max_fd([]) -> + %% Magic! + %% According to Erlang/OTP doc, erlang:system_info(check_io)): + %% Returns a list containing miscellaneous information about the emulators + %% internal I/O checking. Notice that the content of the returned list can + %% vary between platforms and over time. It is only guaranteed that a list + %% is returned. + 1023; +find_max_fd([CheckIoResult | More]) -> + case lists:keyfind(max_fds, 1, CheckIoResult) of + {max_fds, N} when is_integer(N) andalso N > 0 -> + N; + _ -> + find_max_fd(More) + end. -spec(to_string(listen_on()) -> string()). to_string(Port) when is_integer(Port) -> diff --git a/src/esockd_acceptor.erl b/src/esockd_acceptor.erl index 7471953..a1749ae 100644 --- a/src/esockd_acceptor.erl +++ b/src/esockd_acceptor.erl @@ -320,12 +320,13 @@ inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) -> _ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1), ok. -counter(accepted) -> accepted; -counter(emfile) -> closed_sys_limit; -counter(enfile) -> closed_sys_limit; -counter(overloaded) -> closed_overloaded; -counter(rate_limited) -> closed_rate_limited; -counter(_) -> closed_other_reasons. +counter(accepted) -> ?ARG_ACCEPTED; +counter(emfile) -> ?ARG_CLOSED_SYS_LIMIT; +counter(enfile) -> ?ARG_CLOSED_SYS_LIMIT; +counter(?ERROR_MAXLIMIT) -> ?ARG_CLOSED_MAX_LIMIT; +counter(overloaded) -> ?ARG_CLOSED_OVERLOADED; +counter(rate_limited) -> ?ARG_CLOSED_RATE_LIMITED; +counter(_) -> ?ARG_CLOSED_OTHER_REASONS. start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) -> esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns); diff --git a/src/esockd_connection_sup.erl b/src/esockd_connection_sup.erl index 395f3b1..ef4de1b 100644 --- a/src/esockd_connection_sup.erl +++ b/src/esockd_connection_sup.erl @@ -47,6 +47,8 @@ , code_change/3 ]). +-include("esockd.hrl"). + -type(shutdown() :: brutal_kill | infinity | pos_integer()). -type option() :: {shutdown, shutdown()} @@ -61,7 +63,6 @@ mfargs :: esockd:mfargs() }). --define(DEFAULT_MAX_CONNS, 1024). -define(TRANSPORT, esockd_transport). -define(ERROR_MSG(Format, Args), error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])). @@ -149,7 +150,7 @@ call(Sup, Req) -> init(Opts) -> process_flag(trap_exit, true), Shutdown = get_value(shutdown, Opts, brutal_kill), - MaxConns = get_value(max_connections, Opts, ?DEFAULT_MAX_CONNS), + MaxConns = resolve_max_connections(get_value(max_connections, Opts)), RawRules = get_value(access_rules, Opts, [{allow, all}]), AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules], MFA = get_value(connection_mfargs, Opts), @@ -162,7 +163,7 @@ init(Opts) -> handle_call({start_connection, _Sock}, _From, State = #state{curr_connections = Conns, max_connections = MaxConns}) when map_size(Conns) >= MaxConns -> - {reply, {error, maxlimit}, State}; + {reply, {error, ?ERROR_MAXLIMIT}, State}; handle_call({start_connection, Sock}, _From, State = #state{curr_connections = Conns, access_rules = Rules, mfargs = MFA}) -> @@ -207,7 +208,10 @@ handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) -> end catch error:Reason -> - error_logger:error_msg("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]), + logger:log(error, #{msg => "bad_access_rule", + rule => RawRule, + compile_error => Reason + }), {reply, {error, bad_access_rule}, State} end; @@ -283,7 +287,12 @@ get_state_option(connection_mfargs, #state{mfargs = MFA}) -> MFA. set_state_option({max_connections, MaxConns}, State) -> - State#state{max_connections = MaxConns}; + case resolve_max_connections(MaxConns) of + MaxConns -> + State#state{max_connections = MaxConns}; + _ -> + {error, bad_max_connections} + end; set_state_option({shutdown, Shutdown}, State) -> State#state{shutdown = Shutdown}; set_state_option({access_rules, Rules}, State) -> @@ -453,3 +462,27 @@ log(Level, Error, Reason, Pid, #state{mfargs = MFA}) -> get_module({M, _F, _A}) -> M; get_module({M, _F}) -> M; get_module(M) -> M. + +resolve_max_connections(Desired) -> + MaxFds = esockd:ulimit(), + MaxProcs = erlang:system_info(process_limit), + resolve_max_connections(Desired, MaxFds, MaxProcs). + +resolve_max_connections(undefined, MaxFds, MaxProcs) -> + %% not configured + min(MaxFds, MaxProcs); +resolve_max_connections(Desired, MaxFds, MaxProcs) when is_integer(Desired) -> + Res = lists:min([Desired, MaxFds, MaxProcs]), + case Res < Desired of + true -> + logger:log(error, + #{msg => "max_connections_config_ignored", + max_fds => MaxFds, + max_processes => MaxProcs, + desired => Desired + } + ); + false -> + ok + end, + Res. diff --git a/src/esockd_server.erl b/src/esockd_server.erl index 20572ba..21629e2 100644 --- a/src/esockd_server.erl +++ b/src/esockd_server.erl @@ -46,6 +46,8 @@ , code_change/3 ]). +-include("esockd.hrl"). + -record(state, { listener_props :: #{esockd:listener_ref() => #{_Name => _Value}} }). @@ -97,12 +99,7 @@ del_stats({Protocol, ListenOn}) -> -spec ensure_stats({atom(), esockd:listen_on()}) -> ok. ensure_stats(StatsKey) -> - Stats = [accepted, - closed_sys_limit, - closed_overloaded, - closed_rate_limited, - closed_other_reasons], - ok = ?MODULE:init_stats(StatsKey, Stats), + ok = ?MODULE:init_stats(StatsKey, ?ACCEPT_RESULT_GROUPS), ok. -spec get_listener_prop(esockd:listener_ref(), _Name) -> _Value | undefined. diff --git a/test/esockd_SUITE.erl b/test/esockd_SUITE.erl index d7746c3..5f2b63d 100644 --- a/test/esockd_SUITE.erl +++ b/test/esockd_SUITE.erl @@ -222,9 +222,22 @@ t_get_set_max_connections(_) -> ?assertEqual(16, esockd:get_max_connections({udp_echo, 7000})), ok = esockd:close(udp_echo, 7000). +t_get_set_invalid_max_connections(_) -> + MaxFd = esockd:ulimit(), + MaxProcs = erlang:system_info(process_limit), + Invalid = max(MaxFd, MaxProcs) + 1, + {ok, _LSup} = esockd:open(echo, 7000, [{connection_mfargs, echo_server}]), + Expected = min(MaxFd, MaxProcs), + ?assertEqual(Expected, esockd:get_max_connections({echo, 7000})), + esockd:set_max_connections({echo, 7000}, 2), + ?assertEqual(2, esockd:get_max_connections({echo, 7000})), + esockd:set_max_connections({echo, 7000}, Invalid), + ?assertEqual(2, esockd:get_max_connections({echo, 7000})), + ok = esockd:close(echo, 7000). + t_get_set_max_conn_rate(_) -> LimiterOpt = #{module => esockd_limiter, capacity => 100, interval => 1}, - {ok, _LSup} = esockd:open(echo, 7000, + {ok, _LSup} = esockd:open(echo, 7000, [{limiter, LimiterOpt}, {connection_mfargs, echo_server}]), ?assertEqual({100, 1}, esockd:get_max_conn_rate({echo, 7000})), esockd:set_max_conn_rate({echo, 7000}, LimiterOpt#{capacity := 50, interval := 2}), diff --git a/test/esockd_acceptor_SUITE.erl b/test/esockd_acceptor_SUITE.erl index f76604c..275d4ee 100644 --- a/test/esockd_acceptor_SUITE.erl +++ b/test/esockd_acceptor_SUITE.erl @@ -28,11 +28,13 @@ -define(COUNTER_OVERLOADED, 2). -define(COUNTER_RATE_LIMITED, 3). -define(COUNTER_SYS_LIMIT, 4). --define(COUNTER_OTHER_REASONS, 5). +-define(COUNTER_MAX_LIMIT, 5). +-define(COUNTER_OTHER_REASONS, 6). -define(COUNTER_LAST, 10). counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED; counter_tag_to_index(closed_sys_limit) -> ?COUNTER_SYS_LIMIT; +counter_tag_to_index(closed_max_limit) -> ?COUNTER_MAX_LIMIT; counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED; counter_tag_to_index(closed_rate_limited) -> ?COUNTER_RATE_LIMITED; counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONS. @@ -90,7 +92,7 @@ connect(Port, Timeout, Opts0) -> %% This is the very basic test, if this fails, nothing elese matters. t_normal(Config) -> Port = ?PORT, - Server = start(Port, no_limit()), + Server = start(Port, no_rate_limit()), {ok, ClientSock} = connect(Port), try ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000) @@ -129,7 +131,7 @@ t_rate_limitted(Config) -> %% Failed to spawn new connection process t_error_when_spawn(Config) -> Port = ?PORT, - Server = start(Port, no_limit(), #{start_connection_result => {error, overloaded}}), + Server = start(Port, no_rate_limit(), #{start_connection_result => {error, overloaded}}), {ok, Sock1} = connect(Port), try ok = wait_for_counter(Config, ?COUNTER_OVERLOADED, 1, 2000), @@ -141,7 +143,7 @@ t_error_when_spawn(Config) -> %% Failed to tune the socket opts t_einval(Config) -> Port = ?PORT, - Server = start(Port, no_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}), + Server = start(Port, no_rate_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}), {ok, Sock1} = connect(Port), try ok = wait_for_counter(Config, ?COUNTER_OTHER_REASONS, 1, 2000), @@ -157,7 +159,7 @@ t_sys_limit(Config) -> meck:new(prim_inet, [passthrough, no_history, unstick]), meck:expect(prim_inet, async_accept, fun(_, _) -> {error, emfile} end), Port = ?PORT, - Server = start(Port, no_limit()), + Server = start(Port, no_rate_limit()), try %% acceptor to enter suspending state after started %% because async_accept always returns {error, emfile} @@ -178,9 +180,21 @@ t_sys_limit(Config) -> stop(Server) end. +%% Failed to spawn new connection process +t_max_limit(Config) -> + Port = ?PORT, + Server = start(Port, no_rate_limit(), #{start_connection_result => {error, ?ERROR_MAXLIMIT}}), + {ok, Sock1} = connect(Port), + try + ok = wait_for_counter(Config, ?COUNTER_MAX_LIMIT, 1, 2000), + disconnect(Sock1) + after + stop(Server) + end. + t_close_listener_socket_cause_acceptor_stop(_Config) -> Port = ?PORT, - #{acceptor := Acceptor, lsock := LSock} = start(Port, no_limit()), + #{acceptor := Acceptor, lsock := LSock} = start(Port, no_rate_limit()), Mref = monitor(process, Acceptor), unlink(Acceptor), unlink(LSock), @@ -239,8 +253,8 @@ pause_then_allow(Pause) -> }. %% make a no-limit limiter -no_limit() -> - #{module => ?MODULE, name => no_limit}. +no_rate_limit() -> + #{module => ?MODULE, name => no_rate_limit}. %% limiter callback consume(_Token, #{name := pause_then_allow} = Limiter) -> @@ -250,7 +264,7 @@ consume(_Token, #{name := pause_then_allow} = Limiter) -> #{current := allow} -> {ok, Limiter} end; -consume(_Token, #{name := no_limit} = Limiter) -> +consume(_Token, #{name := no_rate_limit} = Limiter) -> {ok, Limiter}. now_ts() -> erlang:system_time(millisecond).