Skip to content

Commit

Permalink
Integrate bucket types
Browse files Browse the repository at this point in the history
Integrate the bucket types functionality.

basho/riak#362

Overall
-------

Bucket types are the future of namespacing and property creation in
Riak. They allow efficient storage of "bucket properties" outside of
the Ring and 2-level namespacing of `Type` and `Name`.

Essentially the bucket type can now be either a lone `binary()`
(legacy) or a 2-tuple of `{Type :: binary(), Name ::
binary()}`. Internally, when the legacy version is encountered it is
considered to live under the `default` bucket type. For example the
bucket `<<"my_bucket">>` would become `{<<"default">>, <<"my_bucket">>}`.

Up until this point Yokozuna has used the bucket property `yz_index`
to determine where to index data. This commit changes that in some
ways. Legacy users will have existing data in buckets. Those buckets,
in 2.0, will be considered to live under the default type as described
above. For legacy buckets (the default type) Yokozuna will NOT respect
the `yz_index` property. Rather it will act like Riak Search and use
an index named the same as the bucket AS LONG AS the `search` property
is set to true. Once users upgrade to 2.0 they should start opting
into non-default bucket types since it is more efficient and newer
features require the use of non-default type. For these types of
buckets Yokozuna will still use the `yz_index` property. This property
will typically be set at the type level but can also be overridden per
name under a type. Yokozuna doesn't care. If that `{Type, Name}` has a
`yz_index` property then it will be indexed.

In summary:

* Legacy buckets (default type) will act like Riak Search. The index
  used must have same name as bucket and `search` property must be
  true. This is used to users migrating from Riak Search.

* All new users MUST use new style buckets made of Type + Name. In
  most cases the `yz_index` property will be set on the type and thus
  inherited by all names under it (many buckets to one index). The
  index DOES NOT have to have the same name.

Handoff
-------

Another important change revolves around handoff. Since Yokozuna
leeches off the KV vnode it doesn't have control over handoff like it
would if it were a true vnode. When a node joins KV can start shipping
data before the bucket type data has been shipped over. In that case
there will be no `yz_index` property and indexes will be missing. AAE
would eventually catch this but it is poor form that node join would
cause a degradation in harvest, especially in a quiescent cluster.

To fix this Yokozuna needs more control over the lifecycle of the KV
vnode. Yokozuna needs to hook into the `handoff_starting` stage and
verify that the bucket types data is shipped before data handoff
begins. This is accomplished by adding the `yz_kv:should_handoff` hook
which is hard-coded in the KV vnode for now.

This is important for removing the hack around index creation as
well. Currently Yokozuna has a pretty horrible busy-wait hack in its
index hook to make sure indexes are created on joining nodes before
doing the first write of a handoff. This busy-wait blocks the KV vnode
and is dangerous for vnode latency. In a future commit this busy-wait
will be replaced with a check in this new handoff hook.

Removal of Automatic AAE Tree Clearing
--------------------------------------

Remove all functionality around automatic clearing of trees when
adding or removing the `yz_index` property on a bucket with data. This
was referred to as `sync_data` in the `yz_events` module. Also called
"flags" harking back to when Yokozuna had a one-to-one bucket-to-index
mapping.

The original intention was that adding an index to a bucket with data
should clear the AAE trees so that exchanges would start repairing
missing indexes. If setting the index property to the tombstone value
(removal) then a) data for that bucket should be purged from the index
and b) AAE trees should be cleared. After much thought I think this
implicit behavior hurts more than helps.

Actions like clearing all AAE trees can be very expensive. It will not
be obvious to all users that adding or changing `yz_index` could cause
expensive operations to occur. For example, clearing the AAE trees for
a database with billions or trillions of objects will be expensive to
rebuild. Rather than relying on AAE a more direct operation could be
offered that allows the user to re-index a bucket or subset of
data. When removing an index it makes more sense to let the user
delete the index entirely rather than do an implicit delete-by-query
which is doing a bunch of extra work for a index that is going to be
deleted anyways.

Misc Changes
------------

* Update all tests to work with bucket types.

* Update Basho Bench driver to work with bucket types.

* Make map-reduce extraction more efficient. This is the ugly hack
  found in `yokozuna:positions`.
  • Loading branch information
