Skip to content

Commit a1e03af

Browse files
author
Jacob Maes
committed
SAMZA-1250: JobRunner.kill doesn't terminate cleanly with YarnJob.
1. The ClientHelper now checks inactive application IDs so it can get status for terminated jobs in addition to running jobs 2. JobRunner.kill() waits for any finish, not just successful finish. 3. A killed job is now considered successful. Author: Jacob Maes <jmaes@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com> Closes apache#152 from jmakes/samza-1250
1 parent 92ae4c6 commit a1e03af

File tree

3 files changed

+38
-19
lines changed

3 files changed

+38
-19
lines changed

samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class JobRunner(config: Config) extends Logging {
137137
info("waiting for job to terminate")
138138

139139
// Wait until the job has terminated, then exit.
140-
Option(job.waitForStatus(SuccessfulFinish, 5000)) match {
140+
Option(job.waitForFinish(5000)) match {
141141
case Some(appStatus) => {
142142
if (SuccessfulFinish.equals(appStatus)) {
143143
info("job terminated successfully - " + appStatus)

samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -236,22 +236,31 @@ class ClientHelper(conf: Configuration) extends Logging {
236236
* @return the active application ids.
237237
*/
238238
def getActiveApplicationIds(appName: String): List[ApplicationId] = {
239-
val getAppsRsp = yarnClient.getApplications
239+
val applicationReports = yarnClient.getApplications
240240

241-
getAppsRsp
241+
applicationReports
242242
.asScala
243-
.filter(appRep => ((
244-
Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
245-
|| New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
246-
)
247-
&& appName.equals(appRep.getName)))
248-
.map(appRep => appRep.getApplicationId)
243+
.filter(applicationReport => isActiveApplication(applicationReport)
244+
&& appName.equals(applicationReport.getName))
245+
.map(applicationReport => applicationReport.getApplicationId)
249246
.toList
250247
}
251248

249+
def getPreviousApplicationIds(appName: String): List[ApplicationId] = {
250+
val applicationReports = yarnClient.getApplications
251+
252+
applicationReports
253+
.asScala
254+
.filter(applicationReport => (!(isActiveApplication(applicationReport))
255+
&& appName.equals(applicationReport.getName)))
256+
.map(applicationReport => applicationReport.getApplicationId)
257+
.toList
258+
}
259+
252260
def status(appId: ApplicationId): Option[ApplicationStatus] = {
253261
val statusResponse = yarnClient.getApplicationReport(appId)
254-
convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
262+
info("Got state: %s, final status: %s".format(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus))
263+
toAppStatus(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
255264
}
256265

257266
def kill(appId: ApplicationId) {
@@ -271,15 +280,20 @@ class ClientHelper(conf: Configuration) extends Logging {
271280
status match {
272281
case Some(status) => getAppsRsp
273282
.asScala
274-
.filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
283+
.filter(appRep => status.equals(toAppStatus(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
275284
.toList
276285
case None => getAppsRsp.asScala.toList
277286
}
278287
}
279288

280-
private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
289+
private def isActiveApplication(applicationReport: ApplicationReport): Boolean = {
290+
(Running.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get)
291+
|| New.equals(toAppStatus(applicationReport.getYarnApplicationState, applicationReport.getFinalApplicationStatus).get))
292+
}
293+
294+
private def toAppStatus(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
281295
(state, status) match {
282-
case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
296+
case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) | (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) => Some(SuccessfulFinish)
283297
case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
284298
case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
285299
case _ => Some(Running)

samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
140140
client.status(appId).getOrElse(null)
141141
case None =>
142142
logger.info("Unable to report status because no applicationId could be found.")
143-
null
143+
ApplicationStatus.SuccessfulFinish
144144
}
145145
}
146146

@@ -171,12 +171,17 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
171171
logger.info("Fetching status from YARN for application name %s" format applicationName)
172172
val applicationIds = client.getActiveApplicationIds(applicationName)
173173

174-
applicationIds.foreach(applicationId => {
175-
logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
176-
})
174+
if (applicationIds.nonEmpty) {
175+
// Only return latest one, because there should only be one.
176+
logger.info("Matching active ids: " + applicationIds.sorted.reverse.toString())
177+
applicationIds.sorted.reverse.headOption
178+
} else {
179+
// Couldn't find an active applicationID. Use one the latest finished ID.
180+
val pastApplicationIds = client.getPreviousApplicationIds(applicationName)
181+
// Don't log because there could be many, many previous app IDs for an application.
182+
pastApplicationIds.sorted.reverse.headOption // Get latest
183+
}
177184

178-
// Only return one, because there should only be one.
179-
applicationIds.headOption
180185
case None =>
181186
None
182187
}

0 commit comments

Comments
 (0)