diff --git a/include/yokozuna.hrl b/include/yokozuna.hrl index c6ed269f..e876cfae 100644 --- a/include/yokozuna.hrl +++ b/include/yokozuna.hrl @@ -55,7 +55,7 @@ %% Short representation of a preflist, partition + n_val -type short_preflist() :: {p(), n()}. %% Riak bucket --type bucket() :: binary(). +-type bucket() :: Name :: binary() | {Type :: binary(), Name :: binary()}. %% Riak key -type key() :: binary(). %% Bucket/Key pair @@ -341,6 +341,13 @@ -define(YZ_RK_FIELD_XML, ?YZ_FIELD_XML(?YZ_RK_FIELD_S)). -define(YZ_RK_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_rk\" and @type=\"_yz_str\" and @indexed=\"true\" and @stored=\"true\"]"). +%% Riak bucket type +-define(YZ_RT_FIELD, '_yz_rt'). +-define(YZ_RT_FIELD_S, "_yz_rt"). +-define(YZ_RT_FIELD_B, <<"_yz_rt">>). +-define(YZ_RT_FIELD_XML, ?YZ_FIELD_XML(?YZ_RT_FIELD_S)). +-define(YZ_RT_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_rt\" and @type=\"_yz_str\" and @indexed=\"true\" and @stored=\"true\"]"). + %% Riak bucket -define(YZ_RB_FIELD, '_yz_rb'). -define(YZ_RB_FIELD_S, "_yz_rb"). diff --git a/java_src/com/basho/yokozuna/handler/EntropyData.java b/java_src/com/basho/yokozuna/handler/EntropyData.java index 30853c1d..157611b5 100644 --- a/java_src/com/basho/yokozuna/handler/EntropyData.java +++ b/java_src/com/basho/yokozuna/handler/EntropyData.java @@ -120,7 +120,9 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) String text = null; String[] vals = null; String docPartition = null; - String riakBucket = null; + String vsn = null; + String riakBType = null; + String riakBName = null; String riakKey = null; String hash = null; int count = 0; @@ -135,13 +137,18 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) log.debug("text: " + text); vals = text.split(" "); - docPartition = vals[0]; - riakBucket = vals[1]; - riakKey = vals[2]; - hash = vals[3]; + vsn = vals[0]; + docPartition = vals[1]; + riakBType = vals[2]; + riakBName = vals[3]; + riakKey = vals[4]; + hash = vals[5]; + if (partition.equals(docPartition)) { SolrDocument tmpDoc = new SolrDocument(); - tmpDoc.addField("riak_bucket", riakBucket); + tmpDoc.addField("vsn", vsn); + tmpDoc.addField("riak_bucket_type", riakBType); + tmpDoc.addField("riak_bucket_name", riakBName); tmpDoc.addField("riak_key", riakKey); tmpDoc.addField("base64_hash", hash); docs.add(tmpDoc); diff --git a/misc/bench/schemas/fruit_schema.xml b/misc/bench/schemas/fruit_schema.xml index 95160a7c..aaf64cb5 100644 --- a/misc/bench/schemas/fruit_schema.xml +++ b/misc/bench/schemas/fruit_schema.xml @@ -10,20 +10,22 @@ rue"/> - + - + - + - + - + + + diff --git a/misc/bench/src/yz_driver.erl b/misc/bench/src/yz_driver.erl index 354b89be..5bf13310 100644 --- a/misc/bench/src/yz_driver.erl +++ b/misc/bench/src/yz_driver.erl @@ -28,6 +28,26 @@ %% API %% ==================================================================== +-type bucket() :: bucket() | {bucket(), bucket()}. + +-spec bucket_path(bucket()) -> binary(). +bucket_path({Type, Name}) -> + <<"/types/",Type/binary,"/buckets/",Name/binary,"/keys">>; +bucket_path(Name) -> + <<"/buckets/",Name/binary,"/keys">>. + +-spec search_path(bucket()) -> binary(). +search_path({BucketType, _}) -> + <<"/solr/",BucketType/binary,"/select">>; +search_path(BucketName) -> + <<"/solr/",BucketName/binary,"/select">>. + +-spec bucket_type(bucket()) -> binary(). +bucket_type({Type, _}) -> + Type; +bucket_type(Name) -> + Name. + start_apps(Apps) -> [application:start(App) || App <- Apps]. @@ -43,19 +63,20 @@ new(_Id) -> false -> ok end, - Index = basho_bench_config:get(index, "test"), + Bucket = basho_bench_config:get(bucket, {<<"test">>, <<"test">>}), + Index = basho_bench_config:get(index, bucket_type(Bucket)), HTTP = basho_bench_config:get(http_conns, [{"127.0.0.1", 8098}]), PB = basho_bench_config:get(pb_conns, [{"127.0.0.1", 8087}]), - IPath = basho_bench_config:get(index_path, "/riak/test"), - SPath = basho_bench_config:get(search_path, "/search/test"), - IURLs = array:from_list(lists:map(make_url(IPath), HTTP)), + BPath = basho_bench_config:get(bucket_path, bucket_path(Bucket)), + SPath = basho_bench_config:get(search_path, search_path(Index)), + IURLs = array:from_list(lists:map(make_url(BPath), HTTP)), SURLs = array:from_list(lists:map(make_url(SPath), HTTP)), Conns = array:from_list(lists:map(make_conn(Secure, User, Password, Cert), PB)), N = length(HTTP), M = length(PB), {ok, #state{pb_conns={Conns, {0,M}}, - index=list_to_binary(Index), + index=Index, iurls={IURLs, {0,N}}, surls={SURLs, {0,N}}}}. diff --git a/priv/default_schema.xml b/priv/default_schema.xml index 8e871e46..1e4d8ef2 100644 --- a/priv/default_schema.xml +++ b/priv/default_schema.xml @@ -130,6 +130,9 @@ + + + diff --git a/riak_test/aae_test.erl b/riak_test/aae_test.erl index 0d44fea6..59e5df72 100644 --- a/riak_test/aae_test.erl +++ b/riak_test/aae_test.erl @@ -2,6 +2,7 @@ -compile(export_all). -include_lib("eunit/include/eunit.hrl"). -define(NUM_KEYS, 10000). +-define(BUCKET, {<<"fruit_aae">>, <<"fruit">>}). -define(INDEX, <<"fruit_aae">>). -define(REPAIR_MFA, {yz_exchange_fsm, repair, 2}). -define(CFG, @@ -30,7 +31,7 @@ confirm() -> PBConns = yz_rt:open_pb_conns(Cluster), PBConn = yz_rt:select_random(PBConns), setup_index(Cluster, PBConn, YZBenchDir), - yz_rt:load_data(Cluster, ?INDEX, YZBenchDir, ?NUM_KEYS), + {0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS), lager:info("Verify data was indexed"), verify_num_match(Cluster, ?NUM_KEYS), %% Wait for a full round of exchange and then get total repair @@ -107,7 +108,7 @@ setup_index(Cluster, PBConn, YZBenchDir) -> RawSchema = read_schema(YZBenchDir), yz_rt:store_schema(PBConn, ?INDEX, RawSchema), ok = yz_rt:create_index(Node, ?INDEX, ?INDEX), - ok = yz_rt:set_index(Node, ?INDEX), + ok = yz_rt:set_bucket_type_index(Node, ?INDEX), yz_rt:wait_for_index(Cluster, ?INDEX). %% @doc Verify that no repair has happened since `TS'. diff --git a/riak_test/yokozuna_essential.erl b/riak_test/yokozuna_essential.erl index 3740a4cc..20504790 100644 --- a/riak_test/yokozuna_essential.erl +++ b/riak_test/yokozuna_essential.erl @@ -4,7 +4,6 @@ host_entries/1, run_bb/2, search_expect/5, search_expect/6, - set_index/2, verify_count/2, wait_for_joins/1, write_terms/2]). -include_lib("eunit/include/eunit.hrl"). @@ -23,6 +22,7 @@ %% cluster and you will see zero repairs occur. -define(FRUIT_SCHEMA_NAME, <<"fruit">>). +-define(BUCKET, {<<"fruit">>,<<"b1">>}). -define(INDEX, <<"fruit">>). -define(NUM_KEYS, 10000). -define(SUCCESS, 0). @@ -47,7 +47,7 @@ confirm() -> wait_for_joins(Cluster), rt:wait_for_cluster_service(Cluster, yokozuna), setup_indexing(Cluster, PBConns, YZBenchDir), - {0, _} = yz_rt:load_data(Cluster, ?INDEX, YZBenchDir, ?NUM_KEYS), + {0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS), %% wait for soft-commit timer:sleep(1000), Ref = async_query(Cluster, YZBenchDir), @@ -63,19 +63,21 @@ confirm() -> yz_rt:close_pb_conns(PBConns), pass. +%% @doc Verify that the delete call made my `yz_kv:cleanup' can deal +%% with a key which contains characters like '/', ':' and '-'. test_escaped_key(Cluster) -> lager:info("Test key escape"), {H, P} = hd(host_entries(rt:connection_info(Cluster))), Value = <<"Never gonna give you up">>, - Bucket = "escaped", + Bucket = {<<"escaped">>,<<"b1">>}, Key = edoc_lib:escape_uri("rick/astley-rolled:derp"), ok = yz_rt:http_put({H, P}, Bucket, Key, Value), ok = http_get({H, P}, Bucket, Key), ok. -http_get({Host, Port}, Bucket, Key) -> - URL = lists:flatten(io_lib:format("http://~s:~s/buckets/~s/keys/~s", - [Host, integer_to_list(Port), Bucket, Key])), +http_get({Host, Port}, {BType, BName}, Key) -> + URL = lists:flatten(io_lib:format("http://~s:~s/types/~s/buckets/~s/keys/~s", + [Host, integer_to_list(Port), BType, BName, Key])), Headers = [{"accept", "text/plain"}], {ok, "200", _, _} = ibrowse:send_req(URL, Headers, get, [], []), ok. @@ -92,7 +94,7 @@ test_tagging(Cluster) -> write_with_tag({Host, Port}) -> lager:info("Tag the object tagging/test"), - URL = lists:flatten(io_lib:format("http://~s:~s/buckets/tagging/keys/test", + URL = lists:flatten(io_lib:format("http://~s:~s/types/tagging/buckets/b1/keys/test", [Host, integer_to_list(Port)])), Opts = [], Body = <<"testing tagging">>, @@ -116,10 +118,9 @@ async_query(Cluster, YZBenchDir) -> {operations, [{Apple,1}]}, {http_conns, Hosts}, {pb_conns, []}, - {search_path, "/search/" ++ binary_to_list(?INDEX)}, + {bucket, ?BUCKET}, {shutdown_on_error, true}], - - File = "bb-query-fruit-" ++ binary_to_list(?INDEX), + File = "bb-query-fruit", write_terms(File, Cfg), run_bb(async, File). @@ -130,7 +131,7 @@ delete_key(Cluster, Key) -> Node = yz_rt:select_random(Cluster), lager:info("Deleting key ~s", [Key]), {ok, C} = riak:client_connect(Node), - C:delete(?INDEX, list_to_binary(Key)). + C:delete(?BUCKET, list_to_binary(Key)). delete_some_data(Cluster, ReapSleep) -> Keys = yz_rt:random_keys(?NUM_KEYS), @@ -168,13 +169,15 @@ setup_indexing(Cluster, PBConns, YZBenchDir) -> yz_rt:store_schema(PBConn, ?FRUIT_SCHEMA_NAME, RawSchema), yz_rt:wait_for_schema(Cluster, ?FRUIT_SCHEMA_NAME, RawSchema), ok = create_index(Node, ?INDEX, ?FRUIT_SCHEMA_NAME), - ok = set_index(Node, ?INDEX), + ok = yz_rt:set_bucket_type_index(Node, ?INDEX), ok = create_index(Node, <<"tagging">>), - ok = set_index(Node, <<"tagging">>), + ok = yz_rt:set_bucket_type_index(Node, <<"tagging">>), ok = create_index(Node, <<"siblings">>), - ok = set_index(Node, <<"siblings">>), + ok = yz_rt:set_bucket_type_index(Node, <<"siblings">>), + ok = create_index(Node, <<"escaped">>), + ok = yz_rt:set_bucket_type_index(Node, <<"escaped">>), [yz_rt:wait_for_index(Cluster, I) - || I <- [<<"fruit">>, <<"tagging">>, <<"siblings">>]]. + || I <- [<<"fruit">>, <<"tagging">>, <<"siblings">>, <<"escaped">>]]. verify_deletes(Cluster, KeysDeleted, YZBenchDir) -> NumDeleted = length(KeysDeleted), @@ -191,10 +194,9 @@ verify_deletes(Cluster, KeysDeleted, YZBenchDir) -> {operations, [{Apple,1}]}, {http_conns, Hosts}, {pb_conns, []}, - {search_path, "/search/" ++ binary_to_list(?INDEX)}, + {bucket, ?BUCKET}, {shutdown_on_error, true}], - - File = "bb-verify-deletes-" ++ binary_to_list(?INDEX), + File = "bb-verify-deletes", write_terms(File, Cfg), check_status(run_bb(sync, File)). diff --git a/riak_test/yz_errors.erl b/riak_test/yz_errors.erl index 9caec91a..b5423764 100644 --- a/riak_test/yz_errors.erl +++ b/riak_test/yz_errors.erl @@ -39,10 +39,11 @@ test_errors(Cluster) -> expect_bad_json(Cluster) -> Index = <<"bad_json">>, + Bucket = {<<"bad_json">>,<<"bucket">>}, HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))), - ok = create_index(Cluster, HP, Index), - lager:info("Write bad json"), - URL = bucket_url(HP, Index, "test"), + ok = create_index(Cluster, Index), + lager:info("Write bad json [~p]", [HP]), + URL = bucket_url(HP, Bucket, "test"), Opts = [], CT = "application/json", Headers = [{"content-type", CT}], @@ -59,10 +60,11 @@ expect_bad_json(Cluster) -> expect_bad_xml(Cluster) -> Index = <<"bad_xml">>, + Bucket = {Index,<<"bucket">>}, HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))), - ok = create_index(Cluster, HP, Index), - lager:info("Write bad xml"), - URL = bucket_url(HP, Index, "test"), + ok = create_index(Cluster, Index), + lager:info("Write bad xml [~p]", [HP]), + URL = bucket_url(HP, Bucket, "test"), Opts = [], CT = "application/xml", Headers = [{"content-type", CT}], @@ -79,10 +81,11 @@ expect_bad_xml(Cluster) -> expect_bad_query(Cluster) -> Index = <<"bad_query">>, + Bucket = {Index, <<"bucket">>}, HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))), - ok = create_index(Cluster, HP, Index), - lager:info("Write bad query"), - URL = bucket_url(HP, Index, "test"), + ok = create_index(Cluster, Index), + lager:info("Write bad query [~p]", [HP]), + URL = bucket_url(HP, Bucket, "test"), Opts = [], CT = "text/plain", Headers = [{"content-type", CT}], @@ -100,22 +103,24 @@ expect_bad_query(Cluster) -> index_url({Host,Port}, Index) -> ?FMT("http://~s:~B/yz/index/~s", [Host, Port, Index]). -bucket_url({Host,Port}, Bucket, Key) -> - ?FMT("http://~s:~B/buckets/~s/keys/~s", [Host, Port, Bucket, Key]). +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", [Host, Port, BType, BName, Key]). -search_url({Host,Port}, Bucket) -> - ?FMT("http://~s:~B/search/~s", [Host, Port, Bucket]). +search_url({Host,Port}, Index) -> + ?FMT("http://~s:~B/solr/~s/select", [Host, Port, Index]). http(Method, URL, Headers, Body) -> Opts = [], ibrowse:send_req(URL, Headers, Method, Body, Opts). -create_index(Cluster, HP, Index) -> +create_index(Cluster, Index) -> Node = yz_rt:select_random(Cluster), - lager:info("create_index ~s [~p]", [Index, HP]), + HP = hd(host_entries(rt:connection_info([Node]))), + lager:info("create_index ~s [~p]", [Index, Node]), URL = index_url(HP, Index), Headers = [{"content-type", "application/json"}], {ok, Status, _, _} = http(put, URL, Headers, ?NO_BODY), - yz_rt:set_index(Node, Index), + yz_rt:set_bucket_type_index(Node, Index), + yz_rt:wait_for_bucket_type(Cluster, Index), yz_rt:wait_for_index(Cluster, Index), ?assertEqual("204", Status). diff --git a/riak_test/yz_fallback.erl b/riak_test/yz_fallback.erl index 85d80d65..741f095f 100644 --- a/riak_test/yz_fallback.erl +++ b/riak_test/yz_fallback.erl @@ -4,6 +4,8 @@ -include_lib("eunit/include/eunit.hrl"). -define(NUM_KEYS, 1000). -define(INDEX, <<"fallback">>). +-define(BUCKET, {?INDEX, <<"bucket">>}). +-define(KEY, <<"key">>). -define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))). -define(CFG, [{riak_core, @@ -21,27 +23,27 @@ confirm() -> rt:wait_for_cluster_service(Cluster, yokozuna), create_index(Cluster, ?INDEX), Cluster2 = take_node_down(Cluster), - write_obj(Cluster2), - check_fallbacks(Cluster2), + write_obj(Cluster2, ?BUCKET, ?KEY), + check_fallbacks(Cluster2, ?INDEX, ?BUCKET, ?KEY), HP = riak_hp(yz_rt:select_random(Cluster2), Cluster2), ?assert(yz_rt:search_expect(yokozuna, HP, ?INDEX, "*", "*", 1)), pass. -check_fallbacks(Cluster) -> +check_fallbacks(Cluster, Index, Bucket, Key) -> Node = yz_rt:select_random(Cluster), - KVPreflist = kv_preflist(Node, ?INDEX, ?INDEX), + KVPreflist = kv_preflist(Node, Bucket, Key), FallbackPreflist = filter_fallbacks(KVPreflist), LogicalFallbackPL = make_logical(Node, FallbackPreflist), [begin {H, P} = solr_hp(FNode, Cluster), - ?assert(yz_rt:search_expect(solr, {H, P}, ?INDEX, "_yz_pn", integer_to_list(LPN), 0)) + ?assert(yz_rt:search_expect(solr, {H, P}, Index, "_yz_pn", integer_to_list(LPN), 0)) end || {LPN, FNode} <- LogicalFallbackPL]. create_index(Cluster, Index) -> Node = yz_rt:select_random(Cluster), yz_rt:create_index(Node, Index), - ok = yz_rt:set_index(Node, Index), + ok = yz_rt:set_bucket_type_index(Node, Index), timer:sleep(5000). make_logical(Node, Preflist) -> @@ -68,12 +70,12 @@ take_node_down(Cluster) -> timer:sleep(5000), Cluster -- [DownNode]. -write_obj(Cluster) -> +write_obj(Cluster, {BType, BName}, Key) -> Node = yz_rt:select_random(Cluster), {Host, Port} = riak_hp(Node, Cluster), lager:info("write obj to node ~p", [Node]), - URL = ?FMT("http://~s:~s/buckets/~s/keys/~s", - [Host, integer_to_list(Port), ?INDEX, ?INDEX]), + URL = ?FMT("http://~s:~s/types/~s/buckets/~s/keys/~s", + [Host, integer_to_list(Port), BType, BName, Key]), Headers = [{"content-type", "text/plain"}], Body = <<"yokozuna">>, {ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body, []), diff --git a/riak_test/yz_flag_transitions.erl b/riak_test/yz_flag_transitions.erl deleted file mode 100644 index 5cc27503..00000000 --- a/riak_test/yz_flag_transitions.erl +++ /dev/null @@ -1,112 +0,0 @@ -%% @doc Test the addition or removal of the index entry from a bucket -%% that has data. If a bucket already has data then associating -%% it with an index should cause re-indexing under the newly -%% associated index. Conversely, if an index is dissociated from -%% the bucket then that bucket's data should be deleted from the -%% index. -%% -%% NOTE: This is called "flag transition" because originally Yokozuna -%% had an implicit one-to-one mapping between bucket and index -%% name. That is, the names were the same and a given index was -%% responsible for only one bucket. --module(yz_flag_transitions). --compile(export_all). --include_lib("eunit/include/eunit.hrl"). --define(NUM_KEYS, 1000). --define(CFG, - [{riak_core, - [ - {ring_creation_size, 32} - ]}, - {riak_kv, - [ - %% build often - {anti_entropy_build_limit, {100, 1000}}, - {anti_entropy_concurrency, 12} - ]}, - {yokozuna, - [ - {enabled, true}, - {entropy_tick, 1000} - ]}, - {lager, - [{handlers, - [{lager_file_backend, - [{"./log/error.log",error,10485760,"$D0",5}, - {"./log/console.log",info,104857600,"$D0",10}]}]}]} - ]). - -confirm() -> - YZBenchDir = rt_config:get(yz_dir) ++ "/misc/bench", - random:seed(now()), - Cluster = rt:build_cluster(4, ?CFG), - rt:wait_for_cluster_service(Cluster, yokozuna), - verify_index_add(Cluster, YZBenchDir), - verify_index_remove(Cluster), - verify_many_to_one_index_remove(Cluster), - pass. - -%% @doc When an index is associated with a bucket the Yokozuna AAE -%% trees should be cleared. Thus on next exchange missing indexes -%% will be discovered and repaired. -verify_index_add(Cluster, YZBenchDir) -> - lager:info("Verify adding index"), - yz_rt:load_data(Cluster, <<"fruit">>, YZBenchDir, ?NUM_KEYS), - %% Let 1s soft-commit catch up - timer:sleep(1000), - Hosts = yz_rt:host_entries(rt:connection_info(Cluster)), - HP = yz_rt:select_random(Hosts), - lager:info("Verify fruit index doesn't exist"), - {ok, "404", _, _} = yz_rt:search(yokozuna, HP, <<"fruit">>, "*", "*"), - lager:info("Create fruit index + set flag"), - yz_rt:create_index(yz_rt:select_random(Cluster), <<"fruit">>), - yz_rt:set_index(yz_rt:select_random(Cluster), <<"fruit">>), - yz_rt:wait_for_index(Cluster, <<"fruit">>), - - %% TODO: use YZ/KV AAE stats to determine when AAE has covered ring once. - F = fun(Node) -> - lager:info("Verify that AAE re-indexes objects under fruit index [~p]", [Node]), - HP2 = hd(yz_rt:host_entries(rt:connection_info([Node]))), - yz_rt:search_expect(HP2, <<"fruit">>, "*", "*", ?NUM_KEYS) - end, - yz_rt:wait_until(Cluster, F). - -%% @doc When an index is dissociated the indexes for that bucket's -%% index should be deleted. -verify_index_remove(Cluster) -> - lager:info("Verify removing index"), - Node = yz_rt:select_random(Cluster), - yz_rt:remove_index(Node, <<"fruit">>), - F = fun(Node2) -> - lager:info("Verify fruit indexes are deleted [~p]", [Node2]), - HP = hd(yz_rt:host_entries(rt:connection_info([Node2]))), - yz_rt:search_expect(HP, <<"fruit">>, "*", "*", 0) - end, - yz_rt:wait_until(Cluster, F). - -%% @doc Verify that removing the index entry for a bucket deletes only -%% that bucket's data in the associated index. -verify_many_to_one_index_remove(Cluster) -> - Index = <<"many">>, - lager:info("Verify removing index on a many-to-one index"), - Node = yz_rt:select_random(Cluster), - HP = hd(yz_rt:host_entries(rt:connection_info([Node]))), - yz_rt:create_index(Node, Index), - yz_rt:set_index(Node, <<"b1">>, Index), - yz_rt:set_index(Node, <<"b2">>, Index), - yz_rt:wait_for_index(Cluster, Index), - yz_rt:http_put(HP, <<"b1">>, <<"key">>, <<"somedata">>), - yz_rt:http_put(HP, <<"b2">>, <<"key">>, <<"somedata">>), - %% Wait for soft-commit - timer:sleep(1100), - ?assert(yz_rt:search_expect(HP, Index, "_yz_rb", "b1", 1)), - ?assert(yz_rt:search_expect(HP, Index, "_yz_rb", "b2", 1)), - yz_rt:remove_index(Node, <<"b1">>), - F = fun(Node2) -> - lager:info("Verify only 'b1' data is removed from ~s index [~p]", [Index, Node2]), - HP2 = hd(yz_rt:host_entries(rt:connection_info([Node2]))), - R1 = yz_rt:search_expect(HP2, Index, "_yz_rb", "b1", 0), - R2 = yz_rt:search_expect(HP2, Index, "_yz_rb", "b2", 1), - R1 and R2 - end, - yz_rt:wait_until(Cluster, F). diff --git a/riak_test/yz_languages.erl b/riak_test/yz_languages.erl index 1d79755d..35ec20f3 100644 --- a/riak_test/yz_languages.erl +++ b/riak_test/yz_languages.erl @@ -35,17 +35,12 @@ select_random(List) -> host_entries(ClusterConnInfo) -> [proplists:get_value(http, I) || {_,I} <- ClusterConnInfo]. -schema_url({Host,Port}, Name) -> - ?FMT("http://~s:~B/yz/schema/~s", [Host, Port, Name]). - index_url({Host,Port}, Index) -> ?FMT("http://~s:~B/yz/index/~s", [Host, Port, Index]). -search_url({Host,Port}, Bucket, Term) -> - ?FMT("http://~s:~B/search/~s?wt=json&omitHeader=true&q=~s", [Host, Port, Bucket, Term]). - -bucket_url({Host,Port}, Bucket, Key) -> - ?FMT("http://~s:~B/buckets/~s/keys/~s", [Host, Port, Bucket, Key]). +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", + [Host, Port, BType, BName, Key]). http(Method, URL, Headers, Body) -> Opts = [], @@ -56,34 +51,17 @@ create_index(Cluster, HP, Index) -> URL = index_url(HP, Index), Headers = [{"content-type", "application/json"}], {ok, Status, _, _} = http(put, URL, Headers, ?NO_BODY), - ok = yz_rt:set_index(hd(Cluster), Index), + ok = yz_rt:set_bucket_type_index(hd(Cluster), Index), yz_rt:wait_for_index(Cluster, Index), ?assertEqual("204", Status). -search(HP, Index, Term) -> - URL = search_url(HP, Index, Term), - lager:info("Run search ~s", [URL]), - Opts = [{response_format, binary}], - case ibrowse:send_req(URL, [], get, [], Opts) of - {ok, "200", _, Resp} -> - lager:info("Search resp ~p", [Resp]), - Resp; - Other -> - {bad_response, Other} - end. - -verify_count(Expected, Resp) -> - Struct = mochijson2:decode(Resp), - NumFound = kvc:path([<<"response">>, <<"numFound">>], Struct), - Expected == NumFound. - -store_and_search(Cluster, Bucket, CT, Body, Search) -> +store_and_search(Cluster, Bucket, Index, CT, Body, Field, Term) -> Headers = [{"Content-Type", CT}], - store_and_search(Cluster, Bucket, Headers, CT, Body, Search). + store_and_search(Cluster, Bucket, Index, Headers, CT, Body, Field, Term). -store_and_search(Cluster, Bucket, Headers, CT, Body, Search) -> +store_and_search(Cluster, Bucket, Index, Headers, CT, Body, Field, Term) -> HP = select_random(host_entries(rt:connection_info(Cluster))), - create_index(Cluster, HP, Bucket), + create_index(Cluster, HP, Index), URL = bucket_url(HP, Bucket, "test"), lager:info("Storing to bucket ~s", [URL]), {ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Body), @@ -91,27 +69,29 @@ store_and_search(Cluster, Bucket, Headers, CT, Body, Search) -> timer:sleep(1000), {ok, "200", _, Body} = ibrowse:send_req(URL, [{"accept", CT}], get, []), lager:info("Verify values are indexed"), - R1 = search(HP, Bucket, Search), - verify_count(1, R1), + ?assert(yz_rt:search_expect(HP, Index, Field, Term, 1)), ok. confirm_body_search_encoding(Cluster) -> - Bucket = <<"test_iso_8859_8">>, - lager:info("confirm_iso_8859_8 ~s", [Bucket]), + Index = <<"test_iso_8859_8">>, + Bucket = {Index, <<"b">>}, + lager:info("confirm_iso_8859_8 ~s", [Index]), Body = "א בְּרֵאשִׁית, בָּרָא אֱלֹהִים, אֵת הַשָּׁמַיִם, וְאֵת הָאָרֶץ", - store_and_search(Cluster, Bucket, "text/plain", Body, "text:בָּרָא"). + store_and_search(Cluster, Bucket, Index, "text/plain", Body, "text", "בָּרָא"). confirm_language_field_type(Cluster) -> - Bucket = <<"test_shift_jis">>, - lager:info("confirm_shift_jis ~s", [Bucket]), + Index = <<"test_shift_jis">>, + Bucket = {Index, <<"b">>}, + lager:info("confirm_shift_jis ~s", [Index]), Body = "{\"text_ja\" : \"私はハイビスカスを食べるのが 大好き\"}", - store_and_search(Cluster, Bucket, "application/json", Body, "text_ja:大好き"). + store_and_search(Cluster, Bucket, Index, "application/json", Body, "text_ja", "大好き"). confirm_tag_encoding(Cluster) -> - Bucket = <<"test_iso_8859_6">>, - lager:info("confirm_iso_8859_6 ~s", [Bucket]), + Index = <<"test_iso_8859_6">>, + Bucket = {Index, <<"b">>}, + lager:info("confirm_iso_8859_6 ~s", [Index]), Body = "أردت أن أقرأ كتابا عن تاريخ المرأة في فرنسا", Headers = [{"Content-Type", "text/plain"}, {"x-riak-meta-yz-tags", "x-riak-meta-arabic_s"}, {"x-riak-meta-arabic_s", "أقرأ"}], - store_and_search(Cluster, Bucket, Headers, "text/plain", Body, "arabic_s:أقرأ"). + store_and_search(Cluster, Bucket, Index, Headers, "text/plain", Body, "arabic_s", "أقرأ"). diff --git a/riak_test/yz_mapreduce.erl b/riak_test/yz_mapreduce.erl index 7cd4e556..987d169a 100644 --- a/riak_test/yz_mapreduce.erl +++ b/riak_test/yz_mapreduce.erl @@ -34,13 +34,15 @@ -spec confirm() -> pass. confirm() -> Index = <<"mr_index">>, + Bucket = {Index, <<"b1">>}, random:seed(now()), Cluster = rt:build_cluster(4, ?CFG), rt:wait_for_cluster_service(Cluster, yokozuna), yz_rt:create_index(yz_rt:select_random(Cluster), Index), - yz_rt:set_index(yz_rt:select_random(Cluster), Index), + yz_rt:set_bucket_type_index(yz_rt:select_random(Cluster), Index), + timer:sleep(500), yz_rt:wait_for_index(Cluster, Index), - write_100_objs(Cluster, Index), + write_100_objs(Cluster, Bucket), verify_100_objs_mr(Cluster, Index), pass. @@ -58,31 +60,32 @@ verify_100_objs_mr(Cluster, Index) -> {'query', [MakeTick, ReduceSum]}], F = fun(Node) -> HP = hd(yz_rt:host_entries(rt:connection_info([Node]))), - R = http_mr(HP, MR), - 100 == hd(mochijson2:decode(R)) + A = hd(mochijson2:decode(http_mr(HP, MR))), + lager:info("Running map-reduce job on ~p", [Node]), + lager:info("E: 100, A: ~p", [A]), + 100 == A end, yz_rt:wait_until(Cluster, F). -spec write_100_objs([node()], index_name()) -> ok. -write_100_objs(Cluster, Index) -> +write_100_objs(Cluster, Bucket) -> lager:info("Writing 100 objects"), - lists:foreach(write_obj(Cluster, Index), lists:seq(1,100)). + lists:foreach(write_obj(Cluster, Bucket), lists:seq(1,100)). --spec write_obj([node()], index_name()) -> fun(). -write_obj(Cluster, Index) -> +-spec write_obj([node()], bucket()) -> fun(). +write_obj(Cluster, Bucket) -> fun(N) -> PL = [{name_s,<<"yokozuna">>}, {num_i,N}], Key = list_to_binary(io_lib:format("key_~B", [N])), Body = mochijson2:encode(PL), HP = yz_rt:select_random(yz_rt:host_entries(rt:connection_info(Cluster))), CT = "application/json", - lager:info("Writing object with bkey ~p [~p]", [{Index,Key}, HP]), - yz_rt:http_put(HP, Index, Key, CT, Body) + lager:info("Writing object with bkey ~p [~p]", [{Bucket, Key}, HP]), + yz_rt:http_put(HP, Bucket, Key, CT, Body) end. -spec http_mr({host(), portnum()}, term()) -> binary(). http_mr({Host,Port}, MR) -> - lager:info("Running map-reduce job [~p]", [{Host,Port}]), URL = ?FMT("http://~s:~s/mapred", [Host, integer_to_list(Port)]), Opts = [], Headers = [{"content-type", "application/json"}], diff --git a/riak_test/yz_pb.erl b/riak_test/yz_pb.erl index 47ae4f34..921a21bc 100644 --- a/riak_test/yz_pb.erl +++ b/riak_test/yz_pb.erl @@ -16,11 +16,12 @@ - - - - - + + + + + + @@ -56,22 +57,24 @@ schema_url({Host,Port}, Name) -> index_url({Host,Port}, Index) -> ?FMT("http://~s:~B/yz/index/~s", [Host, Port, Index]). -bucket_url({Host,Port}, Bucket, Key) -> - ?FMT("http://~s:~B/buckets/~s/keys/~s", [Host, Port, Bucket, Key]). +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", + [Host, Port, BType, BName, Key]). http(Method, URL, Headers, Body) -> Opts = [], ibrowse:send_req(URL, Headers, Method, Body, Opts). -create_index(Cluster, Index, Bucket) -> +create_index(Cluster, BucketType, Index) -> Node = select_random(Cluster), [{Host, Port}] = host_entries(rt:connection_info([Node])), - lager:info("create_index ~s for bucket ~s [~p]", [Index, Bucket, {Host, Port}]), + lager:info("create_index ~s for bucket type ~s [~p]", [Index, BucketType, {Host, Port}]), {ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)), %% set index in props with the same name as the bucket ?assertEqual(ok, riakc_pb_socket:create_search_index(Pid, Index)), % Add the index to the bucket props - yz_rt:set_index(Node, Index, Bucket), + yz_rt:set_bucket_type_index(Node, BucketType, Index), + yz_rt:wait_for_bucket_type(Cluster, BucketType), yz_rt:wait_for_index(Cluster, Index), %% Check that the index exists {ok, IndexData} = riakc_pb_socket:get_search_index(Pid, Index), @@ -83,6 +86,7 @@ store_and_search(Cluster, Bucket, Key, Body, Search, Params) -> store_and_search(Cluster, Bucket, Key, Body, "text/plain", Search, Params). store_and_search(Cluster, Bucket, Key, Body, CT, Search, Params) -> + {BType, _} = Bucket, {Host, Port} = select_random(host_entries(rt:connection_info(Cluster))), URL = bucket_url({Host, Port}, Bucket, Key), lager:info("Storing to bucket ~s", [URL]), @@ -92,10 +96,10 @@ store_and_search(Cluster, Bucket, Key, Body, CT, Search, Params) -> F = fun(_) -> lager:info("Search for ~s [~p:~p]", [Search, Host, Port]), {ok,{search_results,R,Score,Found}} = - riakc_pb_socket:search(Pid, Bucket, Search, Params), + riakc_pb_socket:search(Pid, BType, Search, Params), case Found of 1 -> - [{Bucket,Results}] = R, + [{BType,Results}] = R, KeyCheck = (Key == binary_to_list(proplists:get_value(<<"_yz_rk">>, Results))), ScoreCheck = (Score =/= 0.0), KeyCheck and ScoreCheck; @@ -127,38 +131,40 @@ confirm_admin_index(Cluster) -> [{Host, Port}] = host_entries(rt:connection_info([Node])), {ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)), F = fun(_) -> - %% Remove index from bucket props and delete it - yz_rt:set_index(Node, Index, <<>>), - DelResp = riakc_pb_socket:delete_search_index(Pid, Index), - case DelResp of - ok -> true; - {error,<<"notfound">>} -> true - end + %% Remove index from bucket props and delete it + yz_rt:remove_index(Node, Index), + DelResp = riakc_pb_socket:delete_search_index(Pid, Index), + case DelResp of + ok -> true; + {error,<<"notfound">>} -> true + end end, yz_rt:wait_until(Cluster, F), riakc_pb_socket:stop(Pid), ok. confirm_basic_search(Cluster) -> - Bucket = <<"basic">>, - create_index(Cluster, Bucket, Bucket), + Index = <<"basic">>, + Bucket = {Index, <<"b1">>}, + create_index(Cluster, Index, Index), lager:info("confirm_basic_search ~s", [Bucket]), Body = "herp derp", Params = [{sort, <<"score desc">>}, {fl, ["*","score"]}], store_and_search(Cluster, Bucket, "test", Body, <<"text:herp">>, Params). confirm_encoded_search(Cluster) -> - Bucket = <<"encoded">>, - create_index(Cluster, Bucket, Bucket), + Index = <<"encoded">>, + Bucket = {Index, <<"b1">>}, + create_index(Cluster, Index, Index), lager:info("confirm_encoded_search ~s", [Bucket]), Body = "א בְּרֵאשִׁית, בָּרָא אֱלֹהִים, אֵת הַשָּׁמַיִם, וְאֵת הָאָרֶץ", Params = [{sort, <<"score desc">>}, {fl, ["_yz_rk"]}], store_and_search(Cluster, Bucket, "וְאֵת", Body, <<"text:בָּרָא">>, Params). confirm_multivalued_field(Cluster) -> - Bucket = <<"basic">>, + Index = <<"basic">>, + Bucket = {Index, <<"b1">>}, lager:info("cofirm multiValued=true fields decode properly"), - create_index(Cluster, Bucket, Bucket), Body = <<"{\"name_ss\":\"turner\", \"name_ss\":\"hooch\"}">>, Params = [], HP = select_random(host_entries(rt:connection_info(Cluster))), @@ -171,16 +177,17 @@ confirm_multivalued_field(Cluster) -> timer:sleep(1100), Search = <<"name_ss:turner">>, {ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)), - {ok,{search_results,[{Bucket,Fields}],_Score,_Found}} = - riakc_pb_socket:search(Pid, Bucket, Search, Params), + {ok,{search_results,[{Index,Fields}],_Score,_Found}} = + riakc_pb_socket:search(Pid, Index, Search, Params), ?assert(lists:member({<<"name_ss">>,<<"turner">>}, Fields)), ?assert(lists:member({<<"name_ss">>,<<"hooch">>}, Fields)), riakc_pb_socket:stop(Pid). confirm_stored_fields(Cluster) -> - Bucket = <<"stored_fields">>, + Index = <<"stored_fields">>, + Bucket = {Index, <<"b1">>}, lager:info("Confrim stored fields"), - create_index(Cluster, Bucket, Bucket), + create_index(Cluster, Index, Index), Body = <<"{\"bool_b\":true, \"float_tf\":3.14}">>, Params = [], HP = select_random(host_entries(rt:connection_info(Cluster))), @@ -191,10 +198,11 @@ confirm_stored_fields(Cluster) -> timer:sleep(1100), Search = <<"float_tf:3.14">>, {ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)), - {ok,{search_results,[{Bucket,Fields}],_Score,_Found}} = - riakc_pb_socket:search(Pid, Bucket, Search, Params), + {ok,{search_results,[{Index,Fields}],_Score,_Found}} = + riakc_pb_socket:search(Pid, Index, Search, Params), ?assertEqual(<<"true">>, proplists:get_value(<<"bool_b">>, Fields)), ?assertEqual(3.14, ?BIN_TO_FLOAT(proplists:get_value(<<"float_tf">>, Fields))), - ?assertEqual(Bucket, proplists:get_value(<<"_yz_rb">>, Fields)), + ?assertEqual(Index, proplists:get_value(<<"_yz_rt">>, Fields)), + ?assertEqual(<<"b1">>, proplists:get_value(<<"_yz_rb">>, Fields)), riakc_pb_socket:stop(Pid). diff --git a/riak_test/yz_rt.erl b/riak_test/yz_rt.erl index fc4068e1..0411f87c 100644 --- a/riak_test/yz_rt.erl +++ b/riak_test/yz_rt.erl @@ -92,14 +92,14 @@ get_yz_conn_info(Node) -> host_entries(ClusterConnInfo) -> [riak_http(I) || {_,I} <- ClusterConnInfo]. --spec http_put({string(), portnum()}, binary(), binary(), binary()) -> ok. +-spec http_put({string(), portnum()}, bucket(), binary(), binary()) -> ok. http_put(HP, Bucket, Key, Value) -> http_put(HP, Bucket, Key, "text/plain", Value). --spec http_put({string(), portnum()}, binary(), binary(), string(), binary()) -> ok. -http_put({Host, Port}, Bucket, Key, CT, Value) -> - URL = ?FMT("http://~s:~s/riak/~s/~s", - [Host, integer_to_list(Port), Bucket, Key]), +-spec http_put({string(), portnum()}, bucket(), binary(), string(), binary()) -> ok. +http_put({Host, Port}, {BType, BName}, Key, CT, Value) -> + URL = ?FMT("http://~s:~s/types/~s/buckets/~s/keys/~s", + [Host, integer_to_list(Port), BType, BName, Key]), Opts = [], Headers = [{"content-type", CT}], {ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Value, Opts), @@ -127,13 +127,13 @@ load_data(Cluster, Bucket, YZBenchDir, NumKeys) -> {concurrent, 3}, {code_paths, [YZBenchDir]}, {driver, yz_driver}, - {index_path, "/riak/" ++ binary_to_list(Bucket)}, + {bucket, Bucket}, {http_conns, Hosts}, {pb_conns, []}, {key_generator, KeyGen}, {operations, [{load_fruit, 1}]}, {shutdown_on_error, true}], - File = "bb-load-" ++ binary_to_list(Bucket), + File = "load-data", write_terms(File, Cfg), run_bb(sync, File). @@ -207,17 +207,18 @@ select_random(List) -> Idx = random:uniform(Length), lists:nth(Idx, List). -remove_index(Node, Bucket) -> - lager:info("Remove index from bucket ~s [~p]", [Bucket, Node]), - ok = rpc:call(Node, yz_kv, remove_index, [Bucket]). +-spec remove_index(node(), bucket()) -> ok. +remove_index(Node, BucketType) -> + lager:info("Remove index from bucket type ~s [~p]", [BucketType, Node]), + ok = rpc:call(Node, riak_core_bucket_type, update, [BucketType, [{?YZ_INDEX, ?YZ_INDEX_TOMBSTONE}]]). -set_index(Node, Bucket) -> - set_index(Node, Bucket, Bucket). +set_bucket_type_index(Node, BucketType) -> + set_bucket_type_index(Node, BucketType, BucketType). -%% TODO: this should use PB interface like clients -set_index(Node, Bucket, Index) -> - lager:info("Set bucket ~s index to ~s [~p]", [Bucket, Index, Node]), - ok = rpc:call(Node, yz_kv, set_index, [Bucket, Index]). +set_bucket_type_index(Node, BucketType, Index) -> + lager:info("Set bucket type ~s index to ~s [~p]", [BucketType, Index, Node]), + rt:create_and_activate_bucket_type(Node, BucketType, [{?YZ_INDEX, Index}]), + rt:wait_until_bucket_type_status(BucketType, active, Node). solr_http(ConnInfo) -> proplists:get_value(solr_http, ConnInfo). @@ -230,6 +231,19 @@ store_schema(PBConn, Name, Raw) -> ?assertEqual(ok, riakc_pb_socket:create_search_schema(PBConn, Name, Raw)), ok. +wait_for_bucket_type(Cluster, BucketType) -> + F = fun(Node) -> + {Host, Port} = riak_pb(hd(rt:connection_info([Node]))), + {ok, PBConn} = riakc_pb_socket:start_link(Host, Port), + R = riakc_pb_socket:get_bucket_type(PBConn, BucketType), + case R of + {ok,_} -> true; + _ -> false + end + end, + wait_until(Cluster, F), + ok. + %% @see wait_for_schema/3 wait_for_schema(Cluster, Name) -> wait_for_schema(Cluster, Name, ignore). diff --git a/riak_test/yz_schema_admin.erl b/riak_test/yz_schema_admin.erl index 7a992b79..bf6e4c93 100644 --- a/riak_test/yz_schema_admin.erl +++ b/riak_test/yz_schema_admin.erl @@ -13,11 +13,12 @@ - - - - - + + + + + + @@ -158,6 +159,7 @@ + diff --git a/riak_test/yz_security.erl b/riak_test/yz_security.erl index dd205a4a..fcfefff2 100644 --- a/riak_test/yz_security.erl +++ b/riak_test/yz_security.erl @@ -39,12 +39,13 @@ - - - - - + + + + + + _yz_id @@ -54,13 +55,9 @@ ">>). -define(USER, "user"). -define(PASSWORD, "password"). --define(INDEX, "myindex"). --define(INDEX_B, <<"myindex">>). --define(INDEX2, "myindex2"). --define(INDEX2_B, <<"myindex2">>). --define(SCHEMA, "myschema"). --define(SCHEMA_B, <<"myschema">>). --define(BUCKET, "mybucket"). +-define(INDEX, <<"myindex">>). +-define(INDEX2, <<"myindex2">>). +-define(SCHEMA, <<"myschema">>). -define(HUSER, "h_user"). -define(HINDEX, "h_myindex"). -define(ADD_USER(N,D), rpc:call(N, riak_core_console, add_user, D)). @@ -130,7 +127,7 @@ confirm_index_pb(Node) -> Pid0 = get_secure_pid(Host, Port), lager:info("verifying user cannot create index without grants"), ?assertMatch({error, <<"Permission", _/binary>>}, - riakc_pb_socket:create_search_index(Pid0, ?INDEX_B)), + riakc_pb_socket:create_search_index(Pid0, ?INDEX)), lager:info("verifying user cannot list indexes without grants"), ?assertMatch({error, <<"Permission", _/binary>>}, @@ -143,19 +140,17 @@ confirm_index_pb(Node) -> Pid1 = get_secure_pid(Host, Port), lager:info("verifying user can create an index"), - ?assertEqual(ok, - riakc_pb_socket:create_search_index(Pid1, ?INDEX_B)), - yz_rt:set_index(Node, ?INDEX, ?BUCKET), + ?assertEqual(ok, riakc_pb_socket:create_search_index(Pid1, ?INDEX)), yz_rt:wait_for_index([Node], ?INDEX), %% create another index, never give permission to use it ?assertEqual(ok, - riakc_pb_socket:create_search_index(Pid1, ?INDEX2_B)), + riakc_pb_socket:create_search_index(Pid1, ?INDEX2)), yz_rt:wait_for_index([Node], ?INDEX2), ?assertEqual(ok, riakc_pb_socket:create_search_index(Pid1, <<"_gonna_be_dead_">>)), - yz_rt:wait_for_index([Node], "_gonna_be_dead_"), + yz_rt:wait_for_index([Node], <<"_gonna_be_dead_">>), lager:info("verifying user can delete an index"), ?assertEqual(ok, @@ -163,7 +158,7 @@ confirm_index_pb(Node) -> lager:info("verifying user can get an index"), ?assertMatch({ok,[_|_]}, - riakc_pb_socket:get_search_index(Pid1, ?INDEX_B)), + riakc_pb_socket:get_search_index(Pid1, ?INDEX)), lager:info("verifying user can get all indexes"), ?assertMatch({ok,[_|_]}, @@ -177,11 +172,11 @@ confirm_schema_permission_pb(Node) -> Pid0 = get_secure_pid(Host, Port), lager:info("verifying user cannot create schema without grants"), ?assertMatch({error, <<"Permission", _/binary>>}, - riakc_pb_socket:create_search_schema(Pid0, ?SCHEMA_B, ?SCHEMA_CONTENT)), + riakc_pb_socket:create_search_schema(Pid0, ?SCHEMA, ?SCHEMA_CONTENT)), lager:info("verifying user cannot get schemas without grants"), ?assertMatch({error, <<"Permission", _/binary>>}, - riakc_pb_socket:get_search_schema(Pid0, ?SCHEMA_B)), + riakc_pb_socket:get_search_schema(Pid0, ?SCHEMA)), riakc_pb_socket:stop(Pid0), lager:info("Grant schema permission to user"), @@ -190,7 +185,7 @@ confirm_schema_permission_pb(Node) -> Pid1 = get_secure_pid(Host, Port), lager:info("verifying user can create schema"), ?assertMatch(ok, - riakc_pb_socket:create_search_schema(Pid1, ?SCHEMA_B, ?SCHEMA_CONTENT)), + riakc_pb_socket:create_search_schema(Pid1, ?SCHEMA, ?SCHEMA_CONTENT)), riakc_pb_socket:stop(Pid1), ok. @@ -200,20 +195,21 @@ confirm_search_pb(Node) -> Pid0 = get_secure_pid(Host, Port), lager:info("verifying user cannot search an index without grants"), ?assertMatch({error, <<"Permission", _/binary>>}, - riakc_pb_socket:search(Pid0, ?INDEX_B, <<"*:*">>)), + riakc_pb_socket:search(Pid0, ?INDEX, <<"*:*">>)), riakc_pb_socket:stop(Pid0), - lager:info("Grant search permission to user on "++?INDEX), - ok = ?GRANT(Node, [["yokozuna.search","ON","index",?INDEX,"TO",?USER]]), + lager:info("Grant search permission to user on ~s", [?INDEX]), + IndexS = binary_to_list(?INDEX), + ok = ?GRANT(Node, [["yokozuna.search","ON","index",IndexS,"TO",?USER]]), Pid1 = get_secure_pid(Host, Port), - lager:info("verifying user can search granted on "++?INDEX), + lager:info("verifying user can search granted on ~s", [?INDEX]), ?assertMatch({ok, _Result}, - riakc_pb_socket:search(Pid1, ?INDEX_B, <<"*:*">>)), + riakc_pb_socket:search(Pid1, ?INDEX, <<"*:*">>)), lager:info("verifying user cannot search a different index"), ?assertMatch({error, <<"Permission", _/binary>>}, - riakc_pb_socket:search(Pid1, ?INDEX2_B, <<"*:*">>)), + riakc_pb_socket:search(Pid1, ?INDEX2, <<"*:*">>)), riakc_pb_socket:stop(Pid1), ok. diff --git a/riak_test/yz_siblings.erl b/riak_test/yz_siblings.erl index bb72dd12..5dcd0caa 100644 --- a/riak_test/yz_siblings.erl +++ b/riak_test/yz_siblings.erl @@ -3,7 +3,6 @@ -compile(export_all). -import(yz_rt, [host_entries/1, run_bb/2, search_expect/5, - set_index/2, select_random/1, verify_count/2, write_terms/2]). -include_lib("eunit/include/eunit.hrl"). @@ -21,19 +20,21 @@ confirm() -> pass. test_siblings(Cluster) -> + Index = <<"siblings">>, + Bucket = {Index, <<"b1">>}, HP = hd(host_entries(rt:connection_info(Cluster))), - create_index(Cluster, HP, <<"siblings">>), - ok = allow_mult(Cluster, <<"siblings">>), - ok = write_sibs(HP), + create_index(Cluster, HP, Index), + ok = allow_mult(Cluster, Index), + ok = write_sibs(HP, Bucket), %% Verify 10 times because of non-determinism in coverage - [ok = verify_sibs(HP) || _ <- lists:seq(1,10)], - ok = reconcile_sibs(HP), - [ok = verify_reconcile(HP) || _ <- lists:seq(1,10)], + [ok = verify_sibs(HP, Index) || _ <- lists:seq(1,10)], + ok = reconcile_sibs(HP, Bucket), + [ok = verify_reconcile(HP, Index) || _ <- lists:seq(1,10)], ok. -write_sibs({Host, Port}) -> +write_sibs({Host, Port}, Bucket) -> lager:info("Write siblings"), - URL = bucket_url({Host, Port}, "siblings", "test"), + URL = bucket_url({Host, Port}, Bucket, "test"), Opts = [], Headers = [{"content-type", "text/plain"}], Body1 = <<"This is value alpha">>, @@ -46,47 +47,46 @@ write_sibs({Host, Port}) -> timer:sleep(1000), ok. -verify_sibs(HP) -> +verify_sibs(HP, Index) -> lager:info("Verify siblings are indexed"), - true = yz_rt:search_expect(HP, <<"siblings">>, "_yz_rk", "test", 4), + true = yz_rt:search_expect(HP, Index, "_yz_rk", "test", 4), Values = ["alpha", "beta", "charlie", "delta"], - [true = yz_rt:search_expect(HP, <<"siblings">>, "text", S, 1) || S <- Values], + [true = yz_rt:search_expect(HP, Index, "text", S, 1) || S <- Values], ok. -reconcile_sibs(HP) -> +reconcile_sibs(HP, Bucket) -> lager:info("Reconcile the siblings"), - {VClock, _} = http_get(HP, "siblings", "test"), + {VClock, _} = http_get(HP, Bucket, "test"), NewValue = <<"This is value alpha, beta, charlie, and delta">>, - ok = http_put(HP, "siblings", "test", VClock, NewValue), + ok = http_put(HP, Bucket, "test", VClock, NewValue), timer:sleep(1100), ok. -verify_reconcile(HP) -> +verify_reconcile(HP, Index) -> lager:info("Verify sibling indexes were deleted after reconcile"), - true = yz_rt:search_expect(HP, <<"siblings">>, "_yz_rk", "test", 1), + true = yz_rt:search_expect(HP, Index, "_yz_rk", "test", 1), ok. -http_put({Host, Port}, Bucket, Key, VClock, Value) -> - URL = lists:flatten(io_lib:format("http://~s:~s/buckets/~s/keys/~s", - [Host, integer_to_list(Port), Bucket, Key])), +http_put({Host, Port}, {BType, BName}, Key, VClock, Value) -> + URL = lists:flatten(io_lib:format("http://~s:~s/types/~s/buckets/~s/keys/~s", + [Host, integer_to_list(Port), BType, BName, Key])), Opts = [], Headers = [{"content-type", "text/plain"}, {"x-riak-vclock", VClock}], {ok, "204", _, _} = ibrowse:send_req(URL, Headers, put, Value, Opts), ok. -http_get({Host, Port}, Bucket, Key) -> - URL = lists:flatten(io_lib:format("http://~s:~s/buckets/~s/keys/~s", - [Host, integer_to_list(Port), Bucket, Key])), +http_get({Host, Port}, {BType, BName}, Key) -> + URL = lists:flatten(io_lib:format("http://~s:~s/types/~s/buckets/~s/keys/~s", + [Host, integer_to_list(Port), BType, BName, Key])), Opts = [], Headers = [{"accept", "multipart/mixed"}], {ok, "300", RHeaders, Body} = ibrowse:send_req(URL, Headers, get, [], Opts), VC = proplists:get_value("X-Riak-Vclock", RHeaders), {VC, Body}. -allow_mult(Cluster, Bucket) -> - Args = [Bucket, [{allow_mult, true}]], - ok = rpc:call(hd(Cluster), riak_core_bucket, set_bucket, Args), +allow_mult(Cluster, BType) -> + ok = rpc:call(hd(Cluster), riak_core_bucket_type, update, [BType, [{allow_mult, true}]]), %% TODO: wait for allow_mult to gossip instead of sleep timer:sleep(5000), %% [begin @@ -98,8 +98,8 @@ allow_mult(Cluster, Bucket) -> index_url({Host,Port}, Index) -> ?FMT("http://~s:~B/yz/index/~s", [Host, Port, Index]). -bucket_url({Host,Port}, Bucket, Key) -> - ?FMT("http://~s:~B/buckets/~s/keys/~s", [Host, Port, Bucket, Key]). +bucket_url({Host,Port}, {BType, BName}, Key) -> + ?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", [Host, Port, BType, BName, Key]). http(Method, URL, Headers, Body) -> Opts = [], @@ -110,5 +110,5 @@ create_index(Cluster, HP, Index) -> URL = index_url(HP, Index), Headers = [{"content-type", "application/json"}], {ok, "204", _, _} = http(put, URL, Headers, ?NO_BODY), - ok = set_index(hd(Cluster), Index), + ok = yz_rt:set_bucket_type_index(hd(Cluster), Index), yz_rt:wait_for_index(Cluster, Index). diff --git a/riak_test/yz_stat_test.erl b/riak_test/yz_stat_test.erl index b836a930..5e2f683f 100644 --- a/riak_test/yz_stat_test.erl +++ b/riak_test/yz_stat_test.erl @@ -28,39 +28,36 @@ prepare_cluster(NumNodes) -> rt:wait_for_cluster_service(Cluster, yokozuna), Cluster. -create_indexed_bucket(Pid, Cluster, IndexBucket) -> - create_indexed_bucket(Pid, Cluster, IndexBucket, IndexBucket). - -create_indexed_bucket(Pid, Cluster, Index, Bucket) -> +create_indexed_bucket(Pid, Cluster, Index) -> ?assertEqual(ok, riakc_pb_socket:create_search_index(Pid, Index)), - ?assertEqual(ok, riakc_pb_socket:set_bucket(Pid, Bucket, [{yz_index, Index}])), - yz_rt:wait_for_index(Cluster, binary_to_list(Index)), + yz_rt:set_bucket_type_index(hd(Cluster), Index), + yz_rt:wait_for_index(Cluster, Index), ok. %% populate random plain text values populate_data(_, _, 0, Acc) -> Acc; -populate_data(Pid, IB, Count, Acc)-> +populate_data(Pid, Bucket, Count, Acc)-> KV = gen_random_name(16), - PO = riakc_obj:new(IB, KV, KV, "text/plain"), + PO = riakc_obj:new(Bucket, KV, KV, "text/plain"), {ok, _Obj} = riakc_pb_socket:put(Pid, PO, [return_head]), - populate_data(Pid, IB, Count - 1, [KV|Acc]). + populate_data(Pid, Bucket, Count - 1, [KV|Acc]). -populate_data_and_wait(Pid, Cluster, IndexBucket, Count) -> - Values = populate_data(Pid, IndexBucket, Count, []), +populate_data_and_wait(Pid, Cluster, Bucket, Index, Count) -> + Values = populate_data(Pid, Bucket, Count, []), Search = <<"text:*">>, F = fun(_) -> {ok,{search_results,_R,Score,Found}} = - riakc_pb_socket:search(Pid, IndexBucket, Search, []), + riakc_pb_socket:search(Pid, Index, Search, []), (Count == Found) and (Score =/= 0.0) end, yz_rt:wait_until(Cluster, F), Values. %% search for a list of values assumign plain text -search_values(_Pid, _IndexBucket, []) -> ok; -search_values(Pid, IndexBucket, [Value|Rest]) -> - riakc_pb_socket:search(Pid, IndexBucket, <<"text:", Value/binary>>, []), - search_values(Pid, IndexBucket, Rest). +search_values(_Pid, _Index, []) -> ok; +search_values(Pid, Index, [Value|Rest]) -> + riakc_pb_socket:search(Pid, Index, <<"text:", Value/binary>>, []), + search_values(Pid, Index, Rest). gen_random_name(Length) -> Chars = "abcdefghijklmnopqrstuvwxyz1234567890", @@ -71,21 +68,22 @@ gen_random_name(Length) -> write_bad_json(_, _, 0) -> ok; -write_bad_json(Pid, IB, Num) -> +write_bad_json(Pid, Bucket, Num) -> Key = list_to_binary("bad_json_" ++ integer_to_list(Num)), Value = <<"{\"bad\": \"unclosed\"">>, - PO = riakc_obj:new(IB, Key, Value, "application/json"), + PO = riakc_obj:new(Bucket, Key, Value, "application/json"), {ok, _Obj} = riakc_pb_socket:put(Pid, PO, [return_head]). confirm_stats(Cluster) -> {Host, Port} = select_random(host_entries(rt:connection_info(Cluster))), - IndexBucket = gen_random_name(16), + Index = gen_random_name(16), + Bucket = {Index, <<"b1">>}, {ok, Pid} = riakc_pb_socket:start_link(Host, (Port-1)), - create_indexed_bucket(Pid, Cluster, IndexBucket), - Values = populate_data_and_wait(Pid, Cluster, IndexBucket, 10), - search_values(Pid, IndexBucket, Values), - write_bad_json(Pid, IndexBucket, 10), + create_indexed_bucket(Pid, Cluster, Index), + Values = populate_data_and_wait(Pid, Cluster, Bucket, Index, 10), + search_values(Pid, Index, Values), + write_bad_json(Pid, Bucket, 10), riakc_pb_socket:stop(Pid), yz_rt:wait_until(Cluster, fun check_stat_values/1). diff --git a/src/yokozuna.erl b/src/yokozuna.erl index a6007018..a421b671 100644 --- a/src/yokozuna.erl +++ b/src/yokozuna.erl @@ -132,12 +132,18 @@ search_fold(Index, Query, Filter, F, Acc) -> {fq, Filter}, {start, Start}, {rows, 10}, - {fl, <>}, + {fl, <>}, {omitHeader, <<"true">>}, {wt, <<"json">>}], {_, Body} = yz_solr:dist_search(Index, Params), - E = extract_results(Body), - search_fold(E, Start, Params, Index, Query, Filter, F, Acc). + case extract_docs(Body) of + [] -> + Acc; + Docs -> + Positions = positions(hd(Docs)), + E = extract_results(Docs, Positions), + search_fold(E, Start, Params, Positions, Index, Query, Filter, F, Acc) + end. search(Index, Query) -> yz_solr:dist_search(Index, [{q, Query}]). @@ -164,39 +170,59 @@ switch_to_yokozuna() -> %%% Private %%%=================================================================== +%% @private +%% +%% @doc Decode the `Body' and extract the doc list. +-spec extract_docs(binary()) -> Docs :: term(). +extract_docs(Body) -> + Obj = mochijson2:decode(Body), + kvc:path([<<"response">>, <<"docs">>], Obj). + %% @private %% %% @doc Extract the bucket/key results from the `Body' of a search %% result. --spec extract_results(binary()) -> [{binary(),binary(),[term()]}]. -extract_results(Body) -> - Obj = mochijson2:decode(Body), - Docs = kvc:path([<<"response">>, <<"docs">>], Obj), - %% N.B. This ugly hack is required because as far as I can tell - %% there is not defined order of the fields inside the results - %% returned by Solr. +-type positions() :: {integer(), integer(), integer()}. +-spec extract_results(term(), positions()) -> [{bucket(),binary(),[term()]}]. +extract_results(Docs, {TP, BP, KP}) -> [begin - case {X,Y} of - {{?YZ_RB_FIELD_B,B}, {?YZ_RK_FIELD_B,K}} -> - {B,K,[]}; - {{_,K},{_,B}} -> - {B,K,[]} - end - end|| {struct,[X,Y]} <- Docs]. + {_, BType} = lists:nth(TP, Fields), + {_, BName} = lists:nth(BP, Fields), + {_, Key} = lists:nth(KP, Fields), + {{BType, BName}, Key, []} + end|| {struct, Fields} <- Docs]. + +%% @private +%% +%% @doc Determine the positions of the bucket-type, bucket-name and +%% key in the doc list. +%% +%% N.B. This ugly hack is required because as far as I can tell +%% there is not defined order of the fields inside the results +%% returned by Solr. +-spec positions({struct, [tuple()]}) -> + {integer(), integer(), integer()}. +positions({struct,[X, Y, Z]}) -> + L1 = [{element(1, X), 1}, {element(1, Y), 2}, {element(1, Z), 3}], + {value, {_,BTypePos}, L2} = lists:keytake(?YZ_RT_FIELD_B, 1, L1), + {value, {_,BNamePos}, L3} = lists:keytake(?YZ_RB_FIELD_B, 1, L2), + {value, {_,KeyPos}, _} = lists:keytake(?YZ_RK_FIELD_B, 1, L3), + {BTypePos, BNamePos, KeyPos}. %% @private %% %% @doc This is the interal part of `search_fold' where the actual %% iteration happens. --spec search_fold(list(), non_neg_integer(), list(), index_name(), +-spec search_fold(list(), non_neg_integer(), list(), positions(), index_name(), binary(), binary(), fold_fun(), Acc::term()) -> Acc::term(). -search_fold([], _, _, _, _, _, _, Acc) -> +search_fold([], _, _, _, _, _, _, _, Acc) -> Acc; -search_fold(Results, Start, Params, Index, Query, Filter, F, Acc) -> +search_fold(Results, Start, Params, Positions, Index, Query, Filter, F, Acc) -> F(Results, Acc), Start2 = Start + 10, Params2 = lists:keystore(start, 1, Params, {start, Start2}), {_, Body} = yz_solr:dist_search(Index, Params2), - E = extract_results(Body), - search_fold(E, Start2, Params, Index, Query, Filter, F, Acc). + Docs = extract_docs(Body), + E = extract_results(Docs, Positions), + search_fold(E, Start2, Params, Positions, Index, Query, Filter, F, Acc). diff --git a/src/yz_doc.erl b/src/yz_doc.erl index c228a6c1..1b83cb5b 100644 --- a/src/yz_doc.erl +++ b/src/yz_doc.erl @@ -71,7 +71,8 @@ make_fields({DocId, {Bucket, Key}, FPN, Partition, none, EntropyData}) -> {?YZ_NODE_FIELD, ?ATOM_TO_BIN(node())}, {?YZ_PN_FIELD, Partition}, {?YZ_RK_FIELD, Key}, - {?YZ_RB_FIELD, Bucket}]; + {?YZ_RT_FIELD, yz_kv:bucket_type(Bucket)}, + {?YZ_RB_FIELD, yz_kv:bucket_name(Bucket)}]; make_fields({DocId, BKey, FPN, Partition, Vtag, EntropyData}) -> make_fields({DocId, BKey, FPN, Partition, none, EntropyData}) ++ @@ -197,10 +198,14 @@ doc_ts(MD) -> %% iterate. Otherwise the doc would have to be fetched for each %% entry. gen_ed(O, Hash, Partition) -> + %% Store `Vsn' to allow future changes to this format. + Vsn = <<"1">>, RiakBucket = yz_kv:get_obj_bucket(O), + RiakBType = yz_kv:bucket_type(RiakBucket), + RiakBName = yz_kv:bucket_name(RiakBucket), RiakKey = yz_kv:get_obj_key(O), Hash64 = base64:encode(Hash), - <>. + <>. %% Meta keys and values can be strings or binaries format_meta(key, Value) when is_binary(Value) -> diff --git a/src/yz_events.erl b/src/yz_events.erl index 4f2089f4..293557c3 100644 --- a/src/yz_events.erl +++ b/src/yz_events.erl @@ -65,14 +65,7 @@ handle_cast({ring_event, Ring}, S) -> Current = names(yz_index:get_indexes_from_ring(Ring)), {Removed, Added, Same} = yz_misc:delta(Previous, Current), - PreviousFlags = flagged_buckets(PrevRing), - CurrentFlags = flagged_buckets(Ring), - {FlagsRemoved, FlagsAdded, _} = yz_misc:delta(PreviousFlags, CurrentFlags), - ok = sync_indexes(Ring, Removed, Added, Same), - %% Pass `PrevRing' because need access to index name associated - %% with bucket. - ok = sync_data(PrevRing, FlagsRemoved, FlagsAdded), {noreply, S2}. @@ -80,10 +73,7 @@ handle_info(tick, S) -> %% TODO: tick and ring_event should be merged, actions taken in %% ring_event could fail and should be retried during tick, may %% need to rely on something other than ring to determine when to - %% retry certain actions, e.g. if the `sync_data' call fails then - %% AAE trees will be incorrect until the next ring_event or until - %% tree rebuild - ok = remove_non_owned_data(), + %% retry certain actions. Make all actions in here idempotent. %% Index creation may have failed during ring event. PrevRing = ?PREV_RING(S), @@ -136,13 +126,6 @@ destroy_events_table() -> true = ets:delete(?YZ_EVENTS_TAB), ok. --spec flagged_buckets(ring()) -> ordset(bucket()). -flagged_buckets(Ring) -> - Buckets = riak_core_bucket:get_buckets(Ring), - ordsets:from_list( - [proplists:get_value(name, BProps) - || BProps <- Buckets, yz_kv:should_index(yz_kv:get_index(BProps))]). - get_tick_interval() -> app_helper:get_env(?YZ_APP_NAME, tick_interval, ?YZ_DEFAULT_TICK_INTERVAL). @@ -194,29 +177,6 @@ set_tick() -> erlang:send_after(Interval, ?MODULE, tick), ok. --spec sync_data(ring(), list(), list()) -> ok. -sync_data(PrevRing, Removed, Added) -> - %% TODO: check for case where index isn't added or removed, but changed - [sync_added(Bucket) || Bucket <- Added], - [sync_removed(PrevRing, Bucket) || Bucket <- Removed], - ok. - --spec sync_added(bucket()) -> ok. -sync_added(Bucket) -> - lager:info("indexing enabled for bucket ~s -- clearing AAE trees", [Bucket]), - %% TODO: add hashtree.erl function to clear hashes for Bucket - yz_entropy_mgr:clear_trees(), - ok. - --spec sync_removed(ring(), bucket()) -> ok. -sync_removed(PrevRing, Bucket) -> - lager:info("indexing disabled for bucket ~s", [Bucket]), - Index = yz_kv:get_index(riak_core_bucket:get_bucket(Bucket, PrevRing)), - ok = yz_solr:delete(Index, [{'query', <>}]), - yz_solr:commit(Index), - yz_entropy_mgr:clear_trees(), - ok. - sync_indexes(Ring, Removed, Added, Same) -> ok = remove_indexes(Removed), ok = add_indexes(Ring, Added ++ Same). diff --git a/src/yz_index.erl b/src/yz_index.erl index 89138a9c..85d2e722 100644 --- a/src/yz_index.erl +++ b/src/yz_index.erl @@ -34,9 +34,14 @@ %% @doc Get the list of buckets associated with `Index'. -spec associated_buckets(index_name(), ring()) -> [bucket()]. associated_buckets(Index, Ring) -> - AllBucketProps = riak_core_bucket:get_buckets(Ring), - Assoc = lists:filter(fun(BProps) -> yz_kv:get_index(BProps) == Index end, AllBucketProps), - lists:map(fun riak_core_bucket:name/1, Assoc). + AllProps = riak_core_bucket:get_buckets(Ring), + Assoc = [riak_core_bucket:name(BProps) + || BProps <- AllProps, + proplists:get_value(?YZ_INDEX, BProps, ?YZ_INDEX_TOMBSTONE) == Index], + case is_default_type_indexed(Index, Ring) of + true -> [Index|Assoc]; + false -> Assoc + end. -spec create(index_name()) -> ok. create(Name) -> @@ -231,6 +236,19 @@ index_dir(Name) -> YZDir = app_helper:get_env(?YZ_APP_NAME, yz_dir, ?YZ_DEFAULT_DIR), filename:absname(filename:join([YZDir, Name])). +%% @private +%% +%% @doc Determine if the bucket named `Index' under the default +%% bucket-type has `search' property set to `true'. If so41 this is a +%% legacy Riak Search bucket/index which is associated with a Yokozuna +%% index of the same name. +-spec is_default_type_indexed(index_name(), ring()) -> boolean(). +is_default_type_indexed(Index, Ring) -> + Props = riak_core_bucket:get_bucket(Index, Ring), + %% Check against `true' atom in case the value is <<"true">> or + %% "true" which, hopefully, it should not be. + true == proplists:get_value(search, Props, false). + make_info(IndexName, SchemaName) -> #index_info{name=IndexName, schema_name=SchemaName}. diff --git a/src/yz_kv.erl b/src/yz_kv.erl index e1f0b333..ee4c801c 100644 --- a/src/yz_kv.erl +++ b/src/yz_kv.erl @@ -34,6 +34,28 @@ -type write_reason() :: delete | handoff | put | anti_entropy. +%%%=================================================================== +%%% TODO: move to riak_core +%%%=================================================================== + +bucket_name({_,Name}) -> + Name; +bucket_name(Name) -> + Name. + +bucket_type({Type,_}) -> + Type; +bucket_type(_) -> + <<"default">>. + +is_default_type({<<"default">>,_}) -> + true; +is_default_type({_,_}) -> + false; +is_default_type(_) -> + true. + + %%%=================================================================== %%% API %%%=================================================================== @@ -109,18 +131,22 @@ is_tombstone(MD) -> get_md_entry(MD, Key) -> yz_misc:dict_get(Key, MD, none). -%% @doc Extract the index name from `BProps' or return the tombstone -%% name if there is no associated index. --spec get_index(term()) -> index_name(). -get_index(BProps) -> - proplists:get_value(?YZ_INDEX, BProps, ?YZ_INDEX_TOMBSTONE). - -%% @doc Extract the index name. -%% @see get_index/1 +%% @doc Extract the index name from the `Bucket'. Return the tombstone +%% value if there is none. -spec get_index(bkey(), ring()) -> index_name(). -get_index({Bucket,_} = _BKey, Ring) -> +get_index({Bucket, _}, Ring) -> BProps = riak_core_bucket:get_bucket(Bucket, Ring), - get_index(BProps). + case is_default_type(Bucket) of + false -> + proplists:get_value(?YZ_INDEX, BProps, ?YZ_INDEX_TOMBSTONE); + true -> + case proplists:get_value(search, BProps, false) of + false -> + ?YZ_INDEX_TOMBSTONE; + true -> + bucket_name(Bucket) + end + end. %% @doc Determine the "short" preference list given the `BKey' and %% `Ring'. A short preflist is one that defines the preflist by @@ -132,6 +158,34 @@ get_short_preflist({Bucket, _} = BKey, Ring) -> PrimaryPL = yz_misc:primary_preflist(BKey, Ring, NVal), {first_partition(PrimaryPL), NVal}. +%% @doc Called by KV vnode to determine if handoff should start or +%% not. Yokozuna needs to make sure that the bucket types have been +%% transfered first. Otherwise the bucket-to-index associations may +%% be missing causing missing index entries. +%% +%% TODO: Currently this call will block vnode and also vnode mgr. If +%% I want to get really fancy the first time this function is called I +%% could return false but then send off async job to wait for bucket +%% types to transfer. Once types have transfered some global flag +%% which is cheap to check would be set and this call would simply +%% check that. +-spec should_handoff({p(), node()}) -> boolean(). +should_handoff({_Reason, {_Partition, TargetNode}}) -> + BucketTypesPrefix = {core, bucket_types}, + Server = {riak_core_metadata_hashtree, TargetNode}, + RemoteHash = gen_server:call(Server, {prefix_hash, BucketTypesPrefix}, 1000), + %% TODO Even though next call is local should also add 1s timeout + %% since this call blocks vnode. Or see above. + LocalHash = riak_core_metadata_hashtree:prefix_hash(BucketTypesPrefix), + case LocalHash == RemoteHash of + true -> + true; + false -> + ?INFO("waiting for bucket types prefix to agree between ~p and ~p", + [node(), TargetNode]), + false + end. + index(Obj, Reason, P) -> case yokozuna:is_enabled(index) andalso ?YZ_ENABLED of true -> @@ -310,18 +364,6 @@ put(Client, Bucket, Key, Value, ContentType) -> N = proplists:get_value(n_val, BucketProps), Client:put(O, [{pw,N},{w,N},{dw,N}]). -%% @doc Remove the `Index' property from `Bucket'. Data stored under -%% `Bucket' will no longer be indexed. --spec remove_index(bucket()) -> ok. -remove_index(Bucket) -> - set_index(Bucket, ?YZ_INDEX_TOMBSTONE). - -%% @doc Set the `Index' for which data stored in `Bucket' should be -%% indexed under. --spec set_index(bucket(), index_name()) -> ok. -set_index(Bucket, Index) -> - ok = riak_core_bucket:set_bucket(Bucket, [{?YZ_INDEX, Index}]). - %%%=================================================================== %%% Private %%%=================================================================== diff --git a/src/yz_schema.erl b/src/yz_schema.erl index b8551e90..cf6bb518 100644 --- a/src/yz_schema.erl +++ b/src/yz_schema.erl @@ -125,6 +125,7 @@ verify_fields({ok, Schema}) -> ?YZ_NODE_FIELD_XPATH, ?YZ_PN_FIELD_XPATH, ?YZ_RK_FIELD_XPATH, + ?YZ_RT_FIELD_XPATH, ?YZ_RB_FIELD_XPATH], Checks = [verify_field(F, Schema) || F <- Fields], IsError = fun(X) -> X /= ok end, diff --git a/src/yz_solr.erl b/src/yz_solr.erl index e3dd8102..4297fbdf 100644 --- a/src/yz_solr.erl +++ b/src/yz_solr.erl @@ -332,8 +332,8 @@ get_pairs(R) -> Docs = kvc:path([<<"response">>, <<"docs">>], R), [to_pair(DocStruct) || DocStruct <- Docs]. -to_pair({struct, [{_,Bucket},{_,Key},{_,Base64Hash}]}) -> - {{Bucket,Key}, base64:decode(Base64Hash)}. +to_pair({struct, [{_,_Vsn},{_,BType},{_,BName},{_,Key},{_,Base64Hash}]}) -> + {{{BType, BName},Key}, base64:decode(Base64Hash)}. get_doc_pairs(Resp) -> Docs = kvc:path([<<"docs">>], Resp), diff --git a/test/yz_kv_tests.erl b/test/yz_kv_tests.erl deleted file mode 100644 index 2067d4b2..00000000 --- a/test/yz_kv_tests.erl +++ /dev/null @@ -1,18 +0,0 @@ --module(yz_kv_tests). --compile(export_all). - --include("yokozuna.hrl"). --include_lib("eunit/include/eunit.hrl"). - -set_index_flag_test()-> - meck:new(riak_core_bucket), - meck:expect(riak_core_bucket, set_bucket, - fun(Bucket, Props) when Bucket =:= <<"a">> -> - case Props of - [{?YZ_INDEX, "index"}] -> ok; - _ -> error - end - end), - ?assertEqual(yz_kv:set_index(<<"a">>, "index"), ok), - ?assert(meck:validate(riak_core_bucket)), - meck:unload(riak_core_bucket). diff --git a/tools/build-jar.sh b/tools/build-jar.sh index 53d07f2c..b7b2e38d 100755 --- a/tools/build-jar.sh +++ b/tools/build-jar.sh @@ -2,6 +2,7 @@ # # Temporary script to build JAR file containing customer Solr request # handlers. +set -e if [ -z "$SKIP_JAVA" ]; then if [ ! -x "`which javac`" ] || [ ! -x "`which jar`" ]; then