From 30048ac7ac9fc95edc1b936076415dea335848ef Mon Sep 17 00:00:00 2001 From: zhonghaihua <793507405@qq.com> Date: Sun, 17 Jan 2016 20:46:44 +0800 Subject: [PATCH 1/5] initialize executorIdCounter after ApplicationMaster killed for max number of executor failures reached --- .../cluster/CoarseGrainedClusterMessage.scala | 2 ++ .../CoarseGrainedSchedulerBackend.scala | 9 ++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../spark/deploy/yarn/YarnAllocator.scala | 23 ++++++++++++++++++- 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index f3d0d8547677..7ebcefc7a19c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveSparkProps extends CoarseGrainedClusterMessage + case object RetrieveCurrentExecutorIdCounter extends CoarseGrainedClusterMessage + // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b808993aa6cd..f3bda8e2ce75 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -78,6 +78,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] + // The num of current max ExecutorId used to re-register appMaster + private var currentExecutorIdCounter = 0 + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -155,6 +158,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) + if (currentExecutorIdCounter < Integer.parseInt(executorId)) { + currentExecutorIdCounter = Integer.parseInt(executorId) + } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") @@ -184,6 +190,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RetrieveSparkProps => context.reply(sparkProperties) + + case RetrieveCurrentExecutorIdCounter => + context.reply(currentExecutorIdCounter) } // Make fake resource offers on all executors diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index cccc061647a7..8752a72663c6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -292,6 +292,7 @@ private[spark] class ApplicationMaster( historyAddress, securityMgr) + allocator.initExecutorIdCounter() allocator.allocateResources() reporterThread = launchReporterThread() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 11426eb07c7e..b80ad3e9ff60 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -33,11 +33,14 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveCurrentExecutorIdCounter import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -168,6 +171,24 @@ private[yarn] class YarnAllocator( .toSeq } + /** + * Init `executorIdCounter` + */ + def initExecutorIdCounter(): Unit = { + val port = sparkConf.getInt("spark.yarn.am.port", 0) + SparkHadoopUtil.get.runAsSparkUser { () => + val init = RpcEnv.create( + "executorIdCounterInit", + Utils.localHostName, + port, + sparkConf, + new SecurityManager(sparkConf)) + val driver = init.setupEndpointRefByURI(driverUrl) + executorIdCounter = driver.askWithRetry[Integer](RetrieveCurrentExecutorIdCounter) + init.shutdown() + } + } + /** * Request as many executors from the ResourceManager as needed to reach the desired total. If * the requested total is smaller than the current number of running executors, no executors will From fa7d54bbb51b71663e6008f0cded3c929275b41a Mon Sep 17 00:00:00 2001 From: zhonghaihua <793507405@qq.com> Date: Mon, 22 Feb 2016 11:28:16 +0800 Subject: [PATCH 2/5] change name RetrieveCurrentExecutorIdCounter to RetrieveMaxExecutorId and add some comment --- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 12 +++++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 7ebcefc7a19c..27f51c79f32e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -30,7 +30,7 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveSparkProps extends CoarseGrainedClusterMessage - case object RetrieveCurrentExecutorIdCounter extends CoarseGrainedClusterMessage + case object RetrieveMaxExecutorId extends CoarseGrainedClusterMessage // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f3bda8e2ce75..9c5ed860ef1f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,7 +191,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RetrieveSparkProps => context.reply(sparkProperties) - case RetrieveCurrentExecutorIdCounter => + case RetrieveMaxExecutorId => context.reply(currentExecutorIdCounter) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b80ad3e9ff60..0c794d9bd32f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveCurrentExecutorIdCounter +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveMaxExecutorId import org.apache.spark.util.ThreadUtils import org.apache.spark.util.Utils @@ -172,7 +172,13 @@ private[yarn] class YarnAllocator( } /** - * Init `executorIdCounter` + * Init `executorIdCounter`. + * + * when AM restart, `executorIdCounter` will reset to 0. Then the id of new executor will + * start from 1, this will conflict with the executor has already created before. So, we + * should initialize the `executorIdCounter` by getting the max executorId from driver. + * + * @see SPARK-12864 */ def initExecutorIdCounter(): Unit = { val port = sparkConf.getInt("spark.yarn.am.port", 0) @@ -184,7 +190,7 @@ private[yarn] class YarnAllocator( sparkConf, new SecurityManager(sparkConf)) val driver = init.setupEndpointRefByURI(driverUrl) - executorIdCounter = driver.askWithRetry[Integer](RetrieveCurrentExecutorIdCounter) + executorIdCounter = driver.askWithRetry[Integer](RetrieveMaxExecutorId) init.shutdown() } } From 3a1724c19ad3eb9e87d9a5b10007b6c53424aac0 Mon Sep 17 00:00:00 2001 From: zhonghaihua <793507405@qq.com> Date: Thu, 25 Feb 2016 14:19:24 +0800 Subject: [PATCH 3/5] change the way of getting max executorId from driver --- .../CoarseGrainedSchedulerBackend.scala | 5 +-- .../spark/deploy/yarn/ApplicationMaster.scala | 1 - .../spark/deploy/yarn/YarnAllocator.scala | 44 +++++++------------ .../cluster/YarnSchedulerBackend.scala | 3 ++ 4 files changed, 19 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9c5ed860ef1f..19e72a7cdf4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -79,7 +79,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val executorsPendingLossReason = new HashSet[String] // The num of current max ExecutorId used to re-register appMaster - private var currentExecutorIdCounter = 0 + var currentExecutorIdCounter = 0 class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -190,9 +190,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RetrieveSparkProps => context.reply(sparkProperties) - - case RetrieveMaxExecutorId => - context.reply(currentExecutorIdCounter) } // Make fake resource offers on all executors diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8752a72663c6..cccc061647a7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -292,7 +292,6 @@ private[spark] class ApplicationMaster( historyAddress, securityMgr) - allocator.initExecutorIdCounter() allocator.allocateResources() reporterThread = launchReporterThread() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0c794d9bd32f..01a94a2cbb55 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -33,14 +33,12 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveMaxExecutorId import org.apache.spark.util.ThreadUtils -import org.apache.spark.util.Utils /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -84,8 +82,20 @@ private[yarn] class YarnAllocator( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) @volatile private var numExecutorsRunning = 0 - // Used to generate a unique ID per executor - private var executorIdCounter = 0 + + /** + * Used to generate a unique ID per executor + * + * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then + * the id of new executor will start from 1, this will conflict with the executor has + * already created before. So, we should initialize the `executorIdCounter` by getting + * the max executorId from driver. + * + * @see SPARK-12864 + */ + private var executorIdCounter: Int = { + driverRef.askWithRetry[Int](RetrieveMaxExecutorId) + 1 + } @volatile private var numExecutorsFailed = 0 @volatile private var targetNumExecutors = @@ -171,30 +181,6 @@ private[yarn] class YarnAllocator( .toSeq } - /** - * Init `executorIdCounter`. - * - * when AM restart, `executorIdCounter` will reset to 0. Then the id of new executor will - * start from 1, this will conflict with the executor has already created before. So, we - * should initialize the `executorIdCounter` by getting the max executorId from driver. - * - * @see SPARK-12864 - */ - def initExecutorIdCounter(): Unit = { - val port = sparkConf.getInt("spark.yarn.am.port", 0) - SparkHadoopUtil.get.runAsSparkUser { () => - val init = RpcEnv.create( - "executorIdCounterInit", - Utils.localHostName, - port, - sparkConf, - new SecurityManager(sparkConf)) - val driver = init.setupEndpointRefByURI(driverUrl) - executorIdCounter = driver.askWithRetry[Integer](RetrieveMaxExecutorId) - init.shutdown() - } - } - /** * Request as many executors from the ResourceManager as needed to reach the desired total. If * the requested total is smaller than the current number of running executors, no executors will diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1431bceb256a..53d48f678092 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -287,6 +287,9 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to kill executors before the AM has registered!") context.reply(false) } + + case RetrieveMaxExecutorId => + context.reply(currentExecutorIdCounter) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { From 659c5050a2433fc2ddb056895be30b12985481b9 Mon Sep 17 00:00:00 2001 From: zhonghaihua <793507405@qq.com> Date: Thu, 25 Feb 2016 21:40:33 +0800 Subject: [PATCH 4/5] change variables currentExecutorIdCounter to protected --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 19e72a7cdf4f..039b4f5877f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -79,7 +79,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val executorsPendingLossReason = new HashSet[String] // The num of current max ExecutorId used to re-register appMaster - var currentExecutorIdCounter = 0 + protected var currentExecutorIdCounter = 0 class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { From ebe3c7f290929588c822137b8bf27b18fe75393f Mon Sep 17 00:00:00 2001 From: zhonghaihua <793507405@qq.com> Date: Fri, 1 Apr 2016 12:51:15 +0800 Subject: [PATCH 5/5] optimize and add comment annotation --- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 11 +++++++---- .../scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 27f51c79f32e..1d92efab84d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -30,7 +30,7 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveSparkProps extends CoarseGrainedClusterMessage - case object RetrieveMaxExecutorId extends CoarseGrainedClusterMessage + case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 039b4f5877f4..6c06e1909b61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -158,8 +158,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < Integer.parseInt(executorId)) { - currentExecutorIdCounter = Integer.parseInt(executorId) + if (currentExecutorIdCounter < executorId.toInt) { + currentExecutorIdCounter = executorId.toInt } if (numPendingExecutors > 0) { numPendingExecutors -= 1 diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 01a94a2cbb55..8a50c1884e71 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -37,7 +37,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveMaxExecutorId +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.ThreadUtils /** @@ -91,11 +91,14 @@ private[yarn] class YarnAllocator( * already created before. So, we should initialize the `executorIdCounter` by getting * the max executorId from driver. * + * And this situation of executorId conflict is just in yarn client mode, so this is an issue + * in yarn client mode. For more details, can check in jira. + * * @see SPARK-12864 */ - private var executorIdCounter: Int = { - driverRef.askWithRetry[Int](RetrieveMaxExecutorId) + 1 - } + private var executorIdCounter: Int = + driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) + @volatile private var numExecutorsFailed = 0 @volatile private var targetNumExecutors = diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 53d48f678092..77f96501dbb1 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -288,7 +288,7 @@ private[spark] abstract class YarnSchedulerBackend( context.reply(false) } - case RetrieveMaxExecutorId => + case RetrieveLastAllocatedExecutorId => context.reply(currentExecutorIdCounter) }