Skip to content

Commit

Permalink
refactor socket mod to avoid restart when listener goes down
Browse files Browse the repository at this point in the history
bump acceptor pool for more fixes

upgrading acceptor_pool dep
  • Loading branch information
andymck authored and jeffgrunewald committed May 16, 2022
1 parent 369f556 commit 9ed629c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 37 deletions.
9 changes: 5 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{erl_opts, [debug_info]}.

{deps, [
{chatterbox, ".*", {git, "https://github.com/andymck/chatterbox", {branch, "andymck/fix-trailers-close-race-condition"}}},
{chatterbox, ".*", {git, "https://github.com/novalabsxyz/chatterbox", {branch, "master"}}},
ctx,
acceptor_pool,
gproc]}.
{acceptor_pool, {git, "https://github.com/novalabsxyz/acceptor_pool", {branch, "master"}}},
gproc
]}.

{grpc, [{protos, ["proto"]},
{service_modules, [{'grpc.health.v1.Health', "grpcbox_health"},
Expand Down Expand Up @@ -49,7 +50,7 @@
deprecated_function_calls, deprecated_functions]}.

{project_plugins, [covertool,
{grpcbox_plugin, {git, "https://github.com/andymck/grpcbox_plugin.git",{branch, "master"}}},
{grpcbox_plugin, {git, "https://github.com/novalabsxyz/grpcbox_plugin.git",{branch, "master"}}},
rebar3_lint]}.

{cover_enabled, true}.
Expand Down
11 changes: 6 additions & 5 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
{"1.2.0",
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},0},
[{<<"acceptor_pool">>,
{git,"https://github.com/novalabsxyz/acceptor_pool",
{ref,"56d676e00c11fd071a6bcc4059e3454960900af7"}},
0},
{<<"chatterbox">>,
{git,"https://github.com/andymck/chatterbox",
{ref,"9055e611d509eec2f182bbf52a54ec821b17ac59"}},
{git,"https://github.com/novalabsxyz/chatterbox",
{ref,"cbfe6e46b273f1552b57685c9f6daf710473c609"}},
0},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1}]}.
[
{pkg_hash,[
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]},
{pkg_hash_ext,[
{<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>},
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]}
Expand Down
80 changes: 52 additions & 28 deletions src/grpcbox_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,62 @@
code_change/3,
terminate/2]).

%% public api
-record(state, {
pool,
listen_opts,
pool_opts,
socket,
mref
}).

%% public api
start_link(Pool, ListenOpts, AcceptorOpts) ->
gen_server:start_link(?MODULE, [Pool, ListenOpts, AcceptorOpts], []).

%% gen_server api

init([Pool, ListenOpts, PoolOpts]) ->
{ok, #state{pool = Pool, pool_opts = PoolOpts, listen_opts = ListenOpts}, 0}.

handle_call(Req, _, State) ->
{stop, {bad_call, Req}, State}.

handle_cast(Req, State) ->
{stop, {bad_cast, Req}, State}.
handle_info(timeout, State) ->
case start_listener(State) of
{ok, {Socket, MRef}} ->
{noreply, State#state{socket = Socket, mref = MRef}};
_ ->
erlang:send_after(5000, self(), timeout),
{noreply, State}
end;
handle_info({'DOWN', MRef, port, Socket, _Reason}, #state{mref = MRef, socket = Socket} = State) ->
catch gen_tcp:close(Socket),
erlang:send_after(5000, self(), timeout),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.

code_change(_, State, _) ->
{ok, State}.

terminate(_Reason, {Socket, MRef}) ->
%% Socket may already be down but need to ensure it is closed to avoid
%% eaddrinuse error on restart
%% this takes care of that, unless of course this process is killed...
case demonitor(MRef, [flush, info]) of
true -> gen_tcp:close(Socket);
false -> ok
end.

%% ------------------------------------------------------------------
%% Internal functions
%% ------------------------------------------------------------------
start_listener(#state{
pool = Pool,
listen_opts = ListenOpts,
pool_opts = PoolOpts} = _State) ->
Port = maps:get(port, ListenOpts, 8080),
IPAddress = maps:get(ip, ListenOpts, {0, 0, 0, 0}),
AcceptorPoolSize = maps:get(size, PoolOpts, 10),
Expand All @@ -27,8 +75,7 @@ init([Pool, ListenOpts, PoolOpts]) ->
{reuseaddr, true},
{backlog, 32768},
{keepalive, true}]),
%% Trapping exit so can close socket in terminate/2
_ = process_flag(trap_exit, true),

Opts = [{active, false}, {mode, binary}, {packet, raw}, {ip, IPAddress} | SocketOpts],
case gen_tcp:listen(Port, Opts) of
{ok, Socket} ->
Expand Down Expand Up @@ -78,30 +125,7 @@ init([Pool, ListenOpts, PoolOpts]) ->
socket_not_found ->
noop
end,
{stop, eaddrinuse};
{error, eaddrinuse};
{error, Reason} ->
{stop, Reason}
end.

handle_call(Req, _, State) ->
{stop, {bad_call, Req}, State}.

handle_cast(Req, State) ->
{stop, {bad_cast, Req}, State}.

handle_info({'DOWN', MRef, port, Socket, Reason}, {Socket, MRef} = State) ->
{stop, Reason, State};
handle_info(_, State) ->
{noreply, State}.

code_change(_, State, _) ->
{ok, State}.

terminate(_Reason, {Socket, MRef}) ->
%% Socket may already be down but need to ensure it is closed to avoid
%% eaddrinuse error on restart
%% this takes care of that, unless of course this process is killed...
case demonitor(MRef, [flush, info]) of
true -> gen_tcp:close(Socket);
false -> ok
{error, Reason}
end.

0 comments on commit 9ed629c

Please sign in to comment.