Skip to content

Commit

Permalink
Add support for configurable cache multiple (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsumner committed Apr 28, 2022
1 parent f3ae391 commit 9e8fcbc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 23 deletions.
9 changes: 9 additions & 0 deletions priv/leveled.schema
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
hidden
]}.

%% @doc The maximum size of the bookie's cache before each new PUT results in
%% a slow-offer pause. Prior to Riak 3.0.10 this defaulted to 4
{mapping, "leveled.cache_multiple", "leveled.cache_multiple", [
{default, 2},
{datatype, integer},
hidden
]}.


%% @doc The key size of the Penciller's in-memory cache
{mapping, "leveled.penciller_cache_size", "leveled.penciller_cache_size", [
{default, 20000},
Expand Down
57 changes: 38 additions & 19 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@

-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
Expand Down Expand Up @@ -134,6 +135,7 @@
[{root_path, undefined},
{snapshot_bookie, undefined},
{cache_size, ?CACHE_SIZE},
{cache_multiple, ?MAX_CACHE_MULTTIPLE},
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
Expand Down Expand Up @@ -166,7 +168,8 @@

-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
cache_size :: pos_integer() | undefined,
cache_multiple :: pos_integer() |undefined,
ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
Expand Down Expand Up @@ -236,6 +239,10 @@
% randomised jitter (randomised jitter will still be added to
% configured values)
% The minimum value is 100 - any lower value will be ignored
{cache_multiple, pos_integer()} |
% A multiple of the cache size beyond which the cache should not
% grow even if the penciller is busy. A pasue will be returned for
% every PUT when this multiple of the cache_size is reached
{max_journalsize, pos_integer()} |
% The maximum size of a journal file in bytes. The absolute
% maximum must be 4GB due to 4 byte file pointers being used
Expand Down Expand Up @@ -1208,6 +1215,8 @@ init([Opts]) ->
max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)),
CacheSize =
ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter,
MaxCacheMultiple =
proplists:get_value(cache_multiple, Opts),
PCLMaxSize =
PencillerOpts#penciller_options.max_inmemory_tablesize,
CacheRatio = PCLMaxSize div ConfiguredCacheSize,
Expand Down Expand Up @@ -1242,10 +1251,13 @@ init([Opts]) ->
PencillerOpts0 =
PencillerOpts#penciller_options{sst_options = SSTOpts0},

State0 = #state{cache_size=CacheSize,
is_snapshot=false,
head_only=HeadOnly,
head_lookup = HeadLookup},
State0 =
#state{
cache_size=CacheSize,
cache_multiple = MaxCacheMultiple,
is_snapshot=false,
head_only=HeadOnly,
head_lookup = HeadLookup},

{Inker, Penciller} =
startup(InkerOpts, PencillerOpts0, State0),
Expand Down Expand Up @@ -1297,9 +1309,11 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync},
gen_server:reply(From, ok)
end,
maybe_longrunning(SW0, overall_put),
case maybepush_ledgercache(State#state.cache_size,
Cache0,
State#state.penciller) of
case maybepush_ledgercache(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller) of
{ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
put_timings = Timings,
Expand All @@ -1326,9 +1340,11 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
false ->
gen_server:reply(From, ok)
end,
case maybepush_ledgercache(State#state.cache_size,
Cache0,
State#state.penciller) of
case maybepush_ledgercache(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller) of
{ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
slow_offer = false}};
Expand Down Expand Up @@ -2356,8 +2372,9 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
end.


-spec maybepush_ledgercache(integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}.
-spec maybepush_ledgercache(
pos_integer(), pos_integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}.
%% @doc
%% Following an update to the ledger cache, check if this now big enough to be
%% pushed down to the Penciller. There is some random jittering here, to
Expand All @@ -2368,10 +2385,10 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
%% in the reply. Try again later when it isn't busy (and also potentially
%% implement a slow_offer state to slow down the pace at which PUTs are being
%% received)
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
if
TimeToPush ->
CacheToLoad = {Tab,
Expand All @@ -2391,14 +2408,16 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
{ok, Cache}
end.

-spec maybe_withjitter(integer(), integer()) -> boolean().
-spec maybe_withjitter(
non_neg_integer(), pos_integer(), pos_integer()) -> boolean().
%% @doc
%% Push down randomly, but the closer to 4 * the maximum size, the more likely
%% a push should be
maybe_withjitter(CacheSize, MaxCacheSize) when CacheSize > MaxCacheSize ->
R = leveled_rand:uniform(4 * MaxCacheSize),
maybe_withjitter(
CacheSize, MaxCacheSize, MaxCacheMult) when CacheSize > MaxCacheSize ->
R = leveled_rand:uniform(MaxCacheMult * MaxCacheSize),
(CacheSize - MaxCacheSize) > R;
maybe_withjitter(_CacheSize, _MaxCacheSize) ->
maybe_withjitter(_CacheSize, _MaxCacheSize, _MaxCacheMult) ->
false.


Expand Down
11 changes: 8 additions & 3 deletions src/leveled_pmanifest.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ copy_manifest(Manifest) ->
% about is switched to undefined
Manifest#manifest{snapshots = undefined, pending_deletes = undefined}.

-spec load_manifest(manifest(), fun(), fun()) ->
{integer(), manifest(), list()}.
-spec load_manifest(
manifest(),
fun((file:name_all(), 1..7) -> {pid(), leveled_ebloom:bloom()}),
fun((pid()) -> pos_integer()))
-> {integer(), manifest(), list()}.
%% @doc
%% Roll over the manifest starting a process to manage each file in the
%% manifest. The PidFun should be able to return the Pid of a file process
Expand Down Expand Up @@ -182,7 +185,9 @@ load_manifest(Manifest, LoadFun, SQNFun) ->
{0, Manifest, []},
lists:reverse(lists:seq(0, Manifest#manifest.basement))).

-spec close_manifest(manifest(), fun()) -> ok.
-spec close_manifest(
manifest(),
fun((any()) -> ok)) -> ok.
%% @doc
%% Close all the files in the manifest (using CloseEntryFun to call close on
%% a file). Firts all the files in the active manifest are called, and then
Expand Down
3 changes: 2 additions & 1 deletion src/leveled_pmem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
end
end.

-spec to_list(integer(), fun()) -> list().
-spec to_list(
integer(), fun((pos_integer()) -> leveled_tree:leveled_tree())) -> list().
%% @doc
%% The cache is a list of leveled_trees of length Slots. This will fetch
%% each tree in turn by slot ID and then produce a merged/sorted output of
Expand Down

0 comments on commit 9e8fcbc

Please sign in to comment.