Skip to content

Commit 6983732

Browse files
sryzapwendell
authored andcommitted
SPARK-1183. Don't use "worker" to mean executor
Author: Sandy Ryza <sandy@cloudera.com> Closes #120 from sryza/sandy-spark-1183 and squashes the following commits: 5066a4a [Sandy Ryza] Remove "worker" in a couple comments 0bd1e46 [Sandy Ryza] Remove --am-class from usage bfc8fe0 [Sandy Ryza] Remove am-class from doc and fix yarn-alpha 607539f [Sandy Ryza] Address review comments 74d087a [Sandy Ryza] SPARK-1183. Don't use "worker" to mean executor
1 parent e4e8d8f commit 6983732

21 files changed

+312
-294
lines changed

docs/cluster-overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ object in your main program (called the _driver program_).
1313
Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_
1414
(either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across
1515
applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are
16-
worker processes that run computations and store data for your application.
16+
processes that run computations and store data for your application.
1717
Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to
1818
the executors. Finally, SparkContext sends *tasks* for the executors to run.
1919

docs/graphx-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Chan
135135
structure of the graph are accomplished by producing a new graph with the desired changes. Note
136136
that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies)
137137
are reused in the new graph reducing the cost of this inherently functional data-structure. The
138-
graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with
138+
graph is partitioned across the executors using a range of vertex-partitioning heuristics. As with
139139
RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.
140140

141141
Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the

docs/job-scheduling.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ Resource allocation can be configured as follows, based on the cluster type:
3939
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
4040
and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode.
4141
You should also set `spark.executor.memory` to control the executor memory.
42-
* **YARN:** The `--num-workers` option to the Spark YARN client controls how many workers it will allocate
43-
on the cluster, while `--worker-memory` and `--worker-cores` control the resources per worker.
42+
* **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate
43+
on the cluster, while `--executor-memory` and `--executor-cores` control the resources per executor.
4444

4545
A second option available on Mesos is _dynamic sharing_ of CPU cores. In this mode, each Spark application
4646
still has a fixed and independent memory allocation (set by `spark.executor.memory`), but when the

docs/mllib-classification-regression.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ between the two goals of small loss and small model complexity.
7777

7878
**Distributed Datasets.**
7979
For all currently implemented optimization methods for classification, the data must be
80-
distributed between the worker machines *by examples*. Every machine holds a consecutive block of
81-
the `$n$` example/label pairs `$(\x_i,y_i)$`.
80+
distributed between processes on the worker machines *by examples*. Machines hold consecutive
81+
blocks of the `$n$` example/label pairs `$(\x_i,y_i)$`.
8282
In other words, the input distributed dataset
8383
([RDD](scala-programming-guide.html#resilient-distributed-datasets-rdds)) must be the set of
8484
vectors `$\x_i\in\R^d$`.

docs/python-programming-guide.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ def is_error(line):
4343
errors = logData.filter(is_error)
4444
{% endhighlight %}
4545

46-
PySpark will automatically ship these functions to workers, along with any objects that they reference.
47-
Instances of classes will be serialized and shipped to workers by PySpark, but classes themselves cannot be automatically distributed to workers.
48-
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to workers.
46+
PySpark will automatically ship these functions to executors, along with any objects that they reference.
47+
Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors.
48+
The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors.
4949

5050
In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell.
5151

docs/running-on-yarn.md

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ System Properties:
4141
* `spark.yarn.submit.file.replication`, the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
4242
* `spark.yarn.preserve.staging.files`, set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
4343
* `spark.yarn.scheduler.heartbeat.interval-ms`, the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
44-
* `spark.yarn.max.worker.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
44+
* `spark.yarn.max.executor.failures`, the maximum number of executor failures before failing the application. Default is the number of executors requested times 2 with minimum of 3.
4545

4646
# Launching Spark on YARN
4747

@@ -60,11 +60,10 @@ The command to launch the Spark application on the cluster is as follows:
6060
--jar <YOUR_APP_JAR_FILE> \
6161
--class <APP_MAIN_CLASS> \
6262
--args <APP_MAIN_ARGUMENTS> \
63-
--num-workers <NUMBER_OF_EXECUTORS> \
64-
--master-class <ApplicationMaster_CLASS>
65-
--master-memory <MEMORY_FOR_MASTER> \
66-
--worker-memory <MEMORY_PER_EXECUTOR> \
67-
--worker-cores <CORES_PER_EXECUTOR> \
63+
--num-executors <NUMBER_OF_EXECUTOR_PROCESSES> \
64+
--driver-memory <MEMORY_FOR_ApplicationMaster> \
65+
--executor-memory <MEMORY_PER_EXECUTOR> \
66+
--executor-cores <CORES_PER_EXECUTOR> \
6867
--name <application_name> \
6968
--queue <queue_name> \
7069
--addJars <any_local_files_used_in_SparkContext.addJar> \
@@ -85,10 +84,10 @@ For example:
8584
--jar examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \
8685
--class org.apache.spark.examples.SparkPi \
8786
--args yarn-cluster \
88-
--num-workers 3 \
89-
--master-memory 4g \
90-
--worker-memory 2g \
91-
--worker-cores 1
87+
--num-executors 3 \
88+
--driver-memory 4g \
89+
--executor-memory 2g \
90+
--executor-cores 1
9291

9392
The above starts a YARN client program which starts the default Application Master. Then SparkPi will be run as a child thread of Application Master. The client will periodically poll the Application Master for status updates and display them in the console. The client will exit once your application has finished running. Refer to the "Viewing Logs" section below for how to see driver and executor logs.
9493

@@ -100,12 +99,12 @@ With yarn-client mode, the application will be launched locally, just like runni
10099

101100
Configuration in yarn-client mode:
102101

103-
In order to tune worker cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
102+
In order to tune executor cores/number/memory etc., you need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options.
104103

105-
* `SPARK_WORKER_INSTANCES`, Number of executors to start (Default: 2)
106-
* `SPARK_WORKER_CORES`, Number of cores per executor (Default: 1).
107-
* `SPARK_WORKER_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
108-
* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
104+
* `SPARK_EXECUTOR_INSTANCES`, Number of executors to start (Default: 2)
105+
* `SPARK_EXECUTOR_CORES`, Number of cores per executor (Default: 1).
106+
* `SPARK_EXECUTOR_MEMORY`, Memory per executor (e.g. 1000M, 2G) (Default: 1G)
107+
* `SPARK_DRIVER_MEMORY`, Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
109108
* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark)
110109
* `SPARK_YARN_QUEUE`, The YARN queue to use for allocation requests (Default: 'default')
111110
* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job.

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6161
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
6262
private var isLastAMRetry: Boolean = true
6363

64-
// Default to numWorkers * 2, with minimum of 3
65-
private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
66-
math.max(args.numWorkers * 2, 3))
64+
// Default to numExecutors * 2, with minimum of 3
65+
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
66+
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
6767

