Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Streams with parallel processing, lazy filtering and random sampling #1389

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8e01fb7
Implement stream with a parallel map to a bag
xandkar May 10, 2022
d093d91
Use more-intuitive field names
xandkar May 14, 2022
014024e
Note drawback of current scheduling strategy in pmap_to_bag
xandkar May 24, 2022
c5fff4f
Complete the sequence operations on streams
xandkar Jun 8, 2022
87906a3
Switch snap bin list streaming to the general stream
xandkar Jun 8, 2022
55d3486
Implement random elements selection from a stream
xandkar Jun 13, 2022
bd11ffa
Rename random_elements/2 to sample/2
xandkar Jun 17, 2022
1aa82af
Implement and test RocksDB facade with streams and random sampling
xandkar Jun 18, 2022
8acb284
Extend and tidy-up rocks tests
xandkar Jun 19, 2022
4977566
Remove read_options() from blockchain_rocks API
xandkar Jun 19, 2022
5797695
Implement blockchain_rocks:fold
xandkar Jun 19, 2022
644450c
Rename data_stream:iter to foreach
xandkar Jun 19, 2022
a34fa6c
Normalize t-first position for streams
xandkar Jun 19, 2022
3aa4ac4
Create map and filter aliases without the lazy_ prefix
xandkar Jun 19, 2022
7ff843d
Remove direct implementation of blockchain_rocks:foreach
xandkar Jun 19, 2022
f25c8e6
Fix blockchain_rocks:fold signature
xandkar Jun 19, 2022
6227eab
Add the CF variant of blockchain_rocks:foreach
xandkar Jun 19, 2022
5c5a33c
Improve rocks tests:
xandkar Jun 19, 2022
da86996
Re-expose read_options()
xandkar Jun 20, 2022
44296f3
Assert sample size
xandkar Jun 20, 2022
6a2579e
Fix typo and clarify
xandkar Jun 21, 2022
92ebb1d
Fix extra Opts param in CF version and note to test it
xandkar Jun 21, 2022
c38973f
Support chaining streams
xandkar Jun 26, 2022
3ecf3e2
Refine stream type to non-empty list of streams
xandkar Jun 29, 2022
e996286
Note the reason for representation as a list
xandkar Jun 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ _build
*.iml
rebar3.crashdump
data/
!src/data/
.DS_Store
src/pb/
src/grpc/autogen
Expand Down
116 changes: 116 additions & 0 deletions src/blockchain_rocks.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
-module(blockchain_rocks).

-export([
fold/4,
fold/5,
foreach/3,
foreach/4,
stream/2,
stream/3,
sample/3,
sample/4
]).

%% API ========================================================================

-spec fold(
rocksdb:db_handle(),
rocksdb:read_options(),
Acc,
fun(({K :: binary(), V :: binary()}, Acc) -> Acc)
) ->
Acc.
fold(DB, Opts, Acc, F) ->
data_stream:fold(stream(DB, Opts), Acc, F).

-spec fold(
rocksdb:db_handle(),
rocksdb:cf_handle(),
rocksdb:read_options(),
Acc,
fun(({K :: binary(), V :: binary()}, Acc) -> Acc)
) ->
Acc.
fold(DB, CF, Opts, Acc, F) ->
data_stream:fold(stream(DB, CF, Opts), Acc, F).

-spec foreach(
rocksdb:db_handle(),
rocksdb:read_options(),
fun(({K :: binary(), V :: binary()}) -> ok)
) ->
ok.
foreach(DB, Opts, F) ->
data_stream:foreach(stream(DB, Opts), F).

-spec foreach(
rocksdb:db_handle(),
rocksdb:cf_handle(),
rocksdb:read_options(),
fun(({K :: binary(), V :: binary()}) -> ok)
) ->
ok.
foreach(DB, CF, Opts, F) ->
data_stream:foreach(stream(DB, CF, Opts), F).

-spec stream(rocksdb:db_handle(), rocksdb:read_options()) ->
data_stream:t({K :: binary(), V :: binary()}).
stream(DB, Opts) ->
stream_(fun () -> rocksdb:iterator(DB, Opts) end).

-spec stream(
rocksdb:db_handle(),
rocksdb:cf_handle(),
rocksdb:read_options()
) ->
data_stream:t({K :: binary(), V :: binary()}).
stream(DB, CF, Opts) ->
stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end).

%% @doc Select K random records from database.
-spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) ->
[{K :: binary(), V :: binary()}].
sample(DB, Opts, K) ->
Stream = stream(DB, Opts),
data_stream:sample(Stream, K).

%% @doc Select K random records from CF.
-spec sample(
rocksdb:db_handle(),
rocksdb:cf_handle(),
rocksdb:read_options(),
pos_integer()
) ->
[{K :: binary(), V :: binary()}].
sample(DB, CF, Opts, K) ->
Stream = stream(DB, CF, Opts),
data_stream:sample(Stream, K).

%% Internal ===================================================================

-spec stream_(fun(() -> {ok, rocksdb:itr_handle()} | {error, term()})) ->
data_stream:t({K :: binary(), V :: binary()}).
stream_(IterOpen) ->
case IterOpen() of
{error, Reason} ->
error({blockchain_rocks_iter_make, Reason});
{ok, Iter} ->
Move =
fun Move_ (Target) ->
fun () ->
case rocksdb:iterator_move(Iter, Target) of
{ok, K, V} ->
{some, {{K, V}, Move_(next)}};
{error, invalid_iterator} ->
ok = rocksdb:iterator_close(Iter),
none;
Error ->
error({blockchain_rocks_iter_move, Target, Error})
end
end
end,
data_stream:from_fun(Move(first))
end.

%% Test =======================================================================
%% See test/blockchain_rocks_SUITE.erl
50 changes: 25 additions & 25 deletions src/blockchain_term.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
-export_type([
t/0,
result/0,
stream/1, % TODO Find stream def a better home module than this one.
error/0,
frame/0,
unsound/0,
Expand Down Expand Up @@ -105,69 +104,70 @@

-include("blockchain_term.hrl").

%% TODO Maybe use a map?
%% TODO Switch to a map or a record?
-type file_handle() ::
{
file:fd(),
Pos :: non_neg_integer(),
Len :: pos_integer()
}.

-type stream(A) :: fun(() -> none | {some, {A, stream(A)}}).

-spec from_bin(binary()) -> result().
from_bin(<<Bin/binary>>) ->
envelope(Bin).

%% Tries to stream a list of binaries from file.
%% TODO Generalize.
-spec from_file_stream_bin_list(file_handle()) ->
stream({ok, binary()} | {error, term()}).
data_stream:t({ok, binary()} | {error, term()}).
from_file_stream_bin_list({Fd, Pos, Len}) ->
{ok, Pos} = file:position(Fd, {bof, Pos}),
case file:read(Fd, 6) of
{ok, <<?ETF_VERSION, ?ETF_TAG_LIST_EXT, N:32/integer-unsigned-big>>} ->
stream_bin_list_elements(N, {Fd, Pos + 6, Len});
{ok, <<V/binary>>} ->
fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, stream_end()}} end;
{error, _}=Err ->
fun () -> {some, {Err, stream_end()}} end
end.
Next =
case file:read(Fd, 6) of
{ok, <<?ETF_VERSION, ?ETF_TAG_LIST_EXT, N:32/integer-unsigned-big>>} ->
next_bin_list_elements(N, {Fd, Pos + 6, Len});
{ok, <<V/binary>>} ->
fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, next_end()}} end;
{error, _}=Err ->
fun () -> {some, {Err, next_end()}} end
end,
data_stream:from_fun(Next).

-spec stream_bin_list_elements(non_neg_integer(), file_handle()) ->
stream({ok, binary()} | {error, term()}).
stream_bin_list_elements(0, {Fd, Pos, _}) ->
-spec next_bin_list_elements(non_neg_integer(), file_handle()) ->
data_stream:next({ok, binary()} | {error, term()}).
next_bin_list_elements(0, {Fd, Pos, _}) ->
fun () ->
{ok, Pos} = file:position(Fd, {bof, Pos}),
case file:read(Fd, 1) of
{ok, <<?ETF_TAG_NIL_EXT>>} ->
none;
{ok, <<_/binary>>} ->
{some, {{error, bad_bin_list_nil_tag}, stream_end()}};
{some, {{error, bad_bin_list_nil_tag}, next_end()}};
{error, _}=Err ->
{some, {Err, stream_end()}}
{some, {Err, next_end()}}
end
end;
stream_bin_list_elements(N, {Fd, Pos0, L}) ->
next_bin_list_elements(N, {Fd, Pos0, L}) ->
fun () ->
{ok, Pos1} = file:position(Fd, {bof, Pos0}),
case file:read(Fd, 5) of
{ok, <<?ETF_TAG_BINARY_EXT, Len:32/integer-unsigned-big>>} ->
{ok, Pos2} = file:position(Fd, {bof, Pos1 + 5}),
case file:read(Fd, Len) of
{ok, <<Bin/binary>>} ->
{some, {Bin, stream_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}};
{some, {Bin, next_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}};
{error, _}=Err ->
{some, {Err, stream_end()}}
{some, {Err, next_end()}}
end;
{ok, <<_/binary>>} ->
{some, {{error, bad_bin_list_element}, stream_end()}};
{some, {{error, bad_bin_list_element}, next_end()}};
{error, _}=Err ->
{some, {Err, stream_end()}}
{some, {Err, next_end()}}
end
end.

stream_end() ->
-spec next_end() ->
data_stream:next({ok, binary()} | {error, term()}).
next_end() ->
fun () -> none end.

%% TODO -spec from_bin_with_contract(binary(), blockchain_contract:t()) ->
Expand Down
2 changes: 2 additions & 0 deletions src/blockchain_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include("blockchain_vars.hrl").

-export([
cpus/0,
shuffle_from_hash/2,
shuffle/1, shuffle/2,
rand_from_hash/1, rand_state/1,
Expand Down Expand Up @@ -302,6 +303,7 @@ validation_width() ->
N
end.

-spec cpus() -> non_neg_integer().
cpus() ->
Ct = erlang:system_info(schedulers_online),
max(2, ceil(Ct/2) + 1).
Expand Down
Loading