Skip to content

Commit ab18236

Browse files
committed
WIP: proof-of-concept zstd compression for replication
This requires OTP-28 to build.
1 parent 82ef2fd commit ab18236

File tree

4 files changed

+90
-17
lines changed

4 files changed

+90
-17
lines changed

src/osiris.hrl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
-define(FILE_OPTS_WRITE, [raw, binary, write, read]).
4848
-define(INDEX_RECORD_SIZE_B, 29).
4949

50+
%% signature of chunks compressed
51+
-define(COMPRESSED_MAGIC, 7).
52+
-define(COMPRESSED_VERSION, 0).
5053

5154
%% chunk types
5255
-define(CHNK_USER, 0).

src/osiris_log.erl

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,8 @@
420420
%% that will be included in snapshots written to new segments
421421
readers_counter_fun = fun(_) -> ok end :: function(),
422422
shared :: atomics:atomics_ref(),
423-
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size()
423+
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(),
424+
compression :: undefined | {zstd, zstd:context()}
424425
}).
425426
-record(ra,
426427
{size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
@@ -1064,14 +1065,22 @@ init_data_reader_at(ChunkId, FilePos, File,
10641065
Cnt = make_counter(Config),
10651066
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
10661067
CountersFun(1),
1068+
Compression = case Config of
1069+
#{compression := zstd} ->
1070+
{ok, Ctx} = zstd:context(compress),
1071+
{zstd, Ctx};
1072+
_ ->
1073+
undefined
1074+
end,
10671075
{ok,
10681076
#?MODULE{cfg =
10691077
#cfg{directory = Dir,
10701078
counter = Cnt,
10711079
counter_id = counter_id(Config),
10721080
name = Name,
10731081
readers_counter_fun = CountersFun,
1074-
shared = Shared
1082+
shared = Shared,
1083+
compression = Compression
10751084
},
10761085
mode =
10771086
#read{type = data,
@@ -1716,7 +1725,8 @@ send_file(Sock, State) ->
17161725
{error, term()} |
17171726
{end_of_stream, state()}.
17181727
send_file(Sock,
1719-
#?MODULE{mode = #read{type = RType,
1728+
#?MODULE{cfg = #cfg{compression = Compression},
1729+
mode = #read{type = RType,
17201730
chunk_selector = Selector,
17211731
transport = Transport,
17221732
filter_size = RaFs}} = State0,
@@ -1755,19 +1765,32 @@ send_file(Sock,
17551765
FrameHeader = Callback(Header, ToSend + byte_size(HeaderData)),
17561766
case ra_read(DataPos, ToSend, Ra) of
17571767
Data when byte_size(Data) == ToSend ->
1758-
case send(Transport, Sock, [FrameHeader, HeaderData, Data]) of
1768+
case send(Transport, Compression, Sock, [FrameHeader, HeaderData, Data]) of
17591769
ok ->
17601770
State = State1#?MODULE{mode = Read},
17611771
{ok, State};
17621772
Err ->
17631773
Err
17641774
end;
1775+
_ when Compression =/= undefined ->
1776+
case file:pread(Fd, DataPos, ToSend) of
1777+
{ok, Data} ->
1778+
case send(Transport, Compression, Sock, [FrameHeader, HeaderData, Data]) of
1779+
ok ->
1780+
State = State1#?MODULE{mode = Read},
1781+
{ok, State};
1782+
Err ->
1783+
Err
1784+
end;
1785+
Err ->
1786+
Err
1787+
end;
17651788
_ ->
17661789
%% the read ahead buffer could not satisfy the
17671790
%% data read
1768-
case send(Transport, Sock, [FrameHeader, HeaderData]) of
1791+
case send(Transport, Compression, Sock, [FrameHeader, HeaderData]) of
17691792
ok ->
1770-
case sendfile(Transport, Fd, Sock,
1793+
case sendfile(Transport, Compression, Fd, Sock,
17711794
DataPos, ToSend) of
17721795
ok ->
17731796
State = State1#?MODULE{mode = Read},
@@ -2526,30 +2549,40 @@ max_segment_size_reached(
25262549
CurrentSizeBytes >= MaxSizeBytes orelse
25272550
CurrentSizeChunks >= MaxSizeChunks.
25282551

2529-
sendfile(_Transport, _Fd, _Sock, _Pos, 0) ->
2552+
sendfile(_Transport, _Compression, _Fd, _Sock, _Pos, 0) ->
25302553
ok;
2531-
sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) ->
2554+
sendfile(tcp = Transport, undefined, Fd, Sock, Pos, ToSend) ->
25322555
case file:sendfile(Fd, Sock, Pos, ToSend, []) of
25332556
{ok, 0} ->
25342557
%% TODO add counter for this?
2535-
sendfile(Transport, Fd, Sock, Pos, ToSend);
2558+
sendfile(Transport, undefined, Fd, Sock, Pos, ToSend);
25362559
{ok, BytesSent} ->
2537-
sendfile(Transport, Fd, Sock, Pos + BytesSent, ToSend - BytesSent);
2560+
sendfile(Transport, undefined, Fd, Sock, Pos + BytesSent, ToSend - BytesSent);
25382561
{error, _} = Err ->
25392562
Err
25402563
end;
2541-
sendfile(ssl, Fd, Sock, Pos, ToSend) ->
2564+
sendfile(Transport, Compression, Fd, Sock, Pos, ToSend) ->
25422565
case file:pread(Fd, Pos, ToSend) of
25432566
{ok, Data} ->
2544-
ssl:send(Sock, Data);
2567+
send(Transport, Compression, Sock, Data);
25452568
{error, _} = Err ->
25462569
Err
25472570
end.
25482571

2549-
send(tcp, Sock, Data) ->
2550-
gen_tcp:send(Sock, Data);
2551-
send(ssl, Sock, Data) ->
2552-
ssl:send(Sock, Data).
2572+
send(tcp, Compression, Sock, Data) ->
2573+
gen_tcp:send(Sock, compress_data(Compression, Data));
2574+
send(ssl, Compression, Sock, Data) ->
2575+
ssl:send(Sock, compress_data(Compression, Data)).
2576+
2577+
compress_data({zstd, Ctx}, Data) ->
2578+
CompressedData = zstd:compress(Data, Ctx),
2579+
Header = <<?COMPRESSED_MAGIC:4/unsigned,
2580+
?COMPRESSED_VERSION:4/unsigned,
2581+
3:8/unsigned,
2582+
(iolist_size(CompressedData)):32/unsigned>>,
2583+
[Header | CompressedData];
2584+
compress_data(undefined, Data) ->
2585+
Data.
25532586

25542587
last_timestamp_in_index_file(IdxFile) ->
25552588
case file:open(IdxFile, [raw, binary, read]) of

src/osiris_replica.erl

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,35 @@ parse_chunk(<<?MAGIC:4/unsigned,
679679
TotalSize = ?HEADER_SIZE_B + FSize + Size + TSize,
680680
<<Chunk:TotalSize/binary, _/binary>> = All,
681681
parse_chunk(Rem, undefined, [{{FirstOffset, Timestamp}, Chunk} | Acc]);
682+
parse_chunk(<<?COMPRESSED_MAGIC:4/unsigned,
683+
?COMPRESSED_VERSION:4/unsigned,
684+
CompressionType:8/unsigned,
685+
CompressedLen:32/unsigned,
686+
CompressedData:CompressedLen/binary,
687+
Rem/binary>>,
688+
undefined, Acc) ->
689+
Chunk = case CompressionType of
690+
3 -> %% zstd
691+
zstd_decompress(CompressedData);
692+
1 ->
693+
zlib:gunzip(CompressedData)
694+
end,
695+
<<?MAGIC:4/unsigned,
696+
?VERSION:4/unsigned,
697+
_ChType:8/unsigned,
698+
_NumEntries:16/unsigned,
699+
_NumRecords:32/unsigned,
700+
Timestamp:64/signed,
701+
_Epoch:64/unsigned,
702+
FirstOffset:64/unsigned,
703+
_Rest/binary>> = Chunk,
704+
parse_chunk(Rem, undefined, [{{FirstOffset, Timestamp}, Chunk} | Acc]);
705+
parse_chunk(<<?COMPRESSED_MAGIC:4/unsigned,
706+
?COMPRESSED_VERSION:4/unsigned,
707+
_Partial/binary>> = All,
708+
undefined, Acc) ->
709+
%% This could be more efficient: decompression functions can take iodata.
710+
{All, lists:reverse(Acc)};
682711
parse_chunk(Bin, undefined, Acc)
683712
when byte_size(Bin) =< ?HEADER_SIZE_B ->
684713
{Bin, lists:reverse(Acc)};
@@ -714,6 +743,14 @@ parse_chunk(Bin, {FirstOffsetTs, IOData, RemSize}, Acc) ->
714743
{{FirstOffsetTs, [Bin | IOData], RemSize - byte_size(Bin)},
715744
lists:reverse(Acc)}.
716745

746+
-if(?OTP_RELEASE >= 28).
747+
zstd_decompress(Data) ->
748+
iolist_to_binary(zstd:decompress(Data)).
749+
-else.
750+
zstd_decompress(_Data) ->
751+
erlang:error(zstd_not_supported).
752+
-endif.
753+
717754
notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref,
718755
event_formatter = EvtFmt},
719756
committed_chunk_id = CommittedChId,

src/osiris_replica_reader.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ init(#{hosts := Hosts,
123123
[Host, Port]),
124124
CntId = {?MODULE, ExtRef, Host, Port},
125125
CntSpec = {CntId, ?COUNTER_FIELDS},
126-
Config = #{counter_spec => CntSpec, transport => Transport},
126+
Config = #{counter_spec => CntSpec, transport => Transport, compression => zstd},
127127
%% send token to replica to complete connection setup
128128
ok = send(Transport, Sock, Token),
129129
Ret = osiris_writer:init_data_reader(LeaderPid, TailInfo, Config),

0 commit comments

Comments
 (0)