Skip to content

Commit 69e8b29

Browse files
authored
Mas d34 leveled.i459 partialmerge (#460)
* Add test to replicate issue 459 Nothing actually crashes due to the issue - but looking at the logs there is the polarised stats associated with the issue. When merging into L3, you would normally expect to merge into 4 files - but actually we see FileCounter occasionally spiking. * Add partial merge support There is a `max_mergebelow` size which can be a positive integer, or infinity. It defaults to 32. If a merge from Level N covers less than `max_mergebelow` files in level N + 1 - the merge will proceesd as before. If it has >= `max_mergebelow`, the merge will be curtailed when `max_mergebelow div 2` files have been created at that level. The remainder for Level N will then be written, as well as for Level N + 1 up to the next whole file that has no yet been touched by the merge. The backlog that prompted the merge will still exist - as the files in Level N have not been changed. However, it is likely the next file picked will not be the same one, and will in probability have a lower number of files to merge (as the average is =< 8). This will stop progress from being halted by long merge jobs, as they will exit out in a safe way after partial completion. In the case where the majority of files covered do not require a merge, then those files will be skipped the next time the remainder file is picked up for merge at Level N
1 parent c642575 commit 69e8b29

8 files changed

+360
-66
lines changed

include/leveled.hrl

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
%%% Non-configurable startup defaults
3434
%%%============================================================================
3535
-define(MAX_SSTSLOTS, 256).
36+
-define(MAX_MERGEBELOW, 24).
3637
-define(LOADING_PAUSE, 1000).
3738
-define(LOADING_BATCH, 1000).
3839
-define(CACHE_SIZE_JITTER, 25).
@@ -109,7 +110,8 @@
109110
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
110111
log_options = leveled_log:get_opts()
111112
:: leveled_log:log_options(),
112-
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
113+
max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity,
114+
max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity,
113115
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
114116
:: pos_integer(),
115117
monitor = {no_monitor, 0}

rebar.config

+2-6
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,7 @@
1313
{eunit_opts, [verbose]}.
1414

1515
{project_plugins, [
16-
{eqwalizer_rebar3,
17-
{git_subdir,
18-
"https://github.com/whatsapp/eqwalizer.git",
19-
{branch, "main"},
20-
"eqwalizer_rebar3"}}
16+
{eqwalizer_rebar3, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_rebar3"}}
2117
]}.
2218

2319
{profiles,
@@ -37,7 +33,7 @@
3733
{deps, [
3834
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
3935
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
40-
{eqwalizer_support, {git_subdir, "https://github.com/whatsapp/eqwalizer.git", {branch, "main"}, "eqwalizer_support"}}
36+
{eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}}
4137
]}.
4238

4339
{ct_opts, [{dir, ["test/end_to_end"]}]}.

src/leveled_bookie.erl

+10-1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
{max_journalsize, 1000000000},
115115
{max_journalobjectcount, 200000},
116116
{max_sstslots, 256},
117+
{max_mergebelow, 24},
117118
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
118119
{head_only, false},
119120
{waste_retention_period, undefined},
@@ -201,6 +202,12 @@
201202
% The maximum number of slots in a SST file. All testing is done
202203
% at a size of 256 (except for Quickcheck tests}, altering this
203204
% value is not recommended
205+
{max_mergeblow, pos_integer()|infinity} |
206+
% The maximum number of files for a single file to be merged into
207+
% within the ledger. If less than this, the merge will continue
208+
% without a maximum. If this or more overlapping below, only up
209+
% to max_mergebelow div 2 additions should be created (the merge
210+
% should be partial)
204211
{sync_strategy, sync_mode()} |
205212
% Should be sync if it is necessary to flush to disk after every
206213
% write, or none if not (allow the OS to schecdule). This has a
@@ -293,7 +300,7 @@
293300
% To which level of the ledger should the ledger contents be
294301
% pre-loaded into the pagecache (using fadvise on creation and
295302
% startup)
296-
{compression_method, native|lz4|zstd|none} |
303+
{compression_method, native|lz4|zstd|none} |
297304
% Compression method and point allow Leveled to be switched from
298305
% using bif based compression (zlib) to using nif based compression
299306
% (lz4 or zstd).
@@ -1871,6 +1878,7 @@ set_options(Opts, Monitor) ->
18711878
CompressionLevel = proplists:get_value(compression_level, Opts),
18721879

18731880
MaxSSTSlots = proplists:get_value(max_sstslots, Opts),
1881+
MaxMergeBelow = proplists:get_value(max_mergebelow, Opts),
18741882

18751883
ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),
18761884

@@ -1904,6 +1912,7 @@ set_options(Opts, Monitor) ->
19041912
press_level = CompressionLevel,
19051913
log_options = leveled_log:get_opts(),
19061914
max_sstslots = MaxSSTSlots,
1915+
max_mergebelow = MaxMergeBelow,
19071916
monitor = Monitor},
19081917
monitor = Monitor}
19091918
}.