rzezeski committed Nov 12, 2013
1 parent b58bea4 commit 786c3f2
Showing 27 changed files with 443 additions and 469 deletions.
9 changes: 8 additions & 1 deletion include/yokozuna.hrl
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@
%% Short representation of a preflist, partition + n_val
-type short_preflist() :: {p(), n()}.
%% Riak bucket
-type bucket() :: binary().
-type bucket() :: Name :: binary() | {Type :: binary(), Name :: binary()}.
%% Riak key
-type key() :: binary().
%% Bucket/Key pair
@@ -341,6 +341,13 @@
-define(YZ_RK_FIELD_XML, ?YZ_FIELD_XML(?YZ_RK_FIELD_S)).
-define(YZ_RK_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_rk\" and @type=\"_yz_str\" and @indexed=\"true\" and @stored=\"true\"]").
%% Riak bucket type
-define(YZ_RT_FIELD, '_yz_rt').
-define(YZ_RT_FIELD_S, "_yz_rt").
-define(YZ_RT_FIELD_B, <<"_yz_rt">>).
-define(YZ_RT_FIELD_XML, ?YZ_FIELD_XML(?YZ_RT_FIELD_S)).
-define(YZ_RT_FIELD_XPATH, "/schema/fields/field[@name=\"_yz_rt\" and @type=\"_yz_str\" and @indexed=\"true\" and @stored=\"true\"]").
%% Riak bucket
-define(YZ_RB_FIELD, '_yz_rb').
-define(YZ_RB_FIELD_S, "_yz_rb").
19 changes: 13 additions & 6 deletions java_src/com/basho/yokozuna/handler/EntropyData.java
Original file line number Diff line number Diff line change
@@ -120,7 +120,9 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
String text = null;
String[] vals = null;
String docPartition = null;
String riakBucket = null;
String vsn = null;
String riakBType = null;
String riakBName = null;
String riakKey = null;
String hash = null;
int count = 0;
@@ -135,13 +137,18 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)
log.debug("text: " + text);
vals = text.split(" ");

docPartition = vals[0];
riakBucket = vals[1];
riakKey = vals[2];
hash = vals[3];
vsn = vals[0];
docPartition = vals[1];
riakBType = vals[2];
riakBName = vals[3];
riakKey = vals[4];
hash = vals[5];

if (partition.equals(docPartition)) {
SolrDocument tmpDoc = new SolrDocument();
tmpDoc.addField("riak_bucket", riakBucket);
tmpDoc.addField("vsn", vsn);
tmpDoc.addField("riak_bucket_type", riakBType);
tmpDoc.addField("riak_bucket_name", riakBName);
tmpDoc.addField("riak_key", riakKey);
tmpDoc.addField("base64_hash", hash);
docs.add(tmpDoc);
12 changes: 7 additions & 5 deletions misc/bench/schemas/fruit_schema.xml
Original file line number Diff line number Diff line change
@@ -10,20 +10,22 @@ rue"/>
<field name="_version_" type="long" indexed="true" stored="true"/>

<!-- Entropy Data: Data related to anti-entropy -->
<field name="_yz_ed" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_ed" type="_yz_str" indexed="true" stored="false"/>

<!-- Partition Number: Used as a filter query param -->
<field name="_yz_pn" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_pn" type="_yz_str" indexed="true" stored="false"/>

<!-- First Partition Number: The first partition in this doc's
preflist, used for further filtering on overlapping partitions. -->
<field name="_yz_fpn" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_fpn" type="_yz_str" indexed="true" stored="false"/>

<!-- If there is a sibling, use vtag to differentiate them -->
<field name="_yz_vtag" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_vtag" type="_yz_str" indexed="true" stored="false"/>

<!-- Node: The name of the node that this doc was created on. -->
<field name="_yz_node" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_node" type="_yz_str" indexed="true" stored="false"/>

<field name="_yz_rt" type="_yz_str" indexed="true" stored="true"/>

<!-- Riak Bucket: The bucket of the Riak object this doc corresponds to. -->
<field name="_yz_rb" type="_yz_str" indexed="true" stored="true"/>
31 changes: 26 additions & 5 deletions misc/bench/src/yz_driver.erl
Original file line number Diff line number Diff line change
@@ -28,6 +28,26 @@
%% API
%% ====================================================================

-type bucket() :: bucket() | {bucket(), bucket()}.

-spec bucket_path(bucket()) -> binary().
bucket_path({Type, Name}) ->
<<"/types/",Type/binary,"/buckets/",Name/binary,"/keys">>;
bucket_path(Name) ->
<<"/buckets/",Name/binary,"/keys">>.

-spec search_path(bucket()) -> binary().
search_path({BucketType, _}) ->
<<"/solr/",BucketType/binary,"/select">>;
search_path(BucketName) ->
<<"/solr/",BucketName/binary,"/select">>.

-spec bucket_type(bucket()) -> binary().
bucket_type({Type, _}) ->
Type;
bucket_type(Name) ->
Name.

start_apps(Apps) ->
[application:start(App) || App <- Apps].

@@ -43,19 +63,20 @@ new(_Id) ->
false ->
ok
end,
Index = basho_bench_config:get(index, "test"),
Bucket = basho_bench_config:get(bucket, {<<"test">>, <<"test">>}),
Index = basho_bench_config:get(index, bucket_type(Bucket)),
HTTP = basho_bench_config:get(http_conns, [{"127.0.0.1", 8098}]),
PB = basho_bench_config:get(pb_conns, [{"127.0.0.1", 8087}]),
IPath = basho_bench_config:get(index_path, "/riak/test"),
SPath = basho_bench_config:get(search_path, "/search/test"),
IURLs = array:from_list(lists:map(make_url(IPath), HTTP)),
BPath = basho_bench_config:get(bucket_path, bucket_path(Bucket)),
SPath = basho_bench_config:get(search_path, search_path(Index)),
IURLs = array:from_list(lists:map(make_url(BPath), HTTP)),
SURLs = array:from_list(lists:map(make_url(SPath), HTTP)),
Conns = array:from_list(lists:map(make_conn(Secure, User, Password, Cert), PB)),
N = length(HTTP),
M = length(PB),

{ok, #state{pb_conns={Conns, {0,M}},
index=list_to_binary(Index),
index=Index,
iurls={IURLs, {0,N}},
surls={SURLs, {0,N}}}}.

