@@ -282,7 +282,7 @@ public Response createNewApplication(HttpServletRequest hsr)
282282 .entity (e .getLocalizedMessage ()).build ();
283283 }
284284
285- List <SubClusterId > blacklist = new ArrayList <SubClusterId >();
285+ List <SubClusterId > blacklist = new ArrayList <>();
286286
287287 for (int i = 0 ; i < numSubmitRetries ; ++i ) {
288288
@@ -295,7 +295,7 @@ public Response createNewApplication(HttpServletRequest hsr)
295295 .entity (e .getLocalizedMessage ()).build ();
296296 }
297297
298- LOG .debug ("getNewApplication try #{} on SubCluster {}" , i , subClusterId );
298+ LOG .debug ("getNewApplication try #{} on SubCluster {}. " , i , subClusterId );
299299
300300 DefaultRequestInterceptorREST interceptor =
301301 getOrCreateInterceptorForSubCluster (subClusterId ,
@@ -304,7 +304,7 @@ public Response createNewApplication(HttpServletRequest hsr)
304304 try {
305305 response = interceptor .createNewApplication (hsr );
306306 } catch (Exception e ) {
307- LOG .warn ("Unable to create a new ApplicationId in SubCluster {}" ,
307+ LOG .warn ("Unable to create a new ApplicationId in SubCluster {}. " ,
308308 subClusterId .getId (), e );
309309 }
310310
@@ -424,7 +424,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
424424 .build ();
425425 }
426426
427- List <SubClusterId > blacklist = new ArrayList <SubClusterId >();
427+ List <SubClusterId > blacklist = new ArrayList <>();
428428
429429 for (int i = 0 ; i < numSubmitRetries ; ++i ) {
430430
@@ -441,7 +441,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
441441 .entity (e .getLocalizedMessage ())
442442 .build ();
443443 }
444- LOG .info ("submitApplication appId {} try #{} on SubCluster {}" ,
444+ LOG .info ("submitApplication appId {} try #{} on SubCluster {}. " ,
445445 applicationId , i , subClusterId );
446446
447447 ApplicationHomeSubCluster appHomeSubCluster =
@@ -482,7 +482,7 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,
482482 .build ();
483483 }
484484 if (subClusterId == subClusterIdInStateStore ) {
485- LOG .info ("Application {} already submitted on SubCluster {}" ,
485+ LOG .info ("Application {} already submitted on SubCluster {}. " ,
486486 applicationId , subClusterId );
487487 } else {
488488 routerMetrics .incrAppsFailedSubmitted ();
@@ -712,8 +712,7 @@ public AppsInfo call() {
712712
713713 if (rmApps == null ) {
714714 routerMetrics .incrMultipleAppsFailedRetrieved ();
715- LOG .error ("Subcluster {} failed to return appReport." ,
716- info .getSubClusterId ());
715+ LOG .error ("Subcluster {} failed to return appReport." , info .getSubClusterId ());
717716 return null ;
718717 }
719718 return rmApps ;
@@ -873,8 +872,7 @@ private Map<SubClusterInfo, NodeInfo> getNode(
873872 subclusterId , subcluster .getRMWebServiceAddress ());
874873 return interceptor .getNode (nodeId );
875874 } catch (Exception e ) {
876- LOG .error ("Subcluster {} failed to return nodeInfo." ,
877- subclusterId );
875+ LOG .error ("Subcluster {} failed to return nodeInfo." , subclusterId , e );
878876 return null ;
879877 }
880878 });
@@ -953,58 +951,28 @@ private SubClusterInfo getNodeSubcluster(String nodeId)
953951 public NodesInfo getNodes (String states ) {
954952
955953 NodesInfo nodes = new NodesInfo ();
956-
957- final Map <SubClusterId , SubClusterInfo > subClustersActive ;
958954 try {
959- subClustersActive = getActiveSubclusters ();
960- } catch (Exception e ) {
961- LOG .error ("Cannot get nodes: {}" , e .getMessage ());
962- return new NodesInfo ();
963- }
964-
965- // Send the requests in parallel
966- CompletionService <NodesInfo > compSvc =
967- new ExecutorCompletionService <NodesInfo >(this .threadpool );
968-
969- for (final SubClusterInfo info : subClustersActive .values ()) {
970- compSvc .submit (new Callable <NodesInfo >() {
971- @ Override
972- public NodesInfo call () {
973- DefaultRequestInterceptorREST interceptor =
974- getOrCreateInterceptorForSubCluster (
975- info .getSubClusterId (), info .getRMWebServiceAddress ());
976- try {
977- NodesInfo nodesInfo = interceptor .getNodes (states );
978- return nodesInfo ;
979- } catch (Exception e ) {
980- LOG .error ("Subcluster {} failed to return nodesInfo." ,
981- info .getSubClusterId ());
982- return null ;
983- }
984- }
955+ Map <SubClusterId , SubClusterInfo > subClustersActive = getActiveSubclusters ();
956+ Class [] argsClasses = new Class []{String .class };
957+ Object [] args = new Object []{states };
958+ ClientMethod remoteMethod = new ClientMethod ("getNodes" , argsClasses , args );
959+ Map <SubClusterInfo , NodesInfo > nodesMap =
960+ invokeConcurrent (subClustersActive .values (), remoteMethod , NodesInfo .class );
961+ nodesMap .values ().stream ().forEach (nodesInfo -> {
962+ nodes .addAll (nodesInfo .getNodes ());
985963 });
986- }
987-
988- // Collect all the responses in parallel
989-
990- for (int i = 0 ; i < subClustersActive .size (); i ++) {
991- try {
992- Future <NodesInfo > future = compSvc .take ();
993- NodesInfo nodesResponse = future .get ();
994-
995- if (nodesResponse != null ) {
996- nodes .addAll (nodesResponse .getNodes ());
997- }
998- } catch (Throwable e ) {
999- LOG .warn ("Failed to get nodes report " , e );
1000- }
964+ } catch (NotFoundException e ) {
965+ LOG .error ("Get all active sub cluster(s) error." , e );
966+ } catch (YarnException e ) {
967+ LOG .error ("getNodes error." , e );
968+ } catch (IOException e ) {
969+ LOG .error ("getNodes error with io error." , e );
1001970 }
1002971
1003972 // Delete duplicate from all the node reports got from all the available
1004973 // YARN RMs. Nodes can be moved from one subclusters to another. In this
1005974 // operation they result LOST/RUNNING in the previous SubCluster and
1006975 // NEW/RUNNING in the new one.
1007-
1008976 return RouterWebServiceUtil .deleteDuplicateNodesInfo (nodes .getNodes ());
1009977 }
1010978
@@ -1172,7 +1140,22 @@ public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
11721140 @ Override
11731141 public NodeToLabelsInfo getNodeToLabels (HttpServletRequest hsr )
11741142 throws IOException {
1175- throw new NotImplementedException ("Code is not implemented" );
1143+ try {
1144+ Map <SubClusterId , SubClusterInfo > subClustersActive = getActiveSubclusters ();
1145+ final HttpServletRequest hsrCopy = clone (hsr );
1146+ Class [] argsClasses = new Class []{HttpServletRequest .class };
1147+ Object [] args = new Object []{hsrCopy };
1148+ ClientMethod remoteMethod = new ClientMethod ("getNodeToLabels" , argsClasses , args );
1149+ Map <SubClusterInfo , NodeToLabelsInfo > nodeToLabelsInfoMap =
1150+ invokeConcurrent (subClustersActive .values (), remoteMethod , NodeToLabelsInfo .class );
1151+ return RouterWebServiceUtil .mergeNodeToLabels (nodeToLabelsInfoMap );
1152+ } catch (NotFoundException e ) {
1153+ LOG .error ("Get all active sub cluster(s) error." , e );
1154+ throw new IOException ("Get all active sub cluster(s) error." , e );
1155+ } catch (YarnException e ) {
1156+ LOG .error ("getNodeToLabels error." , e );
1157+ throw new IOException ("getNodeToLabels error." , e );
1158+ }
11761159 }
11771160
11781161 @ Override
@@ -1395,7 +1378,7 @@ public void shutdown() {
13951378 }
13961379
13971380 private <R > Map <SubClusterInfo , R > invokeConcurrent (Collection <SubClusterInfo > clusterIds ,
1398- ClientMethod request , Class <R > clazz ) {
1381+ ClientMethod request , Class <R > clazz ) throws YarnException {
13991382
14001383 Map <SubClusterInfo , R > results = new HashMap <>();
14011384
@@ -1413,8 +1396,8 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
14131396 R ret = clazz .cast (retObj );
14141397 return ret ;
14151398 } catch (Exception e ) {
1416- LOG .error ("SubCluster {} failed to call {} method." , info . getSubClusterId () ,
1417- request .getMethodName (), e );
1399+ LOG .error ("SubCluster %s failed to call %s method." ,
1400+ info . getSubClusterId (), request .getMethodName (), e );
14181401 return null ;
14191402 }
14201403 });
@@ -1428,7 +1411,10 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
14281411 results .put (clusterId , response );
14291412 }
14301413 } catch (Throwable e ) {
1431- LOG .warn ("SubCluster {} failed to {} report." , clusterId , request .getMethodName (), e );
1414+ String msg = String .format ("SubCluster %s failed to %s report." ,
1415+ clusterId , request .getMethodName ());
1416+ LOG .warn (msg , e );
1417+ throw new YarnRuntimeException (msg , e );
14321418 }
14331419 });
14341420 return results ;
0 commit comments