Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSFCluster worker doesn't execute all threads on the same node #172

Closed
louisabraham opened this issue Oct 8, 2018 · 17 comments · Fixed by #177
Closed

LSFCluster worker doesn't execute all threads on the same node #172

louisabraham opened this issue Oct 8, 2018 · 17 comments · Fixed by #177

Comments

@louisabraham
Copy link
Contributor

louisabraham commented Oct 8, 2018

I am not 100% sure, but it seems to me that nothing forces a LSF task to execute all the threads on the same node.

For example, PBSCluster uses the option ncpus:n that requests cpus per node, and SLURMCluster specifies -n 1and uses --cpus-per-task=n to allocate n cpus on each host.

However, LSFCluster uses the -n option. I think that without a --span[hosts=1] option LSF can use processors from different hosts. See the relevant documentation there.

Do I ignore some internals of LSF that make this acceptable?

@guillaumeeb
Copy link
Member

I think no core maintainer of this project uses LSF.

But looking at the PR which introduced this was indeed identified: #78 (comment). But one way or another, it did not get implemented.

It would be nice to submit a PR to fix this!

@louisabraham
Copy link
Contributor Author

Ok, I'll do it this week.

@guillaumeeb
Copy link
Member

@louisabraham I'd like to make a small release by the end of the week before starting to work on bigger changes.

I'd like to take this fix in.
Would you be able to fix this or would you prefer someone else to do it ?

@louisabraham
Copy link
Contributor Author

I'll do it today.

My editor autoformats the code, and it seems the different files don't use the same formatter.

Is black like dask_ml OK?

@louisabraham
Copy link
Contributor Author

Also, currently ncpus is used but I think it should be cores.

I would need some explanation about the difference between cores and processes.

@guillaumeeb
Copy link
Member

We don't have code format template or requirement yet, except flake8 checks.

I don't know black formatter, but should be ok.

For cores vs ncpus, could you point to the corresponding lines in the code?

@louisabraham
Copy link
Contributor Author

Cores is not used in LSFCluster, but required as an instance of JobQueueCluster.

Other arguments are used like this:

self.worker_memory = parse_bytes(memory) if memory is not None else None
self.worker_processes = processes
self.worker_cores = cores

PBSCluster uses them like this:

if resource_spec is None:
# Compute default resources specifications
resource_spec = "select=1:ncpus=%d" % self.worker_cores
memory_string = pbs_format_bytes_ceil(self.worker_memory)
resource_spec += ':mem=' + memory_string
logger.info("Resource specification for PBS not set, initializing it to %s" % resource_spec)

And LSFCluster:

if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, initializing it to %s" % ncpus)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, initializing it to %s" % mem)

I just find the whole process not clear. When one looks in the documentation of PBSCluster or LSFCluster, they see very different interface and want to use ncpus. Then one has the error that cores is required.
There should be some convention to avoid this mess.

Also, worker_processes and worker_threads are used absolutely nowhere in the job schedulers, just for the command:

# dask-worker command line build
dask_worker_command = '%(python)s -m distributed.cli.dask_worker' % dict(python=python)
command_args = [dask_worker_command, self.scheduler.address]
command_args += ['--nthreads', self.worker_threads]
if processes is not None and processes > 1:
command_args += ['--nprocs', processes]

I might not have understood how dask_worker works, but I am under the impression each worker is supposed to be launched individually on only one computer, so one should ensure that all the allocated CPUs are on the same node.

For example, if you set cores=4 and processes=1 (because you have a numpy code that uses multiple threads), and scale to 5 nodes, then the job scheduler script will be launched 5 times, and will launch one distributed.cli.dask_worker each time. But I think that dask_worker doesn't care about the job scheduler and will use the 4 cores on the same machine even if all the cores were not allocated on the same machine.

Also, can you confirm to me that processes>1 has no interest if your code doesn't use multiprocessing or equivalent?

@louisabraham
Copy link
Contributor Author

Maybe the points above should be explained in the documentation.

@guillaumeeb
Copy link
Member

I just find the whole process not clear. When one looks in the documentation of PBSCluster or LSFCluster, they see very different interface and want to use ncpus. Then one has the error that cores is required.

cores is basically an equivalent of ncpus. We want to launch workers in jobs that reserve cores number of cpu resources on a given compute node.

I am under the impression each worker is supposed to be launched individually on only one computer

