Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple cores per process/thread #181

Closed
dgasmith opened this issue Oct 23, 2018 · 65 comments
Closed

Multiple cores per process/thread #181

dgasmith opened this issue Oct 23, 2018 · 65 comments
Labels
enhancement New feature or request usage question Question about using jobqueue

Comments

@dgasmith
Copy link

We have a use case where we would like to dask.distributed to parallelize a python-bound C++ program that would work best if it could consume 8-32 threads depending on the problem size and will manage threading internally. Normally, a Dask worker is run on a node that has 32 cores with the worker using 2 processes at 1 thread each so that we can give each process 16 cores.

Looking at the code currently, it seems that nthreads = ncores / nprocesses without exceptions, is there a canonical way to change this so that we can orchestrate our normal Dask worker operation with dask-jobqueue?

@guillaumeeb
Copy link
Member

Funnily enough, I think we just had a conversation on this point in #179.

As you point out cores/processes kwargs are used both to configure dask-worker command, and to reserve jobqueue system resources. However, the second part is just by default guessed from this kwargs. In every JobQueueCluster implementation, there should be another kwarg allowing you to force different resource reservation on the underlying scheduling system. For example with PBSCluster:

cluster = PBSCluster(cores=2, processes=2, memory='100GB', 
                     resource_spec='select=1:ncpus=32:mem=100GB')

or LSFCluster:

cluster = LSFCluster(cores=2, processes=2, ncpus=32)

It seems we should better document this though, and this seems useful, we were previously thinking of removing this possibility...

@dgasmith
Copy link
Author

saw on LSF you could select cpus, but I wasn't aware of the PBSCluster abilities to write out resource specs. To make things simpler, would it be possible to set the number of threads manually so that you would have nthreads = None which would default to current behavior.

It is worth considering this in a larger context where one might wish to start heterogeneous workers with labels where expanding def start_workers(self, n=1): to allow a user to override the internal settings would be very useful. Very happy to code and document here, mostly trying to figure out the current bounds of the project.

@guillaumeeb
Copy link
Member

I think there is something to do with the cores/processes/threads/ncpus/ressources_spec.. At least some standardisation between JobQueueCluster implementations. I'm not sure how it should look like though, we previously removed the use of threads in order to replace it by cores. But maybe we should remove ncpus and alike and add backs threads...

It would be more than welcome if you could propose some simple design choices here so we can discuss it, and then if we agree on something, propose a PR!

For the second part on the larger context, I'm not sure I get your point, but maybe it has another scope and should be discussed in another issue?

@dgasmith
Copy link
Author

I think my proposal would be similar to the discussed. Scanning through core.py I see the following points:

JobQueueCluster(..., threads_per_process=None, ...):

    self.threads_per_process = threads_per_process or int(self.worker_cores / self.worker_processes)

...

    @property
    def worker_threads(self):
        return self.threads_per_process

    def __repr__(self):
        running_workers = self._count_active_workers()
        running_cores = running_workers * self.worker_threads # maybe changes this to self.cores?

I am not entirely sure what the mental model you want to convey so this project remains relatively easy for new users while being mutable for complex scenarios. But the above while cover our primary use case.

I will hold off the second part for later.

@guillaumeeb
Copy link
Member

Ping @mrocklin @jhamman @willirath @lesteve for opinion on how to best handle this.

running_cores = running_workers * self.worker_threads # maybe changes this to self.cores?

No it's thread, or maybe cores divided per process. We've got one running worker per worker process actually.

@lesteve
Copy link
Member

lesteve commented Oct 24, 2018

It feels like what you want is an advanced use case, so I don't think resource_spec should be automatically inferred from cores and processes. I would recommend:

  • setting cores and processes in terms of how many cores/processes/threads you want each dask-worker to have
  • specifying explicitly your scheduler-specific resource_spec to say that each job would get the number of threads you want your C++ code to use

@louisabraham
Copy link
Contributor

