Skip to content

Commit

Permalink
Merge pull request #569 from basho/merge/2.0-into-2.1
Browse files Browse the repository at this point in the history
Merge/2.0 into 2.1

Reviewed-by: jvoegele
  • Loading branch information
borshop committed Oct 7, 2015
2 parents 3486820 + 4d8a71c commit 2fbae04
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 69 deletions.
80 changes: 54 additions & 26 deletions java_src/com/basho/yokozuna/handler/EntropyData.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.io.IOException;

import org.apache.commons.codec.binary.Base64;
import javax.xml.bind.DatatypeConverter;

import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -117,40 +117,41 @@ public void handleRequestBody(final SolrQueryRequest req, final SolrQueryRespons
tmp = te.next();
}

String text;
String[] vals;
String docPartition;
String vsn;
String riakBType;
String riakBName;
String riakKey;
String hash;
int count = 0;
BytesRef current = null;
final Bits liveDocs = rdr.getLiveDocs();

while(!endOfItr(tmp) && count < n) {
if (isLive(liveDocs, te)) {
current = BytesRef.deepCopyOf(tmp);
text = tmp.utf8ToString();
final String text = tmp.utf8ToString();
if (log.isDebugEnabled()) {
log.debug("text: " + text);
}
vals = text.split(" ");
final String [] vals = text.split(" ");

vsn = vals[0];
docPartition = vals[1];
riakBType = decodeBase64DocPart(vals[2]);
riakBName = decodeBase64DocPart(vals[3]);
riakKey = decodeBase64DocPart(vals[4]);
hash = vals[5];
final String docPartition = vals[1];

/*
If the partition matches the one we are looking for,
parse the version, bkey, and object hash from the
entropy data field (term).
*/
if (partition.equals(docPartition)) {
SolrDocument tmpDoc = new SolrDocument();
final String vsn = vals[0];

final String [] decoded = decodeForVersion(vsn,
vals[2],
vals[3],
vals[4]);

final String hash = vals[5];

final SolrDocument tmpDoc = new SolrDocument();
tmpDoc.addField("vsn", vsn);
tmpDoc.addField("riak_bucket_type", riakBType);
tmpDoc.addField("riak_bucket_name", riakBName);
tmpDoc.addField("riak_key", riakKey);
tmpDoc.addField("riak_bucket_type", decoded[0]);
tmpDoc.addField("riak_bucket_name", decoded[1]);
tmpDoc.addField("riak_key", decoded[2]);
tmpDoc.addField("base64_hash", hash);
docs.add(tmpDoc);
count++;
Expand All @@ -163,7 +164,8 @@ public void handleRequestBody(final SolrQueryRequest req, final SolrQueryRespons
rsp.add("more", false);
} else {
rsp.add("more", true);
final String newCont = Base64.encodeBase64URLSafeString(current.bytes);
final String newCont =
org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString(current.bytes);
// The continue context for next req to start where
// this one finished.
rsp.add("continuation", newCont);
Expand All @@ -182,7 +184,7 @@ static boolean isLive(final Bits liveDocs, final TermsEnum te) throws IOExceptio
}

static BytesRef decodeCont(final String cont) {
final byte[] bytes = Base64.decodeBase64(cont);
final byte[] bytes = org.apache.commons.codec.binary.Base64.decodeBase64(cont);
return new BytesRef(bytes);
}

Expand All @@ -209,12 +211,38 @@ public String getSource() {
return "TODO: implement getSource";
}

/**
@param vsn a String vsn number referring to the item's ed handler version
@param riakBType riak bucket-type
@param riakBName riak bucket-name
@param riakKey riak key
@return a String array consisting of a Bucket Type, Bucket Name, and Riak Key
*/
private String [] decodeForVersion(String vsn, String riakBType, String riakBName, String riakKey) {
final String [] bKeyInfo;
switch(Integer.parseInt(vsn)) {
case 1:
bKeyInfo = new String [] {riakBType, riakBName, riakKey};
break;
default:
bKeyInfo = new String []
{
decodeBase64DocPart(riakBType),
decodeBase64DocPart(riakBName),
decodeBase64DocPart(riakKey)
};
break;
}
return bKeyInfo;
}

/**
@param base64EncodedVal base64 encoded string
@return a string of decoded base64 bytes
*/
*/
private String decodeBase64DocPart(String base64EncodedVal) {
byte[] bytes = Base64.decodeBase64(base64EncodedVal);
return new String(bytes);
return new String(DatatypeConverter.parseBase64Binary(
base64EncodedVal));
}
}

3 changes: 1 addition & 2 deletions riak_test/yokozuna_essential.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ confirm() ->
setup_indexing(Cluster, PBConns, YZBenchDir),
verify_non_existent_index(Cluster, <<"froot">>),
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
%% wait for soft-commit
timer:sleep(1100),
yz_rt:commit(Cluster, ?INDEX),
Ref = async_query(Cluster, YZBenchDir),
%% Verify data exists before running join
timer:sleep(30000),
Expand Down
8 changes: 2 additions & 6 deletions riak_test/yz_errors.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,9 @@ expect_bad_xml(Cluster) ->
Headers = [{"content-type", CT}],
Body = "<\"bad\" \"xml\"></",
{ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body, Opts),
%% Sleep for soft commit
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
%% still store the value in riak
{ok, "200", _, Body} = ibrowse:send_req(URL, [{"accept", CT}], get, []),
%% Sleep for soft commit
timer:sleep(1100),
?assert(search_expect(HP, Index, ?YZ_ERR_FIELD_S, "1", 1)),
ok.

Expand All @@ -89,8 +86,7 @@ expect_bad_query(Cluster) ->
Headers = [{"content-type", CT}],
Body = "",
{ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body, Opts),
%% Sleep for soft commit
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
%% still store the value in riak
{ok, "200", _, Body} = ibrowse:send_req(URL, [{"accept", CT}], get, []),
%% send a bad query
Expand Down
6 changes: 3 additions & 3 deletions riak_test/yz_fallback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ confirm() ->
rt:wait_for_cluster_service(Cluster, yokozuna),
create_index(Cluster, ?INDEX),
Cluster2 = take_node_down(Cluster),
write_obj(Cluster2, ?BUCKET, ?KEY),
write_obj(Cluster2, ?INDEX, ?BUCKET, ?KEY),
check_fallbacks(Cluster2, ?INDEX, ?BUCKET, ?KEY),
HP = riak_hp(yz_rt:select_random(Cluster2), Cluster2),
?assert(yz_rt:search_expect(yokozuna, HP, ?INDEX, "*", "*", 1)),
Expand Down Expand Up @@ -71,7 +71,7 @@ take_node_down(Cluster) ->
timer:sleep(5000),
Cluster -- [DownNode].

write_obj(Cluster, {BType, BName}, Key) ->
write_obj(Cluster, Index, {BType, BName}, Key) ->
Node = yz_rt:select_random(Cluster),
{Host, Port} = riak_hp(Node, Cluster),
lager:info("write obj to node ~p", [Node]),
Expand All @@ -80,7 +80,7 @@ write_obj(Cluster, {BType, BName}, Key) ->
Headers = [{"content-type", "text/plain"}],
Body = <<"yokozuna">>,
{ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body, []),
timer:sleep(1100).
yz_rt:commit(Cluster, Index).

riak_hp(Node, Cluster) ->
CI = yz_rt:connection_info(Cluster),
Expand Down
3 changes: 1 addition & 2 deletions riak_test/yz_languages.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ store_and_search(Cluster, Bucket, Index, Key, Headers, CT, Body, Field, Term) ->
URL = bucket_url(HP, Bucket, Key),
lager:info("Storing to bucket ~s", [URL]),
{ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body),
%% Sleep for soft commit
timer:sleep(1000),
yz_rt:commit(Cluster, Index),
{ok, "200", _, ReturnedBody} = ibrowse:send_req(URL, [{"accept", CT}], get, []),
?assertEqual(Body, list_to_binary(ReturnedBody)),
lager:info("Verify values are indexed"),
Expand Down
11 changes: 4 additions & 7 deletions riak_test/yz_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ confirm_multivalued_field(Cluster) ->
{Host, Port} = HP,
%% populate a value
{ok, "204", _, _} = ibrowse:send_req(URL, [?CT_JSON], put, Body),
%% Sleep for soft commit
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
Search = <<"name_ss:turner">>,
{ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)),
F = fun(_) ->
Expand Down Expand Up @@ -276,8 +275,7 @@ confirm_multivalued_field_json_array(Cluster) ->
lager:info("Storing to bucket ~s", [URL]),
{Host, Port} = HP,
{ok, "204", _, _} = ibrowse:send_req(URL, [?CT_JSON], put, Body),
%% Sleep for soft commit
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
Search = <<"groups_s:3304cf79">>,
{ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)),
F = fun(_) ->
Expand Down Expand Up @@ -309,8 +307,7 @@ confirm_multivalued_field_with_high_n_val(Cluster) ->
lager:info("Storing to bucket ~s", [URL]),
{Host, Port} = HP,
{ok, "204", _, _} = ibrowse:send_req(URL, [?CT_JSON], put, Body),
%% Sleep for soft commit
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
Search = <<"groups_s:3304cf79">>,
{ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)),
F = fun(_) ->
Expand Down Expand Up @@ -353,7 +350,7 @@ confirm_stored_fields(Cluster) ->
lager:info("Storing to bucket ~s", [URL]),
{Host, Port} = HP,
{ok, "204", _, _} = ibrowse:send_req(URL, [?CT_JSON], put, Body),
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
Search = <<"float_tf:3.14">>,
{ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)),
F = fun(_) ->
Expand Down
3 changes: 1 addition & 2 deletions riak_test/yz_ring_resizing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ confirm() ->
%% Index and load data
setup_indexing(Cluster, PBConns, YZBenchDir),
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
%% wait for soft-commit
timer:sleep(1000),
yz_rt:commit(Cluster, ?INDEX),

