Skip to content

Commit b283c5c

Browse files
committed
WIP; Fix Khepri clustering, take #4
1 parent f7c0c37 commit b283c5c

File tree

1 file changed

+60
-47
lines changed

1 file changed

+60
-47
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ mds_phase1_migration(FeatureName, _FeatureProps, enable) ->
258258
case ensure_khepri_cluster_matches_mnesia(FeatureName) of
259259
ok ->
260260
Tables = ?MDS_PHASE1_TABLES,
261-
case is_mds_migration_done(Tables) of
261+
case is_mds_migration_done(FeatureName) of
262262
false -> migrate_tables_to_khepri(FeatureName, Tables);
263263
true -> ok
264264
end;
@@ -686,9 +686,7 @@ empty_unused_mnesia_tables(FeatureName, [Table | Rest]) ->
686686
"Feature flag `~s`: dropping content from unused Mnesia "
687687
"table ~s",
688688
[FeatureName, Table]),
689-
ok = empty_unused_mnesia_table(Table),
690-
ok = write_migrated_mark_to_mnesia(FeatureName, Table),
691-
?assert(is_table_migrated(Table));
689+
ok = empty_unused_mnesia_table(Table);
692690
{aborted, {already_exists, Table, _}} ->
693691
%% Another node is already taking care of this table.
694692
?LOG_DEBUG(
@@ -714,47 +712,62 @@ empty_unused_mnesia_table(Table, Key) ->
714712
ok = mnesia:dirty_delete(Table, Key),
715713
empty_unused_mnesia_table(Table, NextKey).
716714

717-
-define(MDS_MIGRATION_MARK_KEY, '$migrated_to_khepri').
718-
719-
write_migrated_mark_to_mnesia(FeatureName, Table) ->
720-
TableDefs = rabbit_table:definitions(),
721-
TableDef = proplists:get_value(Table, TableDefs),
722-
Match = proplists:get_value(match, TableDef),
723-
ForgedRecord = create_migrated_marker(Match),
724-
?LOG_DEBUG(
725-
"Feature flag `~s`: write a forged record to Mnesia table ~s to "
726-
"mark it as migrated",
727-
[FeatureName, Table]),
728-
mnesia:dirty_write(Table, ForgedRecord).
729-
730-
create_migrated_marker(Record) ->
731-
insert_migrated_marker(Record).
732-
733-
insert_migrated_marker('_') ->
734-
?MDS_MIGRATION_MARK_KEY;
735-
insert_migrated_marker(Atom) when is_atom(Atom) ->
736-
Atom;
737-
insert_migrated_marker(Tuple) when is_tuple(Tuple) ->
738-
Fields = tuple_to_list(Tuple),
739-
Fields1 = [insert_migrated_marker(Field) || Field <- Fields],
740-
list_to_tuple(Fields1).
741-
742-
is_mds_migration_done(Tables) ->
743-
lists:all(fun is_table_migrated/1, Tables).
744-
745-
is_table_migrated(Table) ->
746-
case mnesia:table_info(Table, size) of
747-
1 ->
748-
Key = mnesia:dirty_first(Table),
749-
is_migration_marker(Key);
750-
_ ->
751-
false
715+
is_mds_migration_done(FeatureName) ->
716+
%% To determine if the migration to Khepri was finished, we look at the
717+
%% state of the feature flag on another node, if any.
718+
ThisNode = node(),
719+
KhepriNodes = rabbit_khepri:nodes(),
720+
case KhepriNodes -- [ThisNode] of
721+
[] ->
722+
%% There are no other nodes. It means the node is unclustered
723+
%% and the migration function is called for the first time. This
724+
%% function returns `false'.
725+
?LOG_DEBUG(
726+
"Feature flag `~s`: migration done? false, the node is "
727+
"unclustered",
728+
[FeatureName]),
729+
false;
730+
[RemoteKhepriNode | _] ->
731+
%% This node is clustered already, either because of peer discovery
732+
%% or because of the `expand_khepri_cluster()' function.
733+
%%
734+
%% We need to distinguish two situations:
735+
%%
736+
%% - The first time the feature flag is enabled in a cluster, we
737+
%% want to migrate records from Mnesia to Khepri. In this case,
738+
%% the state of the feature flag will be `state_changing' on all
739+
%% nodes in the cluster. That's why we can pick any node to query
740+
%% its state.
741+
%%
742+
%% - When a new node is joining an existing cluster which is
743+
%% already using Khepri, we DO NOT want to migrate anything
744+
%% (Mnesia tables are empty, or about to be if the
745+
%% `post_enabled_locally' code is still running). To determine
746+
%% this, we query a remote node (but not this local node) to see
747+
%% the feature flag state. If it's `true' (enabled), it means the
748+
%% migration is either in progress or done. Otherwise, we are in
749+
%% the first situation described above.
750+
?LOG_DEBUG(
751+
"Feature flag `~s`: migration done? unknown, querying node ~p",
752+
[FeatureName, RemoteKhepriNode]),
753+
IsEnabledRemotely = rabbit_misc:rpc_call(
754+
RemoteKhepriNode,
755+
rabbit_feature_flags,
756+
is_enabled,
757+
[FeatureName, non_blocking]),
758+
?LOG_DEBUG(
759+
"Feature flag `~s`: feature flag state on node ~p: ~p",
760+
[FeatureName, RemoteKhepriNode, IsEnabledRemotely]),
761+
762+
%% If the RPC call fails (i.e. returns `{badrpc, ...}'), we throw
763+
%% an exception because we want the migration function to abort.
764+
Ret = case IsEnabledRemotely of
765+
true -> true;
766+
state_changing -> false;
767+
{badrpc, Error} -> throw(Error)
768+
end,
769+
?LOG_DEBUG(
770+
"Feature flag `~s`: migration done? ~s",
771+
[FeatureName, Ret]),
772+
Ret
752773
end.
753-
754-
is_migration_marker(?MDS_MIGRATION_MARK_KEY) ->
755-
true;
756-
is_migration_marker(Tuple) when is_tuple(Tuple) ->
757-
Fields = tuple_to_list(Tuple),
758-
lists:any(fun is_migration_marker/1, Fields);
759-
is_migration_marker(_) ->
760-
false.

0 commit comments

Comments
 (0)