Skip to content

Commit

Permalink
Merge pull request #477 from rabbitmq/partial-read-api
Browse files Browse the repository at this point in the history
New log effect type: `log_ext` that allows entries residing in segment files to be read by another process.
  • Loading branch information
kjnilsson authored Nov 26, 2024
2 parents 9e4cbb0 + 7841fae commit eaa2094
Show file tree
Hide file tree
Showing 19 changed files with 417 additions and 167 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,17 @@ logger:set_primary_config(level, debug).
Ra attempts to follow [Semantic Versioning](https://semver.org/).

The modules that form part of the public API are:

* `ra`
* `ra_machine` (behaviour callbacks only)
* `ra_aux`
* `ra_system`
* `ra_counters`
* `ra_counters` (counter keys may vary between minors)
* `ra_leaderboard`
* `ra_env`
* `ra_directory`
* `ra_flru`
* `ra_log_read_plan`

## Copyright and License

Expand Down
17 changes: 17 additions & 0 deletions docs/internals/STATE_MACHINE_TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ Only the leader that first applies an entry will attempt the effect.
Followers process the same set of commands but simply throw away any effects
returned by the state machine unless specific effect provide the `local` option.

### Machine Effects table

| Spec | Executed on |
| -----| ----------- |
| `{send_msg, pid(), Msg :: term()}` | leader |
| `{send_msg, pid(), Msg :: term(), [local]}` | on member local to `pid()` else leader |
| `{monitor \| demonitor, process \| node, pid() \| node()}` | leader |
| `{mod_call, mfa()}` | leader |
| `{timer, Name :: term(), Time :: non_neg_integer() \| infinity}` | leader |
| `{append, term()}` | leader |
| `{append, term(), ra_server:command_reply_mode()}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects())}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{release_cursor \| checkpoint, ra_index(), term()}` | all members |
| `{aux, term()}` | every member |


### Send a message

Expand Down
1 change: 0 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,6 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
key_metrics({_, N} = ServerId, Timeout) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout).


%% internal

-spec usr(UserCommand, ReplyMode) -> Command when
Expand Down
7 changes: 6 additions & 1 deletion src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
is_dir/1,
is_file/1,
ensure_dir/1,
consult/1
consult/1,
cons/2
]).

-type file_err() :: file:posix() | badarg | terminated | system_limit.
Expand Down Expand Up @@ -454,6 +455,10 @@ consult(Path) ->
Err
end.

cons(Item, List)
when is_list(List) ->
[Item | List].

tokens(Str) ->
case erl_scan:string(Str) of
{ok, Tokens, _EndLoc} ->
Expand Down
104 changes: 87 additions & 17 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
write_sync/2,
fold/5,
sparse_read/2,
partial_read/3,
execute_read_plan/3,
read_plan_info/1,
last_index_term/1,
set_last_index/2,
handle_event/2,
Expand Down Expand Up @@ -70,9 +73,10 @@
{down, pid(), term()}.

-type event() :: {ra_log_event, event_body()}.
-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()).

-type effect() ::
{delete_snapshot, Dir :: file:filename(), ra_idxterm()} |
{delete_snapshot, Dir :: file:filename_all(), ra_idxterm()} |
{monitor, process, log, pid()} |
ra_snapshot:effect() |
ra_server:effect().
Expand All @@ -84,7 +88,7 @@

-record(cfg, {uid :: ra_uid(),
log_id :: unicode:chardata(),
directory :: file:filename(),
directory :: file:filename_all(),
min_snapshot_interval = ?MIN_SNAPSHOT_INTERVAL :: non_neg_integer(),
min_checkpoint_interval = ?MIN_CHECKPOINT_INTERVAL :: non_neg_integer(),
snapshot_module :: module(),
Expand All @@ -110,7 +114,11 @@
tx = false :: boolean()
}).

-record(read_plan, {dir :: file:filename_all(),
read :: #{ra_index() := log_entry()},
plan :: ra_log_reader:read_plan()}).

-opaque read_plan() :: #read_plan{}.
-opaque state() :: #?MODULE{}.

-type ra_log_init_args() :: #{uid := ra_uid(),
Expand Down Expand Up @@ -145,6 +153,7 @@
atom() => term()}.

-export_type([state/0,
read_plan/0,
ra_log_init_args/0,
ra_meta_key/0,
segment_ref/0,
Expand All @@ -154,13 +163,16 @@
overview/0
]).

-define(SNAPSHOTS_DIR, <<"snapshots">>).
-define(CHECKPOINTS_DIR, <<"checkpoints">>).

pre_init(#{uid := UId,
system_config := #{data_dir := DataDir}} = Conf) ->
Dir = server_data_dir(DataDir, UId),
SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
_ = ra_snapshot:init(UId, SnapModule, SnapshotsDir,
CheckpointsDir, undefined, MaxCheckpoints),
ok.
Expand All @@ -183,8 +195,8 @@ init(#{uid := UId,
CPInterval = maps:get(min_checkpoint_interval, Conf,
?MIN_CHECKPOINT_INTERVAL),
MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS),
SnapshotsDir = filename:join(Dir, "snapshots"),
CheckpointsDir = filename:join(Dir, "checkpoints"),
SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR),
CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR),
Counter = maps:get(counter, Conf, undefined),

%% ensure directories are there
Expand Down Expand Up @@ -303,7 +315,6 @@ init(#{uid := UId,
{SnapIdx, SnapTerm},
State#?MODULE.last_written_index_term
]),
?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]),
element(1, delete_segments(SnapIdx, State)).

-spec close(state()) -> ok.
Expand Down Expand Up @@ -465,8 +476,9 @@ fold(From0, To0, Fun, Acc0,
fold(_From, _To, _Fun, Acc, State) ->
{Acc, State}.

%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
%% @doc Reads a list of indexes.
%% Found indexes are returned in the same order as the input list of indexes
%% @end
-spec sparse_read([ra_index()], state()) ->
{[log_entry()], state()}.
sparse_read(Indexes0, #?MODULE{cfg = Cfg,
Expand All @@ -488,8 +500,8 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, CacheNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
{Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0),
%% here we recover the original order of indexes
Entries = case Sort of
Expand All @@ -507,6 +519,65 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
end,
{Entries, State#?MODULE{reader = Reader}}.


%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
-spec partial_read([ra_index()], state(),
fun ((ra_index(), ra_term(), ra_server:command()) -> term())
) ->
read_plan().
partial_read(Indexes0, #?MODULE{cfg = Cfg,
reader = Reader0,
last_index = LastIdx,
mem_table = Mt},
TransformFun) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1),
%% indexes need to be sorted high -> low for correct and efficient reading
Sort = ra_lib:lists_detect_sort(Indexes0),
Indexes1 = case Sort of
unsorted ->
lists:sort(fun erlang:'>'/2, Indexes0);
ascending ->
lists:reverse(Indexes0);
_ ->
% descending or undefined
Indexes0
end,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
Read = lists:foldl(fun ({I, T, Cmd}, Acc) ->
maps:put(I, TransformFun(I, T, Cmd), Acc)
end, #{}, Entries0),

Plan = ra_log_reader:read_plan(Reader0, Indexes),
#read_plan{dir = Cfg#cfg.directory,
read = Read,
plan = Plan}.


-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
TransformFun :: transform_fun()) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
execute_read_plan(#read_plan{dir = Dir,
read = Read,
plan = Plan}, Flru0, TransformFun) ->
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read).

-spec read_plan_info(read_plan()) -> map().
read_plan_info(#read_plan{read = Read,
plan = Plan}) ->
NumSegments = length(Plan),
NumInSegments = lists:foldl(fun ({_, Idxs}, Acc) ->
Acc + length(Idxs)
end, 0, Plan),
#{num_read => map_size(Read),
num_in_segments => NumInSegments,
num_segments => NumSegments}.


-spec last_index_term(state()) -> ra_idxterm().
last_index_term(#?MODULE{last_index = LastIdx, last_term = LastTerm}) ->
{LastIdx, LastTerm}.
Expand Down Expand Up @@ -980,8 +1051,8 @@ overview(#?MODULE{last_index = LastIndex,

-spec write_config(ra_server:config(), state()) -> ok.
write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ConfigPath = filename:join(Dir, "config"),
TmpConfigPath = filename:join(Dir, "config.tmp"),
ConfigPath = filename:join(Dir, <<"config">>),
TmpConfigPath = filename:join(Dir, <<"config.tmp">>),
% clean config of potentially unserialisable data
Config = maps:without([parent,
counter,
Expand All @@ -994,12 +1065,12 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) ->
ok = prim_file:rename(TmpConfigPath, ConfigPath),
ok.

-spec read_config(state() | file:filename()) ->
-spec read_config(state() | file:filename_all()) ->
{ok, ra_server:config()} | {error, term()}.
read_config(#?MODULE{cfg = #cfg{directory = Dir}}) ->
read_config(Dir);
read_config(Dir) ->
ConfigPath = filename:join(Dir, "config"),
ConfigPath = filename:join(Dir, <<"config">>),
ra_lib:consult(ConfigPath).

-spec delete_everything(state()) -> ok.
Expand Down Expand Up @@ -1309,8 +1380,7 @@ put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

server_data_dir(Dir, UId) ->
Me = ra_lib:to_list(UId),
filename:join(Dir, Me).
filename:join(Dir, UId).

maps_with_values(Keys, Map) ->
lists:foldr(
Expand Down
20 changes: 20 additions & 0 deletions src/ra_log_read_plan.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
-module(ra_log_read_plan).


-export([execute/2,
info/1]).

-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) ->
{#{ra:index() => Command :: ra_server:command()}, ra_flru:state()}.
execute(Plan, Flru) ->
ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3).

-spec info(ra_log:read_plan()) -> map().
info(Plan) ->
ra_log:read_plan_info(Plan).
Loading

0 comments on commit eaa2094

Please sign in to comment.