louisabraham commented Oct 24, 2018

Replying to the above : I think resource_spec is by worker, not by task and it is specific to some queue systems.

IMO we have three levels:

  • the real number of cores that will be allocated, which should always be specified by design
  • the number of processes, ie the number of GIL bound tasks that can be executed, and the number of threads per process, ie how many tasks each process can execute
  • the number of threads by task

The problem is that the number of cores is supposed to be the number of tasks in our current model, ie threads_by_task=1.

I propose two alternatives:

  • a new parameter threads_by_task that divides threads
  • a boolean one_task_per_process defaulting to False

I suppose that when handling parallelism manually, one doesn't want to share a process with another task.
one_task_per_process=True sets threads=1 in the dask worker options and still allocates cores/processes threads per process. Since we now have only one task per process, the handcrafted parallelism can allocate up to threads without lying to the scheduler.

I would be more in favor of a boolean than threads_by_task that adds complexity to the system and might break existing parallelism mechanisms if the handcrafted code is executed by a thread (not sure about that).

@lesteve
Copy link
Member

lesteve commented Oct 24, 2018

@louisabraham it is not completely straightforward to follow what you are saying, and from what I understand it feels like dask.distributed is the right level to address some of your points (for example, I guess by task you mean a function that is used in client.submit right?)

Granted there are plenty of things that could be added/improved in dask-jobqueue and suggestions are always more than welcome! I would suggest we start from precise use cases like the one in the original post and see how we can tackle them.

For anything non-trivial, my opinion that you will have to specify resource_spec, and so you will have to understand a bit the underlying scheduler specificities. Given the variability of the different schedulers (and even sometimes the variability of the installs for a given scheduler), I would be a bit hesitant about providing a layer that allows to forget about scheduler-specific things.

My personal opinion is that being able to easily create heterogeneous clusters would be the right direction in which to extend (but even then maybe some of this work may happen in dask.distributed).

@louisabraham
Copy link
Contributor

louisabraham commented Oct 24, 2018

it feels like dask.distributed is the right level to address some of your points

I didn't intend it this way.

by task you mean a function that is used in client.submit

Yes.

I would suggest we start from precise use cases like the one in the original post and see how we can tackle them.

Totally agree. To achieve what the author wants with my proposal, the parameters are

cluster = ANYCluster(cores=32, processes=2, one_task_per_process=True)

What disturbs me in @guillaumeeb's immediate fix (it is the only way to do it currently,
which is the real problem)

PBSCluster:

cluster = PBSCluster(cores=2, processes=2, memory='100GB', 
                     resource_spec='select=1:ncpus=32:mem=100GB')

or LSFCluster:

cluster = LSFCluster(cores=2, processes=2, ncpus=32)

is that there is no variable that can track the total number of CPUs allocated and that the code is scheduler specific.

On the other hand, cores totals to the total number of tasks that can be executed concurrently.
If you don't want to break this, threads_by_task would allocate threads_by_task*cores CPUs.

For the OP's problem:

cluster = ANYCluster(cores=2, processes=2, threads_by_task=16)

I think the question we want to ask now is: should cores be the number of CPUs or the number of tasks that can be executed at the same time?

@ocaisa
Copy link
Member

ocaisa commented Oct 24, 2018

We have this use case of heterogeneous clusters at our site. We have modular supercomputing, with pure CPU nodes, nodes with GPUs and KNL nodes all available.