src/leveled_log.erl

+3-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@
165165
pc010 =>
166166
{info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>},
167167
pc011 =>
168-
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>},
168+
{info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>},
169169
pc012 =>
170170
{debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>},
171171
pc013 =>
@@ -190,6 +190,8 @@
190190
{info, <<"Grooming compaction picked file with tomb_count=~w">>},
191191
pc025 =>
192192
{info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>},
193+
pc026 =>
194+
{info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>},
193195
pm002 =>
194196
{info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>},
195197
sst03 =>

src/leveled_pclerk.erl

+163-48
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ notify_deletions([Head|Tail], Penciller) ->
269269
%% to be merged into multiple SSTs at a lower level.
270270
%%
271271
%% SrcLevel is the level of the src sst file, the sink should be srcLevel + 1
272-
273272
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
274273
leveled_log:log(pc010, [leveled_pmanifest:entry_filename(Src), NewSQN]),
275274
SrcList = [{next, Src, all}],
@@ -279,72 +278,188 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN, OptsSST) ->
279278
),
280279
SinkLevel = SrcLevel + 1,
281280
SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel),
282-
Additions =
281+
MaxMergeBelow = OptsSST#sst_options.max_mergebelow,
282+
MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow),
283+
{L2Additions, L1Additions, L2FileRemainder} =
283284
do_merge(
284285
SrcList, SinkList,
285286
SinkLevel, SinkBasement,
286287
RootPath, NewSQN, MaxSQN,
287288
OptsSST,
288-
[]
289+
[],
290+
MergeLimit
291+
),
292+
RevertPointerFun = fun({next, ME, _SK}) -> ME end,
293+
SinkManifestRemovals =
294+
lists:subtract(
295+
lists:map(RevertPointerFun, SinkList),
296+
lists:map(RevertPointerFun, L2FileRemainder)
289297
),
290-
RevertPointerFun =
291-
fun({next, ME, _SK}) ->
292-
ME
293-
end,
294-
SinkManifestList = lists:map(RevertPointerFun, SinkList),
295298
Man0 =
296299
leveled_pmanifest:replace_manifest_entry(
297300
Manifest,
298301
NewSQN,
299302
SinkLevel,
300-
SinkManifestList,
301-
Additions
303+
SinkManifestRemovals,
304+
L2Additions
302305
),
303-
Man2 =
304-
leveled_pmanifest:remove_manifest_entry(
305-
Man0,
306-
NewSQN,
307-
SrcLevel,
308-
Src
306+
Man1 =
307+
case L1Additions of
308+
[] ->
309+
leveled_pmanifest:remove_manifest_entry(
310+
Man0,
311+
NewSQN,
312+
SrcLevel,
313+
Src
314+
);
315+
PartialFiles ->
316+
leveled_pmanifest:replace_manifest_entry(
317+
Man0,
318+
NewSQN,
319+
SrcLevel,
320+
[Src],
321+
PartialFiles
322+
)
323+
end,
324+
{Man1, [Src|SinkManifestRemovals]}.
325+
326+
-spec merge_limit(
327+
non_neg_integer(), non_neg_integer(), pos_integer()|infinity)
328+
-> pos_integer()|infinity.
329+
merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB ->
330+
infinity;
331+
merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) ->
332+
AdditionsLimit = max(1, MMB div 2),
333+
leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]),
334+
AdditionsLimit.
335+
336+
-type merge_maybe_expanded_pointer() ::
337+
leveled_codec:ledger_kv()|
338+
leveled_sst:slot_pointer()|
339+
leveled_sst:sst_pointer().
340+
% Different to leveled_sst:maybe_expanded_pointer/0
341+
% No sst_closed_pointer()
342+
343+
-spec do_merge(
344+
list(merge_maybe_expanded_pointer()),
345+
list(merge_maybe_expanded_pointer()),
346+
leveled_pmanifest:lsm_level(),
347+
boolean(),
348+
string(),
349+
pos_integer(),
350+
pos_integer(),
351+
leveled_sst:sst_options(),
352+
list(leveled_pmanifest:manifest_entry()),
353+
pos_integer()|infinity) ->
354+
{
355+
list(leveled_pmanifest:manifest_entry()),
356+
list(leveled_pmanifest:manifest_entry()),
357+
list(leveled_sst:sst_pointer())
358+
}.
359+
do_merge(
360+
[], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) ->
361+
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]),
362+
{lists:reverse(Additions), [], []};
363+
do_merge(
364+
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max)
365+
when length(Additions) >= Max ->
366+
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]),
367+
FNSrc =
368+
leveled_penciller:sst_filename(
369+
NewSQN, SinkLevel - 1, 1
309370
),
310-
{Man2, [Src|SinkManifestList]}.
311-
312-
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) ->
313-
leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]),
314-
lists:reverse(Additions);
315-
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
371+
FNSnk =
372+
leveled_penciller:sst_filename(
373+
NewSQN, SinkLevel, length(Additions) + 1
374+
),
375+
{ExpandedKL1, []} = split_unexpanded_files(KL1),
376+
{ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2),
377+
TS1 = os:timestamp(),
378+
InfOpts = OptsSST#sst_options{max_sstslots = infinity},
379+
% Need to be careful to make sure all the remainder goes in one file,
380+
% could be situations whereby the max_sstslots has been changed between
381+
% restarts - and so there is too much data for one file in the
382+
% remainder ... but don't want to loop round and consider more complex
383+
% scenarios here.
384+
NewMergeKL1 =
385+
leveled_sst:sst_newmerge(
386+
RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts
387+
),
388+
TS2 = os:timestamp(),
389+
NewMergeKL2 =
390+
leveled_sst:sst_newmerge(
391+
RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts
392+
),
393+
{KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []),
394+
{KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions),
395+
{lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem};
396+
do_merge(
397+
KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) ->
316398
FileName =
317399
leveled_penciller:sst_filename(
318400
NewSQN, SinkLevel, length(Additions)
319401
),
320402
leveled_log:log(pc012, [NewSQN, FileName, SinkB]),
321403
TS1 = os:timestamp(),
322-
case leveled_sst:sst_newmerge(RP, FileName,
323-
KL1, KL2, SinkB, SinkLevel, MaxSQN,
324-
OptsSST) of
325-
empty ->
326-
leveled_log:log(pc013, [FileName]),
327-
do_merge(
328-
[], [],
329-
SinkLevel, SinkB,
330-
RP, NewSQN, MaxSQN,
331-
OptsSST,
332-
Additions
333-
);
334-
{ok, Pid, Reply, Bloom} ->
335-
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
336-
Entry =
337-
leveled_pmanifest:new_entry(
338-
SmallestKey, HighestKey, Pid, FileName, Bloom),
339-
leveled_log:log_timer(pc015, [], TS1),
340-
do_merge(
341-
KL1Rem, KL2Rem,
342-
SinkLevel, SinkB,
343-
RP, NewSQN, MaxSQN,
344-
OptsSST,
345-
[Entry|Additions]
346-
)
347-
end.
404+
NewMerge =
405+
leveled_sst:sst_newmerge(
406+
RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST),
407+
{UpdAdditions, KL1Rem, KL2Rem} =
408+
add_entry(NewMerge, FileName, TS1, Additions),
409+
do_merge(
410+
KL1Rem,
411+
KL2Rem,
412+
SinkLevel,
413+
SinkB,
414+
RP,
415+
NewSQN,
416+
MaxSQN,
417+
OptsSST,
418+
UpdAdditions,
419+
Max
420+
).
421+
422+
add_entry(empty, FileName, _TS1, Additions) ->
423+
leveled_log:log(pc013, [FileName]),
424+
{[], [], Additions};
425+
add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) ->
426+
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply,
427+
Entry =
428+
leveled_pmanifest:new_entry(
429+
SmallestKey, HighestKey, Pid, FileName, Bloom),
430+
leveled_log:log_timer(pc015, [], TS1),
431+
{[Entry|Additions], KL1Rem, KL2Rem}.
432+
433+
434+
-spec split_unexpanded_files(
435+
list(merge_maybe_expanded_pointer())) ->
436+
{
437+
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
438+
list(leveled_sst:sst_pointer())
439+
}.
440+
split_unexpanded_files(Pointers) ->
441+
split_unexpanded_files(Pointers, [], []).
442+
443+
-spec split_unexpanded_files(
444+
list(merge_maybe_expanded_pointer()),
445+
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
446+
list(leveled_sst:sst_pointer())) ->
447+
{
448+
list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()),
449+
list(leveled_sst:sst_pointer())
450+
}.
451+
split_unexpanded_files([], MaybeExpanded, FilePointers) ->
452+
{lists:reverse(MaybeExpanded), lists:reverse(FilePointers)};
453+
split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) ->
454+
split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]);
455+
split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) ->
456+
% Should never see this, once a FilePointer has been seen
457+
split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []);
458+
split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) ->
459+
% Should never see this, once a FilePointer has been seen
460+
split_unexpanded_files(
461+
Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], []
462+
).
348463

