Commits on Jan 13, 2025

  1. Introduce initial_machine_version server config.

    This new key can be used to specify the initial machine version
    a new Ra server should be initialised against.
    This allows machines to skip old versions when creating a new
    Ra cluster.  This is particularly useful when machine_version_strategy=all
    as the initial machine upgrade upgrade after cluster creation is delayed until
    all members reply to the info requests.
    If the machine_version_strategy=all, starting a server with
    an initial machine version that is higher than the locally available
    machine version will result in an error: {error, invalid_initial_machine_version}.
    When machine_version_strategy=quorum the initial machine version
    will be clamped to the locally available machine version.
    kjnilsson committed Jan 13, 2025
    Copy the full SHA
    8077b75 View commit details
  2. Merge pull request #498 from rabbitmq/initial-machine-version

    Introduce initial_machine_version server config.
    kjnilsson authored Jan 13, 2025
    Copy the full SHA
    59e33b5 View commit details
  3. v2.16.0-pre.11

    kjnilsson committed Jan 13, 2025
    Copy the full SHA
    d333e83 View commit details
Showing with 219 additions and 53 deletions.
  1. +1 −1 MODULE.bazel
  2. +1 −1 src/
  3. +9 −11 src/ra_machine.erl
  4. +13 −10 src/ra_server.erl
  5. +27 −4 src/ra_server_sup_sup.erl
  6. +2 −1 test/ra_dbg_SUITE.erl
  7. +9 −20 test/ra_fifo.erl
  8. +157 −5 test/ra_machine_version_SUITE.erl
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "rabbitmq_ra",
version = "2.16.0-pre.10",
version = "2.16.0-pre.11",

2 changes: 1 addition & 1 deletion src/
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[{description,"Raft library"},
20 changes: 9 additions & 11 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@


@@ -90,7 +90,9 @@
-type user_command() :: term().
%% the command type for a given machine implementation

-type machine_init_args() :: #{name := atom(), atom() => term()}.
-type machine_init_args() :: #{name := atom(),
machine_version => version(),
atom() => term()}.
%% the configuration passed to the init callback

-type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}.
@@ -294,15 +296,11 @@
%% @doc initialise a new machine
%% This is only called on startup only if there isn't yet a snapshot to recover
%% from. Once a snapshot has been taken this is never called again.
-spec init(machine(), atom()) -> state().
init({machine, _, Args} = Machine, Name) ->
%% init always dispatches to the first version
%% as this means every state machine in a mixed version cluster will
%% have a common starting point.
%% TODO: it should be possible to pass a lowest supported state machine
%% version flag in the init args so that old machine version can be purged
Mod = which_module(Machine, 0),
Mod:init(Args#{name => Name}).
-spec init(machine(), atom(), version()) -> state().
init({machine, _, Args} = Machine, Name, Version) ->
Mod = which_module(Machine, Version),
Mod:init(Args#{name => Name,
machine_version => Version}).

-spec apply(module(), command_meta_data(), command(), State) ->
{State, reply(), effects()} | {State, reply()}.
23 changes: 13 additions & 10 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
@@ -215,6 +215,7 @@
log_init_args := ra_log:ra_log_init_args(),
initial_members := [ra_server_id()],
machine := machine_conf(),
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
% TODO: review - only really used for
@@ -352,24 +353,26 @@ init(#{id := Id,
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
maps:get(initial_machine_version, Config, 0)),

{_FirstIndex, Cluster0, MacVer, MacState,
{Cluster0, EffectiveMacVer, MacState,
{SnapshotIdx, _} = SnapshotIndexTerm} =
case ra_log:recover_snapshot(Log0) of
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
{0, make_cluster(Id, InitialNodes),
0, InitialMachineState, {0, 0}};
InitialMachineState = ra_machine:init(Machine, Name,
{make_cluster(Id, InitialNodes),
InitialMachineVersion, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes,
machine_version := MacVersion}, MacSt} ->
Clu = make_cluster(Id, ClusterNodes),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
{Clu, MacVersion, MacSt, {Idx, Term}}
MacMod = ra_machine:which_module(Machine, MacVer),
MacMod = ra_machine:which_module(Machine, EffectiveMacVer),

CommitIndex = max(LastApplied, SnapshotIdx),
Cfg = #cfg{id = Id,
@@ -378,8 +381,8 @@ init(#{id := Id,
metrics_key = MetricKey,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
effective_machine_version = EffectiveMacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
@@ -389,7 +392,7 @@ init(#{id := Id,
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer),

NonVoter = get_membership(Cluster0, Id, UId,
maps:get(membership, Config, voter)),
31 changes: 27 additions & 4 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
@@ -39,7 +39,9 @@

-spec start_server(System :: atom(), ra_server:ra_server_config()) ->
supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}.
supervisor:startchild_ret() |
{error, not_new | system_not_started | invalid_initial_machine_version} |
{badrpc, term()}.
start_server(System, #{id := NodeId,
uid := UId} = Config)
when is_atom(System) ->
@@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) ->
%% check that the server isn't already registered
case ra_directory:name_of(System, UId) of
undefined ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
case validate_config(Config) of
ok ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
Err ->
Err ->
@@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) ->

validate_config(#{system_config := SysConf} = Config) ->
Strat = maps:get(machine_upgrade_strategy, SysConf, all),
case Config of
#{initial_machine_version := InitMacVer,
machine := {module, Mod, Args}} when Strat == all ->
MacVer = ra_machine:version({machine, Mod, Args}),
if MacVer < InitMacVer ->
{error, invalid_initial_machine_version};
true ->
_ ->

restart_server_rpc(System, {RaName, _Node}, AddConfig)
when is_atom(System) ->
case ra_system:fetch(System) of
3 changes: 2 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
@@ -77,7 +77,8 @@ execute_state_machine() ->
%% creating a new WAL file with ra_fifo
[Srv] = Nodes = [{ra_dbg, node()}],
ClusterId = ra_dbg,
Config = #{name => ClusterId},
Config = #{name => ClusterId,
machine_version => 0},
Machine = {module, ra_fifo, Config},
{ok, _, _} = ra:start_cluster(default, ClusterId, Machine, Nodes),
29 changes: 9 additions & 20 deletions test/ra_fifo.erl
Original file line number Diff line number Diff line change
@@ -170,6 +170,7 @@
-opaque state() :: #state{}.

-type config() :: #{name := atom(),
machine_version := ra_machine:version(),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
cancel_customer_handler => applied_mfa(),
@@ -902,7 +903,8 @@ size_test(NumMsg, NumCust) ->
EnqGen = fun(N) -> {N, {enqueue, N}} end,
CustGen = fun(N) -> {N, {checkout, {auto, 100},
spawn(fun() -> ok end)}} end,
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test})),
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test,
machine_version => 0})),
S = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
S2 = S#state{ra_indexes = ra_fifo_index:map(fun(_, _) -> undefined end,
@@ -918,29 +920,13 @@ perf_test(NumMsg, NumCust) ->
{N, {settle, N - NumMsg - NumCust - 1, Pid}}
S0 = run_log(1, NumMsg, EnqGen,
init(#{name => size_test})),
init(#{name => size_test,
machine_version => 0})),
S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
_ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),