We launch one command dask-worker per job. But this actually might result in several worker processes if using processes > 1. Grouped workers in dask are somewhat hard to get right at first and can be tricky to understand. We already had several discussions about this. That's also why we need to use worker_threads and worker_processes.

I think that dask_worker doesn't care about the job scheduler and will use the 4 cores on the same machine

That is correct, hence this issue for LSF.

can you confirm to me that processes>1 has no interest if your code doesn't use multiprocessing or equivalent

That I think is not correct. processes > 1 allows you to parallelize Python function that are bound by the GIL, like any pure python code for example. It will launch several Python worker process inside one scheduler job.

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 17, 2018

cores is basically an equivalent of ncpus. We want to launch workers in jobs that reserve cores number of cpu resources on a given compute node.

Why do we have both then?

I am under the impression each worker is supposed to be launched individually on only one computer

We launch one command dask-worker per job. But this actually might result in several worker processes if using processes > 1. Grouped workers in dask are somewhat hard to get right at first and can be tricky to understand. We already had several discussions about this. That's also why we need to use worker_threads and worker_processes.

I think that dask_worker doesn't care about the job scheduler and will use the 4 cores on the same machine

That is correct, hence this issue for LSF.

My example did not cover all the cases I am interested in.
If cores=8 and processes=2 (hence threads=4), do we agree the 8 cores have to run on the same machine?
Or is there a system that allows the processes to run on different machines?

That I think is not correct. processes > 1 allows you to parallelize Python function that are bound by the GIL, like any pure python code for example. It will launch several Python worker process inside one scheduler job.

I don't think it parallelizes it automatically, does it? (I suppose here that no dask datastructure is used because I didn't look up how they are implemented).
For example, even if I have something parallelizable like sum(range(10**8)), the function will only use 1 process.
I was under the impression that your code actually has to create processes.

I actually have the same questions with the threads: if you set OPENBLAS_NUM_THREADS=4 but a number of threads on the worker that is bigger or smaller, will that change anything?

@louisabraham
Copy link
Contributor Author

louisabraham commented Oct 17, 2018

Oh, I think I got it right by rereading for the nth time http://distributed.dask.org/en/latest/worker.html

Your code isn't supposed to create more processes.
If you setup processes > 1, then you can just execute more GIL-bound tasks (that would use 1 process) in parallel.

So I think in most cases, when you use a cluster, you are supposed to launch with processes=1, and just scale up with more workers, am I right?
Queue rules just make allocating more processes on a single machine longer.

If the user spawns more processes (with multiprocessing) during the computation I think they will not count as processes, and we should provide some mechanism to allocate those processes on the machine without authorizing dask-worker to use them. Or maybe this is already done by threads (allocated cores that are not used explicitly by dask-worker)?

@guillaumeeb
Copy link
Member

cores is basically an equivalent of ncpus. We want to launch workers in jobs that reserve cores number of cpu resources on a given compute node.

Why do we have both then?

I did not look at LSFScheduler code earlier. cores is the amount of cores declared to the dask-worker. ncpus is the amount of cores reserved to LSF scheduler. They are generally the same, so ncpus does not need to be set. In some rare case, some people are overusing their scheduler reservation. For example, asking for 36 ncpus, but running dask-workers using 72 cores on a full compute node. Then they will need to specify both. I agree this really is an edge case. We should think on removing this possibility to simplify things, and maybe let user deal with job_extra for such things.

If cores=8 and processes=2 (hence threads=4), do we agree the 8 cores have to run on the same machine?

Yes, with dask-jobqueue this is our assumption. I think at first this is mainly because PBS Pro is less versatile than Slurm or LSF, and pretty much imposes this limitation if we want to leave things simple.

even if I have something parallelizable like sum(range(10**8)), the function will only use 1 process.
I was under the impression that your code actually has to create processes.

Yes your right, your code has to create tasks, through various dask APIs, and in some cases it can also benefit from pandas or numpy multithreading optimizations, though I do not know how dask threads are linked to OMP_NUM_THREADS or other equivalent threading configuration. This would be something to ask upstream or on Stack Overflow if no answer is to be found in the docs.

@guillaumeeb
Copy link
Member

If you setup processes > 1, then you can just execute more GIL-bound tasks (that would use 1 process) in parallel.

Yes that's exactly that, your code should not spawn new processes.