3 changes: 3 additions & 0 deletions priv/default_schema.xml
Original file line number Diff line number Diff line change
@@ -130,6 +130,9 @@
<!-- Riak Key: The key of the Riak object this doc corresponds to. -->
<field name="_yz_rk" type="_yz_str" indexed="true" stored="true"/>

<!-- Riak Bucket Type: The bucket type of the Riak object this doc corresponds to. -->
<field name="_yz_rt" type="_yz_str" indexed="true" stored="true"/>

<!-- Riak Bucket: The bucket of the Riak object this doc corresponds to. -->
<field name="_yz_rb" type="_yz_str" indexed="true" stored="true"/>
</fields>
5 changes: 3 additions & 2 deletions riak_test/aae_test.erl
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(NUM_KEYS, 10000).
-define(BUCKET, {<<"fruit_aae">>, <<"fruit">>}).
-define(INDEX, <<"fruit_aae">>).
-define(REPAIR_MFA, {yz_exchange_fsm, repair, 2}).
-define(CFG,
@@ -30,7 +31,7 @@ confirm() ->
PBConns = yz_rt:open_pb_conns(Cluster),
PBConn = yz_rt:select_random(PBConns),
setup_index(Cluster, PBConn, YZBenchDir),
yz_rt:load_data(Cluster, ?INDEX, YZBenchDir, ?NUM_KEYS),
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
lager:info("Verify data was indexed"),
verify_num_match(Cluster, ?NUM_KEYS),
%% Wait for a full round of exchange and then get total repair
@@ -107,7 +108,7 @@ setup_index(Cluster, PBConn, YZBenchDir) ->
RawSchema = read_schema(YZBenchDir),
yz_rt:store_schema(PBConn, ?INDEX, RawSchema),
ok = yz_rt:create_index(Node, ?INDEX, ?INDEX),
ok = yz_rt:set_index(Node, ?INDEX),
ok = yz_rt:set_bucket_type_index(Node, ?INDEX),
yz_rt:wait_for_index(Cluster, ?INDEX).

%% @doc Verify that no repair has happened since `TS'.
38 changes: 20 additions & 18 deletions riak_test/yokozuna_essential.erl
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
host_entries/1,
run_bb/2,
search_expect/5, search_expect/6,
set_index/2,
verify_count/2,
wait_for_joins/1, write_terms/2]).
-include_lib("eunit/include/eunit.hrl").
@@ -23,6 +22,7 @@
%% cluster and you will see zero repairs occur.

-define(FRUIT_SCHEMA_NAME, <<"fruit">>).
-define(BUCKET, {<<"fruit">>,<<"b1">>}).
-define(INDEX, <<"fruit">>).
-define(NUM_KEYS, 10000).
-define(SUCCESS, 0).
@@ -47,7 +47,7 @@ confirm() ->
wait_for_joins(Cluster),
rt:wait_for_cluster_service(Cluster, yokozuna),
setup_indexing(Cluster, PBConns, YZBenchDir),
{0, _} = yz_rt:load_data(Cluster, ?INDEX, YZBenchDir, ?NUM_KEYS),
{0, _} = yz_rt:load_data(Cluster, ?BUCKET, YZBenchDir, ?NUM_KEYS),
%% wait for soft-commit
timer:sleep(1000),
Ref = async_query(Cluster, YZBenchDir),
@@ -63,19 +63,21 @@ confirm() ->
yz_rt:close_pb_conns(PBConns),
pass.

%% @doc Verify that the delete call made my `yz_kv:cleanup' can deal
%% with a key which contains characters like '/', ':' and '-'.
test_escaped_key(Cluster) ->
lager:info("Test key escape"),
{H, P} = hd(host_entries(rt:connection_info(Cluster))),
Value = <<"Never gonna give you up">>,
Bucket = "escaped",
Bucket = {<<"escaped">>,<<"b1">>},
Key = edoc_lib:escape_uri("rick/astley-rolled:derp"),
ok = yz_rt:http_put({H, P}, Bucket, Key, Value),
ok = http_get({H, P}, Bucket, Key),
ok.

http_get({Host, Port}, Bucket, Key) ->
URL = lists:flatten(io_lib:format("http://~s:~s/buckets/~s/keys/~s",
[Host, integer_to_list(Port), Bucket, Key])),
http_get({Host, Port}, {BType, BName}, Key) ->
URL = lists:flatten(io_lib:format("http://~s:~s/types/~s/buckets/~s/keys/~s",
[Host, integer_to_list(Port), BType, BName, Key])),
Headers = [{"accept", "text/plain"}],
{ok, "200", _, _} = ibrowse:send_req(URL, Headers, get, [], []),
ok.
@@ -92,7 +94,7 @@ test_tagging(Cluster) ->

write_with_tag({Host, Port}) ->
lager:info("Tag the object tagging/test"),
URL = lists:flatten(io_lib:format("http://~s:~s/buckets/tagging/keys/test",
URL = lists:flatten(io_lib:format("http://~s:~s/types/tagging/buckets/b1/keys/test",
[Host, integer_to_list(Port)])),
Opts = [],
Body = <<"testing tagging">>,
@@ -116,10 +118,9 @@ async_query(Cluster, YZBenchDir) ->
{operations, [{Apple,1}]},
{http_conns, Hosts},
{pb_conns, []},
{search_path, "/search/" ++ binary_to_list(?INDEX)},
{bucket, ?BUCKET},
{shutdown_on_error, true}],

File = "bb-query-fruit-" ++ binary_to_list(?INDEX),
File = "bb-query-fruit",
write_terms(File, Cfg),
run_bb(async, File).

@@ -130,7 +131,7 @@ delete_key(Cluster, Key) ->
Node = yz_rt:select_random(Cluster),
lager:info("Deleting key ~s", [Key]),
{ok, C} = riak:client_connect(Node),
C:delete(?INDEX, list_to_binary(Key)).
C:delete(?BUCKET, list_to_binary(Key)).

delete_some_data(Cluster, ReapSleep) ->
Keys = yz_rt:random_keys(?NUM_KEYS),
@@ -168,13 +169,15 @@ setup_indexing(Cluster, PBConns, YZBenchDir) ->
yz_rt:store_schema(PBConn, ?FRUIT_SCHEMA_NAME, RawSchema),
yz_rt:wait_for_schema(Cluster, ?FRUIT_SCHEMA_NAME, RawSchema),
ok = create_index(Node, ?INDEX, ?FRUIT_SCHEMA_NAME),
ok = set_index(Node, ?INDEX),
ok = yz_rt:set_bucket_type_index(Node, ?INDEX),
ok = create_index(Node, <<"tagging">>),
ok = set_index(Node, <<"tagging">>),
ok = yz_rt:set_bucket_type_index(Node, <<"tagging">>),
ok = create_index(Node, <<"siblings">>),
ok = set_index(Node, <<"siblings">>),
ok = yz_rt:set_bucket_type_index(Node, <<"siblings">>),
ok = create_index(Node, <<"escaped">>),
ok = yz_rt:set_bucket_type_index(Node, <<"escaped">>),
[yz_rt:wait_for_index(Cluster, I)
|| I <- [<<"fruit">>, <<"tagging">>, <<"siblings">>]].
|| I <- [<<"fruit">>, <<"tagging">>, <<"siblings">>, <<"escaped">>]].

verify_deletes(Cluster, KeysDeleted, YZBenchDir) ->
NumDeleted = length(KeysDeleted),
@@ -191,10 +194,9 @@ verify_deletes(Cluster, KeysDeleted, YZBenchDir) ->
{operations, [{Apple,1}]},
{http_conns, Hosts},
{pb_conns, []},
{search_path, "/search/" ++ binary_to_list(?INDEX)},
{bucket, ?BUCKET},
{shutdown_on_error, true}],

