2626 normalize /1 ,
2727 append_node_prefix /1 ,
2828 node_prefix /0 ]).
29- -export ([do_query_node_props /1 ,
30- group_leader_proxy /2 ]).
29+ -export ([do_query_node_props /2 ]).
3130
3231-ifdef (TEST ).
3332-export ([query_node_props /1 ,
@@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
378377% % @private
379378
380379query_node_props (Nodes ) when Nodes =/= [] ->
381- {Prefix , Suffix } = rabbit_nodes_common :parts (node ()),
380+ ThisNode = node (),
381+ {Prefix , Suffix } = rabbit_nodes_common :parts (ThisNode ),
382382 PeerName = peer :random_name (Prefix ),
383383 % % We go through a temporary hidden node to query all other discovered
384384 % % peers properties, instead of querying them directly.
@@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] ->
440440 [Peer ],
441441 #{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
442442 try
443- peer :call (Pid , ? MODULE , do_query_node_props , [Nodes ], 180000 )
443+ NodesAndProps1 = peer :call (
444+ Pid ,
445+ ? MODULE , do_query_node_props ,
446+ [Nodes , ThisNode ], 180000 ),
447+ NodesAndProps2 = sort_nodes_and_props (NodesAndProps1 ),
448+ NodesAndProps2
444449 after
445450 peer :stop (Pid )
446451 end ;
@@ -563,80 +568,31 @@ maybe_add_tls_arguments(VMArgs) ->
563568 end ,
564569 VMArgs2 .
565570
566- do_query_node_props (Nodes ) when Nodes =/= [] ->
571+ do_query_node_props (Nodes , FromNode ) when Nodes =/= [] ->
567572 % % Make sure all log messages are forwarded from this temporary hidden
568573 % % node to the upstream node, regardless of their level.
569574 _ = logger :set_primary_config (level , debug ),
570575
571- % % The group leader for all processes on this temporary hidden node is the
572- % % calling process' group leader on the upstream node.
573- % %
574- % % When we use `erpc:call/4' (or the multicall equivalent) to execute code
575- % % on one of the `Nodes', the remotely executed code will also use the
576- % % calling process' group leader by default.
577- % %
578- % % We use this temporary hidden node to ensure the downstream node will
579- % % not connected to the upstream node. Therefore, we must change the group
580- % % leader as well, otherwise any I/O from the downstream node will send a
581- % % message to the upstream node's group leader and thus open a connection.
582- % % This would defeat the entire purpose of this temporary hidden node.
583- % %
584- % % To avoid this, we start a proxy process which we will use as a group
585- % % leader. This process will send all messages it receives to the group
586- % % leader on the upstream node.
587- % %
588- % % There is one caveat: the logger (local to the temporary hidden node)
589- % % forwards log messages to the upstream logger (on the upstream node)
590- % % only if the group leader of that message is a remote PID. Because we
591- % % set a local PID, it stops forwarding log messages originating from that
592- % % temporary hidden node. That's why we use `with_group_leader_proxy/2' to
593- % % set the group leader to our proxy only around the use of `erpc'.
594- % %
595- % % That's a lot just to keep logging working while not reveal the upstream
596- % % node to the downstream node...
597- Parent = self (),
598- UpstreamGroupLeader = erlang :group_leader (),
599- ProxyGroupLeader = spawn_link (
600- ? MODULE , group_leader_proxy ,
601- [Parent , UpstreamGroupLeader ]),
602-
603576 % % TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
604577 % % supported version has it.
605- MembersPerNode = with_group_leader_proxy (
606- ProxyGroupLeader ,
607- fun () ->
608- erpc :multicall (Nodes , rabbit_nodes , all , [])
609- end ),
610- query_node_props1 (Nodes , MembersPerNode , [], ProxyGroupLeader ).
611-
612- with_group_leader_proxy (ProxyGroupLeader , Fun ) ->
613- UpstreamGroupLeader = erlang :group_leader (),
614- try
615- true = erlang :group_leader (ProxyGroupLeader , self ()),
616- Fun ()
617- after
618- true = erlang :group_leader (UpstreamGroupLeader , self ())
619- end .
620-
621- group_leader_proxy (Parent , UpstreamGroupLeader ) ->
622- receive
623- stop_proxy ->
624- erlang :unlink (Parent ),
625- Parent ! proxy_stopped ;
626- Message ->
627- UpstreamGroupLeader ! Message ,
628- group_leader_proxy (Parent , UpstreamGroupLeader )
629- end .
578+ MembersPerNode = [try
579+ {ok ,
580+ erpc_call (Node , rabbit_nodes , all , [], FromNode )}
581+ catch
582+ Class :Reason ->
583+ {Class , Reason }
584+ end || Node <- Nodes ],
585+ query_node_props1 (Nodes , MembersPerNode , [], FromNode ).
630586
631587query_node_props1 (
632588 [Node | Nodes ], [{ok , Members } | MembersPerNode ], NodesAndProps ,
633- ProxyGroupLeader ) ->
589+ FromNode ) ->
634590 NodeAndProps = {Node , Members },
635591 NodesAndProps1 = [NodeAndProps | NodesAndProps ],
636- query_node_props1 (Nodes , MembersPerNode , NodesAndProps1 , ProxyGroupLeader );
592+ query_node_props1 (Nodes , MembersPerNode , NodesAndProps1 , FromNode );
637593query_node_props1 (
638- [Node | Nodes ], [{error , _ } = Error | MembersPerNode ], NodesAndProps ,
639- ProxyGroupLeader ) ->
594+ [Node | Nodes ], [{_ , _ } = Error | MembersPerNode ], NodesAndProps ,
595+ FromNode ) ->
640596 % % We consider that an error means the remote node is unreachable or not
641597 % % ready. Therefore, we exclude it from the list of discovered nodes as we
642598 % % won't be able to join it anyway.
@@ -645,55 +601,51 @@ query_node_props1(
645601 " Peer discovery: node '~ts ' excluded from the discovered nodes" ,
646602 [Node , Error , Node ],
647603 #{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
648- query_node_props1 (Nodes , MembersPerNode , NodesAndProps , ProxyGroupLeader );
649- query_node_props1 ([], [], NodesAndProps , ProxyGroupLeader ) ->
604+ query_node_props1 (Nodes , MembersPerNode , NodesAndProps , FromNode );
605+ query_node_props1 ([], [], NodesAndProps , FromNode ) ->
650606 NodesAndProps1 = lists :reverse (NodesAndProps ),
651- query_node_props2 (NodesAndProps1 , [], ProxyGroupLeader ).
652-
653- query_node_props2 ([{Node , Members } | Rest ], NodesAndProps , ProxyGroupLeader ) ->
654- try
655- erpc :call (
656- Node , logger , debug ,
657- [" Peer discovery: temporary hidden node '~ts ' queries properties "
658- " from node '~ts '" , [node (), Node ]]),
659- StartTime = get_node_start_time (Node , microsecond , ProxyGroupLeader ),
660- IsReady = is_node_db_ready (Node , ProxyGroupLeader ),
661- NodeAndProps = {Node , Members , StartTime , IsReady },
662- NodesAndProps1 = [NodeAndProps | NodesAndProps ],
663- query_node_props2 (Rest , NodesAndProps1 , ProxyGroupLeader )
664- catch
665- _ :Error :_ ->
666- % % If one of the erpc calls we use to get the start time fails,
667- % % there is something wrong with the remote node because it
668- % % doesn't depend on RabbitMQ. We exclude it from the discovered
669- % % nodes.
670- ? LOG_DEBUG (
671- " Peer discovery: failed to query start time of node '~ts ': "
672- " ~0tp~n "
673- " Peer discovery: node '~ts ' excluded from the discovered nodes" ,
674- [Node , Error , Node ],
675- #{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
676- query_node_props2 (Rest , NodesAndProps , ProxyGroupLeader )
677- end ;
678- query_node_props2 ([], NodesAndProps , ProxyGroupLeader ) ->
607+ query_node_props2 (NodesAndProps1 , [], FromNode ).
608+
609+ query_node_props2 ([{Node , Members } | Rest ], NodesAndProps , FromNode ) ->
610+ NodesAndProps2 = try
611+ erpc_call (
612+ Node , logger , debug ,
613+ [" Peer discovery: temporary hidden node '~ts ' "
614+ " queries properties from node '~ts '" ,
615+ [node (), Node ]], FromNode ),
616+ StartTime = get_node_start_time (
617+ Node , microsecond , FromNode ),
618+ IsReady = is_node_db_ready (Node , FromNode ),
619+ NodeAndProps = {Node , Members , StartTime , IsReady },
620+ NodesAndProps1 = [NodeAndProps | NodesAndProps ],
621+ NodesAndProps1
622+ catch
623+ _ :Error :_ ->
624+ % % If one of the erpc calls we use to get the
625+ % % start time fails, there is something wrong with
626+ % % the remote node because it doesn't depend on
627+ % % RabbitMQ. We exclude it from the discovered
628+ % % nodes.
629+ ? LOG_DEBUG (
630+ " Peer discovery: failed to query start time "
631+ " + DB readyness of node '~ts ': ~0tp~n "
632+ " Peer discovery: node '~ts ' excluded from the "
633+ " discovered nodes" ,
634+ [Node , Error , Node ],
635+ #{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
636+ NodesAndProps
637+ end ,
638+ query_node_props2 (Rest , NodesAndProps2 , FromNode );
639+ query_node_props2 ([], NodesAndProps , _FromNode ) ->
679640 NodesAndProps1 = lists :reverse (NodesAndProps ),
680- NodesAndProps2 = sort_nodes_and_props (NodesAndProps1 ),
681- % % Wait for the proxy group leader to flush its inbox.
682- ProxyGroupLeader ! stop_proxy ,
683- receive
684- proxy_stopped ->
685- ok
686- after 120_000 ->
687- ok
688- end ,
689641 ? assertEqual ([], nodes ()),
690- ? assert (length (NodesAndProps2 ) =< length (nodes (hidden ))),
691- NodesAndProps2 .
642+ ? assert (length (NodesAndProps1 ) =< length (nodes (hidden ))),
643+ NodesAndProps1 .
692644
693- -spec get_node_start_time (Node , Unit , ProxyGroupLeader ) -> StartTime when
645+ -spec get_node_start_time (Node , Unit , FromNode ) -> StartTime when
694646 Node :: node (),
695647 Unit :: erlang :time_unit (),
696- ProxyGroupLeader :: pid (),
648+ FromNode :: node (),
697649 StartTime :: non_neg_integer ().
698650% % @doc Returns the start time of the given `Node' in `Unit'.
699651% %
@@ -713,52 +665,60 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
713665% %
714666% % @private
715667
716- get_node_start_time (Node , Unit , ProxyGroupLeader ) ->
717- with_group_leader_proxy (
718- ProxyGroupLeader ,
719- fun () ->
720- NativeStartTime = erpc :call (
721- Node , erlang , system_info , [start_time ]),
722- TimeOffset = erpc :call (Node , erlang , time_offset , []),
723- SystemStartTime = NativeStartTime + TimeOffset ,
724- StartTime = erpc :call (
725- Node , erlang , convert_time_unit ,
726- [SystemStartTime , native , Unit ]),
727- StartTime
728- end ).
729-
730- -spec is_node_db_ready (Node , ProxyGroupLeader ) -> IsReady when
668+ get_node_start_time (Node , Unit , FromNode ) ->
669+ NativeStartTime = erpc_call (
670+ Node , erlang , system_info , [start_time ], FromNode ),
671+ TimeOffset = erpc_call (Node , erlang , time_offset , [], FromNode ),
672+ SystemStartTime = NativeStartTime + TimeOffset ,
673+ StartTime = erpc_call (
674+ Node , erlang , convert_time_unit ,
675+ [SystemStartTime , native , Unit ], FromNode ),
676+ StartTime .
677+
678+ -spec is_node_db_ready (Node , FromNode ) -> IsReady when
731679 Node :: node (),
732- ProxyGroupLeader :: pid (),
680+ FromNode :: node (),
733681 IsReady :: boolean () | undefined .
734682% % @doc Returns if the node's DB layer is ready or not.
735683% %
736684% % @private
737685
738- is_node_db_ready (Node , ProxyGroupLeader ) ->
739- % % This code is running from a temporary hidden node. We derive the real
740- % % node interested in the properties from the group leader.
741- UpstreamGroupLeader = erlang :group_leader (),
742- ThisNode = node (UpstreamGroupLeader ),
743- case Node of
744- ThisNode ->
745- % % The current node is running peer discovery, thus way before we
746- % % mark the DB layer as ready. Consider it ready in this case,
747- % % otherwise if the current node is selected, it will loop forever
748- % % waiting for itself to be ready.
749- true ;
750- _ ->
751- with_group_leader_proxy (
752- ProxyGroupLeader ,
753- fun () ->
754- try
755- erpc :call (Node , rabbit_db , is_init_finished , [])
756- catch
757- _ :{exception , undef ,
758- [{rabbit_db , is_init_finished , _ , _ } | _ ]} ->
759- undefined
760- end
761- end )
686+ is_node_db_ready (FromNode , FromNode ) ->
687+ % % The function is called for rhe current node running peer discovery, thus
688+ % % way before we mark the DB layer as ready. Consider it ready in this
689+ % % case, otherwise if the current node is selected, it will loop forever
690+ % % waiting for itself to be ready.
691+ true ;
692+ is_node_db_ready (Node , FromNode ) ->
693+ try
694+ erpc_call (Node , rabbit_db , is_init_finished , [], FromNode )
695+ catch
696+ _ :{exception , undef , [{rabbit_db , is_init_finished , _ , _ } | _ ]} ->
697+ undefined
698+ end .
699+
700+ erpc_call (Node , Mod , Fun , Args , FromNode ) ->
701+ erpc_call (Node , Mod , Fun , Args , FromNode , 10000 ).
702+
703+ erpc_call (Node , Mod , Fun , Args , FromNode , Timeout ) when Timeout >= 0 ->
704+ try
705+ erpc :call (Node , Mod , Fun , Args )
706+ catch
707+ error :{erpc , _ } = Reason :Stacktrace ->
708+ Peer = node (),
709+ _ = catch erpc :call (
710+ FromNode ,
711+ logger , debug ,
712+ [" Peer discovery: temporary hidden node '~ts ' "
713+ " failed to connect to '~ts ': ~0p " ,
714+ [Peer , Node , Reason ]]),
715+ Sleep = 1000 ,
716+ timer :sleep (Sleep ),
717+ NewTimeout = Timeout - Sleep ,
718+ case NewTimeout >= 0 of
719+ true -> erpc_call (Node , Mod , Fun , Args , FromNode , NewTimeout );
720+ false -> erlang :raise (error , Reason , Stacktrace )
721+ end
762722 end .
763723
764724-spec sort_nodes_and_props (NodesAndProps ) ->
0 commit comments