349464
-spec grooming_scorer(
350465
list(leveled_pmanifest:manifest_entry()))

src/leveled_sst.erl

+18-5
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,18 @@
248248

249249
-type build_timings() :: no_timing|#build_timings{}.
250250

251-
-export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]).
251+
-export_type(
252+
[
253+
expandable_pointer/0,
254+
maybe_expanded_pointer/0,
255+
sst_closed_pointer/0,
256+
sst_pointer/0,
257+
slot_pointer/0,
258+
press_method/0,
259+
segment_check_fun/0,
260+
sst_options/0
261+
]
262+
).
252263

253264
%%%============================================================================
254265
%%% API
@@ -312,8 +323,8 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
312323
end.
313324

314325
-spec sst_newmerge(string(), string(),
315-
list(leveled_codec:ledger_kv()|sst_pointer()),
316-
list(leveled_codec:ledger_kv()|sst_pointer()),
326+
list(maybe_expanded_pointer()),
327+
list(maybe_expanded_pointer()),
317328
boolean(), leveled_pmanifest:lsm_level(),
318329
integer(), sst_options())
319330
-> empty|{ok, pid(),
@@ -1529,7 +1540,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) ->
15291540
PressMethod.
15301541

15311542
-spec maxslots_level(
1532-
leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer().
1543+
leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity.
1544+
maxslots_level(_Level, infinity) ->
1545+
infinity;
15331546
maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL ->
15341547
MaxSlotCount;
15351548
maxslots_level(_Level, MaxSlotCount) ->
@@ -3020,7 +3033,7 @@ merge_lists(
30203033
list(binary_slot()),
30213034
leveled_codec:ledger_key()|null,
30223035
non_neg_integer(),
3023-
non_neg_integer(),
3036+
pos_integer()|infinity,
30243037
press_method(),
30253038
boolean(),
30263039
non_neg_integer(),

0 commit comments

Comments
 (0)