@@ -952,56 +952,29 @@ public NodesInfo getNodes(String states) {
952952
953953 NodesInfo nodes = new NodesInfo ();
954954
955- final Map <SubClusterId , SubClusterInfo > subClustersActive ;
956955 try {
957- subClustersActive = getActiveSubclusters ();
958- } catch (Exception e ) {
959- LOG .error ("Cannot get nodes." , e );
960- return new NodesInfo ();
961- }
962-
963- // Send the requests in parallel
964- CompletionService <NodesInfo > compSvc =
965- new ExecutorCompletionService <NodesInfo >(this .threadpool );
966-
967- for (final SubClusterInfo info : subClustersActive .values ()) {
968- compSvc .submit (new Callable <NodesInfo >() {
969- @ Override
970- public NodesInfo call () {
971- DefaultRequestInterceptorREST interceptor =
972- getOrCreateInterceptorForSubCluster (
973- info .getSubClusterId (), info .getRMWebServiceAddress ());
974- try {
975- NodesInfo nodesInfo = interceptor .getNodes (states );
976- return nodesInfo ;
977- } catch (Exception e ) {
978- LOG .error ("Subcluster {} failed to return nodesInfo." , info .getSubClusterId (), e );
979- return null ;
980- }
981- }
956+ Map <SubClusterId , SubClusterInfo > subClustersActive = getActiveSubclusters ();
957+ Class [] argsClasses = new Class []{String .class };
958+ Object [] args = new Object []{states };
959+ ClientMethod remoteMethod = new ClientMethod ("getNodes" , argsClasses , args );
960+ Map <SubClusterInfo , NodesInfo > nodesMap =
961+ invokeConcurrent (subClustersActive .values (), remoteMethod , NodesInfo .class );
962+ nodesMap .values ().stream ().forEach (nodesInfo -> {
963+ nodes .addAll (nodesInfo .getNodes ());
982964 });
983- }
984-
985- // Collect all the responses in parallel
986-
987- for (int i = 0 ; i < subClustersActive .size (); i ++) {
988- try {
989- Future <NodesInfo > future = compSvc .take ();
990- NodesInfo nodesResponse = future .get ();
991965
992- if ( nodesResponse != null ) {
993- nodes . addAll ( nodesResponse . getNodes () );
994- }
995- } catch ( Throwable e ) {
996- LOG . warn ( "Failed to get nodes report " , e );
997- }
966+ } catch ( NotFoundException e ) {
967+ LOG . error ( "Get all active sub cluster(s) error." , e );
968+ } catch ( YarnException e ) {
969+ LOG . error ( "getNodes error." , e );
970+ } catch ( IOException e ) {
971+ LOG . error ( "getNodes error." , e );
998972 }
999973
1000974 // Delete duplicate from all the node reports got from all the available
1001975 // YARN RMs. Nodes can be moved from one subclusters to another. In this
1002976 // operation they result LOST/RUNNING in the previous SubCluster and
1003977 // NEW/RUNNING in the new one.
1004-
1005978 return RouterWebServiceUtil .deleteDuplicateNodesInfo (nodes .getNodes ());
1006979 }
1007980
0 commit comments