Skip to content

Commit

Permalink
Merge pull request #767 from basho/bugfix/jdm/vnode-proxy-stuck-in-ov…
Browse files Browse the repository at this point in the history
…erload-v2

Vnode proxy stuck in overload state

Reviewed-by: cmeiklejohn
  • Loading branch information
borshop committed Aug 13, 2015
2 parents dce213c + c322af3 commit 481c5dd
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,8 @@ handle_sync_event(core_status, _From, StateName, State=#state{index=Index,
end,
{reply, {Mode, Status}, StateName, State, State#state.inactivity_timeout}.

handle_info({'$vnode_proxy_ping', From, Msgs}, StateName, State) ->
riak_core_vnode_proxy:cast(From, {vnode_proxy_pong, self(), Msgs}),
handle_info({'$vnode_proxy_ping', From, Ref, Msgs}, StateName, State) ->
riak_core_vnode_proxy:cast(From, {vnode_proxy_pong, Ref, Msgs}),
{next_state, StateName, State, State#state.inactivity_timeout};

handle_info({'EXIT', Pid, Reason},
Expand Down
105 changes: 64 additions & 41 deletions src/riak_core_vnode_proxy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
%% -------------------------------------------------------------------
-module(riak_core_vnode_proxy).
-export([start_link/2, init/1, reg_name/2, reg_name/3, call/2, call/3, cast/2,
unregister_vnode/3, command_return_vnode/2]).
unregister_vnode/3, command_return_vnode/2, overloaded/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).

-include("riak_core_vnode.hrl").
Expand Down Expand Up @@ -105,6 +105,12 @@ unregister_vnode(Mod, Index, Pid) ->
command_return_vnode({Mod,Index,Node}, Req) ->
call(reg_name(Mod, Index, Node), {return_vnode, Req}).

%% Return true if the next proxied message will return overload
overloaded({Mod, Index, Node}) ->
call(reg_name(Mod, Index, Node), overloaded);
overloaded(Pid) ->
call(Pid, overloaded).

call(Name, Msg) ->
call_reply(catch gen:call(Name, '$vnode_proxy_call', Msg)).

Expand Down Expand Up @@ -155,7 +161,10 @@ handle_call({return_vnode, Req}, _From, State) ->
{Pid, NewState} = get_vnode_pid(State),
gen_fsm:send_event(Pid, Req),
{reply, {ok, Pid}, NewState};

handle_call(overloaded, _From, State=#state{check_mailbox=Mailbox,
check_threshold=Threshold}) ->
Result = (Mailbox > Threshold),
{reply, Result, State};
handle_call(_Msg, _From, State) ->
{reply, ok, State}.

Expand All @@ -167,17 +176,15 @@ handle_cast({unregister_vnode, Pid}, State) ->
catch demonitor(State#state.vnode_mref, [flush]),
NewState = forget_vnode(State),
{noreply, NewState};
handle_cast({vnode_proxy_pong, Pid, Msgs}, State=#state{vnode_pid=VNodePid,
check_request=RequestState,
handle_cast({vnode_proxy_pong, Ref, Msgs}, State=#state{check_request=RequestState,
check_mailbox=Mailbox}) ->
ValidReply = (Pid =:= VNodePid) and (RequestState =:= sent),
NewState = case ValidReply of
true ->
NewState = case Ref of
RequestState ->
State#state{check_mailbox=Mailbox - Msgs,
check_request=undefined,
check_counter=0};
_ ->
State#state{check_request=undefined}
State
end,
{noreply, NewState};

Expand Down Expand Up @@ -219,6 +226,15 @@ handle_proxy(Msg, State=#state{check_counter=Counter,
State2 = State
end,

Mailbox2 = case Mailbox =< Threshold of
true ->
Pid ! Msg,
Mailbox + 1;
false ->
handle_overload(Msg, State),
Mailbox
end,

Counter2 = Counter + 1,
case Counter2 of
RequestInterval ->
Expand All @@ -229,13 +245,11 @@ handle_proxy(Msg, State=#state{check_counter=Counter,
%% we can adjust our mailbox estimate accordingly.
case RequestState of
undefined ->
Pid ! {'$vnode_proxy_ping', self(), Mailbox + 1},
RequestState2 = sent,
Mailbox2 = Mailbox + 2;
RequestState2 = send_proxy_ping(Pid, Mailbox2);
_ ->
Mailbox2 = Mailbox + 1,
RequestState2 = RequestState
end,
Mailbox3 = Mailbox2,
Counter3 = Counter2;
Interval ->
%% Time to directly check the mailbox size. This operation may
Expand All @@ -245,30 +259,19 @@ handle_proxy(Msg, State=#state{check_counter=Counter,
{_, L} =
erlang:process_info(Pid, message_queue_len),
Counter3 = 0,
Mailbox2 = L,
RequestState2 = case RequestState of
sent ->
%% Ignore pending ping response as it is
%% no longer valid nor useful.
ignore;
_ ->
RequestState
end;
Mailbox3 = L + 1,
%% Send a new proxy ping so that if the new length is above the
%% threshold then the proxy will detect the work is completed,
%% rather than being stuck in overload state until the interval
%% counts are reached.
RequestState2 = send_proxy_ping(Pid, Mailbox3);
_ ->
Mailbox3 = Mailbox2,
Counter3 = Counter2,
Mailbox2 = Mailbox + 1,
RequestState2 = RequestState
end,

case Mailbox2 =< Threshold of
true ->
Pid ! Msg;
false ->
handle_overload(Msg, State)
end,

{noreply, State2#state{check_counter=Counter3,
check_mailbox=Mailbox2,
check_mailbox=Mailbox3,
check_request=RequestState2}}.

handle_overload(Msg, #state{mod=Mod, index=Index}) ->
Expand Down Expand Up @@ -299,6 +302,12 @@ get_vnode_pid(State=#state{mod=Mod, index=Index, vnode_pid=undefined}) ->
get_vnode_pid(State=#state{vnode_pid=Pid}) ->
{Pid, State}.

%% @private
send_proxy_ping(Pid, MailboxSizeAfterPing) ->
Ref = make_ref(),
Pid ! {'$vnode_proxy_ping', self(), Ref, MailboxSizeAfterPing},
Ref.

-ifdef(TEST).

update_msg_counter() ->
Expand All @@ -317,6 +326,10 @@ fake_loop() ->
{get_count, Pid} ->
Pid ! {count, erlang:get(count)},
fake_loop();
%% Original tests do not expect replies
%% {'$vnode_proxy_ping', ReplyTo, Ref, Msgs} ->
%% ReplyTo ! {Ref, Msgs},
%% fake_loop();
_Msg ->
update_msg_counter(),
fake_loop()
Expand Down Expand Up @@ -364,14 +377,23 @@ overload_test_() ->
fun({_VnodePid, ProxyPid}) ->
{"should not discard in normal operation", timeout, 60,
fun() ->
ToSend = ?DEFAULT_OVERLOAD_THRESHOLD-2,
ToSend = ?DEFAULT_OVERLOAD_THRESHOLD * 2,
[ProxyPid ! hello || _ <- lists:seq(1, ToSend)],
%% synchronize on the mailbox

%% synchronize on the proxy and the mailbox
{ok, ok} = gen:call(ProxyPid, '$vnode_proxy_call', sync, infinity),
ProxyPid ! {get_count, self()},
receive
{count, Count} ->
%% ToSend messages + 1 unanswered vnode_proxy_ping
?assertEqual(ToSend+1, Count)
%% Every ReqInterval happy proxy will do a ping/pong
%% 2500 -> 2501
%% 5000 -> 5002
%% 7500 -> 7503
%% 10000 -> 10004 etc
%% ToSend messages + 4 vnode_proxy_pings
PingReqs = 1 + % for first request intarval
ToSend div ?DEFAULT_CHECK_INTERVAL,
?assertEqual(ToSend+PingReqs, Count)
end
end
}
Expand All @@ -381,15 +403,15 @@ overload_test_() ->
fun() ->
VnodePid ! block,
[ProxyPid ! hello || _ <- lists:seq(1, 50000)],
%% synchronize on the mailbox
%% synchronize on the mailbox - no-op that hits msg catchall
Reply = gen:call(ProxyPid, '$vnode_proxy_call', sync, infinity),
?assertEqual({ok, ok}, Reply),
VnodePid ! unblock,
VnodePid ! {get_count, self()},
receive
{count, Count} ->
%% Threshold + 1 unanswered vnode_proxy_ping
?assertEqual(?DEFAULT_OVERLOAD_THRESHOLD + 1, Count)
%% Threshold + 10 unanswered vnode_proxy_ping
?assertEqual(?DEFAULT_OVERLOAD_THRESHOLD + 10, Count)
end
end
}
Expand All @@ -399,15 +421,16 @@ overload_test_() ->
fun() ->
VnodePid ! slow,
[ProxyPid ! hello || _ <- lists:seq(1, 50000)],
%% synchronize on the mailbox
%% synchronize on the mailbox - no-op that hits msg catchall
Reply = gen:call(ProxyPid, '$vnode_proxy_call', sync, infinity),
?assertEqual({ok, ok}, Reply),
%% check that the outstanding message count is
%% reasonable
{message_queue_len, L} =
erlang:process_info(VnodePid, message_queue_len),
%% Threshold + 1 unanswered vnode_proxy_ping
?assert(L =< (?DEFAULT_OVERLOAD_THRESHOLD + 1))
%% Threshold + 2 unanswered vnode_proxy_ping (one
%% for first ping, second after process_info check)
?assert(L =< (?DEFAULT_OVERLOAD_THRESHOLD + 2))
end
}
end
Expand Down

0 comments on commit 481c5dd

Please sign in to comment.