Skip to content

Commit

Permalink
Merge pull request #572 from basho/zl/handle-and-expose-solr-request-…
Browse files Browse the repository at this point in the history
…timeout-config-and-prevent-crashes

Zl/handle and expose solr request timeout config and prevent crashes

Reviewed-by: jvoegele
  • Loading branch information
borshop committed Oct 14, 2015
2 parents 2fbae04 + 96ca5f2 commit 9c72938
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 64 deletions.
9 changes: 8 additions & 1 deletion include/yokozuna.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@
app_helper:get_env(riak_core, platform_data_dir)++"/yz")).
-define(YZ_TEMP_DIR, app_helper:get_env(?YZ_APP_NAME, temp_dir,
app_helper:get_env(riak_core, platform_data_dir)++"/yz_temp")).
%% The request timeout for Solr calls. Defaults to 60 seconds.
-define(YZ_SOLR_REQUEST_TIMEOUT, app_helper:get_env(?YZ_APP_NAME,
solr_request_timeout,
60000)).
-define(YZ_PRIV, code:priv_dir(?YZ_APP_NAME)).
-define(YZ_CORE_CFG_FILE, "solrconfig.xml").
-define(YZ_INDEX_CMD, #yz_index_cmd).
Expand Down Expand Up @@ -237,11 +241,12 @@
{partition, lp()} |
{limit, pos_integer()}].
-type ed_continuation() :: none | base64().
-type ed_pairs() :: [{DocID::binary(), Hash::base64()}].

-record(entropy_data, {
more=false :: boolean(),
continuation :: ed_continuation(),
pairs :: [{DocID::binary(), Hash::base64()}]
pairs :: ed_pairs()
}).
-type entropy_data() :: #entropy_data{}.
-type keydiff() :: hashtree:keydiff().
Expand Down Expand Up @@ -288,6 +293,8 @@
-define(ERROR(Fmt), lager:error(Fmt)).
-define(ERROR(Fmt, Args), lager:error(Fmt, Args)).
-define(INFO(Fmt, Args), lager:info(Fmt, Args)).
-define(NOTICE(Fmt, Args), lager:notice(Fmt, Args)).
-define(NOTICE(Fmt), lager:notice(Fmt)).
-define(WARN(Fmt, Args), lager:warning(Fmt, Args)).

%%%===================================================================
Expand Down
9 changes: 9 additions & 0 deletions priv/yokozuna.schema
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,12 @@
{datatype, directory},
hidden
]}.

%% @doc The timeout for ibrowse (ibrowse:send_req) requests to Solr endpoints.
%% Defaults to 60 seconds. It will always round up to the nearest second, e.g.
%% 1ms = 999 ms = 1s.
{mapping, "search.solr.request_timeout", "yokozuna.solr_request_timeout", [
{default, "60s"},
{datatype, {duration, ms}},
hidden
]}.
19 changes: 19 additions & 0 deletions riak_test/intercepts/yz_solr_intercepts.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
-module(yz_solr_intercepts).
-compile(export_all).

-type index_name() :: binary().

-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).

-spec slow_cores() -> {ok, []}.
slow_cores() ->
timer:sleep(6000),
{ok, []}.

-spec entropy_data_cant_complete(index_name(), list()) -> {error, term()}.
entropy_data_cant_complete(Core, Filter) ->
Params = [{wt, json}|Filter] -- [{continuation, none}],
Params2 = proplists:substitute_aliases([{continuation, continue},
{limit,n}], Params),
Opts = [{response_format, binary}],
URL = ?FMT("~s/~s/entropy_data?~s",
[yz_solr:base_url(), Core, mochiweb_util:urlencode(Params2)]),
case ibrowse:send_req(URL, [], get, [], Opts, 0) of
Error ->
{error, Error}
end.

131 changes: 131 additions & 0 deletions riak_test/yz_entropy_data.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%%-------------------------------------------------------------------

%% @doc Test that checks through various entropy_data endpoint calls
%% and entropy_data iteration handling
%% @end

-module(yz_entropy_data).

-compile(export_all).
-include("yokozuna.hrl").
-include_lib("eunit/include/eunit.hrl").

-define(RING_SIZE, 32).
-define(CFG,
[
{riak_core,
[
{ring_creation_size, ?RING_SIZE}
]},
{riak_kv,
[
{anti_entropy_tick, 1000},
%% allow AAE to build trees and exchange rapidly
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8}
]},
{yokozuna,
[
{enabled, true},
{solr_request_timeout, 60000}
]}
]).
-define(NO_HEADERS, []).
-define(NO_BODY, <<>>).
-define(INDEX, <<"test_search_entropy">>).
-define(TYPE, <<"data_entropy">>).
-define(BUCKET, {?TYPE, <<"test_search_entropy">>}).
-define(TOTAL, 1000).