So I think in most cases, when you use a cluster, you are supposed to launch with processes=1, and just scale up with more workers, am I right?

I'm not sure I understand what you mean. There are two extremes:

  • If your code is pure python or bound by GIL, you'll need to set processes=cores. So that for each cores reserved on you scheduler, you've got one python process behind.
  • If your code is Python thread compatible, like maybe big Numpy computations, you should use processes=1 and cores to something greater. So that you have only one python process within a job.

A good practice is to have cores equal to the number of cores on what of you compute node, this is also the maximum you can set. Otherwise, you should try to use some divisor of your nodes cores, e.g for a 24 cores compute node, something like 4, 6, 8 or 12.

If the user spawns more processes (with multiprocessing)

Using Dask, a user should never do this. Dask basically act (for one part of it) as a multi node multiprocessing library, no need to use another parallelization module within it.

See also http://docs.dask.org/en/latest/setup/single-machine.html#single-machine-scheduler, http://distributed.dask.org/en/latest/efficiency.html#adjust-between-threads-and-processes

@louisabraham
Copy link
Contributor Author

Yes, with dask-jobqueue this is our assumption. I think at first this is mainly because PBS Pro is less versatile than Slurm or LSF, and pretty much imposes this limitation if we want to leave things simple.

Ok, then I got things wrong on #176 because I let the user choose how many processes should be executed on the same node. It is very simple to fix.

Yes your right, your code has to create tasks, through various dask APIs, and in some cases it can also benefit from pandas or numpy multithreading optimizations, though I do not know how dask threads are linked to OMP_NUM_THREADS or other equivalent threading configuration. This would be something to ask upstream or on Stack Overflow if no answer is to be found in the docs.

Yes, indeed I don't know what dask threads really are.

If your code is pure python or bound by GIL, you'll need to set processes=cores. So that for each cores reserved on you scheduler, you've got one python process behind.

Precisely in this case, I don't see why one would set cores>1.
In my mind, the computation power of n dask-worker with 1 process is roughly equivalent to 1 dask-worker with n processes, once you neglect data transfer if the dask-workers are on separate nodes and the small additional communication of the workers.
On the other hand, I experimented much longer waiting times when you want more than 1 process on the same node.

Using Dask, a user should never do this. Dask basically act (for one part of it) as a multi node multiprocessing library, no need to use another parallelization module within it.

I can see examples where one wants to do that. If you have large datasets and want to do computations without copying the data between the processes, you have to use the multiprocessing.sharedctypes module.
But I think it is another story, and it would be more convenient to compile the parallel code into OMP with numba :) And also the whole point of dask is to forget those low level considerations.

@guillaumeeb
Copy link
Member

Precisely in this case, I don't see why one would set cores>1.

cores is not thread per process. cores is cores used for all processes. so if processes=cores, you've got one thread per process.

But maybe what you mean is that you would only use job scheduler with only one ncpus per job allocation? In this case, you're putting some charge on the job scheduler, especially when you begin to scale to undreds or thousands of cores, and using dask-jobqueue adaptivity. It's often better to pack workers into bigger jobs, even if it means more waiting time.

Generally, I agree that dask-jobqueue is great to take holes left by other jobs into the overall cluster resources, but I seldom use jobs allocation with less than 4 ncpus.

you have to use the multiprocessing.sharedctypes module

did not know this one, nice!

@louisabraham
Copy link
Contributor Author

especially when you begin to scale to hundreds or thousands of cores

I think we don't have the same experience with cluster computing :)
I am allowed to use 48 cores in total on my university's cluster. Even if each worker uses 1 thread (or pretends to 😈), I think 48 workers are acceptable. On the other hand I can wait 40 minutes to allocate a single node with 4 CPUs (I have low privileges).

you have to use the multiprocessing.sharedctypes module

did not know this one, nice!

Maybe it will get integrated at some point in dask.distributed? But if you have already paid a network transfer, a simple copy in RAM is nothing.

@guillaumeeb
Copy link
Member

I am allowed to use 48 cores in total on my university's cluster. Even if each worker uses 1 thread (or pretends to 😈), I think 48 workers are acceptable. On the other hand I can wait 40 minutes to allocate a single node with 4 CPUs (I have low privileges).

Okay, then I understand 😁. In the end, dask-jobqueue allows adressing many kinds of needs or cluster constraints!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants