Skip to content

Commit 02c8205

Browse files
authored
Test and fix - issue with folding beyond JournalSQN (#466)
Test previously fails, as even on a fast machine the fold goes on for 5s beyond the last object found. With change to reduce batch size, and stop when batch goes beyond JournalSQN - success with << 100ms spent folding after the last object discovered
1 parent 313b8ed commit 02c8205

File tree

3 files changed

+125
-42
lines changed

3 files changed

+125
-42
lines changed

include/leveled.hrl

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
-define(MAX_SSTSLOTS, 256).
3636
-define(MAX_MERGEBELOW, 24).
3737
-define(LOADING_PAUSE, 1000).
38-
-define(LOADING_BATCH, 1000).
38+
-define(LOADING_BATCH, 200).
3939
-define(CACHE_SIZE_JITTER, 25).
4040
-define(JOURNAL_SIZE_JITTER, 20).
4141
-define(LONG_RUNNING, 1000000).

src/leveled_inker.erl

+65-41
Original file line numberDiff line numberDiff line change
@@ -558,10 +558,13 @@ handle_call({fold,
558558
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
559559
Folder =
560560
fun() ->
561-
fold_from_sequence(StartSQN,
562-
{FilterFun, InitAccFun, FoldFun},
563-
Acc,
564-
Manifest)
561+
fold_from_sequence(
562+
StartSQN,
563+
State#state.journal_sqn,
564+
{FilterFun, InitAccFun, FoldFun},
565+
Acc,
566+
Manifest
567+
)
565568
end,
566569
case By of
567570
as_ink ->
@@ -1211,8 +1214,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
12111214

12121215

12131216

1214-
-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
1215-
-> any().
1217+
-spec fold_from_sequence(
1218+
non_neg_integer(),
1219+
pos_integer(),
1220+
{fun(), fun(), fun()},
1221+
any(),
1222+
list()) -> any().
12161223
%% @doc
12171224
%%
12181225
%% Scan from the starting sequence number to the end of the Journal. Apply
@@ -1226,62 +1233,79 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->
12261233
%% over in batches using foldfile_between_sequence/7. The batch is a range of
12271234
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
12281235
%% files
1229-
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
1236+
fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) ->
12301237
Acc;
1231-
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
1238+
fold_from_sequence(MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
12321239
when LowSQN >= MinSQN ->
1233-
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
1234-
MinSQN + ?LOADING_BATCH,
1235-
FoldFuns,
1236-
Acc,
1237-
Pid,
1238-
undefined,
1239-
FN),
1240-
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
1241-
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
1240+
{NextMinSQN, Acc0} =
1241+
foldfile_between_sequence(
1242+
MinSQN,
1243+
MinSQN + ?LOADING_BATCH,
1244+
JournalSQN,
1245+
FoldFuns,
1246+
Acc,
1247+
Pid,
1248+
undefined,
1249+
FN
1250+
),
1251+
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest);
1252+
fold_from_sequence(
1253+
MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
12421254
% If this file has a LowSQN less than the minimum, we can skip it if the
12431255
% next file also has a LowSQN below the minimum
12441256
{NextMinSQN, Acc0} =
12451257
case Rest of
12461258
[] ->
1247-
foldfile_between_sequence(MinSQN,
1248-
MinSQN + ?LOADING_BATCH,
1249-
FoldFuns,
1250-
Acc,
1251-
Pid,
1252-
undefined,
1253-
FN);
1259+
foldfile_between_sequence(
1260+
MinSQN,
1261+
MinSQN + ?LOADING_BATCH,
1262+
JournalSQN,
1263+
FoldFuns,
1264+
Acc,
1265+
Pid,
1266+
undefined,
1267+
FN
1268+
);
12541269
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
1255-
foldfile_between_sequence(MinSQN,
1256-
MinSQN + ?LOADING_BATCH,
1257-
FoldFuns,
1258-
Acc,
1259-
Pid,
1260-
undefined,
1261-
FN);
1270+
foldfile_between_sequence(
1271+
MinSQN,
1272+
MinSQN + ?LOADING_BATCH,
1273+
JournalSQN,
1274+
FoldFuns,
1275+
Acc,
1276+
Pid,
1277+
undefined,
1278+
FN
1279+
);
12621280
_ ->
12631281
{MinSQN, Acc}
12641282
end,
1265-
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
1283+
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest).
12661284

1267-
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
1268-
Acc, CDBpid, StartPos, FN) ->
1285+
foldfile_between_sequence(
1286+
MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) ->
12691287
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
12701288
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
12711289

12721290
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
12731291
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
12741292
{AccMinSQN, FoldFun(BatchAcc, Acc)};
1293+
{_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}}
1294+
when AccMinSQN >= JournalSQN ->
1295+
{AccMinSQN, FoldFun(BatchAcc, Acc)};
12751296
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
12761297
UpdAcc = FoldFun(BatchAcc, Acc),
12771298
NextSQN = MaxSQN + 1,
1278-
foldfile_between_sequence(NextSQN,
1279-
NextSQN + ?LOADING_BATCH,
1280-
FoldFuns,
1281-
UpdAcc,
1282-
CDBpid,
1283-
LastPosition,
1284-
FN)
1299+
foldfile_between_sequence(
1300+
NextSQN,
1301+
NextSQN + ?LOADING_BATCH,
1302+
JournalSQN,
1303+
FoldFuns,
1304+
UpdAcc,
1305+
CDBpid,
1306+
LastPosition,
1307+
FN
1308+
)
12851309
end.
12861310

12871311

test/end_to_end/riak_SUITE.erl

+59
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
fetchclocks_modifiedbetween/1,
1212
crossbucket_aae/1,
1313
handoff/1,
14+
handoff_close/1,
1415
dollar_bucket_index/1,
1516
dollar_key_index/1,
1617
bigobject_memorycheck/1,
@@ -23,6 +24,7 @@ all() -> [
2324
fetchclocks_modifiedbetween,
2425
crossbucket_aae,
2526
handoff,
27+
handoff_close,
2628
dollar_bucket_index,
2729
dollar_key_index,
2830
bigobject_memorycheck,
@@ -1633,6 +1635,63 @@ dollar_key_index(_Config) ->
16331635
ok = leveled_bookie:book_close(Bookie1),
16341636
testutil:reset_filestructure().
16351637

1638+
handoff_close(_Config) ->
1639+
RootPath = testutil:reset_filestructure(),
1640+
KeyCount = 500000,
1641+
Bucket = {<<"BType">>, <<"BName">>},
1642+
StartOpts1 =
1643+
[
1644+
{root_path, RootPath},
1645+
{max_journalobjectcount, KeyCount + 1},
1646+
{max_pencillercachesize, 12000},
1647+
{sync_strategy, testutil:sync_strategy()}
1648+
],
1649+
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
1650+
ObjList1 =
1651+
testutil:generate_objects(
1652+
KeyCount div 10,
1653+
{fixed_binary, 1}, [],
1654+
leveled_rand:rand_bytes(512),
1655+
fun() -> [] end,
1656+
Bucket
1657+
),
1658+
ObjList2 =
1659+
testutil:generate_objects(
1660+
KeyCount - (KeyCount div 10),
1661+
{fixed_binary, KeyCount div 10 + 1}, [],
1662+
leveled_rand:rand_bytes(512),
1663+
fun() -> [] end,
1664+
Bucket
1665+
),
1666+
testutil:riakload(Bookie1, ObjList1),
1667+
FoldObjectsFun =
1668+
fun(_, _, _, Acc) ->
1669+
[os:timestamp()|Acc]
1670+
end,
1671+
{async, Runner} =
1672+
leveled_bookie:book_objectfold(
1673+
Bookie1,
1674+
?RIAK_TAG,
1675+
{FoldObjectsFun, []},
1676+
true,
1677+
sqn_order
1678+
),
1679+
testutil:riakload(Bookie1, ObjList2),
1680+
TSList = Runner(),
1681+
QueryCompletionTime = os:timestamp(),
1682+
LastTS = hd(TSList),
1683+
io:format(
1684+
"Found ~w objects with Last TS ~w completion time ~w~n",
1685+
[length(TSList), LastTS, QueryCompletionTime]
1686+
),
1687+
true = KeyCount div 10 == length(TSList),
1688+
TimeSinceLastObjectTouchedMS =
1689+
timer:now_diff(QueryCompletionTime, LastTS) div 1000,
1690+
true = TimeSinceLastObjectTouchedMS < 1000,
1691+
leveled_bookie:book_destroy(Bookie1),
1692+
testutil:reset_filestructure().
1693+
1694+
16361695
%% @doc test that the riak specific $bucket indexes can be iterated
16371696
%% using leveled's existing folders
16381697
dollar_bucket_index(_Config) ->

0 commit comments

Comments
 (0)