Skip to content

Commit caca695

Browse files
committed
Add application attempt window for Spark on Yarn
1 parent 7286988 commit caca695

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

docs/running-on-yarn.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ If you need a reference to the proper location to put log files in the YARN so t
303303
It should be no larger than the global number of max attempts in the YARN configuration.
304304
</td>
305305
</tr>
306+
<tr>
307+
<td><code>spark.yarn.attemptFailuresValidityInterval</code></td>
308+
<td>-1</td>
309+
<td>
310+
Ignore the failure number which happens out the validity interval (in millisecond).
311+
Default value -1 means this validity interval is not enabled.
312+
</td>
313+
</tr>
306314
<tr>
307315
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
308316
<td><code>true</code></td>

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,23 @@ private[spark] class Client(
185185
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
186186
"Cluster's default value will be used.")
187187
}
188+
sparkConf.getOption("spark.yarn.attemptFailuresValidityInterval").map(_.toLong) match {
189+
case Some(v) =>
190+
require(v > 0, "spark.yarn.attemptFailuresValidityInterval should be greater than 0")
191+
try {
192+
val method = appContext.getClass().getMethod(
193+
"setAttemptFailuresValidityInterval", classOf[Long])
194+
method.invoke(appContext, v: java.lang.Long)
195+
} catch {
196+
case e: NoSuchMethodException =>
197+
logWarning("Ignoring spark.yarn.attemptFailuresValidityInterval because the version " +
198+
"of YARN does not support it")
199+
}
200+
case None =>
201+
logDebug("spark.yarn.attemptFailuresValidityInterval is not set, " +
202+
"only use spark.yarn.maxAppAppAttempts to control the application failure attempts")
203+
}
204+
188205
val capability = Records.newRecord(classOf[Resource])
189206
capability.setMemory(args.amMemory + amMemoryOverhead)
190207
capability.setVirtualCores(args.amCores)

0 commit comments

Comments
 (0)