Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for combination queries on same snapshot point #439

Open
wants to merge 1 commit into
base: mas-d31-i433re2capture
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
deprecated_function_calls, deprecated_functions]}.

{cover_excl_mods,
[leveled_filterlexer, leveled_filterparser, leveled_evallexer, leveled_evalparser,
[leveled_filterlexer, leveled_filterparser,
leveled_evallexer, leveled_evalparser,
leveled_setoplexer, leveled_setopparser,
testutil,
appdefined_SUITE, basic_SUITE, iterator_SUITE,
perf_SUITE, recovery_SUITE, riak_SUITE, tictac_SUITE]}.
Expand Down
240 changes: 153 additions & 87 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
-export([
book_returnfolder/2,
book_indexfold/5,
book_multiindexfold/5,
book_bucketlist/4,
book_keylist/3,
book_keylist/4,
Expand Down Expand Up @@ -684,7 +685,7 @@ book_returnfolder(Pid, RunnerType) ->
Constraint:: {Bucket, StartKey},
FoldAccT :: {FoldFun, Acc},
Range :: {IndexField, Start, End},
TermHandling :: {ReturnTerms, TermRegex}) ->
TermHandling :: {ReturnTerms, TermExpression}) ->
{async, Runner::fun(() -> term())}
when Bucket::term(),
Key :: term(),
Expand All @@ -697,7 +698,7 @@ book_returnfolder(Pid, RunnerType) ->
Start::IndexVal,
End::IndexVal,
ReturnTerms::boolean()|binary(),
TermRegex :: leveled_codec:regular_expression().
TermExpression :: leveled_codec:term_expression().

book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling)
when is_tuple(Constraint) ->
Expand All @@ -713,6 +714,23 @@ book_indexfold(Pid, Bucket, FoldAccT, Range, TermHandling) ->
leveled_log:log(b0019, [Bucket]),
book_indexfold(Pid, {Bucket, null}, FoldAccT, Range, TermHandling).

-type query()
:: {binary(), binary(), binary(), leveled_codec:term_expression()}.
-type combo_fun()
:: fun((list(sets:set(leveled_codec:key())))
-> sets:set(leveled_codec:key())).

-spec book_multiindexfold(
pid(),
leveled_codec:key(),
fun((leveled_codec:key(), leveled_codec:key(), term()) -> term()),
list(query()),
combo_fun())
-> {async, fun(() -> term())}.
book_multiindexfold(Pid, Bucket, FoldAccT, Queries, ComboFun) ->
RunnerType =
{multi_index_query, Bucket, FoldAccT, Queries, ComboFun},
book_returnfolder(Pid, RunnerType).

