@@ -227,35 +227,29 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
227227 ApplicationClientProtocol clientRMProxy = null ;
228228 try {
229229 boolean serviceAuthEnabled = getConf ().getBoolean (
230- CommonConfigurationKeys .HADOOP_SECURITY_AUTHORIZATION , false );
230+ CommonConfigurationKeys .HADOOP_SECURITY_AUTHORIZATION , false );
231231 UserGroupInformation realUser = user ;
232232 if (serviceAuthEnabled ) {
233233 realUser = UserGroupInformation .createProxyUser (
234- user .getShortUserName (), UserGroupInformation .getLoginUser ());
234+ user .getShortUserName (), UserGroupInformation .getLoginUser ());
235235 }
236236 clientRMProxy = FederationProxyProviderUtil .createRMProxy (getConf (),
237237 ApplicationClientProtocol .class , subClusterId , realUser );
238238 } catch (Exception e ) {
239239 RouterServerUtil .logAndThrowException (
240- "Unable to create the interface to reach the SubCluster "
241- + subClusterId ,
242- e );
240+ "Unable to create the interface to reach the SubCluster " + subClusterId , e );
243241 }
244-
245242 clientRMProxies .put (subClusterId , clientRMProxy );
246243 return clientRMProxy ;
247244 }
248245
249246 private SubClusterId getRandomActiveSubCluster (
250- Map <SubClusterId , SubClusterInfo > activeSubclusters )
251- throws YarnException {
252-
253- if (activeSubclusters == null || activeSubclusters .size () < 1 ) {
247+ Map <SubClusterId , SubClusterInfo > activeSubClusters ) throws YarnException {
248+ if (activeSubClusters == null || activeSubClusters .size () < 1 ) {
254249 RouterServerUtil .logAndThrowException (
255250 FederationPolicyUtils .NO_ACTIVE_SUBCLUSTER_AVAILABLE , null );
256251 }
257- List <SubClusterId > list = new ArrayList <>(activeSubclusters .keySet ());
258-
252+ List <SubClusterId > list = new ArrayList <>(activeSubClusters .keySet ());
259253 return list .get (rand .nextInt (list .size ()));
260254 }
261255
@@ -280,47 +274,50 @@ private SubClusterId getRandomActiveSubCluster(
280274 public GetNewApplicationResponse getNewApplication (
281275 GetNewApplicationRequest request ) throws YarnException , IOException {
282276
283- long startTime = clock .getTime ();
277+ if (request == null ) {
278+ routerMetrics .incrAppsFailedCreated ();
279+ String errMsg = "Missing getNewApplication request." ;
280+ RouterAuditLogger .logFailure (user .getShortUserName (),
281+ RouterAuditLogger .AuditConstants .GET_NEW_APP , "UNKNOWN" ,
282+ "RouterClientRMService" , errMsg );
283+ RouterServerUtil .logAndThrowException (errMsg , null );
284+ }
284285
286+ long startTime = clock .getTime ();
285287 Map <SubClusterId , SubClusterInfo > subClustersActive =
286288 federationFacade .getSubClusters (true );
287289
290+ GetNewApplicationResponse response = null ;
291+
288292 for (int i = 0 ; i < numSubmitRetries ; ++i ) {
289293 SubClusterId subClusterId = getRandomActiveSubCluster (subClustersActive );
290- LOG .debug (
291- "getNewApplication try #{} on SubCluster {}" , i , subClusterId );
292- ApplicationClientProtocol clientRMProxy =
293- getClientRMProxyForSubCluster (subClusterId );
294- GetNewApplicationResponse response = null ;
294+ LOG .info ("getNewApplication try #{} on SubCluster {}" , i , subClusterId );
295+ ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster (subClusterId );
296+ response = null ;
295297 try {
296298 response = clientRMProxy .getNewApplication (request );
297299 } catch (Exception e ) {
298- LOG .warn ("Unable to create a new ApplicationId in SubCluster "
299- + subClusterId . getId (), e );
300+ LOG .warn ("Unable to create a new ApplicationId in SubCluster {}." , subClusterId . getId (), e );
301+ subClustersActive . remove ( subClusterId );
300302 }
301303
302304 if (response != null ) {
303-
304305 long stopTime = clock .getTime ();
305306 routerMetrics .succeededAppsCreated (stopTime - startTime );
306307 RouterAuditLogger .logSuccess (user .getShortUserName (),
307308 RouterAuditLogger .AuditConstants .GET_NEW_APP ,
308309 "RouterClientRMService" , response .getApplicationId ());
309310 return response ;
310- } else {
311- // Empty response from the ResourceManager.
312- // Blacklist this subcluster for this request.
313- subClustersActive .remove (subClusterId );
314311 }
315-
316312 }
317313
318314 routerMetrics .incrAppsFailedCreated ();
319- String errMsg = "Fail to create a new application." ;
315+ String errMsg = "Failed to create a new application." ;
320316 RouterAuditLogger .logFailure (user .getShortUserName (),
321317 RouterAuditLogger .AuditConstants .GET_NEW_APP , "UNKNOWN" ,
322318 "RouterClientRMService" , errMsg );
323- throw new YarnException (errMsg );
319+ RouterServerUtil .logAndThrowException (errMsg , null );
320+ return response ;
324321 }
325322
326323 /**
@@ -392,32 +389,31 @@ public GetNewApplicationResponse getNewApplication(
392389 public SubmitApplicationResponse submitApplication (
393390 SubmitApplicationRequest request ) throws YarnException , IOException {
394391
395- long startTime = clock .getTime ();
396-
397392 if (request == null || request .getApplicationSubmissionContext () == null
398- || request .getApplicationSubmissionContext ()
399- .getApplicationId () == null ) {
393+ || request .getApplicationSubmissionContext ().getApplicationId () == null ) {
400394 routerMetrics .incrAppsFailedSubmitted ();
401395 String errMsg =
402- "Missing submitApplication request or applicationSubmissionContext "
403- + "information." ;
396+ "Missing submitApplication request or applicationSubmissionContext information." ;
404397 RouterAuditLogger .logFailure (user .getShortUserName (),
405398 RouterAuditLogger .AuditConstants .SUBMIT_NEW_APP , "UNKNOWN" ,
406399 "RouterClientRMService" , errMsg );
407- throw new YarnException (errMsg );
400+ RouterServerUtil . logAndThrowException (errMsg , null );
408401 }
409402
403+ long startTime = clock .getTime ();
404+
410405 ApplicationId applicationId =
411406 request .getApplicationSubmissionContext ().getApplicationId ();
412407
413- List <SubClusterId > blacklist = new ArrayList <SubClusterId >();
408+ List <SubClusterId > blacklist = new ArrayList <>();
414409
415410 for (int i = 0 ; i < numSubmitRetries ; ++i ) {
416411
417412 SubClusterId subClusterId = policyFacade .getHomeSubcluster (
418413 request .getApplicationSubmissionContext (), blacklist );
419- LOG .info ("submitApplication appId {} try #{} on SubCluster {}." , applicationId , i ,
420- subClusterId );
414+
415+ LOG .info ("submitApplication appId {} try #{} on SubCluster {}." ,
416+ applicationId , i , subClusterId );
421417
422418 ApplicationHomeSubCluster appHomeSubCluster =
423419 ApplicationHomeSubCluster .newInstance (applicationId , subClusterId );
@@ -430,32 +426,34 @@ public SubmitApplicationResponse submitApplication(
430426 federationFacade .addApplicationHomeSubCluster (appHomeSubCluster );
431427 } catch (YarnException e ) {
432428 routerMetrics .incrAppsFailedSubmitted ();
433- String message = "Unable to insert the ApplicationId " + applicationId
434- + " into the FederationStateStore" ;
429+ String message =
430+ String .format ("Unable to insert the ApplicationId %s into the FederationStateStore." ,
431+ applicationId );
435432 RouterAuditLogger .logFailure (user .getShortUserName (),
436433 RouterAuditLogger .AuditConstants .SUBMIT_NEW_APP , "UNKNOWN" ,
437434 "RouterClientRMService" , message , applicationId , subClusterId );
438- throw new YarnException (message , e );
435+ RouterServerUtil . logAndThrowException (message , e );
439436 }
440437 } else {
441438 try {
442439 // update the mapping of applicationId and the home subClusterId to
443440 // the new subClusterId we have selected
444441 federationFacade .updateApplicationHomeSubCluster (appHomeSubCluster );
445442 } catch (YarnException e ) {
446- String message = "Unable to update the ApplicationId " + applicationId
447- + " into the FederationStateStore" ;
443+ String message =
444+ String .format ("Unable to update the ApplicationId %s into the FederationStateStore." ,
445+ applicationId );
448446 SubClusterId subClusterIdInStateStore =
449447 federationFacade .getApplicationHomeSubCluster (applicationId );
450448 if (subClusterId == subClusterIdInStateStore ) {
451- LOG .info ("Application {} already submitted on SubCluster {}." , applicationId ,
452- subClusterId );
449+ LOG .info ("Application {} already submitted on SubCluster {}." ,
450+ applicationId , subClusterId );
453451 } else {
454452 routerMetrics .incrAppsFailedSubmitted ();
455453 RouterAuditLogger .logFailure (user .getShortUserName (),
456454 RouterAuditLogger .AuditConstants .SUBMIT_NEW_APP , "UNKNOWN" ,
457455 "RouterClientRMService" , message , applicationId , subClusterId );
458- throw new YarnException (message , e );
456+ RouterServerUtil . logAndThrowException (message , e );
459457 }
460458 }
461459 }
@@ -489,9 +487,8 @@ public SubmitApplicationResponse submitApplication(
489487 }
490488
491489 routerMetrics .incrAppsFailedSubmitted ();
492- String errMsg = "Application "
493- + request .getApplicationSubmissionContext ().getApplicationName ()
494- + " with appId " + applicationId + " failed to be submitted." ;
490+ String errMsg = String .format ("Application %s with appId %s failed to be submitted." ,
491+ request .getApplicationSubmissionContext ().getApplicationName (), applicationId );
495492 RouterAuditLogger .logFailure (user .getShortUserName (),
496493 RouterAuditLogger .AuditConstants .SUBMIT_NEW_APP , "UNKNOWN" ,
497494 "RouterClientRMService" , errMsg , applicationId );
0 commit comments