diff --git a/src/osiris.erl b/src/osiris.erl index a4f3913..c787c53 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -83,7 +83,8 @@ -type reader_options() :: #{transport => tcp | ssl, chunk_selector => all | user_data, filter_spec => osiris_bloom:filter_spec(), - read_ahead => boolean() + read_ahead => boolean(), + read_ahead_limit => pos_integer() }. -export_type([name/0, diff --git a/src/osiris_log.erl b/src/osiris_log.erl index ad0fb41..5a215fc 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -107,7 +107,7 @@ _/binary>>). -define(SKIP_SEARCH_JUMP, 2048). --define(READ_AHEAD_LIMIT, 4096). +-define(DEFAULT_READ_AHEAD_LIMIT, 4096). %% Specification of the Log format. %% @@ -425,7 +425,8 @@ -record(ra, {on = true :: boolean(), size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(), - buf :: undefined | {Pos :: non_neg_integer(), binary()} + buf :: undefined | {Pos :: non_neg_integer(), binary()}, + limit = ?DEFAULT_READ_AHEAD_LIMIT :: pos_integer() }). -record(read, {type :: data | offset, @@ -1059,7 +1060,6 @@ init_data_reader_at(ChunkId, FilePos, File, readers_counter_fun := CountersFun} = Config) -> case file:open(File, [raw, binary, read]) of {ok, Fd} -> - RaOn = ra_on(Config), Cnt = make_counter(Config), counters:put(Cnt, ?C_OFFSET, ChunkId - 1), CountersFun(1), @@ -1078,7 +1078,7 @@ init_data_reader_at(ChunkId, FilePos, File, chunk_selector = all, position = FilePos, transport = maps:get(transport, Config, tcp), - read_ahead = #ra{on = RaOn}}, + read_ahead = ra(Config)}, fd = Fd}}; Err -> Err @@ -1288,7 +1288,6 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, _ -> undefined end, - RaOn = ra_on(Conf), {ok, #?MODULE{cfg = #cfg{directory = Dir, counter = Cnt, counter_id = counter_id(Conf), @@ -1303,7 +1302,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos, next_offset = NextChunkId, transport = maps:get(transport, Options, tcp), filter = FilterMatcher, - read_ahead = #ra{on = RaOn}}, + read_ahead = ra(Conf)}, fd = Fd}}. %% Searches the index files backwards for the ID of the last user chunk. @@ -3312,7 +3311,8 @@ iter_guess_size(Credit0, NumEntries, DataSize) -> Credit = min(Credit0, NumEntries), (DataSize div NumEntries * Credit). -iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0) +iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, + #ra{limit = ReadAheadLimit} = Ra0) when is_integer(Credit0) andalso MinReqSize =< DataSize -> %% if the minimum request size can be served from read ahead then we @@ -3335,7 +3335,7 @@ iter_read_ahead(Fd, Pos, MinReqSize, Credit0, DataSize, NumEntries, Ra0) %% needed to serve that, else we read up to the readahead %% limit but not beyond the end of the chunk and not less %% that the minimum request size - MinSize = max(MinReqSize, min(?READ_AHEAD_LIMIT, DataSize)), + MinSize = max(MinReqSize, min(ReadAheadLimit, DataSize)), Size = max(MinSize, iter_guess_size(Credit0, NumEntries, DataSize)), {ok, Data} = file:pread(Fd, Pos, Size), @@ -3364,15 +3364,15 @@ ra_read(_Pos, _Len, _Ra) -> undefined. ra_update_size(undefined, FilterSize, LastDataSize, - #ra{on = true, size = Sz} = Ra) - when Sz < ?READ_AHEAD_LIMIT andalso - LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - + #ra{on = true, size = Sz, limit = Limit} = Ra) + when Sz < Limit andalso + LastDataSize =< (Limit - ?HEADER_SIZE_B - FilterSize - ?REC_HDR_SZ_SUBBATCH_B) -> %% no filter and last data size was small so enable data read ahead - Ra#ra{size = ?READ_AHEAD_LIMIT}; + Ra#ra{size = Limit}; ra_update_size(undefined, FilterSize, LastDataSize, - #ra{on = true, size = ?READ_AHEAD_LIMIT} = Ra) - when LastDataSize =< (?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - + #ra{on = true, size = Limit, limit = Limit} = Ra) + when LastDataSize =< (Limit - ?HEADER_SIZE_B - FilterSize - ?REC_HDR_SZ_SUBBATCH_B) -> Ra; ra_update_size(_Filter, FilterSize, _LastDataSize, #ra{size = Sz} = Ra) -> @@ -3397,10 +3397,13 @@ ra_fill(Fd, Pos, #ra{size = Sz} = Ra) -> Err end. -ra_on(#{options := #{read_ahead := false}}) -> - false; -ra_on(_) -> - true. +-spec ra(config()) -> #ra{}. +ra(#{options := #{read_ahead := false}}) -> + #ra{on = false}; +ra(#{options := #{read_ahead_limit := Limit}}) when is_integer(Limit) -> + #ra{limit = Limit}; +ra(_) -> + #ra{on = true}. generate_log(Msg, MsgsPerChunk, NumMessages, Directory) -> Name = filename:basename(Directory), @@ -3441,16 +3444,16 @@ ra_update_size_test() -> DefSize = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE, ?assertMatch(#ra{size = DefSize}, #ra{}), Ra0 = #ra{}, - ?assertMatch(#ra{size = ?READ_AHEAD_LIMIT}, + ?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)), - ?assertMatch(#ra{size = ?READ_AHEAD_LIMIT}, + ?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra0)), - Ra1 = #ra{size = ?READ_AHEAD_LIMIT}, + Ra1 = #ra{size = ?DEFAULT_READ_AHEAD_LIMIT}, ?assertMatch(#ra{size = DefSize}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 5000, Ra1)), - ?assertMatch(#ra{size = ?READ_AHEAD_LIMIT}, + ?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, 100, Ra1)), ?assertMatch(#ra{size = DefSize}, @@ -3458,11 +3461,11 @@ ra_update_size_test() -> %% we need to ensure that if we enable read ahead we can at least fulfil %% the prior chunk including header filter and record length header - MaxEnablingDataSize = ?READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B, + MaxEnablingDataSize = ?DEFAULT_READ_AHEAD_LIMIT - ?HEADER_SIZE_B - ?DEFAULT_FILTER_SIZE - ?REC_HDR_SZ_SUBBATCH_B, ?assertMatch(#ra{size = DefSize}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize + 1 , Ra0)), - ?assertMatch(#ra{size = ?READ_AHEAD_LIMIT}, + ?assertMatch(#ra{size = ?DEFAULT_READ_AHEAD_LIMIT}, ra_update_size(undefined, ?DEFAULT_FILTER_SIZE, MaxEnablingDataSize, Ra0)), ok.