Skip to content

Commit d008e4b

Browse files
committed
WIP; Fix Khepri clustering, take #3
1 parent c7f28cb commit d008e4b

File tree

3 files changed

+141
-85
lines changed

3 files changed

+141
-85
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,20 +364,23 @@ add_nodes_to_khepri_cluster(_FeatureName, _KhepriCluster, []) ->
364364
ok.
365365

366366
add_node_to_khepri_cluster(FeatureName, KhepriCluster, Node) ->
367+
?assertNotEqual([], KhepriCluster),
367368
case lists:member(Node, KhepriCluster) of
368369
true ->
369370
?LOG_DEBUG(
370-
"Feature flag `~s`: this node (~s) is already a member of "
371+
"Feature flag `~s`: node ~p is already a member of "
371372
"the largest cluster: ~p",
372373
[FeatureName, Node, KhepriCluster]),
373374
ok;
374375
false ->
375-
[KhepriNode | _] = KhepriCluster,
376376
?LOG_DEBUG(
377-
"Feature flag `~s`: adding this node (~s) to the largest "
377+
"Feature flag `~s`: adding node ~p to the largest "
378378
"Khepri cluster found among Mnesia nodes: ~p",
379379
[FeatureName, Node, KhepriCluster]),
380-
ok = rabbit_khepri:join_cluster(KhepriNode)
380+
case rabbit_khepri:add_member(Node, KhepriCluster) of
381+
ok -> ok;
382+
{ok, already_member} -> ok
383+
end
381384
end.
382385

383386
find_largest_khepri_cluster(FeatureName) ->

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 98 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515

1616
-export([setup/0,
1717
setup/1,
18-
join_cluster/1,
19-
add_member/1,
18+
add_member/2,
2019
remove_member/1,
2120
members/0,
2221
locally_known_members/0,
@@ -56,7 +55,8 @@
5655
is_enabled/1,
5756
nodes_if_khepri_enabled/0,
5857
try_mnesia_or_khepri/2]).
59-
-export([priv_reset/0]).
58+
-export([do_add_member/1,
59+
priv_reset/0]).
6060

