Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/zl/remove unnecessary delete to solr idx when crdt or sc #452

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 144 additions & 5 deletions src/yz_kv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
-include_lib("riak_core/include/riak_core_bucket_type.hrl").
-include("yokozuna.hrl").

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-type write_reason() :: delete | handoff | put | anti_entropy.


Expand Down Expand Up @@ -82,7 +86,7 @@ get(C, Bucket, Key) ->
Other
end.

%% @doc calculates the hash of a riak object, returns binary
%% @doc calculates the hash of a riak object, returns binary
-spec hash_object(riak_object:riak_object()) -> binary().
hash_object(Obj) ->
Vclock = riak_object:vclock(Obj),
Expand Down Expand Up @@ -252,8 +256,7 @@ index(_, delete, _, P, BKey, ShortPL, Index) ->
ok = yz_solr:delete(Index, [{bkey, BKey}]),
ok = update_hashtree(delete, P, ShortPL, BKey),
ok;

index(Obj, _Reason, Ring, P, BKey, ShortPL, Index) ->
index(Obj, Reason, Ring, P, BKey, ShortPL, Index) ->
case riak_object:get_values(Obj) of
[notfound] ->
ok = index(Obj, delete, Ring, P, BKey, ShortPL, Index);
Expand All @@ -263,8 +266,8 @@ index(Obj, _Reason, Ring, P, BKey, ShortPL, Index) ->
LP = yz_cover:logical_partition(LI, P),
Hash = hash_object(Obj),
Docs = yz_doc:make_docs(Obj, Hash, ?INT_TO_BIN(LFPN), ?INT_TO_BIN(LP)),
DelOp = cleanup(length(Docs), {Obj, BKey, LP}),
ok = yz_solr:index(Index, Docs, DelOp),
ok = yz_solr:index(Index, Docs, delete_operation(Obj, Reason, Docs,
BKey, LP)),
ok = update_hashtree({insert, Hash}, P, ShortPL, BKey)
end.

Expand Down Expand Up @@ -460,3 +463,139 @@ is_owner_or_future_owner(P, Node, Ring) ->
-spec is_service_up(atom(), node()) -> boolean().
is_service_up(Service, Node) ->
lists:member(Service, riak_core_node_watcher:services(Node)).

%% @private
%%
%% @doc Check if object has 2.0 CRDT datatype entry or property for
%% strong consistency.
-spec is_datatype_or_consistent(obj()) -> boolean()|{error, _}.
is_datatype_or_consistent(Obj) ->
Bucket = riak_object:bucket(Obj),
case riak_core_bucket:get_bucket(Bucket) of
BProps when is_list(BProps) ->
case is_datatype(BProps) of
false ->
%% Check if this is a consistent object
lists:member({consistent, true}, BProps);
_ ->
true
end;
{error, _}=Err ->
Err
end.

%% @private
%%
%% @doc Check if BucketProps has 2.0 CRDT datatype
-spec is_datatype(riak_kv_bucket:props()) -> boolean().
is_datatype(BProps) ->
Type = proplists:get_value(datatype, BProps),
Mod = riak_kv_crdt:to_mod(Type),
riak_kv_crdt:supported(Mod).

%% @private
%%
%% @doc Set yz_solr:index delete operation(s) on write_reason.
-spec delete_operation(obj(), put|handoff|anti_entropy, [doc()], bkey(), lp()) ->
[]|[{id, _}]|[{siblings, _}].
delete_operation(Obj, put, Docs, BKey, LP) ->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why handoff or anti_entropy should result in cleanups? Shouldn't the same reasons apply in those cases as for put?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a conversation I had w/ @drewkerrigan, it was to keep the delete operation for handoff/anti_entropy as cleanup b/c there was an edge case where data-typed keys could actually have a siblings and "one of the test cases the client had was like deleting without using a context and then updating the datatype object."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I think we should fix the underlying problem in that case, but that should be ok for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, as there's another issue altogether.

case is_datatype_or_consistent(Obj) of
true -> [];
false -> cleanup(length(Docs), {Obj, BKey, LP})
end;
delete_operation(Obj, _Reason, Docs, BKey, LP) ->
cleanup(length(Docs), {Obj, BKey, LP}).


%%%===================================================================
%%% Tests
%%%===================================================================

-ifdef(TEST).

is_datatype_or_consistent_test_() ->
{setup,
fun() ->
meck:new(riak_core_capability, []),
meck:expect(riak_core_capability, get,
fun({riak_core, bucket_types}) -> true;
(X) -> meck:passthrough([X]) end),
meck:expect(riak_core_capability, get,
fun({riak_kv, crdt}, []) ->
[pncounter,riak_dt_pncounter,riak_dt_orswot,
riak_dt_map];
(X, Y) -> meck:passthrough([X, Y]) end),
application:load(riak_core),
application:set_env(riak_core, default_bucket_props, []),
riak_core_ring_events:start_link(),
riak_core_ring_manager:start_link(test),
riak_core_claimant:start_link(),
riak_core_metadata_manager:start_link([]),
riak_core_ring_manager:setup_ets(test),
riak_core_metadata_hashtree:start_link(),
ok
end,
fun(_) ->
process_flag(trap_exit, true),
riak_core_ring_manager:cleanup_ets(test),
catch application:stop(riak_core),
catch(riak_core_metadata_hashtree:stop()),
catch(riak_core_claimant:stop()),
catch(riak_core_ring_manager:stop()),
catch(exit(whereis(riak_core_ring_events), shutdown)),
application:unset_env(riak_core, default_bucket_props),
meck:unload(riak_core_capability)
end,
[
?_test(begin
Bucket1 = <<"bucket">>,
BucketType = <<"type">>,
Bucket2 = {BucketType, <<"bucket2">>},
riak_core_bucket:set_bucket(Bucket1, [{consistent, true}]),
riak_core_bucket_type:create(BucketType, [{consistent, true}]),
riak_core_bucket_type:activate(BucketType),
TypeProps = riak_core_bucket_type:get(BucketType),
?assert(proplists:get_value(consistent, TypeProps)),
?assertEqual([{consistent,true}, {name, Bucket1}],
riak_core_bucket:get_bucket(Bucket1)),
BTProps = riak_core_bucket:get_bucket(Bucket2),
?assert(proplists:get_value(consistent, BTProps)),
?assertEqual(Bucket2, proplists:get_value(name, BTProps)),
[?assert(is_datatype_or_consistent(riak_object:new(B, K, V)))
|| {B, K, V} <- [{Bucket1, <<"k1">>, hi},
{Bucket2, <<"k2">>, hey}]]
end),
?_test(begin
BucketType1 = <<"counters">>,
BucketType2 = <<"maps">>,
Bucket1 = {BucketType1, <<"crdt">>},
Bucket2 = {BucketType2, <<"crdtz">>},
riak_core_bucket_type:create(BucketType1, [{datatype, counter}]),
riak_core_bucket_type:activate(BucketType1),
riak_core_bucket_type:create(BucketType2, [{datatype, map}]),
riak_core_bucket_type:activate(BucketType2),
BTProps1 = riak_core_bucket:get_bucket(Bucket1),
BTProps2 = riak_core_bucket:get_bucket(Bucket2),
?assertEqual(counter, proplists:get_value(datatype, BTProps1)),
?assertEqual(map, proplists:get_value(datatype, BTProps2)),
[?assert(is_datatype_or_consistent(riak_object:new(B, K, V)))
|| {B, K, V} <- [{Bucket1, <<"k1">>, hi},
{Bucket2, <<"k2">>, hey}]]
end),
?_test(begin
Bucket1 = <<"buckety">>,
BucketType = <<"typey">>,
Bucket2 = {BucketType, <<"bucketjumpy">>},
riak_core_bucket:set_bucket(Bucket1, []),
riak_core_bucket_type:create(BucketType, []),
riak_core_bucket_type:activate(BucketType),
?assertEqual([{name, Bucket1}],
riak_core_bucket:get_bucket(Bucket1)),
BTProps = riak_core_bucket:get_bucket(Bucket2),
?assertEqual(Bucket2, proplists:get_value(name, BTProps)),
[?assertNot(is_datatype_or_consistent(riak_object:new(B, K, V)))
|| {B, K, V} <- [{Bucket1, <<"k1">>, hi},
{Bucket2, <<"k2">>, hey}]]
end)]}.

-endif.