Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark worker scaling parameters #260

Open
ceresek opened this issue Jun 7, 2021 · 10 comments
Open

Spark worker scaling parameters #260

ceresek opened this issue Jun 7, 2021 · 10 comments
Milestone

Comments

@ceresek
Copy link
Collaborator

ceresek commented Jun 7, 2021

Currently, the performance of the Spark benchmarks does not change with the configured number of executors, except for ALS, which partitions the input data based on the configuration. This may be a relevant note in the Spark documentation:

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

This is quite vague, but may explain why our code (

.config("spark.executor.instances", s"$executorCount")
) does not behave as expected.

@ceresek
Copy link
Collaborator Author

ceresek commented Jun 7, 2021

workers-page-rank

@ceresek
Copy link
Collaborator Author

ceresek commented Jun 7, 2021

... seeing how our Spark instance is hardcoded to run locally (

) I'm not sure what we're trying to do with the worker instance count ?

@lbulej
Copy link
Member

lbulej commented Jun 7, 2021

I was under the impression that the number of executors influenced the default number of partitions created by default when creating RDDs directly from files on disk. It was also one of the reasons I avoided creating RDDs by just preparing data collections in memory and calling parallelize() on them, which would avoid I/O, but use a different class underneath. Maybe it's something we should actually do (or maybe repartition the existing RDDs explicitly).

@lbulej
Copy link
Member

lbulej commented Jun 7, 2021

... seeing how our Spark instance is hardcoded to run locally (


) I'm not sure what we're trying to do with the worker instance count ?

This was in the original code from the very beginning. I probably did a cursory check of the documentation at some point to see what that means, but at this point I only recall that the whole idea was to control the number of executors (thread pools) and the number of threads per executor. If this does not work as expected, then we should revisit this completely.

@ceresek
Copy link
Collaborator Author

ceresek commented Jun 7, 2021

I think in local mode there is always only one (in process) Executor (with one thread pool) created here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/SparkContext.scala#L2873 and here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L62. We can only tweak the number of threads.

Not sure if some other functions (like partitioning) react to the setting of spark.executor.instances even in local mode, my guess is this would be unlikely (since there is also the option to allocate workers dynamically and therefore static value is possibly wrong).

If we aim for single JVM execution, then I think we can drop the executor count thing, as well as few other config bits, and just set the master to local[N] where N is the number of cores used.

@lbulej
Copy link
Member

lbulej commented Jun 7, 2021

The thing with controlling the number of executor instances appears to originate from #145 and #147 and at that time, it seemed to work for @farquet.

Looking at the allowed master URLs, the recommended setting for local master appears to be to set the number of worker threads to the number of cores on the machine, i.e., local[*], which however led to the problem @farquet observed in #145 (on a big machine) in the first place.

I wonder if we should perhaps set the master to local[*] and see if we can limit the level of parallelism by explicitly setting the number of partitions? It's probably not much different from using local[N] with a benchmark-specific N, I was just thinking that if partitioning can control the parallelism, I would not push it into Spark configuration and instead make it part of "job description".

@lbulej
Copy link
Member

lbulej commented Jun 21, 2021

#274 removes the configuration of executor instances (along with the benchmark parameter) as well as explicit input data partitioning (for now).

I was wondering whether it would make sense to have a parameter, e.g., input_partititions_scaling, as a multiplier of spark_thread_count to control the number of input data partitions. Reasonable values of the parameter would be probably limited to 0 (producing 1 partition), 0.5, 1, 1.5 (maybe), and 2.

@lbulej
Copy link
Member

lbulej commented Jun 23, 2021

I have updated the PR and the measurement bundle (plugins work now). For testing, I added als-ml benchmark, which uses ml.ALS instead of mllib.ALS. Both do a conversion to RDD, but the mllib version seems more efficient.

@ceresek
Copy link
Collaborator Author

ceresek commented Jul 1, 2021

Adding a whole bunch of plots showing how performance depends on the number of threads for individual benchmarks, collected on dual socket Xeon Gold 6230 (2 packages, 20 cores per package, 2 threads per core).


threads-als


threads-als-ml


threads-chi-square


threads-dec-tree


threads-gauss-mix


threads-log-regression


threads-movie-lens


threads-naive-bayes


threads-page-rank

@ceresek
Copy link
Collaborator Author

ceresek commented Jul 1, 2021

Assuming other machines behave similarly, I think we should cap the number of threads used as follows (with a warning if more cores are available):

  • 2 or slightly more (exact value not important) - chi-square, gauss-mix
  • 4 or slightly more (exact value not important) - dec-tree
  • 8 - movie-lens
  • 12 - als, als-ml, log-regression, page-rank
  • unlimited - naive-bayes

That is until we tackle the scaling issue more systematically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants