@@ -594,16 +594,50 @@ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
594594 .getTaskReports (jobID , taskType );
595595 }
596596
597+ private void killUnFinishedApplication (ApplicationId appId )
598+ throws IOException {
599+ ApplicationReport application = null ;
600+ try {
601+ application = resMgrDelegate .getApplicationReport (appId );
602+ } catch (YarnException e ) {
603+ throw new IOException (e );
604+ }
605+ if (application .getYarnApplicationState () == YarnApplicationState .FINISHED
606+ || application .getYarnApplicationState () == YarnApplicationState .FAILED
607+ || application .getYarnApplicationState () == YarnApplicationState .KILLED ) {
608+ return ;
609+ }
610+ killApplication (appId );
611+ }
612+
613+ private void killApplication (ApplicationId appId ) throws IOException {
614+ try {
615+ resMgrDelegate .killApplication (appId );
616+ } catch (YarnException e ) {
617+ throw new IOException (e );
618+ }
619+ }
620+
621+ private boolean isJobInTerminalState (JobStatus status ) {
622+ return status .getState () == JobStatus .State .KILLED
623+ || status .getState () == JobStatus .State .FAILED
624+ || status .getState () == JobStatus .State .SUCCEEDED ;
625+ }
626+
597627 @ Override
598628 public void killJob (JobID arg0 ) throws IOException , InterruptedException {
599629 /* check if the status is not running, if not send kill to RM */
600630 JobStatus status = clientCache .getClient (arg0 ).getJobStatus (arg0 );
631+ ApplicationId appId = TypeConverter .toYarn (arg0 ).getAppId ();
632+
633+ // get status from RM and return
634+ if (status == null ) {
635+ killUnFinishedApplication (appId );
636+ return ;
637+ }
638+
601639 if (status .getState () != JobStatus .State .RUNNING ) {
602- try {
603- resMgrDelegate .killApplication (TypeConverter .toYarn (arg0 ).getAppId ());
604- } catch (YarnException e ) {
605- throw new IOException (e );
606- }
640+ killApplication (appId );
607641 return ;
608642 }
609643
@@ -612,26 +646,26 @@ public void killJob(JobID arg0) throws IOException, InterruptedException {
612646 clientCache .getClient (arg0 ).killJob (arg0 );
613647 long currentTimeMillis = System .currentTimeMillis ();
614648 long timeKillIssued = currentTimeMillis ;
615- while ((currentTimeMillis < timeKillIssued + 10000L ) && (status .getState ()
616- != JobStatus .State .KILLED )) {
617- try {
618- Thread .sleep (1000L );
619- } catch (InterruptedException ie ) {
620- /** interrupted, just break */
621- break ;
622- }
623- currentTimeMillis = System .currentTimeMillis ();
624- status = clientCache .getClient (arg0 ).getJobStatus (arg0 );
649+ while ((currentTimeMillis < timeKillIssued + 10000L )
650+ && !isJobInTerminalState (status )) {
651+ try {
652+ Thread .sleep (1000L );
653+ } catch (InterruptedException ie ) {
654+ /** interrupted, just break */
655+ break ;
656+ }
657+ currentTimeMillis = System .currentTimeMillis ();
658+ status = clientCache .getClient (arg0 ).getJobStatus (arg0 );
659+ if (status == null ) {
660+ killUnFinishedApplication (appId );
661+ return ;
662+ }
625663 }
626664 } catch (IOException io ) {
627665 LOG .debug ("Error when checking for application status" , io );
628666 }
629- if (status .getState () != JobStatus .State .KILLED ) {
630- try {
631- resMgrDelegate .killApplication (TypeConverter .toYarn (arg0 ).getAppId ());
632- } catch (YarnException e ) {
633- throw new IOException (e );
634- }
667+ if (status != null && !isJobInTerminalState (status )) {
668+ killApplication (appId );
635669 }
636670 }
637671
0 commit comments