Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec(),
read_ahead => boolean()
read_ahead => boolean(),
read_ahead_limit => pos_integer()
}.

-export_type([name/0,
Expand Down
51 changes: 27 additions & 24 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
_/binary>>).

-define(SKIP_SEARCH_JUMP, 2048).
-define(READ_AHEAD_LIMIT, 4096).
-define(DEFAULT_READ_AHEAD_LIMIT, 4096).

%% Specification of the Log format.
%%
Expand Down Expand Up @@ -425,7 +425,8 @@
-record(ra,
{on = true :: boolean(),
size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
buf :: undefined | {Pos :: non_neg_integer(), binary()}
buf :: undefined | {Pos :: non_neg_integer(), binary()},
limit = ?DEFAULT_READ_AHEAD_LIMIT :: pos_integer()
}).
-record(read,
{type :: data | offset,
Expand Down Expand Up @@ -1059,7 +1060,6 @@ init_data_reader_at(ChunkId, FilePos, File,
readers_counter_fun := CountersFun} = Config) ->
case file:open(File, [raw, binary, read]) of
{ok, Fd} ->
RaOn = ra_on(Config),
Cnt = make_counter(Config),
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
CountersFun(1),
Expand All @@ -1078,7 +1078,7 @@ init_data_reader_at(ChunkId, FilePos, File,
chunk_selector = all,
position = FilePos,
transport = maps:get(transport, Config, tcp),
read_ahead = #ra{on = RaOn}},
read_ahead = ra(Config)},
fd = Fd}};
Err ->
Err
Expand Down Expand Up @@ -1288,7 +1288,6 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
_ ->
undefined
end,
RaOn = ra_on(Conf),
{ok, #?MODULE{cfg = #cfg{directory = Dir,
counter = Cnt,
counter_id = counter_id(Conf),
Expand All @@ -1303,7 +1302,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
next_offset = NextChunkId,
transport = maps:get(transport, Options, tcp),
filter = FilterMatcher,
read_ahead = #ra{on = RaOn}},
read_ahead = ra(Conf)},
fd = Fd}}.

%% Searches the index files backwards for the ID of the last user chunk.
Expand Down Expand Up @@ -3312,7 +3311,8 @@ iter_guess_size(Credit0, NumEntries, DataSize) ->
Credit = min(Credit0, NumEntries),
(DataSize div NumEntries * Credit).

iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0)
iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries,
#ra{limit = ReadAheadLimit} = Ra0)
when is_integer(Credit0) andalso
MinReqSize =< DataSize ->
%% if the minimum request size can be served from read ahead then we
Expand All @@ -3335,7 +3335,7 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0)
%% needed to serve that, else we read up to the readahead
%% limit but not beyond the end of the chunk and not less
%% that the minimum request size
MinSize = max(MinReqSize, min(?READ_AHEAD_LIMIT, DataSize)),
MinSize = max(MinReqSize, min(ReadAheadLimit, DataSize)),
Size = max(MinSize, iter_guess_size(Credit0, NumEntries,
DataSize)),
{ok, Data} = file:pread(Fd, Pos, Size),
Expand Down Expand Up @@ -3364,15 +3364,15 @@ ra_read(_Pos, _Len, _Ra) ->
undefined.

ra_update_size(undefined, FilterSize, LastDataSize,
#ra{on = true, size = Sz} = Ra)
when Sz < ?READ_AHEAD_LIMIT andalso
LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
#ra{on = true, size = Sz, limit = Limit} = Ra)
when Sz < Limit andalso
LastDataSize =< (Limit - ?HEADER_SIZE_B -
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
%% no filter and last data size was small so enable data read ahead
Ra#ra{size = ?READ_AHEAD_LIMIT};
Ra#ra{size = Limit};
ra_update_size(undefined, FilterSize, LastDataSize,
#ra{on = true, size = ?READ_AHEAD_LIMIT} = Ra)
when LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B -
#ra{on = true, size = Limit, limit = Limit} = Ra)
when LastDataSize =< (Limit - ?HEADER_SIZE_B -
FilterSize - ?REC_HDR_SZ_SUBBATCH_B) ->
Ra;
ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) ->
Expand All @@ -3397,10 +3397,13 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) ->
Err
end.

ra_on(#{options := #{read_ahead := false}}) ->
false;
ra_on(_) ->
true.
-spec ra(config()) -> #ra{}.
ra(#{options := #{read_ahead := false}}) ->
#ra{on = false};
ra(#{options := #{read_ahead_limit := Limit}}) when is_integer(Limit) ->
#ra{limit = Limit};
ra(_) ->
#ra{on = true}.

generate_log(Msg, MsgsPerChunk, NumMessages, Directory) ->
Name = filename:basename(Directory),
Expand Down Expand Up @@ -3441,28 +3444,28 @@ ra_update_size_test() ->
DefSize = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE,
?assertMatch(#ra{size = DefSize}, #ra{}),
Ra0 = #ra{},
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),

?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)),
Ra1 = #ra{size = ?READ_AHEAD_LIMIT},
Ra1 = #ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
?assertMatch(#ra{size = DefSize},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 5000, Ra1)),

?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra1)),

?assertMatch(#ra{size = DefSize},
ra_update_size("a filter", ?DEFAULT_FILTER_SIZE, 100, Ra0)),

%% we need to ensure that if we enable read ahead we can at least fulfil
%% the prior chunk including header filter and record length header
MaxEnablingDataSize = ?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
MaxEnablingDataSize = ?DEFAULT_READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B,
?assertMatch(#ra{size = DefSize},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize + 1 ,
Ra0)),
?assertMatch(#ra{size = ?READ_AHEAD_LIMIT},
?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT},
ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize,
Ra0)),
ok.
Expand Down