% profile(File) ->
% GzFile = atom_to_list(File) ++ ".gz",
% lg:trace([ra_fifo, maps, queue, ra_fifo_index], lg_file_tracer,
% GzFile, #{running => false, mode => profile}),
% NumMsg = 10000,
% NumCust = 500,
% EnqGen = fun(N) -> {N, {enqueue, self(), N, N}} end,
% Pid = spawn(fun() -> ok end),
% CustGen = fun(N) -> {N, {checkout, {auto, NumMsg},
% {term_to_binary(N), Pid}}} end,
% SetlGen = fun(N) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end,
% S0 = run_log(1, NumMsg, EnqGen, element(1, init(#{name => size_test}))),
% S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
% _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
% lg:stop().

run_log(Num, Num, _Gen, State) ->
run_log(Num, Max, Gen, State0) ->
@@ -995,6 +981,7 @@ dehydrate_state(#state{messages = Messages0,

test_init(Name) ->
init(#{name => Name,
machine_version => 0,
shadow_copy_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}}).

@@ -1243,6 +1230,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
Cid = {<<"completed_customer_yields_demonitor_effect_test">>, self()},
State00 = init(#{name => test,
machine_version => 0,
dead_letter_handler =>
{somemod, somefun, [somearg]}}),
{State0, _, [_, _]} = enq(1, 1, first, State00),
@@ -1430,6 +1418,7 @@ duplicate_delivery_test() ->
state_enter_test() ->

S0 = init(#{name => the_name,
machine_version => 0,
become_leader_handler => {m, f, [a]}}),
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
@@ -1505,7 +1494,7 @@ run_log(InitState, Entries) ->
aux_test() ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
MacState = init(#{name => aux_test}),
MacState = init(#{name => aux_test, machine_version => 0}),
Log = undefined,
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
Log, MacState),
162 changes: 157 additions & 5 deletions test/ra_machine_version_SUITE.erl
Original file line number Diff line number Diff line change
@@ -34,7 +34,9 @@ all_tests() ->
% snapshot_persists_machine_version

@@ -65,8 +67,8 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config) ->
ok = logger:set_primary_config(level, all),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
case lists:member(TestCase, machine_upgrade_quorum_tests()) of
true ->
ok = application:set_env(ra, machine_upgrade_strategy, quorum),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
@@ -94,8 +96,8 @@ init_per_testcase(TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
catch ra:delete_cluster(?config(cluster, Config)),
case TestCase of
server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher ->
case lists:member(TestCase, machine_upgrade_quorum_tests()) of
true ->
ok = application:unset_env(ra, machine_upgrade_strategy),
_ = ra_system:stop_default(),
{ok, _} = ra_system:start_default();
@@ -104,6 +106,10 @@ end_per_testcase(TestCase, Config) ->

machine_upgrade_quorum_tests() ->

%%% Test cases
@@ -427,6 +433,152 @@ server_applies_with_new_module(Config) ->
snapshot_persists_machine_version(_Config) ->
error({todo, ?FUNCTION_NAME}).

initial_machine_version(Config) ->
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(3, MacVer),
meck:expect(Mod, version, fun () -> 5 end),
meck:expect(Mod, which_module, fun (_) -> Mod end),
meck:expect(Mod, apply, fun (_, dummy, S) ->
{S, ok};
(_, {machine_version, 0, 3}, init_state) ->
{state_v3, ok};
(_, {machine_version, 3, 5}, init_state) ->
{state_v5, ok}
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
Machine = {module, Mod, #{}},
Configs = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 3}
end || Id <- [ServerId]],
% debugger:start(),
% int:i(ra_machine),
% int:i(ra_server_sup_sup),
% int:break(ra_server_sup_sup, 66),
{ok, _, _} = ra:start_cluster(?SYS, Configs, 5000),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
S == state_v5
end, 100),
?assertMatch({ok, #{effective_machine_version := 5}, _},
{ok, _} = ra:delete_cluster([ServerId]),
await(fun () -> whereis(element(1, ServerId)) == undefined end, 100),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(5, MacVer),
meck:expect(Mod, apply, fun (Meta, meta, _S) ->
{state_v5, Meta};
(_, {machine_version, 0, 3}, init_state) ->
{state_v3, ok};
(_, {machine_version, 5, 5}, init_state) ->
{state_v5, ok}
Configs2 = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 9}
end || Id <- [ServerId]],
{error, cluster_not_formed} = ra:start_cluster(?SYS, Configs2, 5000),

initial_machine_version_quorum(Config) ->
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(3, MacVer),
meck:expect(Mod, version, fun () -> 5 end),
meck:expect(Mod, which_module, fun (_) -> Mod end),
meck:expect(Mod, apply, fun (_, dummy, S) ->
{S, ok};
(_, {machine_version, 0, 3}, init_state) ->
{state_v3, ok};
(_, {machine_version, 3, 5}, init_state) ->
{state_v5, ok}
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
Machine = {module, Mod, #{}},
Configs = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 3}
end || Id <- [ServerId]],
% debugger:start(),
% int:i(ra_machine),
% int:i(ra_server_sup_sup),
% int:break(ra_server_sup_sup, 66),
{ok, _, _} = ra:start_cluster(?SYS, Configs, 5000),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
S == state_v5
end, 100),
?assertMatch({ok, #{effective_machine_version := 5}, _},
{ok, _} = ra:delete_cluster([ServerId]),
await(fun () -> whereis(element(1, ServerId)) == undefined end, 100),
meck:expect(Mod, init, fun (#{machine_version := MacVer}) ->
?assertEqual(5, MacVer),
meck:expect(Mod, apply, fun (Meta, meta, _S) ->
{state_v5, Meta};
(_, {machine_version, 0, 3}, init_state) ->
{state_v3, ok};
(_, {machine_version, 5, 5}, init_state) ->
{state_v5, ok}
Configs2 = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
#{id => Id,
uid => UId,
cluster_name => ClusterName,
log_init_args => #{uid => UId},
initial_members => [ServerId],
machine => Machine,
initial_machine_version => 9}
end || Id <- [ServerId]],
{ok, _, _} = ra:start_cluster(?SYS, Configs2, 5000),
{ok, #{machine_version := 5}, _} = ra:process_command(ServerId, meta),
await(fun () ->
{ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1),
ct:pal("S ~p", [S]),
S == state_v5
end, 100),
ct:pal("overview ~p", [ra:member_overview(ServerId)]),
%% Utility

validate_state_enters(States) ->