Skip to content

Commit

Permalink
Permits client requests in any FSM state.
Browse files Browse the repository at this point in the history
This commit adds a request queue to the shotgun FSM and three argument
versions of each FSM state function whose only purpose is to ether
stick valid client request data into the work queue and get back to
waiting, or stop the FSM if the client request data was invalid.
create_work/2 describes valid and invalid requests.

This lets us have multiple callers -directly or indirectly- call
shotgun:request while a gun request is still pending. Previously,
if the call happened when we were in any state other than at_rest,
the FSM would probably crash. This also means that if one's request
times out, one can immediately re-submit that request without
crashing shotgun. However, there *are* a few caveats:

* shotgun only processes one request at a time.
* Your new request is put on the end of the request queue.
* Your *previous* request is not cancelled. If shotgun never got
  around to servicing it, or is not done servicing it, then your
  old request that you no longer care about will be serviced before
  your new request.
* If the process that initiated a timed-out request is still alive,
  it will get a message in its mailbox when the request eventually
  succeeds. It also might get a message when the request eventually
  fails, I'm not sure.

The guts of at_rest/3 have been moved to at_rest/2. at_rest/3 now only
sticks client request data into the work queue and triggers the call
of at_rest/2.

Every transition from another state to at_rest now includes a timeout
value of 0. The purpose of at_rest(timeout, State) is to check for work
and either dispatch it to the FSM, or go back to the idle at_rest state.
  • Loading branch information
kennethlakin committed Oct 14, 2015
1 parent e63b2e7 commit 372539e
Showing 1 changed file with 128 additions and 29 deletions.
157 changes: 128 additions & 29 deletions src/shotgun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,21 @@
]).

-export([
at_rest/3,
at_rest/2,
wait_response/2,
receive_data/2,
receive_chunk/2,
parse_event/1
]).

%Work request handlers
-export([
at_rest/3,
wait_response/3,
receive_data/3,
receive_chunk/3
]).

-type response() ::
#{
status_code => integer(),
Expand Down Expand Up @@ -354,29 +362,59 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) ->
%% gen_fsm states
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%See if we have work. If we do, dispatch.
%If we don't, stay in at_rest.
%% @private
at_rest(timeout, State) ->
case get_work(State) of
no_work ->
{next_state, at_rest, State};
{ok, Work, NewState} ->
ok=gen_fsm:send_event(self(), Work),
{next_state, at_rest, NewState}
end;
at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(State),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.

-spec at_rest(term(), pid(), term()) -> term().
at_rest({get_async, {HandleEvent, AsyncMode}, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(get, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
from => From,
pid => Pid,
stream => StreamRef,
handle_event => HandleEvent,
async => true,
async_mode => AsyncMode
},
{next_state, wait_response, NewState};
at_rest({HttpVerb, Args}, From, #{pid := Pid}) ->
StreamRef = do_http_verb(HttpVerb, Pid, Args),
CleanState = clean_state(),
NewState = CleanState#{
pid => Pid,
stream => StreamRef,
from => From
},
{next_state, wait_response, NewState}.
at_rest(Event, From, State) ->
case create_work(Event, From) of
{ok, Work} ->
NewState=append_work(Work, State),
{next_state, at_rest, NewState, 0};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
-spec wait_response(term(), pid(), term()) -> term().
wait_response(Event, From, State) ->
case create_work(Event, From) of
{ok, Work} ->
NewState=append_work(Work, State),
{next_state, wait_response, NewState};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
-spec wait_response(term(), term()) -> term().
Expand All @@ -395,7 +433,7 @@ wait_response({gun_response, _Pid, _StreamRef, fin, StatusCode, Headers},
true ->
gen_fsm:reply(From, {ok, Response})
end,
{next_state, at_rest, State#{responses => NewResponses}};
{next_state, at_rest, State#{responses => NewResponses}, 0};
wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
#{from := From, stream := StreamRef, async := Async} = State) ->
StateName =
Expand All @@ -415,10 +453,21 @@ wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers},
wait_response({gun_error, _Pid, _StreamRef, Error},
#{from := From} = State) ->
gen_fsm:reply(From, {error, Error}),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
wait_response(Event, State) ->
{stop, {unexpected, Event}, State}.

%% @private
-spec receive_data(term(), pid(), term()) -> term().
receive_data(Event, From, State) ->
case create_work(Event, From) of
{ok, Work} ->
NewState=append_work(Work, State),
{next_state, receive_data, NewState};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
%% @doc Regular response
-spec receive_data(term(), term()) -> term().
Expand All @@ -437,10 +486,21 @@ receive_data({gun_data, _Pid, _StreamRef, fin, Data},
body => NewData
}},
gen_fsm:reply(From, Result),
{next_state, at_rest, State};
{next_state, at_rest, State, 0};
receive_data({gun_error, _Pid, StreamRef, _Reason},
#{stream := StreamRef} = State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%% @private
-spec receive_chunk(term(), pid(), term()) -> term().
receive_chunk(Event, From, State) ->
case create_work(Event, From) of
{ok, Work} ->
NewState=append_work(Work, State),
{next_state, receive_chunk, NewState};
not_work ->
{stop, {unexpected, Event}, State}
end.

%% @private
%% @doc Chunked data response
Expand All @@ -451,19 +511,27 @@ receive_chunk({gun_data, _Pid, StreamRef, IsFin, Data}, State) ->
NewState = manage_chunk(IsFin, StreamRef, Data, State),
case IsFin of
fin ->
{next_state, at_rest, NewState};
{next_state, at_rest, NewState, 0};
nofin ->
{next_state, receive_chunk, NewState}
end;
receive_chunk({gun_error, _Pid, _StreamRef, _Reason}, State) ->
{next_state, at_rest, State}.
{next_state, at_rest, State, 0}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Private
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%% @private
clean_state() ->
clean_state([]).

%% @private
clean_state(State) when is_map(State) ->
clean_state(get_pending_reqs(State));

%% @private
clean_state(Reqs) when is_list(Reqs) ->
#{
pid => undefined,
stream => undefined,
Expand All @@ -475,7 +543,8 @@ clean_state() ->
headers => undefined,
async => false,
async_mode => binary,
buffer => <<"">>
buffer => <<"">>,
pending_requests => Reqs
}.

%% @private
Expand Down Expand Up @@ -569,6 +638,36 @@ sse_events(IsFin, Data, State = #{buffer := Buffer}) ->
{lists:reverse(Events), State#{buffer := Rest}}
end.

%% @private
create_work({M=get_async, {HandleEvent, AsyncMode}, Args}, From) ->
{ok, {M, {HandleEvent, AsyncMode}, Args, From}};
create_work({M, Args}, From)
when M == get orelse M == post
orelse M == delete orelse M == head
orelse M == options orelse M == patch
orelse M == put ->
{ok, {M, Args, From}};
create_work(_, _) ->
not_work.

%% @private
get_work(#{pending_requests := []}) ->
no_work;
get_work(State) ->
[Work|Rest] = maps:get(pending_requests, State),
NewState=State#{pending_requests => Rest},
{ok, Work, NewState}.

%% @private
append_work(Work, State) ->
PendingReqs = get_pending_reqs(State),
NewPending = lists:append(PendingReqs, [Work]),
maps:put(pending_requests, NewPending, State).

%% @private
get_pending_reqs(State) ->
maps:get(pending_requests, State).

%% @private
check_uri(U) ->
case string:chr(U, $/) of
Expand Down

0 comments on commit 372539e

Please sign in to comment.