Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #242 from heroku/ypaq-default-limit-on-channel-logs
Browse files Browse the repository at this point in the history
Limit number of channel logs using `num` query string
  • Loading branch information
cyx authored Apr 9, 2018
2 parents d57372d + 635182e commit 57f2de6
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
27 changes: 23 additions & 4 deletions src/logplex_api_v3_channel_logs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
rest_init/2,
service_available/2,
allowed_methods/2,
malformed_request/2,
is_authorized/2,
resource_exists/2,
content_types_provided/2,
to_logs/2
]).

-record(state, {
channel_id :: binary()
channel_id :: binary(),
num_logs = 100 :: integer() %% number of log lines to fetch from log buffers
}).

%% @private
Expand All @@ -38,6 +40,22 @@ service_available(Req, State) ->
allowed_methods(Req, State) ->
{[<<"GET">>], Req, State}.

%% @private
malformed_request(Req, State) ->
case cowboy_req:qs_val(<<"num">>, Req) of
{undefined, Req1} ->
{false, Req1, State};
{Val, Req1} ->
try
Num = list_to_integer(binary_to_list(Val)),
NewState = State#state{ num_logs = Num },
{false, Req1, NewState}
catch
_:_ ->
{false, Req1, State}
end
end.

%% @private
is_authorized(Req, State) ->
logplex_api_v3:is_authorized(Req, State).
Expand All @@ -57,9 +75,10 @@ content_types_provided(Req, State) ->
{[{{<<"application">>, <<"logplex-1">>, []}, to_logs}], Req, State}.

%% @private
to_logs(Req, #state{ channel_id = ChannelId } = State) ->
%% fetch all messages from log buffer
case logplex_channel:logs(ChannelId, -1) of
to_logs(Req, #state{ channel_id = ChannelId,
num_logs = NumLogs } = State) ->
%% fetch messages from log buffer
case logplex_channel:logs(ChannelId, NumLogs) of
{error, Reason} ->
?ERR("channel_id=~s err='failed to fetch channel logs' reason='~p'",
[ChannelId, Reason]),
Expand Down
79 changes: 78 additions & 1 deletion test/logplex_api_v3_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ groups() ->
, fetch_channel_logs
, channel_logs_format
, channel_logs_bad_syslog_message
, channel_logs_with_num_query_string
, channel_logs_with_malformed_query_string
]},
{sessions,
[sessions_service_unavailable
Expand Down Expand Up @@ -630,6 +632,73 @@ channel_logs_bad_syslog_message(Config0) ->
?assertEqual("64 <1>1 aaaa host cccc dddd - " ++ Uuid ++ "\n", Body).


channel_logs_with_num_query_string(Config0) ->
Config = create_channel_with_tokens(Config0),
Channel = ?config(channel, Config),
[{TokenName, TokenId} | _] = ?config(tokens, Config),
NumLogs = 5,
ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 3)],
LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs],
logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName),
Props = stream_channel_logs(Channel, [{num_logs, NumLogs} | Config]),
Headers = proplists:get_value(headers, Props),
?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)),
?assertEqual("3", proplists:get_value("logplex-msg-count", Headers)),
?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)),
?assertEqual("close", proplists:get_value("connection", Headers)),
Lines = re:split(proplists:get_value(body, Props), "\n", [trim]),
[begin
?assertEqual(match, re:run(Line, Expected, [{capture, none}])),
NBin = list_to_binary(integer_to_list(N)),
?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line)
end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines)],
Config.

channel_logs_with_num_query_limit(Config0) ->
Config = create_channel_with_tokens(Config0),
Channel = ?config(channel, Config),
[{TokenName, TokenId} | _] = ?config(tokens, Config),
NumLogs = 3,
ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 5)],
LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs],
logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName),
Props = stream_channel_logs(Channel, [{num_logs, NumLogs} | Config]),
Headers = proplists:get_value(headers, Props),
?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)),
%% note: spits out one more line to indicate there's more line than requested in the buffer
?assertEqual("4", proplists:get_value("logplex-msg-count", Headers)),
?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)),
?assertEqual("close", proplists:get_value("connection", Headers)),
Lines = re:split(proplists:get_value(body, Props), "\n", [trim]),
[begin
?assertEqual(match, re:run(Line, Expected, [{capture, none}])),
NBin = list_to_binary(integer_to_list(N)),
?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line)
end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines), N > 1],
Config.

channel_logs_with_malformed_query_string(Config0) ->
Config = create_channel_with_tokens(Config0),
Channel = ?config(channel, Config),
[{TokenName, TokenId} | _] = ?config(tokens, Config),
ExpectedLogMsgs = [{N, uuid:to_string(uuid:v4())} || N <- lists:seq(1, 5)],
LogMsgs = [{msg, new_log_msg(N, Msg)} || {N, Msg} <- ExpectedLogMsgs],
logplex_message:process_msgs(LogMsgs, list_to_binary(Channel), TokenId, TokenName),
Props = stream_channel_logs(Channel, [{num_logs, "invalid"} | Config]),
Headers = proplists:get_value(headers, Props),
?assertEqual("application/logplex-1", proplists:get_value("content-type", Headers)),
?assertEqual("5", proplists:get_value("logplex-msg-count", Headers)),
?assertEqual("chunked", proplists:get_value("transfer-encoding", Headers)),
?assertEqual("close", proplists:get_value("connection", Headers)),
Lines = re:split(proplists:get_value(body, Props), "\n", [trim]),
[begin
ct:pal("~p -- ~p~n", [Line, Expected]),
?assertEqual(match, re:run(Line, Expected, [{capture, none}])),
NBin = list_to_binary(integer_to_list(N)),
?assertMatch(<<"64 <", NBin:1/binary, _/binary>>, Line)
end || {{N, Expected}, Line} <- lists:zip(ExpectedLogMsgs, Lines)],
Config.

%% -----------------------------------------------------------------------------
%% sessions
%% -----------------------------------------------------------------------------
Expand Down Expand Up @@ -750,7 +819,15 @@ get_channel_logs(Channel, Config) ->
logplex_api_SUITE:get_(Url, Opts).

stream_channel_logs(Channel, Config) ->
Url = ?config(api_v3_url, Config) ++ "/v3/channels/" ++ Channel ++ "/logs",
NumLogs = ?config(num_logs, Config),
QueryString = case NumLogs of
undefined -> "";
_ when is_integer(NumLogs) ->
"?num=" ++ integer_to_list(NumLogs);
_ when is_list(NumLogs) ->
"?num=" ++ NumLogs
end,
Url = ?config(api_v3_url, Config) ++ "/v3/channels/" ++ Channel ++ "/logs" ++ QueryString,
Headers = [{"Authorization", ?config(auth, Config)}],
Opts = [{headers, Headers},
{timeout, timer:seconds(10)},
Expand Down

0 comments on commit 57f2de6

Please sign in to comment.