Parallelism - what do libraries offer, and is there an API aspect to it #776
Replies: 14 comments
-
sklearn now actually uses So in conclusion: just in scikit-learn, this is already a mess, 'only' dealing with 4 types of parallelism (n_jobs processes, n_jobs threads, OpenMP and BLAS). We could have our own 'library' solution, but I don't think anyone of us has the expertise to do this; it's probably pretty hard to actually know how to allocate cores across different ML algorithms. I'm not sure where to even start on that. I'm not sure I understand proposal 2: is that a python library? How would that integrate with the C & Fortran code? If it's a C library: how does it integrate with numba? |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
Some parallelism issues that come to mind. My comments assume built in threading is always available (as opposed to dependency to external threading packages). Even if a package does not want to support threading.. they can just return a warning or error.
At least three types of thread work division (do we want to expose this at a lower level API)?
Is the user allowed to call into the symmetric divider for 1. ? What about 2. - this is similar to nb.prange(). And 3. is used for all reductions. I believe requiring an external threading package creates an unnec. dependency and thus recommend against it. If the user tries to configure a thread in a manner not supported - API can be quiet, warning, or error based on some setting. I think the parallelism API is in another package that we control the API for. We also control all the configuration for it. If our threading API package uses another threading package like TBB or OpenMP --the dependencies are abstracted away. We could just wave our hands and say outside package, outside scope -- problem gone -- but i encourage the team to solve the problem. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the detailed feedback @tdimitri.
This to me is the most important question. I don't have an answer (there's multiple possible answers though); maybe step 1 would be to figure out if an API for this should be included in this RFC. Or if not, how do we move forward in the libraries of interest.
My vote would be no - that's giving too much control to the user, and I can't think of any library of interest that allows this.
A global flag is what PyTorch is introducing right now (still experimental).
Not sure, it combines with the first question - how granular should an API be? Most libraries don't give this level of control. Instead, max number of threads is set, and the rest is up to the library.
Aren't the first and last sentence here contradictory? |
Beta Was this translation helpful? Give feedback.
-
I think doing it optimally is quite hard indeed. However, doing better than we're doing today isn't too difficult. I do think figuring out an API pattern is a lot easier than a separate library, and is already quite valuable. Attempting the library (solution 2) would be ambitious - at some point it needs doing, but I think it's mostly beyond the scope of the current API standard goals. We could describe the approach though, and make sure any developments in libraries converges rather than diverges. |
Beta Was this translation helpful? Give feedback.
-
sounds good
This is for the threading ecosystem. Extra threads just slows down the calculation and consumes a threading resource another process could be using. Too few threads (when allowed) also slows down calculations. Different calculations have different compute times. For instance reduce operations often only read from memory returning a single scalar result (and thus are less memory bound and can use more threads). If might take 3 threads to cap out on add float32, while sqrt caps out at 8 threads. The code is a lookup table (not difficult). I know it helps because it has been tested in riptide. Readers of this issue can play with threads today to see the asymmetry in calculations. To me, it is a small amount of code (LUT) to handle a known issue. The risk is low and the reward is high. Two concepts: how many threads in the thread pool and how many threads needed per given calculation. On a similar note there is also a threading threshold -- for instance if we are adding 30,000 float32 numbers, the cost to release an additional worker thread is often not worth it. However at 50,000 numbers one extra thread may be helpful, but releasing all 8 would be wasteful. The linux futex can dynamically release a thread count in one OS API.
Apologize for the confusion. Requiring an external threading package (not controlled by this API) creates an unnec dependency to a foreign package. Rather if we control the threading package and API, then it is a local dependency (with versions changing in sync) that can be bundled together. |
Beta Was this translation helpful? Give feedback.
-
@aregm posted a nice writeup / summary on threading APIs in another thread, and it feels very relevant to this discussion of parallelism as well: |
Beta Was this translation helpful? Give feedback.
-
@aregm Thank you for sharing that writeup, it contains some great insights into the various trade-offs in these threading layers; I thought the comparison of compute vs. I/O handling is especially interesting. You may be interested to read the Why HPX? writeup for the HPX library, as it covers some of the same issues and how HPX is trying to address them through their programming model. One general approach not included in your report but could be interesting is to implement a lightweight portability layer on top of an OS-integrated threadpool like libdispatch (a.k.a Grand Central Dispatch) on macOS or the Thread Pool API on Windows. These APIs are already meant to provide support for both computational and IO tasks, and their integration with the OS (and scheduler) could help avoid oversubscription issues. There would be some downsides to this approach -- for starters, explicit control over thread/core affinity is more difficult or impossible, and we'd likely want to incorporate task-splitting logic ("auto-balancing") in the portability layer since the queues won't support that natively. It feels like it's worth some experimentation though, if only to provide some data for comparison against the other threading libraries. |
Beta Was this translation helpful? Give feedback.
-
Yes, it's been around a long time. I didn't follow the latest developments there, but I think it is conceptually the same. Basically, there was a thrust in the HPC community that we needed to completely rethink how we wrote software in HPC for the upcoming exascale era. The idea was that we needed to create a vast swarm of tasks which would then migrate to the data to do the computation. In practice, however, it was a pain in the butt to work with. It required massive change to your software that made the code more complex than legacy code (message passing between a fixed group of processes). Intel in collaboration with Rice University contributed Open Compute Runtime to this movement, which then evolved to the Habanero-C https://github.com/habanero-rice/hclib. Such approaches require massive changes, and when you look at the code, it is IMHO ugly and it didn't perform well. MPI was superior in every way, though very low-level. GCD is old and works only on MacOS, and was never intended to work well for NUMA, custom allocators, etc. Windows Thread Pool is a good pool API, but for Windows only. With all of the problems specific to Thread Pool described in the doc. |
Beta Was this translation helpful? Give feedback.
-
In pydata/xarray#7019 we're working on generalising xarray to wrap parallel array types other than dask. It's a WIP, and a lot of the discussion is also happening the pangeo distributed array computing working group meetings, but the general idea is that any array type that conforms to the numpy API standard and offers certain additional methods (e.g. We also need to apply versions of key functions that dask defines (particularly What would be amazing would be to eventually have a standard for "parallel numpy-like arrays" that adds There is interest from the ramba, Arkouda, and cubed libraries so far too. |
Beta Was this translation helpful? Give feedback.
-
@TomNicholas thanks for bringing this up! I think there definitely is interest to figuring out a holistic story here. From the last comments on pydata/xarray#7019 it seems like explicit chunking isn't needed for Ramba and Arkouda. A similar discussion came up in data-apis/consortium-feedback#7, where the Dask model appears to be too restrictive for cuNumeric. It's also worth considering whether a design (whatever that ends up looking like) should distinguish between single-node parallelism and distributed execution. This issue is about the former. I believe the distinction is important. Most array libraries (including NumPy) already deal with multi-threaded execution on a single node. And if that applies, it's inherently nicer than using a Dask-like model. Distributed execution is very different, and is completely out of scope for some libraries (NumPy, CuPy) and done as an add-on or again with a separate library for others (e.g., PyTorch offers |
Beta Was this translation helpful? Give feedback.
-
@emcastillo had built basic support for dense/sparse CuPy arrays under |
Beta Was this translation helpful? Give feedback.
-
Ah okay! Makes sense. From xarray's perspective if all the parallelism happens behind a fully numpy-like API then we don't have to do anything else in order to benefit (e.g. we can already wrap CuPy arrays). Standardising dask and other libraries like cubed which copy dask's API is more complicated. Anyway I just wanted to make everyone aware of the use-case of xarray and the work that's going on there. |
Beta Was this translation helpful? Give feedback.
-
There's some other work going on in Intel that is kind of a successor to ramba. Not sure how much I can say so I'll leave it to Also, there was a comment that leaving multi-node to something like Dask and then doing within the node by some other package is likely optimal. The Ramba experience might say that isn't the case because in Ramba we frequently saw it was best to have multiple workers per node (and here weren't talking big machines with 70+ cores) and then the infrastructure parallelizing across threads only then goes up to 20 or so. If a bunch of Python packages did standardize their options and internally used the same parallelism package then it seems most packages are thread-like. However, those are known not to compose well. |
Beta Was this translation helpful? Give feedback.
-
Several people have expressed a strong interest in talking about and working on (auto-)parallelization. Here is an attempt at summarizing this topic.
Current status
Linear algebra libraries
The main accelerated linear algebra libraries that are in use (for CPU based
code) are OpenBLAS and
MKL.
Both of those libraries auto-parallelize function calls.
OpenBLAS can be built with either its own pthreads-based thread pool, or with
OpenMP support. The number of threads can be controlled with an environment
variable (
OPENBLAS_NUM_THREADS
orOMP_NUM_THREADS
), or from Python viathreadpoolctl. The conda-forge
OpenBLAS package uses OpenMP; the OpenBLAS builds linked into NumPy and SciPy
wheels on PyPI use pthreads.
MKL supports OpenMP and Intel TBB as the threading control mechanisms. The
number of threads can be controlled with an environment variable
(
MKL_NUM_THREADS
orOMP_NUM_THREADS
), or from Python withthreadpoolctl
.NumPy
NumPy does not provide parallelization, with the exception of linear algebra
routines which inherit the auto-parallelization of the underlying library
(OpenBLAS or MKL typically). NumPy does however release the GIL consistently
where it can.
Scikit-learn
Scikit-learn provides a keyword
n_jobs=1
in many estimators and otherfunctions to let users enable parallel execution. This is done via the
joblib library, which provides both
multiprocessing (default) and threading backends that can be selected with a
context manager.
Scikit-learn also contains C and Cython code that uses OpenMP. OpenMP is
enabled in both wheels on PyPI and in conda-forge packages. The number of
threads used can be controlled with the
OMP_NUM_THREADS
environment variable.Scikit-learn has good documentation on parallelism and resource management.
SciPy
SciPy provides a
workers=1
keyword in a (still limited) number of functionsto let users enable parallel execution. It is similar to scikit-learn's
n_jobs
keyword, except that it also accepts amap
-like callable (e.g.multiprocess.Pool.map
to allow using a custom pool. C++ code in SciPy usespthreads; the use of OpenMP was
discussed and rejected.
scipy.linalg
also provides a Cython API for BLAS and LAPACK. This lets otherlibraries use linear algebra routines without having to ship or build against
an accelerated linear algebra library directly. Scikit-learn, statsmodels and
other libraries do this - thereby again inheriting the auto-parallelization
behavior from OpenBLAS or MKL.
Deep learning frameworks
TensorFlow, PyTorch, MXNet and JAX all have auto-parallelization behavior.
Furthermore they provide support for distributed computing (with the exception
of JAX). These frameworks are very performance-focused, and aim to optimally
use all available hardware. They typically allow building with different
backends like NCCL or GLOO for GPU support, and use OpenMP, MPI, gRPC and more.
The advantage these frameworks have is that users typically only use this one
framework for their whole program, so the parallelism used can be optimized
without having to play well with other Python packages that also execute code
in parallel.
Dask
Dask provides parallel arrays, dataframes and machine learning algorithms with
APIs that match NumPy, Pandas and scikit-learn as much as possible. Dask is a
pure Python library and uses blocked algorithms; each block contains a single
NumPy array or Pandas dataframe. Scaling to hundreds of nodes is possible; Dask
is a good solution to obtain distributed arrays. When used as a method to
obtain parallelism on a single node however, it is not very efficient.
Auto-parallelization and nested parallelism
Some libraries, like the deep learning frameworks, do auto-parallelization.
Most non deep learning libraries do not do this. When a single library or
framework is used to execute an end user program, auto-parallelization is
usually a good thing to have. It uses all available hardware resources in an
optimal fashion.
Problems can occur when multiple libraries are involved. What often happens is
oversubscription of resources. For example, if an end user would write code
using scikit-learn with
n_jobs=-1
, and NumPy would auto-parallelizeoperations, then scikit-learn will use
N
processes (on anN
-core machine)and NumPy will use
N
threads per process - leading toN^2
threads beingused. On machines with a large number of cores, the overhead of this quickly
becomes problematic. Given that NumPy uses OpenBLAS or MKL, this problem
already occurs today. For a while Anaconda and Intel shipped a modified NumPy
version that had auto-parallelization behavior for functions other than linear
algebra - and the problem occurred more frequently.
The paper Composable Multi-Threading and Multi-Processing for Numeric
Libraries
from Malakhov et al. contains a good overview with examples and comparisons
between different parallelization methods. It uses NumPy, SciPy, Dask, and
Numba, and uses
multiprocessing
,concurrent.futures
, OpenMP, Intel TBB(Threading Building Blocks), and a custom library SMP (symmetric
multi-processing).
Limitations due to Python package distribution mechanisms
When one wants to use auto-parallelization, it's important to have control over
the complete set of packages that a user gets installed on their machine. That
way one can ensure there's a single linear algebra library installed, and a
single OpenMP runtime is used.
That control over the full set of packages is common in HPC type situations,
where admins need to deal with build and install requirements to make libraries
work well together. Both packages managers (e.g. Apt in Debian) and Conda have
the ability to do this right as well - both because of dependency resolution
and because of a common build infrastructure.
A large fraction of Python users install packages from PyPI with Pip however.
The binary installers (wheels) on PyPI are not built on a common
infrastructure, and because there's no real support for non-Python
dependencies, libraries like OpenMP and OpenBLAS are bundled into the wheels
and installed into end user environments multiple times. This makes it
very difficult to reliably use, e.g., OpenMP. For this reason SciPy uses custom
pthreads thread pools rather than OpenMP.
The need for a better API pattern or library
The default behavior for libraries like NumPy and SciPy given the status of the
ecosystem today should be to be single-threaded, otherwise it composes badly
with multiprocessing, scikit-learn (joblib), Dask, etc. However, there's
room for improvement here. Two things that could help improve the coordination
of parallelization behavior in a stack of Python libraries are:
A common API pattern is the simpler of the two options. It could be a keyword
like
n_jobs
orworkers
that gets used consistently between libraries, or acontext manager to achieve the same level of per-function or per-code-block
control.
A common library would be more powerful and enable auto-parallelization rather
than giving the user control (which is what the API pattern does). From a
performance perspective, having arrays and dataframes auto-parallelize their
functions as much as possible over all cores on a single node, and then letting
a separate library like Dask deal with multi-node coordination, seems optimal.
Introducing a new dependency into multiple libraries at the core of the PyData
ecosystem is a nontrivial exercise however.
The above attempts to summarize the state of affairs today. The topic of
parallelization is largely an implementation rather than an API question,
however there is an API component to it with option (1) above. How to move
forward here is worth discussing.
Note: there's also a lot of room left in NumPy also for optimizing
single-threaded performance. There's ongoing work on making better use of
intrinsics (this is a large effort, ongoing), or using SLEEF for vector math
(discussed in the past, no one is working on it).
Beta Was this translation helpful? Give feedback.
All reactions