6161
-ifdef(TEST).
6262
-export([force_metadata_store/1,
@@ -126,37 +126,43 @@ wait_for_leader(Fun, Timeout) ->
126126
Error
127127
end.
128128

129-
join_cluster(RemoteNode) when RemoteNode =/= node() ->
130-
ThisNode = node(),
131-
?LOG_INFO(
132-
"Khepri clustering: Attempt to add node ~p to Khepri cluster through "
133-
"node ~p",
134-
[ThisNode, RemoteNode],
135-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
129+
add_member(JoiningNode, JoinedNode) when JoinedNode =:= node() ->
130+
Ret = do_add_member(JoiningNode),
131+
post_add_member(JoiningNode, JoinedNode, Ret);
132+
add_member(JoiningNode, JoinedNode) when is_atom(JoinedNode) ->
136133
Ret = rabbit_misc:rpc_call(
137-
RemoteNode, rabbit_khepri, add_member, [ThisNode]),
138-
case Ret of
139-
ok ->
140-
?LOG_INFO(
141-
"Khepri clustering: Node ~p successfully added to Khepri "
142-
"cluster through node ~p",
143-
[ThisNode, RemoteNode],
144-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
145-
ok;
146-
Error ->
134+
JoinedNode, rabbit_khepri, do_add_member, [JoiningNode]),
135+
post_add_member(JoiningNode, JoinedNode, Ret);
136+
add_member(JoiningNode, [_ | _] = Cluster) ->
137+
case lists:member(JoiningNode, Cluster) of
138+
false ->
139+
JoinedNode = pick_node_in_cluster(Cluster),
147140
?LOG_INFO(
148-
"Khepri clustering: Failed to add node ~p to Khepri cluster "
149-
"through ~p: ~p",
150-
[ThisNode, RemoteNode, Error],
151-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
152-
Error
153-
end;
154-
join_cluster(ThisNode) when ThisNode =:= node() ->
155-
ok.
141+
"Khepri clustering: Attempt to add node ~p to cluster ~0p "
142+
"through node ~p",
143+
[JoiningNode, Cluster, JoinedNode],
144+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
145+
%% Recurse with a single node taken in the `Cluster' list.
146+
add_member(JoiningNode, JoinedNode);
147+
true ->
148+
?LOG_DEBUG(
149+
"Khepri clustering: Node ~p is already a member of cluster ~p",
150+
[JoiningNode, Cluster]),
151+
ok
152+
end.
153+
154+
pick_node_in_cluster(Cluster) when is_list(Cluster) ->
155+
?assertNotEqual([], Cluster),
156+
ThisNode = node(),
157+
case lists:member(ThisNode, Cluster) of
158+
true -> ThisNode;
159+
false -> hd(Cluster)
160+
end.
156161

157-
add_member(NewNode) when NewNode =/= node() ->
162+
do_add_member(NewNode) when NewNode =/= node() ->
158163
?LOG_DEBUG(
159-
"Trying to add node ~s to Khepri cluster \"~s\" from node ~s",
164+
"Khepri clustering: Trying to add node ~p to cluster \"~s\" through "
165+
"node ~p",
160166
[NewNode, ?RA_CLUSTER_NAME, node()],
161167
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
162168

@@ -169,24 +175,20 @@ add_member(NewNode) when NewNode =/= node() ->
169175
pong = net_adm:ping(NewNode),
170176

171177
?LOG_DEBUG(
172-
"Resetting Khepri on remote node ~s",
178+
"Khepri clustering: Resetting Khepri on remote node ~p",
173179
[NewNode],
174180
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
181+
175182
%% If the remote node to add is running RabbitMQ, we need to put
176183
%% it in maintenance mode at least. We remember that state to
177184
%% revive the node only if it was fully running before this code.
178185
RemoteIsRunning = rabbit:is_running(NewNode),
179186
RemoteAlreadyBeingDrained =
180187
rabbit_maintenance:is_being_drained_consistent_read(NewNode),
181188
NeedToReviveRemote =
182-
RemoteIsRunning andalso RemoteAlreadyBeingDrained,
183-
case RemoteIsRunning of
184-
true ->
185-
ok = rabbit_misc:rpc_call(
186-
NewNode, rabbit_maintenance, drain, []);
187-
false ->
188-
ok
189-
end,
189+
RemoteIsRunning andalso not RemoteAlreadyBeingDrained,
190+
maybe_drain_node(NewNode, RemoteIsRunning),
191+
190192
Ret1 = rabbit_misc:rpc_call(
191193
NewNode, rabbit_khepri, priv_reset, []),
192194
case Ret1 of
@@ -199,46 +201,74 @@ add_member(NewNode) when NewNode =/= node() ->
199201
Ret2 = khepri:add_member(
200202
?RA_SYSTEM, ?RA_CLUSTER_NAME, ?RA_FRIENDLY_NAME,
201203
NewNode),
204+
202205
%% Revive the remote node if it was running and not under
203206
%% maintenance before we changed the cluster membership.
204-
case NeedToReviveRemote of
205-
true ->
206-
ok = rabbit_misc:rpc_call(
207-
NewNode, rabbit_maintenance, revive, []);
208-
false ->
209-
ok
210-
end,
211-
case Ret2 of
212-
ok ->
213-
?LOG_DEBUG(
214-
"Node ~s added to Khepri cluster \"~s\"",
215-
[NewNode, ?RA_CLUSTER_NAME],
216-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
217-
ok;
218-
{error, _} = Error ->
219-
?LOG_ERROR(
220-
"Failed to add remote node ~s to Khepri "
221-
"cluster \"~s\": ~p",
222-
[NewNode, ?RA_CLUSTER_NAME, Error],
223-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
224-
Error
225-
end;
207+
maybe_revive_node(NewNode, NeedToReviveRemote),
208+
209+
Ret2;
226210
Error ->
227211
?LOG_ERROR(
228-
"Failed to reset Khepri on remote node ~s: ~p",
212+
"Khepri clustering: Failed to reset Khepri on node "
213+
"~p: ~p",
229214
[NewNode, Error],
230215
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
216+
217+
%% Revive the remote node if it was running and not under
218+
%% maintenance before we changed the cluster membership.
219+
maybe_revive_node(NewNode, NeedToReviveRemote),
220+
231221
Error
232222
end;
233223
true ->
234-
?LOG_INFO(
235-
"Asked to add node ~s to Khepri cluster \"~s\" but already a "
236-
"member of it: ~p",
237-
[NewNode, ?RA_CLUSTER_NAME, lists:sort(CurrentNodes)],
238-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
239-
ok
224+
{error, {already_member, CurrentNodes}}
240225
end.
241226

227+
maybe_drain_node(Node, true) ->
228+
ok = rabbit_misc:rpc_call(Node, rabbit_maintenance, drain, []);
229+
maybe_drain_node(_Node, false) ->
230+
ok.
231+
232+
maybe_revive_node(Node, true) ->
233+
ok = rabbit_misc:rpc_call(Node, rabbit_maintenance, revive, []);
234+
maybe_revive_node(_Node, false) ->
235+
ok.
236+
237+
post_add_member(JoiningNode, JoinedNode, ok) ->
238+
?LOG_INFO(
239+
"Khepri clustering: Node ~p successfully added to cluster \"~s\" "
240+
"through node ~p",
241+
[JoiningNode, ?RA_CLUSTER_NAME, JoinedNode],
242+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
243+
ok;
244+
post_add_member(
245+
JoiningNode, _JoinedNode, {error, {already_member, Cluster}}) ->
246+
?LOG_INFO(
247+
"Khepri clustering: Asked to add node ~p to cluster \"~s\" "
248+
"but already a member of it: ~p",
249+
[JoiningNode, ?RA_CLUSTER_NAME, lists:sort(Cluster)],
250+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
251+
{ok, already_member};
252+
post_add_member(
253+
JoiningNode, JoinedNode,
254+
{badrpc, {'EXIT', {undef, [{rabbit_khepri, Function, _, _}]}}} = Error)
255+
when Function =:= do_add_member orelse
256+
Function =:= priv_reset ->
257+
?LOG_INFO(
258+
"Khepri clustering: Can't add node ~p to cluster \"~s\"; "
259+
"Khepri unavailable on node ~p: ~p",
260+
[JoiningNode, ?RA_CLUSTER_NAME, JoinedNode, Error],
261+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
262+
%% TODO: Should we return an error and let the caller decide?
263+
ok;
264+
post_add_member(JoiningNode, JoinedNode, Error) ->
265+
?LOG_INFO(
266+
"Khepri clustering: Failed to add node ~p to cluster \"~s\" "
267+
"through ~p: ~p",
268+
[JoiningNode, ?RA_CLUSTER_NAME, JoinedNode, Error],
269+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
270+
Error.
271+
242272
remove_member(NodeToRemove) when NodeToRemove =/= node() ->
243273
?LOG_DEBUG(
244274
"Trying to remove node ~s from Khepri cluster \"~s\" on node ~s",

deps/rabbit/src/rabbit_mnesia.erl

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterva
216216

217217
join_cluster(DiscoveryNode, NodeType) ->
218218
case join_mnesia_cluster(DiscoveryNode, NodeType) of
219-
ok -> join_khepri_cluster(DiscoveryNode);
220-
Other -> Other
219+
ok -> join_khepri_cluster(DiscoveryNode);
220+
{ok, already_member} -> join_khepri_cluster(DiscoveryNode);
221+
Other -> Other
221222
end.
222223

223224
join_mnesia_cluster(DiscoveryNode, NodeType) ->
@@ -265,16 +266,8 @@ join_mnesia_cluster(DiscoveryNode, NodeType) ->
265266
end.
266267

267268
join_khepri_cluster(DiscoveryNode) ->
268-
case rabbit_khepri:join_cluster(DiscoveryNode) of
269-
{badrpc, {'EXIT', {undef, [{rabbit_khepri, add_member, _, _}]}}} ->
270-
rabbit_log:debug(
271-
"Skip Khepri cluster expansion; Khepri is unavailable on "
272-
"remote node ~p",
273-
[DiscoveryNode]),
274-
ok;
275-
Other ->
276-
Other
277-
end.
269+
ThisNode = node(),
270+
rabbit_khepri:add_member(ThisNode, [DiscoveryNode]).
278271

279272
%% return node to its virgin state, where it is not member of any
280273
%% cluster, has no cluster configuration, no local database, and no
@@ -929,7 +922,17 @@ stop_mnesia() ->
929922
stopped = mnesia:stop(),
930923
ensure_mnesia_not_running().
931924

932-
change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
925+
change_extra_db_nodes(ClusterNodes, CheckOtherNodes) ->
926+
Nodes = change_extra_mnesia_nodes(ClusterNodes, CheckOtherNodes),
927+
%% FIXME: Need to cluster Khepri at this point? I don't think so... I keep
928+
%% the code but it does nothing (`false' condition).
929+
case false andalso rabbit_khepri:is_enabled() of
930+
true -> _ = change_extra_khepri_nodes(ClusterNodes, CheckOtherNodes);
931+
false -> ok
932+
end,
933+
Nodes.
934+
935+
change_extra_mnesia_nodes(ClusterNodes0, CheckOtherNodes) ->
933936
ClusterNodes = nodes_excl_me(ClusterNodes0),
934937
case {mnesia:change_config(extra_db_nodes, ClusterNodes), ClusterNodes} of
935938
{{ok, []}, [_|_]} when CheckOtherNodes ->
@@ -939,6 +942,26 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
939942
Nodes
940943
end.
941944

945+
change_extra_khepri_nodes(ClusterNodes, CheckOtherNodes) ->
946+
ThisNode = node(),
947+
_ = rabbit_khepri:add_member(ThisNode, ClusterNodes),
948+
ActualNodes = rabbit_khepri:locally_known_nodes(),
949+
case CheckOtherNodes of
950+
true ->
951+
UnclusteredNodes = ClusterNodes -- ActualNodes,
952+
case UnclusteredNodes of
953+
[] ->
954+
ActualNodes;
955+
_ ->
956+
rabbit_log:error("UnclusteredNodes = ~p~n", [UnclusteredNodes]),
957+
throw({error,
958+
{failed_to_cluster_with, ClusterNodes,
959+
"Khepri could not connect to any nodes."}})
960+
end;
961+
false ->
962+
ActualNodes
963+
end.
964+
942965
check_consistency(Node, OTP, Rabbit, ProtocolVersion) ->
943966
rabbit_misc:sequence_error(
944967
[check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),

0 commit comments

Comments
 (0)