%% @doc list buckets. Folds over the ledger only. Given a `Tag' folds
%% over the keyspace calling `FoldFun' from `FoldAccT' for each
Expand Down Expand Up @@ -815,7 +833,7 @@ book_keylist(Pid, Tag, Bucket, KeyRange, FoldAccT) ->
StartKey :: Key,
EndKey :: Key,
Key :: term(),
TermRegex :: leveled_codec:regular_expression(),
TermRegex :: leveled_codec:term_expression(),
Runner :: fun(() -> Acc).
book_keylist(Pid, Tag, Bucket, KeyRange, FoldAccT, TermRegex) ->
RunnerType = {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex},
Expand Down Expand Up @@ -1956,22 +1974,44 @@ snaptype_by_presence(false) ->
%% Get an {async, Runner} for a given fold type. Fold types have different
%% tuple inputs
get_runner(State, {index_query, Constraint, FoldAccT, Range, TermHandling}) ->
{IdxFld, StartT, EndT} = Range,
{Bucket, ObjKey0} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
StartKey =
leveled_codec:to_ledgerkey(Bucket, ObjKey0, ?IDX_TAG, IdxFld, StartT),
EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxFld, EndT),
{StartKey, EndKey} = index_range(Constraint, Range),
SnapFun = return_snapfun(State, ledger, {StartKey, EndKey}, false, false),
leveled_runner:index_query(SnapFun,
{StartKey, EndKey, TermHandling},
FoldAccT);
leveled_runner:index_query(
SnapFun, {StartKey, EndKey, TermHandling}, FoldAccT);
get_runner(
State,
{multi_index_query, Bucket, FoldAccT, Queries, ComboFun}) ->
{FoldFun, InitAcc} = FoldAccT,
KeyFolder = fun(_B, K, Acc) -> [K|Acc] end,
QueryRunners =
lists:map(
fun({IdxFld, StartTerm, EndTerm, Expr}) ->
{SK, EK} =
index_range(
{Bucket, null}, {IdxFld, StartTerm, EndTerm}),
SnapFun =
return_snapfun(State, ledger, {SK, EK}, false, true),
{async, Runner} =
leveled_runner:index_query(
SnapFun, {SK, EK, {false, Expr}}, {KeyFolder, []}
),
Runner
end,
Queries
),
OverallRunner =
fun() ->
FinalSet =
ComboFun(
lists:map(fun(R) -> sets:from_list(R()) end, QueryRunners)
),
lists:foldl(
fun(K, Acc) -> FoldFun(Bucket, K, Acc) end,
InitAcc,
sets:to_list(FinalSet)
)
end,
{async, OverallRunner};
get_runner(State, {keylist, Tag, FoldAccT}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucketkey_query(SnapFun, Tag, null, FoldAccT);
Expand All @@ -1980,91 +2020,102 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) ->
leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT);
get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex}) ->
SnapFun = return_snapfun(State, ledger, no_lookup, true, true),
leveled_runner:bucketkey_query(SnapFun,
Tag, Bucket, KeyRange,
FoldAccT, TermRegex);
leveled_runner:bucketkey_query(
SnapFun, Tag, Bucket, KeyRange, FoldAccT, TermRegex);
%% Set of runners for object or metadata folds
get_runner(State,
{foldheads_allkeys,
Tag, FoldFun,
JournalCheck, SnapPreFold, SegmentList,
LastModRange, MaxObjectCount}) ->
get_runner(
State,
{foldheads_allkeys,
Tag, FoldFun,
JournalCheck, SnapPreFold, SegmentList,
LastModRange, MaxObjectCount}) ->
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_allkeys(SnapFun,
Tag, FoldFun,
JournalCheck, SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order});
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, key_order);
get_runner(State,
{foldobjects_allkeys, Tag, FoldFun, SnapPreFold, sqn_order}) ->
SnapFun = return_snapfun(State, store, undefined, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(SnapFun, Tag, FoldFun, sqn_order);
get_runner(State,
{foldheads_bybucket,
Tag,
BucketList, bucket_list,
FoldFun,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
leveled_runner:foldheads_allkeys(
SnapFun,
Tag,
FoldFun,
JournalCheck,
SegmentList,
LastModRange,
MaxObjectCount);
get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) ->
get_runner(
State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, key_order});
get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold, Order}) ->
case Order of
key_order ->
SnapFun =
return_snapfun(State, store, no_lookup, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(
SnapFun, Tag, FoldFun, key_order);
sqn_order ->
SnapFun =
return_snapfun(State, store, undefined, true, SnapPreFold),
leveled_runner:foldobjects_allkeys(
SnapFun, Tag, FoldFun, sqn_order)
end;
get_runner(
State,
{foldheads_bybucket,
Tag,
BucketList, bucket_list,
FoldFun,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
KeyRangeFun =
fun(Bucket) ->
{StartKey, EndKey, _} = return_ledger_keyrange(Tag, Bucket, all),
{StartKey, EndKey}
end,
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldheads_bybucket,
Tag,
Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
leveled_runner:foldheads_bybucket(
SnapFun,
Tag,
lists:map(KeyRangeFun, BucketList),
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(
State,
{foldheads_bybucket,
Tag,
Bucket, KeyRange,
FoldFun,
JournalCheck, SnapPreFold,
SegmentList, LastModRange, MaxObjectCount}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapType = snaptype_by_presence(JournalCheck),
SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold),
leveled_runner:foldheads_bybucket(SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
FoldFun,
SnapPreFold}) ->
leveled_runner:foldheads_bybucket(
SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun,
JournalCheck,
SegmentList,
LastModRange, MaxObjectCount);
get_runner(
State,
{foldobjects_bybucket,
Tag, Bucket, KeyRange,
FoldFun,
SnapPreFold}) ->
{StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange),
SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold),
leveled_runner:foldobjects_bybucket(SnapFun,
Tag,
[{StartKey, EndKey}],
FoldFun);
get_runner(State,
{foldobjects_byindex,
Tag, Bucket, {Field, FromTerm, ToTerm},
FoldObjectsFun,
SnapPreFold}) ->
leveled_runner:foldobjects_bybucket(
SnapFun, Tag, [{StartKey, EndKey}], FoldFun);
get_runner(
State,
{foldobjects_byindex,
Tag, Bucket, {Field, FromTerm, ToTerm},
FoldObjectsFun,
SnapPreFold}) ->
SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold),
leveled_runner:foldobjects_byindex(SnapFun,
{Tag, Bucket, Field, FromTerm, ToTerm},
FoldObjectsFun);
leveled_runner:foldobjects_byindex(
SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm}, FoldObjectsFun);
get_runner(State, {bucket_list, Tag, FoldAccT}) ->
{FoldBucketsFun, Acc} = FoldAccT,
SnapFun = return_snapfun(State, ledger, no_lookup, false, false),
Expand All @@ -2078,6 +2129,21 @@ get_runner(State, DeprecatedQuery) ->
get_deprecatedrunner(State, DeprecatedQuery).


