Skip to content

Commit a1f581c

Browse files
committed
WIP; Fix Khepri clustering, take #4
1 parent a9b91be commit a1f581c

File tree

1 file changed

+60
-47
lines changed

1 file changed

+60
-47
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ mds_phase1_migration(FeatureName, _FeatureProps, enable) ->
269269
case ensure_khepri_cluster_matches_mnesia(FeatureName) of
270270
ok ->
271271
Tables = ?MDS_PHASE1_TABLES,
272-
case is_mds_migration_done(Tables) of
272+
case is_mds_migration_done(FeatureName) of
273273
false -> migrate_tables_to_khepri(FeatureName, Tables);
274274
true -> ok
275275
end;
@@ -697,9 +697,7 @@ empty_unused_mnesia_tables(FeatureName, [Table | Rest]) ->
697697
"Feature flag `~s`: dropping content from unused Mnesia "
698698
"table ~s",
699699
[FeatureName, Table]),
700-
ok = empty_unused_mnesia_table(Table),
701-
ok = write_migrated_mark_to_mnesia(FeatureName, Table),
702-
?assert(is_table_migrated(Table));
700+
ok = empty_unused_mnesia_table(Table);
703701
{aborted, {already_exists, Table, _}} ->
704702
%% Another node is already taking care of this table.
705703
?LOG_DEBUG(
@@ -725,47 +723,62 @@ empty_unused_mnesia_table(Table, Key) ->
725723
ok = mnesia:dirty_delete(Table, Key),
726724
empty_unused_mnesia_table(Table, NextKey).
727725

728-
-define(MDS_MIGRATION_MARK_KEY, '$migrated_to_khepri').
729-
730-
write_migrated_mark_to_mnesia(FeatureName, Table) ->
731-
TableDefs = rabbit_table:definitions(),
732-
TableDef = proplists:get_value(Table, TableDefs),
733-
Match = proplists:get_value(match, TableDef),
734-
ForgedRecord = create_migrated_marker(Match),
735-
?LOG_DEBUG(
736-
"Feature flag `~s`: write a forged record to Mnesia table ~s to "
737-
"mark it as migrated",
738-
[FeatureName, Table]),
739-
mnesia:dirty_write(Table, ForgedRecord).
740-
741-
create_migrated_marker(Record) ->
742-
insert_migrated_marker(Record).
743-
744-
insert_migrated_marker('_') ->
745-
?MDS_MIGRATION_MARK_KEY;
746-
insert_migrated_marker(Atom) when is_atom(Atom) ->
747-
Atom;
748-
insert_migrated_marker(Tuple) when is_tuple(Tuple) ->
749-
Fields = tuple_to_list(Tuple),
750-
Fields1 = [insert_migrated_marker(Field) || Field <- Fields],
751-
list_to_tuple(Fields1).
752-
753-
is_mds_migration_done(Tables) ->
754-
lists:all(fun is_table_migrated/1, Tables).
755-
756-
is_table_migrated(Table) ->
757-
case mnesia:table_info(Table, size) of
758-
1 ->
759-
Key = mnesia:dirty_first(Table),
760-
is_migration_marker(Key);
761-
_ ->
762-
false
726+
is_mds_migration_done(FeatureName) ->
727+
%% To determine if the migration to Khepri was finished, we look at the
728+
%% state of the feature flag on another node, if any.
729+
ThisNode = node(),
730+
KhepriNodes = rabbit_khepri:nodes(),
731+
case KhepriNodes -- [ThisNode] of
732+
[] ->
733+
%% There are no other nodes. It means the node is unclustered
734+
%% and the migration function is called for the first time. This
735+
%% function returns `false'.
736+
?LOG_DEBUG(
737+
"Feature flag `~s`: migration done? false, the node is "
738+
"unclustered",
739+
[FeatureName]),
740+
false;
741+
[RemoteKhepriNode | _] ->
742+
%% This node is clustered already, either because of peer discovery
743+
%% or because of the `expand_khepri_cluster()' function.
744+
%%
745+
%% We need to distinguish two situations:
746+
%%
747+
%% - The first time the feature flag is enabled in a cluster, we
748+
%% want to migrate records from Mnesia to Khepri. In this case,
749+
%% the state of the feature flag will be `state_changing' on all
750+
%% nodes in the cluster. That's why we can pick any node to query
751+
%% its state.
752+
%%
753+
%% - When a new node is joining an existing cluster which is
754+
%% already using Khepri, we DO NOT want to migrate anything
755+
%% (Mnesia tables are empty, or about to be if the
756+
%% `post_enabled_locally' code is still running). To determine
757+
%% this, we query a remote node (but not this local node) to see
758+
%% the feature flag state. If it's `true' (enabled), it means the
759+
%% migration is either in progress or done. Otherwise, we are in
760+
%% the first situation described above.
761+
?LOG_DEBUG(
762+
"Feature flag `~s`: migration done? unknown, querying node ~p",
763+
[FeatureName, RemoteKhepriNode]),
764+
IsEnabledRemotely = rabbit_misc:rpc_call(
765+
RemoteKhepriNode,
766+
rabbit_feature_flags,
767+
is_enabled,
768+
[FeatureName, non_blocking]),
769+
?LOG_DEBUG(
770+
"Feature flag `~s`: feature flag state on node ~p: ~p",
771+
[FeatureName, RemoteKhepriNode, IsEnabledRemotely]),
772+
773+
%% If the RPC call fails (i.e. returns `{badrpc, ...}'), we throw
774+
%% an exception because we want the migration function to abort.
775+
Ret = case IsEnabledRemotely of
776+
true -> true;
777+
state_changing -> false;
778+
{badrpc, Error} -> throw(Error)
779+
end,
780+
?LOG_DEBUG(
781+
"Feature flag `~s`: migration done? ~s",
782+
[FeatureName, Ret]),
783+
Ret
763784
end.
764-
765-
is_migration_marker(?MDS_MIGRATION_MARK_KEY) ->
766-
true;
767-
is_migration_marker(Tuple) when is_tuple(Tuple) ->
768-
Fields = tuple_to_list(Tuple),
769-
lists:any(fun is_migration_marker/1, Fields);
770-
is_migration_marker(_) ->
771-
false.

0 commit comments

Comments
 (0)