While this is heterogeneous architectures, we also have the use case of heterogeneous resources within clusters. We want the ability to self-manage some of the resources (see #84 (comment) for the use case). At present we simply use job_extra to override the resource request that dask-jobqueue makes

        job_extra=[
            '-N %s' % nodes, 
            '--tasks-per-node=%s' % tasks_per_node,
            '--cpus-per-task=%s' % cpus_per_task,
            # Override the default -n 1 value with the calculated value
            '-n %s' % total_tasks
        ],

This results in a submission script like

#SBATCH -J mpi_tasks
#SBATCH -p booster
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=12G
#SBATCH -t 00:19:00
#SBATCH -N 2
#SBATCH --tasks-per-node=68
#SBATCH --cpus-per-task=1
#SBATCH -n 136

I don't know if the same approach would work with other schedulers.

@dgasmith
Copy link
Author

It seem ultimately that you do not want users touching resource_spec or similar where possible as it invalidates all of your internal tracking in addition to being custom per scheduler.

I think the question we want to ask now is: should cores be the number of CPUs or the number of tasks that can be executed at the same time?

I would be hesitant to override the "core" definition with supercomputers as it is well defined there, where "tasks" is well defined in Dask.

Heterogeneous clusters would difficult to tackle in a generic way I think. It might be worth adding a start_worker() that allows full specification of the dask-worker CLI as intermediate step.

PS: Hey @ocaisa!

@louisabraham
Copy link
Contributor

Ok, then my proposal is the following:

A threads_by_task defaulting to 1.

Each dask worker receives the parameters processes and threads like now.

Jobqueue allocates cores * threads_by_task CPUs where cores = processes * threads.

I think heterogeneous clusters are out of the topic.

@guillaumeeb
Copy link
Member

I think heterogeneous clusters are out of the topic.

Yep I agree, we should address this in another issue if this is something that interests people. See #103 already open.

For the other part of the rich discussion here, it's a complicated desgin choice, but I thin I'm more in favor of avoiding jobqueue system specific kwargs as much as possible. I think I would better like what @dgasmith proposes: having three kwargs:

  • cores: which is the ncpus reserved in jobqueue system, and the default total cores used by dask-worker command. e.g. worker_threads = cores / processes.
  • processes: default to one, only useful for dask-worker command.
  • threads_per_process or something like that: default to none, only useful for dask-worker command.

For job scheduler specificities, we would rely only on job_extra, as @ocaisa is doing. This would mean removing resources_spec from PBSCluster or ncpus from SlurmCluster.

I've two remaining concerns:

  • How should we handle the memory part, which may not be expressed identically between dask and jobqueue systems, can we have only one parameter easilly?
  • Would we be able to override resources_spec for example in job_extra if really needed, e.g. if there are for example two #PBS -l select=1:ncpus=24 in the job script, would the second be prioritize over the first? Or should we put job_extra first depending on the scheduler?

Any thought?

@louisabraham
Copy link
Contributor

@guillaumeeb I'm not sure to follow.

In your proposal, isn't cores = processes * threads_per_process?

Maybe your threads_per_process is the same as my threads_by_task, aka the number of threads that execute one task.

@guillaumeeb
Copy link
Member

In your proposal, isn't cores = processes * threads_per_process?

Not always, no. In what I propose, your threads_by_task would be cores / (processes * threads_per_process). Or to put it differently and perhaps more clearer: cores = processes * threads_per_processes * threads_per_tasks.

In my opinion, threads_per_tasks is application specific, and should not appear in dask-jobqueue. dask-jobqueue books scheduler resources, and then start dask distributed with a given set of parameters. Possibly less dask threads than reserved cores, but in some cases we've also seen more threads than reserved cores.

@louisabraham
Copy link
Contributor

louisabraham commented Oct 27, 2018

In my opinion, threads_per_tasks is application specific, and should not appear in dask-jobqueue.

I see and approve.

threads_per_process is a bit confusing however, because threads could mean anything.
worker_threads_per_process is better IMO but too long.

For the sake of clarity, I think we could do:

with nthreads being "Number of threads per process" as defined by dask-worker.

This way, it becomes way simpler to explain what dask-jobqueue does: allocate cores CPUs and launch a dask-worker with the parameters nprocs and nthreads.

@dgasmith
Copy link
Author

+1 for the language on cores, nprocs, and nthreads which would match dask-worker. The cognitive model that a job/block/submission is just a single dask-worker underneath would be easier for people familiar with Dask to handle. Memory in this case would be memory per submission.

Agreed on allowing the flexibility of threads * processes either being less than, equal to, or more than assigned cores is good. For jobs that do a lot of pure python hyperthreading works very well and understanding how many cores a given process (in the C++ threading case) is beyond the scope of Dask, but still can be supported in this model. The assumption of threads * processes = ncores is a very good default however.

I would note that in an ideal world the ability to convey the dask-workers settings such as nprocs, nthreads, and ncores to the called function would be good so that the function could scale its own threads accordingly. I do not think this is technically feasible at the moment however.

@louisabraham
Copy link
Contributor

louisabraham commented Oct 27, 2018

I would note that in an ideal world the ability to convey the dask-workers settings such as nprocs, nthreads, and ncores to the called function would be good so that the function could scale its own threads accordingly. I do not think this is technically feasible at the moment however.

I'm not totally familiar with how dask workers work, but from my understanding this kind of information could be stored in os.environ.

@guillaumeeb
Copy link
Member

I'd like to have some opinion of other maintainer here, such as @lesteve (again now that this is clearer), @jhamman and @mrocklin which introduced the cores notion removing the threads one (#55) and had also questions on specific scheduler resources kwargs (#82).

@guillaumeeb guillaumeeb added the enhancement New feature or request label Oct 27, 2018
@guillaumeeb
Copy link
Member

@dgasmith (or @louisabraham): would you provide some PR to illustrate this? This would probably be more easy for other to give opinion.

+1 for the language on cores, nprocs, and nthreads which would match dask-worker

Only problem that I see with that is that we would replace processes by nprocs, I don't like changing kwarg like this, even if jobqueue is still a young module...

@louisabraham
Copy link
Contributor

@guillaumeeb Ok I can do this.

Only problem that I see with that is that we would replace processes by nprocs, I don't like changing kwarg like this, even if jobqueue is still a young module...

We can put a warning and use processes as nprocs for backward compatibility.

@guillaumeeb
Copy link
Member

Would we be able to override resources_spec for example in job_extra if really needed, e.g. if there are for example two #PBS -l select=1:ncpus=24 in the job script, would the second be prioritize over the first? Or should we put job_extra first depending on the scheduler?

I've made the test with PBS, it takes the first defined value. So we might need (but in another issue/PR) to put job_extra settings first in the job_script.

@lesteve
Copy link
Member

lesteve commented Nov 7, 2018

I am still unconvinced that dask-jobqueue has to change to accommodate this use case. One reason I am not a big fan of the general direction taken by this issue: suppose some of your tasks are doing multi-threading internally but some of your tasks don't (e.g. in the OP case, once your C++ code has run you need to run additional analysis code in Python). Basically you have booked some resources that your dask-worker is unable to use during the Python analysis phase.

I only recently realised that worker resources could be used for the original OP's use case, see this related SO question and answer.

@dgasmith
Copy link
Author

dgasmith commented Nov 7, 2018

You are making the assumption that there is additional analysis that needs to be run and/or it takes a significant amount of computing time which is not always the case. The issue is not about changing your average case, but allowing more flexibility for the project.

As you note dask.distributed has this functionality as shown in the SO question. However, the adaptive cluster components and ability to submit to PBS/Slurm/etc is quite appealing in this project and is not present in dask.distributed.

@lesteve
Copy link
Member

lesteve commented Nov 7, 2018

About your second point, let me try to develop a bit more what I had in mind to see if that clarifies things a bit. What I would suggest is to do something along these lines:

from dask.distributed import Client
from dask_jobqueue import FooCluster

cluster = FooCluster(cores=32, processes=1, extra=['--resources threads=32'])
client = Client(cluster)

n_workers = 10
cluster.scale(n_workers)

for arg in arg_list:
    # resources will ensure that maximum two tasks run concurrently on the same dask-worker
    client.submit(function_calling_multithreaded_cpp_code, arg, resources={threads: 16})

My understanding of the dask model (and the SO answer goes in the same direction) is that each task consumes one thread. One thing you can do if that's not the case is to restrict the number of concurrent tasks that can run on a dask-worker through resources.

@guillaumeeb
Copy link
Member

Thanks @lesteve, I was not aware of this alternative solution that is really interesting.

As a result, I'm really hesitant here, no strong opinion... On one hand I don't really like the fact that resources reservation is really scheduler specific in our API (but that's also a mirror of the reality), on the other hand, there are already two solutions available to the OP case, one being generic for all JobQueueCluster implementation.

@louisabraham
Copy link
Contributor

Using resources is indeed very nice.
Regarding the code in @lesteve's last comment, I am not sure about what

cluster = FooCluster(cores=32, processes=1, extra=['--resources threads=32'])

does.

I think --nthreads 32 and --resources threads=32 are the same.

If processes=2 then dask-worker will be launched with --nthreads=16 --nprocs=2, am I right?

IMO we should let the user set nthreads and nprocs directly, or check if they are present in the extra.

@guillaumeeb
Copy link
Member

@lesteve even if the change is not really required for the original problem, I like the idea because:

  • In some cases, it can be clearer to ask for some cores but only a few threads, than relying on dask resources, or specific scheduler kwargs, especially for non PBS/LSF/Slurm/SGE(...) or Dask experts.
  • I like the idea of standardizing, kwargs across jobqueue systems, and get rid of resources_spec or job_cpu.

This could also simplify a possible implementation of #133.

Your proposed solution is nice, but it could lead to misunderstanding between the nthreads dask kwarg and the declared resources which is totally abstract. A fix to this could be to make a change upstream in distributed, and to always automatically add a resource called threads and one called memory for example, that are not abstract. But I'm not sure this is the philosophy of dask resources.

I think I slightly lean towards a modification to harmonize JobQueueCluster implementation and allow specifying nthreads, but again, I'm not seeing this as fundamental.

@dgasmith
Copy link
Author

An issue with the proposed snippet is that you are unable to run thread-unsafe OMP tasks on 8 cores at 4 tasks per node which is easily do-able with dask-distributed (although granted, not in the cleanest way possible). Typically when we are running OMP processes, we are using dask to merely map a large number of computations without dependencies onto worker nodes without clogging the queue.

Dask LocalCluster has the following parameters:

n_workers: int
Number of workers to start

threads_per_worker: int
Number of threads per each worker

The ability to effect this set of parameters from dask_jobqueue would completely satisfy the use case here. I would argue that what we are looking for is more "dask-like" as in advanced users who are familiar with Dask can access the options already present in distributed easily.

Having distributed understand threads and processes or even if a task could be flagged as needing a private process would be great, but I strongly suspect a much more long-term project. In short, I would very much like to handle this via resources, but I do not believe it is currently possible unless I have some fundamental misunderstanding here.

@lesteve
Copy link
Member

lesteve commented Dec 21, 2018

OK I think I get what you are saying, sorry for not getting it earlier: if you have multiple threads per dask workers you can limit the number of concurrent tasks on your worker but you can not control that each task is run in a separate process, which is a problem for thread-unsafe tasks.

In the case where you have a thread-unsafe task, do I understand correctly that you are always going to use nthreads=1 in your snippet from #181 (comment)? I guess you could have a similar work-around with my snippet by having cores == processes.

I am fine with adding a threads_per_worker argument in JobQueueCluster (whose name I prefer to nthreads I have to say). It would be great to open an issue in distributed asking whether limiting the number of taks per process would be something that they would consider.

I am not very familiar with OpenMP I have to admit, so out-of curiosity how common would you say thread-unsafe programs are?

@dgasmith
Copy link
Author

I should separate OMP and thread-unsafe tasks as they are not always related. Thread-unsafe usually comes from the fact that some scientific code writes files that use PID as an identifier or have shared globals that should not, in fact, be shared. This stems from the days when these codes were actually separate binaries that were called via bash and data was passed in files or original design choices that were perhaps not advisable.

You can use cores == processes certainly, but this is quite opaque. I would also advise trying to keep the mental model close to distributed so that dask-jobqueue is not yet another way of viewing distributed compute.

@dgasmith
Copy link
Author

So to be clear threads_per_worker is favored which will mean that dask-worker nthreads needs to be computed as nthreads = int(threads_per_worker / processes) to make sure we conform to dask-worker args:

  --nthreads INTEGER     Number of threads per process. Defaults to number of
                         cores
  --nprocs INTEGER       Number of worker processes to launch.  Defaults to one.

I don't feel this is great, but again happy to have this in either way.

@guillaumeeb
Copy link
Member

I'd like to see this advance as a PR, so that we can ask for advice of other maintainers.

nthreads needs to be computed as nthreads = int(threads_per_worker / processes) to make sure we conform to dask-worker

I don't think this is true. In Dask, a worker is equivalent to on process. With dask-jobqueue (and distributed), we sometimes launch grouped workers (with processes>1, multiple workers at once, which might be confusing.

So whatever we call the kwarg, it should be the equivalent of nthreads.

@dgasmith
Copy link
Author

dgasmith commented Jan 3, 2019

There may be some misunderstanding between the dask-worker command and a worker. In general, I am not sure how much folks refer to a worker as a individual worker object and process vs the dask-worker nanny that manages multiple processes/workers.

I would be happy to make a PR, but I do not seem to follow your naming conventions and worker/process mental models. Please state these conventions and how you would like this work and I can make the changes.

@guillaumeeb
Copy link
Member

In general, I am not sure how much folks refer to a worker as a individual worker object and process vs the dask-worker nanny that manages multiple processes/workers.

This is definitly unclear. If I'm not mistaken, dask-worker with nprocs > 1 will launch multiple processes, each with its own nanny process. I've got trouble understanding this too. It's the same as calling dask-worker multiple times.

The convention for me is what you stated in #181 (comment) or with your example in #181 (comment).

So going with an nthreads kwarg, or threads_per_worker_process, or whatever, but not with something where we should compute the option to give to the dask-worker command.

@lesteve
Copy link
Member

lesteve commented Jan 15, 2019

From #181 (comment):

One drawback is that I do not believe --resources is per-process, but per-worker where not all C++/OMP applications are thread safe due to globals.

I did some tests and it appears that resources are per worker process and not per dask-worker as I (and maybe others in this thread) originally thought. When you do dask-worker --nprocs 3 --nthreads 2 --resources slots=1 (which is equivalent to LocalCluster(n_workers=3, threads_per_processes=2, resources={'slots': 1})). The slots=1 resource is applied to each of the 3 worker processes and not globally to the dask-worker.

This means that there is a way with resources to ensure that tasks are run in separate processes. Unless I am missing something, this should get rid of the concern about using resources with non-threadsafe tasks.

This is a snippet to show what I mean in more details. I am using LocalCluster for simplicity but this should hold for any dask-jobqueue FooCluster (I double-checked with SGECluster and it does):

import os
import time
import threading
from pprint import pprint

from dask.distributed import Client, LocalCluster
from distributed.client import get_task_stream

cluster = LocalCluster(n_workers=3, threads_per_worker=2, resources={'slots': 1})
client = Client(cluster)
print('dashboard port:', client.scheduler_info()['services']['bokeh'])

def func():
    time.sleep(1)
    return (os.getpid(), threading.current_thread().name)

t0 = time.time()
with get_task_stream() as ts:
    futures = [client.submit(func, pure=False, resources={'slots': 1}) for i in range(21)]
    result = client.gather(futures)
print('time: ', int(time.time() - t0))

Looking at the timing (7 seconds is 21 / 3) and in the dashboard you can see that there are three tasks run concurrently:
dask-resource

The thing is with the dashboard I don't think there is a way to figure out which row is corresponds to which worker process. So here is a little bit of hacky code to show that tasks were run indeed into different worker processes:

# hacky code to show that there are three tasks running concurrently and
# that they run on a different worker port (i.e. different worker process)
worker_port_and_start_list = [
    [each['worker'].split(':')[2], int(each['startstops'][0][1]) % 100]
    for each in ts.data]
pprint(worker_port_and_start_list)

Output (first column is the worker port, second column is the start time (rounded) of the task):

[['33379', 51],
 ['44557', 51],
 ['40765', 51],
 ['44557', 52],
 ['33379', 52],
 ['40765', 52],
 ['44557', 53],
 ['33379', 53],
 ['40765', 53],
 ['44557', 54],
 ['33379', 54],
 ['40765', 54],
 ['44557', 55],
 ['33379', 55],
 ['40765', 55],
 ['33379', 56],
 ['44557', 56],
 ['40765', 56],
 ['33379', 57],
 ['44557', 57],
 ['40765', 57]]

@guillaumeeb
Copy link
Member

I did some tests and it appears that resources are per worker process and not per dask-worker

Thanks @lesteve.

Generally speaking, I think there is no such thing as a dask-worker entity. If I'm not wrong, a Worker is equivalent for a worker process. dask-worker has just a mean to launch several process using --nprocs, but in the end this would be equivalent of launching nprocs times dask-worker.

So yes

this should get rid of the concern about using resources with non-threadsafe tasks

But anyway, I still think the proposed modification could be useful.

@lesteve
Copy link
Member

lesteve commented Jan 18, 2019

But anyway, I still think the proposed modification could be useful.

Sure. I mainly wanted to make the point that dask resources can be used for the original OP problem: execute a python function that manages multi-threading internally. This was not obvious to me for quite a long time and probably for others.

It would be great to have confirmation that it actually works on "production" use cases rather than only on my toy example above.

IMO if we add a threads_per_worker argument, this would be very nice to:

  • mention, maybe in the docstring, that dask resources can be used too
  • point to a slightly simpler example in the docs using resources (maybe this should be a FAQ). The example we have about resources now is using compute and I find it a lot less clear than using submit.

@dgasmith
Copy link
Author

That is quite interesting that resources are per worker (nprocs arg to dask-worker CLI) and ultimately the way we should go I think. Let me experiment a bit with resources, I should be able to interface our cluster management tech to hook into this. It may be worth adding this back to a distributed with a kwarg like client.submit(..., thread_unsafe=True) which could translate to a resource like {"_thread_unsafe_task": 1} which is applied to all workers using existing technology.

The fact this is not per node brings up some interesting questions about heterogenous resources. Say a 16 core CPU and you have tasks that require 12 and 4 cores respectively. I think this kind of heterogeneous cluster usage is a bit out of scope of Dask at the moment however (not really possible with simple thread/processes either). Simply mentioning this as something to think about and might go back to a distributed kwarg like client.submit(..., thread_unsafe=True).

I am still open to making a PR if this change is still wanted.

Thank you for the many comments and discussion!

@guillaumeeb
Copy link
Member

This discussion gives me headaches! I think we are over thinking 🙂. At least we should document how the OP can be solved both with resources and resource_spec.

For the nthreads PR, I leave it up to you guys. My opinion is that it can be useful, because I'd like to get almost rid of resource_spec if possible, and I feel that resources solution is a bit too much.

@wkerzendorf
Copy link

I'm getting headaches as well about this 😉 : I have a similar problem. Each one of my tasks (embarrassingly parallel) manages threading internally with OpenMP and needs 8 cores. I did start 2 workers using

SLURMCluster(walltime='01:00:00', memory='7 GB', 
                       job_extra=['--nodes=1', '--ntasks-per-node=1'], cores=8, processes=1)

but when i then map my (OpenMP parallalized) function using client.map with 16 parametersets - it runs many tasks at the same time on each worker. I want to have a maximum of one task for each worker (that means two instances of each function running at the same time).

@wkerzendorf
Copy link

@lesteve
Copy link
Member

lesteve commented Feb 1, 2019

Can you try this and report back?

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

# through
cluster = SLURMCluster(cores=8, processes=1, memory='7GB',
                       # each dask worker declares it has an amount 1
                       # of the resource named "processes"
                       extra=['--resources processes=1'])
client = Client(cluster)

n_workers = 10
cluster.scale(n_workers)

# resources ensure that at most one task (i.e. function in `.submit`)
# will run on each dask worker
futures = [client.submit(function_calling_multithreaded_cpp_code, arg,
                         # each task declares that they need an amount 1
                         # of the resource named "processes" 
                         resources={'processes': 1})
           for arg in arg_list]

@wkerzendorf
Copy link

@lesteve won't the problem be that the futures go out of scope. Do I have to do fire_and_forget?

@lesteve
Copy link
Member

lesteve commented Feb 1, 2019

Sorry I edited my snippet above. You may be able to use .map with resources but I have not checked ...

@wkerzendorf
Copy link

@lesteve doesn't work when leaving the resources out of the map command - trying with resources in map. I found a fix and am gonna post it after this next test.

@wkerzendorf
Copy link

@lesteve resources is not a document keyword for either submit or map but it does work. Thanks! it would be nice to somehow be able to tell the cluster that one worker has 8 cpus and can only schedule one task

@lesteve
Copy link
Member

lesteve commented Feb 1, 2019

IMO dask resources is the "dask way" of doing what you want. In particular, note resources is not at the cluster level but at the task (i.e. function in .submit) level, which makes a lot of sense because some of the tasks may use multiple threads but some of the other tasks may not.

I completely agree that resources are not very easy to discover when you are new to dask. I'd be in favour of adding an example that explains how to execute a function using multi-threading in docs/source/examples.rst. PR more than welcome!

@wkerzendorf
Copy link

@lesteve I guess it makes sense if you want to be that fine-grained that different tasks can have different resource requests. I still don't understand what the resources is for the cluster and for submit. It seems one worker can do more than one task at the same time (I've seen the word slot mentioned) - what do I tell the workers in the cluster creation (they have one slot? )

thanks for your help btw!

@lesteve
Copy link
Member

lesteve commented Feb 1, 2019

I still don't understand what the resources is for the cluster and for submit.

Right sorry, I am going to try to explain better (I'll edit my snippet above too to make it clearer):

# each dask worker declare it has processes=1
cluster = SLURMCluster(cores=8, processes=1, memory='7GB', extra=['--resources processes=1'])

# each task (e.g. function submission) declare they need processes=1
# because each dask worker declared it had processes=1 this will ensure that 
# at most one task runs on each dask worker
futures = [client.submit(function_calling_multithreaded_cpp_code, arg,
                         resources={'processes': 1})

You can look at http://distributed.dask.org/en/latest/resources.html for a more detailed explanation of dask resources.

@wkerzendorf
Copy link

thank you!! that helped. So now I do adaptive scaling and it seems to only run on one worker. I 've had that before - where it doesn't seem to load-balance. Is this something that should happen or do I need to set this up?

@lesteve
Copy link
Member

lesteve commented Feb 1, 2019

I don't think it is expected, can you open a new issue about this?

@wkerzendorf
Copy link

I'm first trying to see if that is a problem with manual scaling as well. Ah ... the resources - it seems to be arbitrary what I name the resources. As my code is called TARDIS - I could say that each worker has one heart_of_tardis which is needed for each compilation - there is nothing special about the keyword processes, right?

@guillaumeeb
Copy link
Member

Okay, I'm going to close this discussion. It is two long already. I've created #231 as an outcome we all agree with.

I'm still open if someone want to propose a PR to add nthreads like kwarg, and see how it looks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request usage question Question about using jobqueue
Projects
None yet
Development

No branches or pull requests

10 participants