confirm() ->
[Node1|_] = Cluster = rt:build_cluster(4, ?CFG),
rt:wait_for_cluster_service(Cluster, yokozuna),
ok = yz_rt:create_bucket_type(Node1, ?TYPE),
ok = yz_rt:create_index_http(Cluster, ?INDEX),
yz_rt:set_index(Node1, ?BUCKET, ?INDEX),
HP = hd(yz_rt:host_entries(rt:connection_info(Cluster))),

%% write_objs writes a seq of 1000 objects
yz_rt:write_objs(Cluster, ?BUCKET),

%% time for soft auto commit
timer:sleep(1100),

?assert(yz_rt:search_expect(yokozuna, HP, ?INDEX, "*", "*", ?TOTAL)),

EDParams = [{wt, json}],

PartitionList = rpc:call(Node1, yokozuna, partition_list, [?INDEX]),

EntropyURL = yz_rt:entropy_data_url({rt:select_random(Cluster),
yz_rt:node_solr_port(Node1)},
?INDEX, EDParams),

test_entropy_get_missing_partition_param(EntropyURL),
test_entropy_get(Node1, ?INDEX, PartitionList, EDParams),

test_ed_timeout_error(Cluster, ?INDEX, rt:select_random(PartitionList),
?CFG),

pass.

-spec test_entropy_get_missing_partition_param(string()) -> ok.
test_entropy_get_missing_partition_param(URL) ->
lager:info("Test missing `partition` parameter on entropy url"),
{ok, Status, _, _} = yz_rt:http(get, URL, ?NO_HEADERS, ?NO_BODY),
?assertEqual(Status, "500").

test_entropy_get(Node, Index, PartitionList, EDParams) ->
lager:info("Test checking through documents on each solr `partition` in the partition list"),
EntropyURLs = [yz_rt:entropy_data_url(
{Node, yz_rt:node_solr_port(Node)},
Index,
[{partition, P}|EDParams])
|| P <- PartitionList],
Results =
[begin
{ok, "200", _, R} = yz_rt:http(get, URL, ?NO_HEADERS, ?NO_BODY),
yz_rt:get_count(R)
end || URL <- EntropyURLs],

[?assert(Count > 0) || Count <- Results],
ok.

-spec test_ed_timeout_error([node()], index_name(), p(), proplist()) -> ok.
test_ed_timeout_error(Cluster, Index, Partition, _Config) ->
lager:info("wait for full exchange around before making entropy call"),
TS1 = erlang:now(),
yz_rt:wait_for_full_exchange_round(Cluster, TS1),

Node = rt:select_random(Cluster),

%% load and install the intercept
rt_intercept:load_code(Node, [filename:join([rt_config:get(yz_dir),
"riak_test", "intercepts", "*.erl"])]),
rt_intercept:add(Node, {yz_solr, [{{entropy_data, 2},
entropy_data_cant_complete}]}),

Filter = [{partition, Partition}],
Fun = fun({_BKey, _Hash}) ->
fake_fun
end,
?assertEqual(rpc:call(Node, yz_entropy, iterate_entropy_data,
[Index, Filter, Fun]), error).
19 changes: 1 addition & 18 deletions riak_test/yz_mapreduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ confirm() ->
yz_rt:create_index(yz_rt:select_random(Cluster), Index),
yz_rt:set_bucket_type_index(yz_rt:select_random(Cluster), Index),
timer:sleep(500),
write_objs(Cluster, Bucket),
yz_rt:write_objs(Cluster, Bucket),
verify_objs_mr(Cluster, Index),
ok = yz_rt:load_module(Cluster, ?MODULE),
%% NOTE: Deliberate choice not to use `wait_unil'. The data is
Expand Down Expand Up @@ -123,23 +123,6 @@ verify_objs_mr(Cluster, Index) ->
end,
yz_rt:wait_until(Cluster, F).

-spec write_objs([node()], index_name()) -> ok.
write_objs(Cluster, Bucket) ->
lager:info("Writing 1000 objects"),
lists:foreach(write_obj(Cluster, Bucket), lists:seq(1,1000)).

-spec write_obj([node()], bucket()) -> fun().
write_obj(Cluster, Bucket) ->
fun(N) ->
PL = [{name_s,<<"yokozuna">>}, {num_i,N}],
Key = list_to_binary(io_lib:format("key_~B", [N])),
Body = mochijson2:encode(PL),
HP = yz_rt:select_random(yz_rt:host_entries(rt:connection_info(Cluster))),
CT = "application/json",
lager:info("Writing object with bkey ~p [~p]", [{Bucket, Key}, HP]),
yz_rt:http_put(HP, Bucket, Key, CT, Body)
end.

-spec http_mr({host(), portnum()}, term()) -> binary().
http_mr({Host,Port}, MR) ->
URL = ?FMT("http://~s:~s/mapred", [Host, integer_to_list(Port)]),
Expand Down
35 changes: 33 additions & 2 deletions riak_test/yz_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,9 @@ wait_for_schema(Cluster, Name, Content) ->
ok.

verify_count(Expected, Resp) ->
lager:info("E: ~p, A: ~p", [Expected, get_count(Resp)]),
Expected == get_count(Resp).
Count = get_count(Resp),
lager:info("E: ~p, A: ~p", [Expected, Count]),
Expected =:= get_count(Resp).

-spec wait_for_index(list(), index_name()) -> ok.
wait_for_index(Cluster, Index) ->
Expand Down Expand Up @@ -547,3 +548,33 @@ commit(Nodes, Index) ->
[Index, ?SOFTCOMMIT]),
rpc:multicall(Nodes, yz_solr, commit, [Index]),
ok.

entropy_data_url({Host, Port}, Index, Params) ->
?FMT("http://~s:~B/internal_solr/~s/entropy_data?~s",
[Host, Port, Index, mochiweb_util:urlencode(Params)]).

-spec merge_config(proplist(), proplist()) -> proplist().
merge_config(Change, Base) ->
lists:ukeymerge(1, lists:keysort(1, Change), lists:keysort(1, Base)).

-spec write_objs([node()], bucket()) -> ok.
write_objs(Cluster, Bucket) ->
lager:info("Writing 1000 objects"),
write_objs(Cluster, Bucket, 1000).

write_objs(Cluster, Bucket, NumObjects) ->
lager:info("Writing ~B objects", [NumObjects]),
lists:foreach(write_obj(Cluster, Bucket), lists:seq(1, NumObjects)).

-spec write_obj([node()], bucket()) -> fun().
write_obj(Cluster, Bucket) ->
fun(N) ->
PL = [{name_s,<<"yokozuna">>}, {num_i,N}],
Key = list_to_binary(io_lib:format("key_~B", [N])),
Body = mochijson2:encode(PL),
HP = yz_rt:select_random(yz_rt:host_entries(rt:connection_info(
Cluster))),
CT = "application/json",
lager:info("Writing object with bkey ~p [~p]", [{Bucket, Key}, HP]),
yz_rt:http_put(HP, Bucket, Key, CT, Body)
end.
55 changes: 43 additions & 12 deletions src/yz_entropy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,63 @@
-compile(export_all).
-include("yokozuna.hrl").


%%%===================================================================
%%% Private
%%%===================================================================

%% @doc Iterate all the entropy data in `Index' calling `Fun' for
%% every 100 entries.
-spec iterate_entropy_data(index_name(), list(), function()) -> ok.
-spec iterate_entropy_data(index_name(), list(), function()) ->
ok|error|not_available.
iterate_entropy_data(Index, Filter, Fun) ->
case yz_solr:ping(Index) of
true ->
Filter2 = [{continuation, none},
{limit, app_helper:get_env(?YZ_APP_NAME, entropy_data_limit, 100)}|Filter],
ED = yz_solr:entropy_data(Index, Filter2),
iterate_entropy_data(Index, Filter2, Fun, ED);
{limit,
app_helper:get_env(?YZ_APP_NAME,
entropy_data_limit, 100)}|Filter],
case get_entropy_data(Index, Filter2) of
{ok, ED} ->
iterate_entropy_data(Index, Filter2, Fun, ED);
{Err, _ED} ->
Err
end;
_ ->
ok
?NOTICE("Can't ping Solr to start iterating over entropy data"),
not_available
end.

%%%===================================================================
%%% Private
%%%===================================================================

-spec iterate_entropy_data(index_name(), list(), function(), ED::entropy_data())
-> ok|error.
iterate_entropy_data(Index, Filter, Fun, #entropy_data{more=true,
continuation=Cont,
pairs=Pairs}) ->
%% apply function to pairs before iterating through the next set
lists:foreach(Fun, Pairs),
Filter2 = lists:keyreplace(continuation, 1, Filter, {continuation, Cont}),
ED = yz_solr:entropy_data(Index, Filter2),
iterate_entropy_data(Index, Filter2, Fun, ED);
case get_entropy_data(Index, Filter2) of
{ok, ED} ->
iterate_entropy_data(Index, Filter2, Fun, ED);
{Err, _ED} ->
Err
end;
iterate_entropy_data(_, _, Fun, #entropy_data{more=false,
pairs=Pairs}) ->
lists:foreach(Fun, Pairs).

-spec get_entropy_data(index_name(), list()) ->
{ok|error, entropy_data()}.
get_entropy_data(Index, Filter) ->
case yz_solr:entropy_data(Index, Filter) of
{error, {error, req_timedout}} ->
?ERROR("failed to iterate over entropy data due to request"
++ " exceeding timeout ~b for filter params ~p",
[?YZ_SOLR_REQUEST_TIMEOUT, Filter]),
{error, #entropy_data{more=false, pairs=[]}};
{error, Err} ->
?ERROR("failed to iterate over entropy data due to request"
++ " error ~p for filter params ~p", [Err, Filter]),
{error, #entropy_data{more=false, pairs=[]}};
ED ->
{ok, ED}
end.
Loading

0 comments on commit 9c72938

Please sign in to comment.