diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 0e25ccf512c02..6e0f3e65f6525 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -303,6 +303,14 @@ If you need a reference to the proper location to put log files in the YARN so t
It should be no larger than the global number of max attempts in the YARN configuration.
+
+ spark.yarn.attemptFailuresValidityInterval |
+ -1 |
+
+ Ignore the failure number which happens out the validity interval (in millisecond).
+ Default value -1 means this validity interval is not enabled.
+ |
+
spark.yarn.submit.waitAppCompletion |
true |
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a2c4bc2f5480b..9e997f4239db7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -185,6 +185,23 @@ private[spark] class Client(
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
"Cluster's default value will be used.")
}
+ sparkConf.getOption("spark.yarn.attemptFailuresValidityInterval").map(_.toLong) match {
+ case Some(v) =>
+ require(v > 0, "spark.yarn.attemptFailuresValidityInterval should be greater than 0")
+ try {
+ val method = appContext.getClass().getMethod(
+ "setAttemptFailuresValidityInterval", classOf[Long])
+ method.invoke(appContext, v: java.lang.Long)
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning("Ignoring spark.yarn.attemptFailuresValidityInterval because the version " +
+ "of YARN does not support it")
+ }
+ case None =>
+ logDebug("spark.yarn.attemptFailuresValidityInterval is not set, " +
+ "only use spark.yarn.maxAppAppAttempts to control the application failure attempts")
+ }
+
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)