Skip to content

Commit f81ed84

Browse files
authored
Fix/timeout leak (#497)
fix socket leaks make sure that pending connections are correctly removed from the queue, which fix a race condition with the manager. fix #462
1 parent 40ce7aa commit f81ed84

File tree

2 files changed

+120
-33
lines changed

2 files changed

+120
-33
lines changed

examples/test_deadlock.erl

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env escript
2+
%% -*- erlang -*-
3+
%%! -pa ../_build/default/lib/hackney/ebin -pa ../_build/default/lib/*/ebin -pa ../_build/default/lib/certifi/ebin -pa ../_build/default/lib/idna/ebin -pa ../_build/default/lib/metrics/ebin -pa ../_build/default/lib/mimerl/ebin -pa ../_build/default/lib/ssl_verify_fun/ebin -pa ../_build/default/lib/unicode_util_compat/ebin
4+
5+
-module(test_async).
6+
7+
-export([main/1]).
8+
9+
deadlock(Url, N) ->
10+
Fun = fun() ->
11+
{ok, _, _Headers, Ref} = hackney:get(Url, [], <<>>, [{connect_timeout, 1000}, {recv_timeout, 1000}]),
12+
{ok, _Body} = hackney:body(Ref)
13+
end,
14+
Pids = [spawn(
15+
fun() ->
16+
spawn_link(Fun),
17+
timer:sleep(500),
18+
exit(timeout)
19+
end) || _I <- lists:seq(1, N)],
20+
MRefs = [erlang:monitor(process, Pid) || Pid <- Pids],
21+
wait_pids(MRefs),
22+
ok.
23+
24+
wait_pids([]) -> ok;
25+
wait_pids(MRefs) ->
26+
receive
27+
{'DOWN', MRef, process, _, _} ->
28+
wait_pids(MRefs -- [MRef])
29+
end.
30+
31+
32+
main(_) ->
33+
{ok, _}= application:ensure_all_started(hackney),
34+
io:format("start test~n", []),
35+
deadlock("https://httparrot.herokuapp.com/delay/5", 550),
36+
io:format("pools are ~p~n", [ets:tab2list(hackney_pool)]),
37+
io:format("state of the pool:~n ~p~n", [hackney_pool:get_stats(default)]),
38+
ok.

src/hackney_pool.erl

+82-33
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
checkout/4,
1717
checkin/2]).
1818

19-
-export([start_pool/2,
19+
-export([
20+
get_stats/1,
21+
start_pool/2,
2022
stop_pool/1,
2123
find_pool/1,
22-
notify/2]).
24+
notify/2
25+
]).
2326

2427

2528
-export([count/1, count/2,
@@ -45,7 +48,8 @@
4548
max_connections,
4649
timeout,
4750
clients = dict:new(),
48-
queues = dict:new(), % Dest => queue of Froms
51+
queues = dict:new(), % Dest => queue of Froms,
52+
pending = dict:new(),
4953
connections = dict:new(),
5054
sockets = dict:new(),
5155
nb_waiters=0}).
@@ -99,6 +103,9 @@ checkin({_Name, Ref, Dest, Owner, Transport}, Socket) ->
99103
ok
100104
end.
101105

106+
get_stats(Pool) ->
107+
gen_server:call(find_pool(Pool), stats).
108+
102109

103110
%% @doc start a pool
104111
start_pool(Name, Options) ->
@@ -235,6 +242,8 @@ init([Name, Options]) ->
235242
{ok, #state{name=Name, metrics=Engine, max_connections=MaxConn,
236243
timeout=Timeout}}.
237244

245+
handle_call(stats, _From, State) ->
246+
{reply, handle_stats(State), State};
238247
handle_call(count, _From, #state{sockets=Sockets}=State) ->
239248
{reply, dict:size(Sockets), State};
240249
handle_call(timeout, _From, #state{timeout=Timeout}=State) ->
@@ -247,7 +256,7 @@ handle_call({checkout, Dest, Pid, RequestRef}, From, State) ->
247256
max_connections=MaxConn,
248257
clients=Clients,
249258
queues = Queues,
250-
nb_waiters = NbWaiters} = State,
259+
pending = Pending} = State,
251260

252261
{Reply, State2} = find_connection(Dest, Pid, State),
253262
case Reply of
@@ -259,12 +268,11 @@ handle_call({checkout, Dest, Pid, RequestRef}, From, State) ->
259268
case dict:size(Clients) >= MaxConn of
260269
true ->
261270
Queues2 = add_to_queue(Dest, From, RequestRef, Queues),
262-
NbWaiters2 = NbWaiters + 1,
263-
_ = metrics:update_histogram(Engine,
264-
[hackney_pool, PoolName, queue_count],
265-
NbWaiters2),
266-
{noreply, State2#state{queues = Queues2,
267-
nb_waiters=NbWaiters2}};
271+
Pending2 =add_pending(RequestRef, From, Dest, Pending),
272+
_ = metrics:update_histogram(
273+
Engine, [hackney_pool, PoolName, queue_count], dict:size(Pending2)
274+
),
275+
{noreply, State2#state{queues = Queues2, pending = Pending2}};
268276
false ->
269277
State3 = monitor_client(Dest, RequestRef, State2),
270278
ok = update_usage(State3),
@@ -307,11 +315,12 @@ handle_cast({set_timeout, NewTimeout}, State) ->
307315
{noreply, State#state{timeout=NewTimeout}};
308316

309317
handle_cast({checkout_cancel, Dest, Ref}, State) ->
310-
{Queues, Removed} = del_from_queue(Dest, Ref, State#state.queues),
318+
#state{queues=Queues, pending=Pending} = State,
319+
{Queues2, Removed} = del_from_queue(Dest, Ref, Queues),
311320
case Removed of
312321
true ->
313-
NbWaiters = State#state.nb_waiters - 1,
314-
{noreply, State#state{queues=Queues, nb_waiters=NbWaiters}};
322+
Pending2 = del_pending(Ref, Pending),
323+
{noreply, State#state{queues=Queues2, nb_waiters=Pending2}};
315324
false ->
316325
% we leak the socket here but 'DOWN' will mop up for us when it times out
317326
{noreply, dequeue(Dest, Ref, State)}
@@ -338,7 +347,8 @@ handle_info({'DOWN', Ref, request, _Pid, _Reason}, State) ->
338347
{ok, Dest} ->
339348
{noreply, dequeue(Dest, Ref, State)};
340349
error ->
341-
{noreply, State}
350+
NewState = remove_pending(Ref, State),
351+
{noreply, NewState}
342352
end;
343353
handle_info(_, State) ->
344354
{noreply, State}.
@@ -360,17 +370,18 @@ terminate(_Reason, #state{name=PoolName, metrics=Engine, sockets=Sockets}) ->
360370
%% internals
361371

362372
dequeue(Dest, Ref, State) ->
363-
Clients2 = dict:erase(Ref, State#state.clients),
364-
case queue_out(Dest, State#state.queues) of
373+
#state{clients=Clients, queues=Queues, pending=Pending} = State,
374+
Clients2 = dict:erase(Ref, Clients),
375+
case queue_out(Dest, Queues) of
365376
empty ->
366377
State#state{clients = Clients2};
367378
{ok, {From, Ref2}, Queues2} ->
368-
NbWaiters = State#state.nb_waiters - 1,
369-
_ = metrics:update_histogram(State#state.metrics,
370-
[hackney_pool, State#state.name, queue_count], NbWaiters),
379+
Pending2 = del_pending(Ref, Pending),
380+
_ = metrics:update_histogram(
381+
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
382+
),
371383
gen_server:reply(From, {error, no_socket, self()}),
372-
State2 = State#state{queues = Queues2, clients = Clients2,
373-
nb_waiters=NbWaiters},
384+
State2 = State#state{queues = Queues2, clients = Clients2, pending=Pending2},
374385
monitor_client(Dest, Ref2, State2)
375386
end.
376387

@@ -387,8 +398,7 @@ find_connection({_Host, _Port, Transport}=Dest, Pid,
387398
cancel_timer(S, Timer),
388399
NewConns = update_connections(Rest, Dest, Conns),
389400
NewSockets = dict:erase(S, Sockets),
390-
NewState = State#state{connections=NewConns,
391-
sockets=NewSockets},
401+
NewState = State#state{connections=NewConns, sockets=NewSockets},
392402
{{ok, S, self()}, NewState};
393403
{error, badarg} ->
394404
%% something happened here normally the PID died,
@@ -509,26 +519,25 @@ queue_out({_Host, _Port, _Transport} = Dest, Queues) ->
509519
%% @private
510520
%%------------------------------------------------------------------------------
511521
deliver_socket(Socket, {_, _, Transport} = Dest, State) ->
512-
case queue_out(Dest, State#state.queues) of
522+
#state{queues = Queues, pending=Pending} = State,
523+
case queue_out(Dest, Queues) of
513524
empty ->
514525
store_socket(Dest, Socket, State);
515526
{ok, {{PidWaiter, _} = FromWaiter, Ref}, Queues2} ->
516-
NbWaiters = State#state.nb_waiters - 1,
517-
_ = metrics:update_histogram(State#state.metrics,
518-
[hackney_pool, State#state.name, queue_count],
519-
NbWaiters),
527+
Pending2 = del_pending(Ref, Pending),
528+
_ = metrics:update_histogram(
529+
State#state.metrics, [hackney_pool, State#state.name, queue_count], dict:size(Pending2)
530+
),
520531
case Transport:controlling_process(Socket, PidWaiter) of
521532
ok ->
522533
gen_server:reply(FromWaiter, {ok, Socket, self()}),
523-
monitor_client(Dest, Ref,
524-
State#state{queues = Queues2,
525-
nb_waiters=NbWaiters});
534+
monitor_client(Dest, Ref, State#state{queues = Queues2, pending=Pending2});
526535
_Error ->
527536
% Something wrong, close the socket
528-
catch Transport:close(Socket),
537+
_ = (catch Transport:close(Socket)),
529538
%% and let the waiter connect to a new one
530539
gen_server:reply(FromWaiter, {error, no_socket, self()}),
531-
State#state{queues = Queues2, nb_waiters = NbWaiters}
540+
State#state{queues = Queues2, pending = Pending2}
532541
end
533542
end.
534543

@@ -544,6 +553,37 @@ sync_socket(Transport, Socket) ->
544553
true
545554
end.
546555

556+
557+
add_pending(Ref, From, Dest, Pending) ->
558+
dict:store(Ref, {From, Dest}, Pending).
559+
560+
del_pending(Ref, Pending) ->
561+
dict:erase(Ref, Pending).
562+
563+
remove_pending(Ref, #state{queues=Queues0, pending=Pending0} = State) ->
564+
case dict:find(Ref, Pending0) of
565+
{ok, {From, Dest}} ->
566+
Pending1 = dict:erase(Ref, Pending0),
567+
Queues1 = case dict:find(Dest, Queues0) of
568+
{ok, Q0} ->
569+
Q1 = queue:filter(
570+
fun
571+
(PendingReq) when PendingReq =:= {From, Ref} -> false;
572+
(_) -> true
573+
end,
574+
Q0
575+
),
576+
dict:store(Dest, Q1, Queues0);
577+
error ->
578+
Queues0
579+
end,
580+
State#state{queues=Queues1, pending=Pending1};
581+
error ->
582+
State
583+
end.
584+
585+
586+
547587
%------------------------------------------------------------------------------
548588
%% @private
549589
%%------------------------------------------------------------------------------
@@ -581,3 +621,12 @@ update_usage(
581621
_ = metrics:update_histogram(Engine, [hackney_pool, PoolName, free_count],
582622
dict:size(Sockets) - 1),
583623
ok.
624+
625+
626+
handle_stats(State) ->
627+
#state{name=PoolName, max_connections=Max, sockets=Sockets, clients=Clients, pending=Pending} = State,
628+
[{name, PoolName},
629+
{max, Max},
630+
{in_use_count, dict:size(Clients)},
631+
{free_count, dict:size(Sockets)},
632+
{queue_count, dict:size(Pending)}].

0 commit comments

Comments
 (0)