File = "bb-verify-deletes-" ++ binary_to_list(?INDEX),
File = "bb-verify-deletes",
write_terms(File, Cfg),
check_status(run_bb(sync, File)).

37 changes: 21 additions & 16 deletions riak_test/yz_errors.erl
Original file line number Diff line number Diff line change
@@ -39,10 +39,11 @@ test_errors(Cluster) ->

expect_bad_json(Cluster) ->
Index = <<"bad_json">>,
Bucket = {<<"bad_json">>,<<"bucket">>},
HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))),
ok = create_index(Cluster, HP, Index),
lager:info("Write bad json"),
URL = bucket_url(HP, Index, "test"),
ok = create_index(Cluster, Index),
lager:info("Write bad json [~p]", [HP]),
URL = bucket_url(HP, Bucket, "test"),
Opts = [],
CT = "application/json",
Headers = [{"content-type", CT}],
@@ -59,10 +60,11 @@ expect_bad_json(Cluster) ->

expect_bad_xml(Cluster) ->
Index = <<"bad_xml">>,
Bucket = {Index,<<"bucket">>},
HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))),
ok = create_index(Cluster, HP, Index),
lager:info("Write bad xml"),
URL = bucket_url(HP, Index, "test"),
ok = create_index(Cluster, Index),
lager:info("Write bad xml [~p]", [HP]),
URL = bucket_url(HP, Bucket, "test"),
Opts = [],
CT = "application/xml",
Headers = [{"content-type", CT}],
@@ -79,10 +81,11 @@ expect_bad_xml(Cluster) ->

expect_bad_query(Cluster) ->
Index = <<"bad_query">>,
Bucket = {Index, <<"bucket">>},
HP = yz_rt:select_random(host_entries(rt:connection_info(Cluster))),
ok = create_index(Cluster, HP, Index),
lager:info("Write bad query"),
URL = bucket_url(HP, Index, "test"),
ok = create_index(Cluster, Index),
lager:info("Write bad query [~p]", [HP]),
URL = bucket_url(HP, Bucket, "test"),
Opts = [],
CT = "text/plain",
Headers = [{"content-type", CT}],
@@ -100,22 +103,24 @@ expect_bad_query(Cluster) ->
index_url({Host,Port}, Index) ->
?FMT("http://~s:~B/yz/index/~s", [Host, Port, Index]).

bucket_url({Host,Port}, Bucket, Key) ->
?FMT("http://~s:~B/buckets/~s/keys/~s", [Host, Port, Bucket, Key]).
bucket_url({Host,Port}, {BType, BName}, Key) ->
?FMT("http://~s:~B/types/~s/buckets/~s/keys/~s", [Host, Port, BType, BName, Key]).

search_url({Host,Port}, Bucket) ->
?FMT("http://~s:~B/search/~s", [Host, Port, Bucket]).
search_url({Host,Port}, Index) ->
?FMT("http://~s:~B/solr/~s/select", [Host, Port, Index]).

http(Method, URL, Headers, Body) ->
Opts = [],
ibrowse:send_req(URL, Headers, Method, Body, Opts).

create_index(Cluster, HP, Index) ->
create_index(Cluster, Index) ->
Node = yz_rt:select_random(Cluster),
lager:info("create_index ~s [~p]", [Index, HP]),
HP = hd(host_entries(rt:connection_info([Node]))),
lager:info("create_index ~s [~p]", [Index, Node]),
URL = index_url(HP, Index),
Headers = [{"content-type", "application/json"}],
{ok, Status, _, _} = http(put, URL, Headers, ?NO_BODY),
yz_rt:set_index(Node, Index),
yz_rt:set_bucket_type_index(Node, Index),
yz_rt:wait_for_bucket_type(Cluster, Index),
yz_rt:wait_for_index(Cluster, Index),
?assertEqual("204", Status).
Loading

0 comments on commit 786c3f2

Please sign in to comment.