6868
private var registered = false
6969

@@ -96,7 +96,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
9696

9797
// Call this to force generation of secret so it gets populated into the
9898
// hadoop UGI. This has to happen before the startUserClass which does a
99-
// doAs in order for the credentials to be passed on to the worker containers.
99+
// doAs in order for the credentials to be passed on to the executor containers.
100100
val securityMgr = new SecurityManager(sparkConf)
101101

102102
// Start the user's JAR
@@ -115,7 +115,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
115115
}
116116

117117
// Allocate all containers
118-
allocateWorkers()
118+
allocateExecutors()
119119

120120
// Wait for the user class to Finish
121121
userThread.join()
@@ -215,7 +215,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
215215
t
216216
}
217217

218-
// this need to happen before allocateWorkers
218+
// this need to happen before allocateExecutors
219219
private def waitForSparkContextInitialized() {
220220
logInfo("Waiting for spark context initialization")
221221
try {
@@ -260,21 +260,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
260260
}
261261
}
262262

263-
private def allocateWorkers() {
263+
private def allocateExecutors() {
264264
try {
265-
logInfo("Allocating " + args.numWorkers + " workers.")
265+
logInfo("Allocating " + args.numExecutors + " executors.")
266266
// Wait until all containers have finished
267267
// TODO: This is a bit ugly. Can we make it nicer?
268268
// TODO: Handle container failure
269269

270270
// Exists the loop if the user thread exits.
271-
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
272-
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
271+
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
272+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
273273
finishApplicationMaster(FinalApplicationStatus.FAILED,
274-
"max number of worker failures reached")
274+
"max number of executor failures reached")
275275
}
276276
yarnAllocator.allocateContainers(
277-
math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
277+
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
278278
ApplicationMaster.incrementAllocatorLoop(1)
279279
Thread.sleep(100)
280280
}
@@ -283,7 +283,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
283283
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
284284
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
285285
}
286-
logInfo("All workers have launched.")
286+
logInfo("All executors have launched.")
287287