index_range(Constraint, Range) ->
{IdxFld, StartT, EndT} = Range,
{Bucket, ObjKey0} =
case Constraint of
{B, SK} ->
{B, SK};
B ->
{B, null}
end,
StartKey =
leveled_codec:to_ledgerkey(Bucket, ObjKey0, ?IDX_TAG, IdxFld, StartT),
EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxFld, EndT),
{StartKey, EndKey}.

-spec get_deprecatedrunner(book_state(), tuple()) ->
{async, fun(() -> term())}.
%% @doc
Expand Down
18 changes: 9 additions & 9 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@
-type actual_regex() ::
reference()|{re_pattern, term(), term(), term(), term()}.
-type capture_value() :: binary()|integer().
-type capture_filter_fun() ::
-type query_filter_fun() ::
fun((#{binary() => capture_value()}) -> boolean()).
-type capture_eval_fun() ::
-type query_eval_fun() ::
fun((binary(), binary()) -> #{binary() => capture_value()}).
-type capture_reference() ::
{eval, capture_eval_fun(), capture_filter_fun()}.
-type regular_expression() ::
actual_regex()|undefined|capture_reference().
-type query_expression() ::
{query, query_eval_fun(), query_filter_fun()}.
-type term_expression() ::
actual_regex()|undefined|query_expression().

-type value_fetcher() ::
{fun((pid(), leveled_codec:journal_key()) -> any()),
Expand Down Expand Up @@ -161,7 +161,7 @@
maybe_lookup/0,
last_moddate/0,
lastmod_range/0,
regular_expression/0,
term_expression/0,
actual_regex/0,
value_fetcher/0,
proxy_object/0,
Expand Down Expand Up @@ -301,7 +301,7 @@ maybe_accumulate(

-spec accumulate_index(
{boolean()|binary(),
regular_expression()},
term_expression()},
leveled_runner:acc_fun()) -> any().
accumulate_index({false, undefined}, FoldKeysFun) ->
fun({?IDX_TAG, Bucket, _IndexInfo, ObjKey}, _Value, Acc) ->
Expand All @@ -312,7 +312,7 @@ accumulate_index({true, undefined}, FoldKeysFun) ->
FoldKeysFun(Bucket, {IdxValue, ObjKey}, Acc)
end;
accumulate_index(
{AddTerm, {eval, EvalFun, FilterFun}}, FoldKeysFun) ->
{AddTerm, {query, EvalFun, FilterFun}}, FoldKeysFun) ->
fun({?IDX_TAG, Bucket, {_IdxFld, IdxValue}, ObjKey}, _Value, Acc) ->
CptMap = EvalFun(IdxValue, ObjKey),
check_captured_terms(
Expand Down
Loading