%% Start a query and wait for it to start
Ref1 = async_query(Cluster, YZBenchDir),
Expand Down
3 changes: 1 addition & 2 deletions riak_test/yz_rs_migration_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,7 @@ create_pb_conn(Node) ->
load_data(Cluster, YZBenchDir, NumKeys) ->
{ExitCode, _} = yz_rt:load_data(Cluster, ?FRUIT_BUCKET, YZBenchDir, NumKeys),
?assertEqual(0,ExitCode),
% Sleep for soft-commit.
timer:sleep(1100).
yz_rt:commit(Cluster, ?FRUIT_BUCKET).

query_data(Cluster, YZBenchDir, NumKeys, Time, DefaultField) ->
lager:info("Run query against cluster ~p", [Cluster]),
Expand Down
10 changes: 10 additions & 0 deletions riak_test/yz_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
-define(YZ_RT_ETS_OPTS, [public, named_table, {write_concurrency, true}]).
-define(NO_BODY, <<>>).
-define(IBROWSE_TIMEOUT, 60000).
-define(SOFTCOMMIT, 1000).

-type host() :: string().
-type portnum() :: integer().
Expand Down Expand Up @@ -537,3 +538,12 @@ internal_solr_url(Host, Port, Index, Name, Term, Shards) ->
|| {_, ShardPort} <- Shards],
?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=~s:~s&shards=~s",
[Host, Port, Index, Name, Term, string:join(Ss, ",")]).