288288
// Launch a progress reporter thread, else the app will get killed after expiration
289289
// (def: 10mins) timeout.
@@ -309,15 +309,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
309309
val t = new Thread {
310310
override def run() {
311311
while (userThread.isAlive) {
312-
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
312+
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
313313
finishApplicationMaster(FinalApplicationStatus.FAILED,
314-
"max number of worker failures reached")
314+
"max number of executor failures reached")
315315
}
316-
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
317-
if (missingWorkerCount > 0) {
316+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
317+
if (missingExecutorCount > 0) {
318318
logInfo("Allocating %d containers to make up for (potentially) lost containers".
319-
format(missingWorkerCount))
320-
yarnAllocator.allocateContainers(missingWorkerCount)
319+
format(missingExecutorCount))
320+
yarnAllocator.allocateContainers(missingExecutorCount)
321321
}
322322
else sendProgress()
323323
Thread.sleep(sleepTime)

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala renamed to yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.util.{Utils, AkkaUtils}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3535
import org.apache.spark.scheduler.SplitInfo
3636

37-
class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
37+
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
3838
extends Logging {
3939

4040
def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
@@ -89,7 +89,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
8989
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
9090

9191
if (minimumMemory > 0) {
92-
val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
92+
val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
9393
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
9494

9595
if (numCore > 0) {
@@ -102,7 +102,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
102102
waitForSparkMaster()
103103

104104
// Allocate all containers
105-
allocateWorkers()
105+
allocateExecutors()
106106

107107
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
108108
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
@@ -199,7 +199,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
199199
}
200200

201201

202-
private def allocateWorkers() {
202+
private def allocateExecutors() {
203203

204204
// Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
205205
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
@@ -208,16 +208,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
208208
yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId,
209209
args, preferredNodeLocationData, sparkConf)
210210

211-
logInfo("Allocating " + args.numWorkers + " workers.")
211+
logInfo("Allocating " + args.numExecutors + " executors.")
212212
// Wait until all containers have finished
213213
// TODO: This is a bit ugly. Can we make it nicer?
214214
// TODO: Handle container failure
215-
while ((yarnAllocator.getNumWorkersRunning < args.numWorkers) && (!driverClosed)) {
216-
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
215+
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
216+
yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
217217
Thread.sleep(100)
218218
}
219219

220-
logInfo("All workers have launched.")
220+
logInfo("All executors have launched.")
221221

222222
}
223223

@@ -228,10 +228,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
228228
val t = new Thread {
229229
override def run() {
230230
while (!driverClosed) {
231-
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
232-
if (missingWorkerCount > 0) {
233-
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
234-
yarnAllocator.allocateContainers(missingWorkerCount)
231+
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
232+
if (missingExecutorCount > 0) {
233+
logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
234+
yarnAllocator.allocateContainers(missingExecutorCount)
235235
}
236236
else sendProgress()
237237
Thread.sleep(sleepTime)
@@ -264,9 +264,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
264264
}
265265

266266

267-
object WorkerLauncher {
267+
object ExecutorLauncher {
268268
def main(argStrings: Array[String]) {
269269
val args = new ApplicationMasterArguments(argStrings)
270-
new WorkerLauncher(args).run()
270+
new ExecutorLauncher(args).run()
271271
}
272272
}

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala renamed to yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,24 +38,24 @@ import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
3838
import org.apache.spark.{SparkConf, Logging}
3939

4040

41-
class WorkerRunnable(
41+
class ExecutorRunnable(
4242
container: Container,
4343
conf: Configuration,
4444
spConf: SparkConf,
4545
masterAddress: String,
4646
slaveId: String,
4747
hostname: String,
48-
workerMemory: Int,
49-
workerCores: Int)
50-
extends Runnable with WorkerRunnableUtil with Logging {
48+
executorMemory: Int,
49+
executorCores: Int)
50+
extends Runnable with ExecutorRunnableUtil with Logging {
5151

5252
var rpc: YarnRPC = YarnRPC.create(conf)
5353
var cm: ContainerManager = _
5454
val sparkConf = spConf
5555
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
5656

5757
def run = {
58-
logInfo("Starting Worker Container")
58+
logInfo("Starting Executor Container")
5959
cm = connectToCM
6060
startContainer
6161
}
@@ -81,8 +81,8 @@ class WorkerRunnable(
8181
credentials.writeTokenStorageToStream(dob)
8282
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
8383

84-
val commands = prepareCommand(masterAddress, slaveId, hostname, workerMemory, workerCores)
85-
logInfo("Setting up worker with commands: " + commands)
84+
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores)
85+
logInfo("Setting up executor with commands: " + commands)
8686
ctx.setCommands(commands)
8787

8888
// Send the start request to the ContainerManager

0 commit comments

Comments
 (0)