Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster should support many worker types #2118

Open
mrocklin opened this issue Jul 15, 2018 · 16 comments
Open

Cluster should support many worker types #2118

mrocklin opened this issue Jul 15, 2018 · 16 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@mrocklin
Copy link
Member

The various Cluster objects often allow the user to provide specifications of a worker (cores, memory, software environment, ...) and then provides mechanisms around increasing and decreasing the number of workers.

However sometimes a dask deployment has a few different kinds of workers, for example machines with GPUs or high memory, or machines from a queue that is more or less expensive or reliable in some way.

This suggests that maybe the Cluster object should accept a list of worker pools, and provide common functionality around them.

Things like the widget are easy to scale to multiple pools. Adaptivity is a bit weirder.

cc @lesteve @jhamman (dask-jobqueue) @jcrist (dask-yarn) @jacobtomlinson (dask-kubernetes)

Credit for this thought goes to @lesteve

@lesteve
Copy link
Member

lesteve commented Jul 15, 2018

Just for completeness, when I mentioned this I was thinking more about it in a dask-jobqueue context, but it would be nice to have a feature like this in distributed for sure!

Just for completeness, a few remarks about the dask-jobqueue context I was thinking of

  • the main use case I had in mind was CPU vs GPU with a worflow where some of the tasks need a GPU but others don't. In our cluster this means there is a queue for CPU nodes and a queue for GPU nodes (GPU nodes is a scarcer resource as you can imagine so you want to do as much work as possible on CPU nodes if you can).
  • my initial attempt would have been to hack something quickly together in dask-jobqueue (essentially I was hoping that you could just pass arguments to create_worker and override the parameters that were passed in the JobQueueCluster constructor). In this context, adaptivity would be nice to have but not crucial I reckon.

@jhamman
Copy link
Member

jhamman commented Jul 15, 2018

I'll just say that I generally like this idea. Currently, in jobqueue, it seems like it might be easier to create multiple Clusters with different configurations but if there were a nice way to orchestrate this sort of functionality within a Cluster, that would be cool.

@sjperkins
Copy link
Member

Another use case is creating separate workers for I/O and compute (see for e.g. http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/).

  • Compute threads are usually equal to number of cores and have their affinity set to a particular core to prevent thread migration. They also generally don't block.
  • I/O threads (disk access/network reads) are usually some multiple of the number of cores and block while waiting for data.

The ability to distinguish between the two would be useful for ensuring that "the cores are always fed".

Given the description above, would it be possible to create workers with separate I/O and compute threads? In the current distributed paradigm, one might specify these workers like so:

dask-worker scheduler:8786 --resources "io=32; compute=8" --nprocs=1 --nthreads=40

However, manually specifying io/compute resources for each task is somewhat laborious and error prone.

@vincentschut
Copy link

Another use case is creating separate workers to download data from third party servers with a limited number of connections. To make things more interesting, this could also be a max number of connections per ip, and thus per worker node/machine/instance. So when running on e.g. a single 32 core node, one would have 32 cpu-bound workers and for example 8 download workers for a certain server with max. 8 connections/ip. When adding a new node to the cluster, the number of cpu workers would always scale up with the extra number of cpus. However, the number of download workers would always increment with 8.

@nicolls1
Copy link
Contributor

nicolls1 commented Jul 16, 2018

A couple things that would be nice for my use case:

  1. I also have CPU and GPU workers and would like a less strict resource mapping as my tasks can still be run on CPU machines but slower. In Kubernetes they have the concept of affinity where a pod can prefer to be scheduled on a certain node but still be run on other nodes/contexts if that one doesn't exist. It would be nice to have an analogous concept with tasks.

  2. I would also like having the ability to create a node or worker pool that is reserved for time sensitive tasks. My cluster needs to process some jobs as fast as possible and I would like it if I could reserve node(s) and possibly monitor them separately from the rest of the workload. Going back to Kubernetes, there is the concept of taints which prevent pods from being placed on a node. Again would be nice to have an analogous concept for the tasks.

Curious how these multiple worker types could or could not support this.

@sjperkins
Copy link
Member

I think this issue has some relation to #2127, where @kkraus14 wants to run certain tasks on CPU/GPU workers. I've also wanted to run tasks on specific workers, or require resources to be exclusive for certain tasks.

Currently, these task dependencies must be specified as additional arguments to compute/persist etc. rather than at the point of actual construction -- embedding resource/worker dependencies in the graph is not currently possible.

To support this, how about adding a TaskAnnotation type? This can be a namedtuple, itself containing nested tuples representing key-value pairs. e.g.

annot = TaskAnnotation(an=(('resource', ('GPU': '1'), ('worker', 'alice')))

dask array graphs tend to have the following structure:

dsk = {
    (tsk_name, 0) : (fn, arg1, arg2, ..., argn),
    (tsk_name, 1) : (fn, arg1, arg2, ..., argn),
}

How about embedding annotations within value tuples?

dsk = {
    (tsk_name, 0) : (fn, arg1, arg2, ..., argn, annotation1),
    (tsk_name, 1) : (fn, arg1, arg2, ..., argn, annotation2),
}

If the scheduler discovers an annotation in the tuple, it could remove it from the argument list and attempt to satisfy the requested constraints. In the above example, annotations are placed at the end of the tuple, but the location could be arbitrary and multiple annotations are possible. Alternatively, it might be better to put them at the start.

I realise the above example is somewhat specific to dask arrays (I'm not too familiar with the dataframe and bag collections) so there may be issues I'm not seeing.

One problem I can immediately identify would be modifying existing graph construction functions to support the above annotations (atop/top support is probably the first place to look).

@mrocklin
Copy link
Member Author

The cluster object is about starting and stopping workers, not about assigning workers to tasks. I don't think that this is related.

@sjperkins
Copy link
Member

sjperkins commented Jul 19, 2018

@mrocklin Ah sorry. If you think the above is useful, should the discussion be moved to #2127 or a new issue be opened?

EDIT: Created issue in dask/dask#3783

@mrocklin
Copy link
Member Author

mrocklin commented Jul 19, 2018 via email

@Winand
Copy link

Winand commented Feb 4, 2019

I cannot figure out how does one deal with network/compute-bound tasks now. I use ThreadPoolExecutor for network tasks and then feed results to ProcessPoolExecutor workers to compute. If I get this right I need to start two LocalClusters to do the same thing in Dask.

@mrocklin
Copy link
Member Author

mrocklin commented Feb 4, 2019 via email

@Winand
Copy link

Winand commented Feb 5, 2019

@mrocklin well, currently I load html pages using 10 parallel threads, then parse and process results with BeautifulSoup using 5 worker processes. Of course number of workers can be tuned up, but in general in the 1st case we mostly wait and in the second one mostly compute.

As I understand I have to use different clusters for this workflow (and therefore separate dashboards, if I want to monitor the whole process somehow)

I recommend using the defaults until difficulties arise

Do you mean just use one cluster, tune up its settings and look at the overall execution time?

@mrocklin
Copy link
Member Author

mrocklin commented Feb 5, 2019 via email

@arpit1997
Copy link
Contributor

@mrocklin Worker pools like same as airflow implementation could certainly be useful.

The use case I encountered was

  • We have some very long running task (using dask core library) like a day long tasks and some very short duration tasks on same cluster.
  • Pools can provide a friendly way to group workers and assign them dedicated resources.

@therc
Copy link

therc commented Aug 29, 2020

Another scenario: you have a worker size that accommodates 95% of tasks or higher, but the occasional pathological cases will use 2x memory. I'd love to retry tasks that were killed by the nanny, giving them each time X% more memory. This assumes dask-kubernetes and some kind of autoscaling to bring up beefier machines on demand.

@alexandervaneck
Copy link

Hello 👋 I'm interested in this topic since the project I'm working on has need of different types of resources per worker.

@mrocklin @lesteve @jhamman @jacobtomlinson Is there still interest in this? and if so could someone point me in the direction where improvements/implementations could be made?

@GenevieveBuckley GenevieveBuckley added the discussion Discussing a topic with no specific actions yet label Oct 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests