Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk iterator API to osiris_log #151

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/osiris.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
batch() |
{filter_value(), iodata() | batch()}.

%% returned when reading
-type entry() :: binary() | batch().
-type reader_options() :: #{transport => tcp | ssl,
chunk_selector => all | user_data,
filter_spec => osiris_bloom:filter_spec()
Expand All @@ -89,7 +91,8 @@
retention_spec/0,
timestamp/0,
writer_id/0,
data/0]).
data/0,
entry/0]).

-spec start_cluster(config()) ->
{ok, config()} |
Expand Down
266 changes: 194 additions & 72 deletions src/osiris_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
init_data_reader/2,
init_offset_reader/2,
read_header/1,
chunk_iterator/1,
chunk_iterator/2,
iterator_next/1,
batch_records/2,
read_chunk/1,
read_chunk_parsed/1,
read_chunk_parsed/2,
Expand Down Expand Up @@ -375,7 +379,8 @@
index_files => [file:filename_all()],
filter_size => osiris_bloom:filter_size()
}.
-type record() :: {offset(), osiris:data()}.
-type record() :: {offset(), osiris:entry()}.
-type offset_entry() :: {offset(), osiris:entry()}.
-type offset_spec() :: osiris:offset_spec().
-type retention_spec() :: osiris:retention_spec().
-type header_map() ::
Expand Down Expand Up @@ -452,6 +457,7 @@
-opaque state() :: #?MODULE{}.

-export_type([state/0,
chunk_iterator/0,
range/0,
config/0,
counter_spec/0,
Expand Down Expand Up @@ -1364,6 +1370,136 @@ read_header(#?MODULE{cfg = #cfg{}} = State0) ->
Err
end.

-record(iterator, {fd :: file:io_device(),
next_offset :: offset(),
%% entries left
num_left :: non_neg_integer(),
%% any trailing data from last read
%% we try to capture at least the size of the next record
data :: undefined | binary(),
next_record_pos :: non_neg_integer()}).
-opaque chunk_iterator() :: #iterator{}.
-define(REC_MATCH_SIMPLE(Len, Rem),
<<0:1, Len:31/unsigned, Rem/binary>>).
-define(REC_MATCH_SUBBATCH(CompType, NumRec, UncompLen, Len, Rem),
<<1:1, CompType:3/unsigned, _:4/unsigned,
NumRecs:16/unsigned,
UncompressedLen:32/unsigned,
Len:32/unsigned, Rem/binary>>).

-define(REC_HDR_SZ_SIMPLE_B, 4).
-define(REC_HDR_SZ_SUBBATCH_B, 11).
-define(ITER_READ_AHEAD_B, 64).


-spec chunk_iterator(state()) ->
{ok, header_map(), chunk_iterator(), state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
chunk_iterator(State) ->
chunk_iterator(State, 1).

-spec chunk_iterator(state(), pos_integer() | all) ->
{ok, header_map(), chunk_iterator(), state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
chunk_iterator(#?MODULE{cfg = #cfg{},
mode = #read{type = RType,
chunk_selector = Selector}
} = State0, CreditHint)
when (is_integer(CreditHint) andalso CreditHint > 0) orelse
is_atom(CreditHint) ->
%% reads the next chunk of unparsed chunk data
case catch read_header0(State0) of
{ok,
#{type := ChType,
chunk_id := ChId,
crc := Crc,
num_entries := NumEntries,
num_records := NumRecords,
data_size := DataSize,
filter_size := FilterSize,
position := Pos,
next_position := NextPos} = Header,
#?MODULE{fd = Fd, mode = #read{next_offset = ChId} = Read} = State1} ->
State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords,
position = NextPos}},
case needs_handling(RType, Selector, ChType) of
true ->
DataPos = Pos + ?HEADER_SIZE_B + FilterSize,
Data = iter_read_ahead(Fd, DataPos, ChId, Crc, CreditHint,
DataSize, NumEntries),
Iterator = #iterator{fd = Fd,
data = Data,
next_offset = ChId,
num_left = NumEntries,
next_record_pos = DataPos},
{ok, Header, Iterator, State};
false ->
%% skip
chunk_iterator(State, CreditHint)
end;
Other ->
Other
end.

-spec iterator_next(chunk_iterator()) ->
end_of_chunk | {offset_entry(), chunk_iterator()}.
iterator_next(#iterator{num_left = 0}) ->
end_of_chunk;
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
num_left = Num,
data = ?REC_MATCH_SIMPLE(Len, Rem0),
next_record_pos = Pos} = I0) ->
{Record, Rem} =
case Rem0 of
<<Record0:Len/binary, Rem1/binary>> ->
{Record0, Rem1};
_ ->
%% not enough in Rem0 to read the entire record
%% so we need to read it from disk
{ok, <<Record0:Len/binary, Rem1/binary>>} =
file:pread(Fd, Pos + ?REC_HDR_SZ_SIMPLE_B,
Len + ?ITER_READ_AHEAD_B),
{Record0, Rem1}
end,

I = I0#iterator{next_offset = NextOffs + 1,
num_left = Num - 1,
data = Rem,
next_record_pos = Pos + ?REC_HDR_SZ_SIMPLE_B + Len},
{{NextOffs, Record}, I};
iterator_next(#iterator{fd = Fd,
next_offset = NextOffs,
num_left = Num,
data = ?REC_MATCH_SUBBATCH(CompType, NumRecs,
UncompressedLen,
Len, Rem0),
next_record_pos = Pos} = I0) ->
{Data, Rem} =
case Rem0 of
<<Record0:Len/binary, Rem1/binary>> ->
{Record0, Rem1};
_ ->
%% not enough in Rem0 to read the entire record
%% so we need to read it from disk
{ok, <<Record0:Len/binary, Rem1/binary>>} =
file:pread(Fd, Pos + ?REC_HDR_SZ_SUBBATCH_B,
Len + ?ITER_READ_AHEAD_B),
{Record0, Rem1}
end,
Record = {batch, NumRecs, CompType, UncompressedLen, Data},
I = I0#iterator{next_offset = NextOffs + NumRecs,
num_left = Num - 1,
data = Rem,
next_record_pos = Pos + ?REC_HDR_SZ_SUBBATCH_B + Len},
{{NextOffs, Record}, I};
iterator_next(#iterator{fd = Fd,
next_record_pos = Pos} = I) ->
{ok, Data} = file:pread(Fd, Pos, ?ITER_READ_AHEAD_B),
iterator_next(I#iterator{data = Data}).

-spec read_chunk(state()) ->
{ok, binary(), state()} |
{end_of_stream, state()} |
Expand Down Expand Up @@ -1410,42 +1546,30 @@ read_chunk_parsed(State) ->
{ok, header_map(), [record()], state()} |
{end_of_stream, state()} |
{error, {invalid_chunk_header, term()}}.
read_chunk_parsed(#?MODULE{mode = #read{type = RType,
chunk_selector = Selector}} = State0,
HeaderOrNot) ->
%% reads the next chunk of entries, parsed
%% NB: this may return records before the requested index,
%% that is fine - the reading process can do the appropriate filtering
case catch read_header0(State0) of
{ok,
#{type := ChType,
chunk_id := ChId,
crc := Crc,
num_records := NumRecords,
data_size := DataSize,
position := Pos,
filter_size := FilterSize,
next_position := NextPos} = Header,
#?MODULE{fd = Fd, mode = #read{next_offset = _ChId} = Read} = State1} ->
{ok, Data} = file:pread(Fd, Pos + ?HEADER_SIZE_B + FilterSize, DataSize),
validate_crc(ChId, Crc, Data),
State = State1#?MODULE{mode = Read#read{next_offset = ChId + NumRecords,
position = NextPos}},

case needs_handling(RType, Selector, ChType) of
true when HeaderOrNot == no_header ->
%% parse data into records
{parse_records(ChId, Data, []), State};
true ->
{ok, Header, parse_records(ChId, Data, []), State};
false ->
%% skip
read_chunk_parsed(State, HeaderOrNot)
end;
Ret ->
Ret
read_chunk_parsed(#?MODULE{mode = #read{}} = State0,
HeaderOrNot) ->
%% the Header parameter isn't used anywhere in RabbitMQ so is ignored
case chunk_iterator(State0, all) of
{end_of_stream, _} = Eos ->
Eos;
{ok, _H, I0, State1} when HeaderOrNot == no_header ->
Records = iter_all_records(iterator_next(I0), []),
{Records, State1};
{ok, Header, I0, State1} when HeaderOrNot == with_header ->
Records = iter_all_records(iterator_next(I0), []),
{ok, Header, Records, State1}
end.

iter_all_records(end_of_chunk, Acc) ->
lists:reverse(Acc);
iter_all_records({{ChId, {batch, _Num, 0, _Size, Data}}, I}, Acc0) ->
%% TODO validate that sub batch is correct
Acc = parse_subbatch(ChId, Data, Acc0),
iter_all_records(iterator_next(I), Acc);
iter_all_records({X, I}, Acc0) ->
Acc = [X | Acc0],
iter_all_records(iterator_next(I), Acc).

is_valid_chunk_on_disk(SegFile, Pos) ->
%% read a chunk from a specified location in the segment
%% then checks the CRC
Expand Down Expand Up @@ -1625,42 +1749,24 @@ next_chunk_pos(Fd, Pos) ->
_Reserved:24>>} = file:pread(Fd, Pos, ?HEADER_SIZE_B),
Pos + ?HEADER_SIZE_B + FSize + Size + TSize.

parse_records(_Offs, <<>>, Acc) ->
%% TODO: this could probably be changed to body recursive
lists:reverse(Acc);
parse_records(Offs,
<<0:1, %% simple
Len:31/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
parse_records(Offs + 1, Rem, [{Offs, Data} | Acc]);
parse_records(Offs,
<<1:1, %% simple
0:3/unsigned, %% compression type
_:4/unsigned, %% reserved
NumRecs:16/unsigned,
_UncompressedLen:32/unsigned,
Len:32/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
Recs = parse_records(Offs, Data, []),
parse_records(Offs + NumRecs, Rem, lists:reverse(Recs) ++ Acc);
parse_records(Offs,
<<1:1, %% simple
CompType:3/unsigned, %% compression type
_:4/unsigned, %% reserved
NumRecs:16/unsigned,
UncompressedLen:32/unsigned,
Len:32/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
%% return the first offset of the sub batch and the batch, unparsed
%% as we don't want to decompress on the server
parse_records(Offs + NumRecs, Rem,
[{Offs, {batch, NumRecs, CompType, UncompressedLen, Data}} | Acc]).

%% utility function to parse an uncompressed subbatch into records
batch_records(ChId, {batch, _NumRecords, 0, _UncompLen, Data}) ->
Records = lists:reverse(parse_subbatch(ChId, Data, [])),
{ok, Records};
batch_records(_ChId, {batch, _NumRecords, CompType, _UncompLen, _Data}) ->
{error, {compression_type_not_supported, CompType}}.

parse_subbatch(_Offs, <<>>, Acc) ->
Acc;
parse_subbatch(Offs,
<<0:1, %% simple
Len:31/unsigned,
Data:Len/binary,
Rem/binary>>,
Acc) ->
parse_subbatch(Offs + 1, Rem, [{Offs, Data} | Acc]).


sorted_index_files(#{index_files := IdxFiles}) ->
%% cached
Expand Down Expand Up @@ -3000,7 +3106,23 @@ dump_crc_check(Fd) ->
dump_crc_check(Fd)
end.


iter_read_ahead(_Fd, _Pos, _ChunkId, _Crc, 1, _DataSize, _NumEntries) ->
%% no point reading ahead if there is only one entry to be read at this
%% time
undefined;
iter_read_ahead(Fd, Pos, ChunkId, Crc, Credit, DataSize, NumEntries)
when Credit == all orelse NumEntries == 1 ->
{ok, Data} = file:pread(Fd, Pos, DataSize),
validate_crc(ChunkId, Crc, Data),
Data;
iter_read_ahead(Fd, Pos, _ChunkId, _Crc, Credit0, DataSize, NumEntries) ->
%% read ahead, assumes roughly equal entry sizes which may not be the case
%% TODO round up to nearest block?
%% We can only practically validate CRC if we read the whole data
Credit = min(Credit0, NumEntries),
Size = DataSize div NumEntries * Credit,
{ok, Data} = file:pread(Fd, Pos, Size + ?ITER_READ_AHEAD_B),
Data.

-ifdef(TEST).

Expand Down
Loading