-spec commit([node()], index_name()) -> ok.
commit(Nodes, Index) ->
%% Wait for yokozuna index to trigger, then force a commit
timer:sleep(?SOFTCOMMIT),
lager:info("Commit search writes to ~s at softcommit (default) ~p",
[Index, ?SOFTCOMMIT]),
rpc:multicall(Nodes, yz_solr, commit, [Index]),
ok.
5 changes: 1 addition & 4 deletions riak_test/yz_search_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ confirm() ->
Key = <<"Problem">>,
Value = <<"FOR YOU">>,
ok = yz_rt:http_put(HP, ?BUCKET, Key, Value),

%% time for soft auto commit
timer:sleep(1100),

yz_rt:commit(Cluster, ?INDEX),
URL = yz_rt:search_url(HP, ?INDEX),

test_search_get_and_post_query(HP, URL, ?INDEX),
Expand Down
20 changes: 10 additions & 10 deletions riak_test/yz_siblings.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ test_siblings(Cluster) ->
HP = hd(yz_rt:host_entries(rt:connection_info(Cluster))),
yz_rt:create_index_http(Cluster, HP, Index),
ok = allow_mult(Cluster, Index),
ok = write_sibs(HP, Bucket, EncKey),
ok = write_sibs(Cluster, HP, Index, Bucket, EncKey),
%% Verify 10 times because of non-determinism in coverage
[ok = verify_sibs(HP, Index) || _ <- lists:seq(1,10)],
ok = reconcile_sibs(HP, Bucket, EncKey),
ok = reconcile_sibs(Cluster, HP, Index, Bucket, EncKey),
[ok = verify_reconcile(HP, Index) || _ <- lists:seq(1,10)],
ok = delete_key(HP, Bucket, EncKey),
ok = delete_key(Cluster, HP, Index, Bucket, EncKey),
[ok = verify_deleted(HP, Index) || _ <- lists:seq(1,10)],
ok.

write_sibs({Host, Port}, Bucket, EncKey) ->
write_sibs(Cluster, {Host, Port}, Index, Bucket, EncKey) ->
lager:info("Write siblings"),
URL = bucket_url({Host, Port}, Bucket, EncKey),
Opts = [],
Expand All @@ -48,8 +48,7 @@ write_sibs({Host, Port}, Bucket, EncKey) ->
Body4 = <<"This is value delta">>,
[{ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, B, Opts)
|| B <- [Body1, Body2, Body3, Body4]],
%% Sleep for soft commit
timer:sleep(1000),
yz_rt:commit(Cluster, Index),
ok.

verify_sibs(HP, Index) ->
Expand All @@ -59,23 +58,24 @@ verify_sibs(HP, Index) ->
[true = yz_rt:search_expect(HP, Index, "text", S, 1) || S <- Values],
ok.

reconcile_sibs(HP, Bucket, EncKey) ->
reconcile_sibs(Cluster, HP, Index, Bucket, EncKey) ->
lager:info("Reconcile the siblings"),
{VClock, _} = http_get(HP, Bucket, EncKey),
NewValue = <<"This is value alpha, beta, charlie, and delta">>,
ok = http_put(HP, Bucket, EncKey, VClock, NewValue),
timer:sleep(1100),
yz_rt:commit(Cluster, Index),
ok.

delete_key(HP, Bucket, EncKey) ->
delete_key(Cluster, HP, Index, Bucket, EncKey) ->
lager:info("Delete the key"),
URL = bucket_url(HP, Bucket, EncKey),
{ok, "200", RH, _} = ibrowse:send_req(URL, [], get, [], []),
VClock = proplists:get_value("X-Riak-Vclock", RH),
Headers = [{"x-riak-vclock", VClock}],
{ok, "204", _, _} = ibrowse:send_req(URL, Headers, delete, [], []),
%% Wait for Riak delete timeout + Solr soft-commit
timer:sleep(4100),
timer:sleep(3000),
yz_rt:commit(Cluster, Index),
ok.

verify_deleted(HP, Index) ->
Expand Down
2 changes: 1 addition & 1 deletion src/yz_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
-define(MD_VTAG, <<"X-Riak-VTag">>).
-define(YZ_ID_SEP, "*").
-define(YZ_ID_VER, "1").
-define(YZ_ED_VER, <<"2">>).
-define(YZ_ED_VER, <<"3">>).

%%%===================================================================
%%% API
Expand Down
2 changes: 1 addition & 1 deletion src/yz_entropy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ iterate_entropy_data(Index, Filter, Fun) ->
case yz_solr:ping(Index) of
true ->
Filter2 = [{continuation, none},
{limit, 100}|Filter],
{limit, app_helper:get_env(?YZ_APP_NAME, entropy_data_limit, 100)}|Filter],
ED = yz_solr:entropy_data(Index, Filter2),
iterate_entropy_data(Index, Filter2, Fun, ED);
_ ->
Expand Down
2 changes: 1 addition & 1 deletion src/yz_solr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
{delete_index, deleteIndex},
{delete_data_dir, deleteDataDir}]).
-define(FIELD_ALIASES, [{continuation, continue},
{limit,n}]).
{limit, n}]).
-define(QUERY(Bin), {struct, [{'query', Bin}]}).
-define(SOLR_TIMEOUT, 60000).

Expand Down
2 changes: 1 addition & 1 deletion tools/build-jar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ if [ ! -e "../priv/java_lib" ]; then
mkdir ../priv/java_lib
fi

YZ_JAR_VSN=2
YZ_JAR_VSN=3
YZ_JAR_NAME=yokozuna-$YZ_JAR_VSN.jar
YZ_JAR_SHA=$YZ_JAR_NAME.sha

Expand Down
2 changes: 1 addition & 1 deletion tools/grab-solr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ then
fi

JAVA_LIB=../priv/java_lib
YZ_JAR_VSN=2
YZ_JAR_VSN=3
YZ_JAR_NAME=yokozuna-$YZ_JAR_VSN.jar

if [ ! -e $JAVA_LIB/$YZ_JAR_NAME ]
Expand Down

0 comments on commit 2fbae04

Please sign in to comment.