Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specification of the ClusterManagers.jl workflow #145

Closed
juliohm opened this issue Oct 7, 2020 · 16 comments
Closed

Specification of the ClusterManagers.jl workflow #145

juliohm opened this issue Oct 7, 2020 · 16 comments

Comments

@juliohm
Copy link
Collaborator

juliohm commented Oct 7, 2020

Currently the LSFManager adopts a strategy where bsub is called multiple times to launch a job for each worker process. It is also possible to create a single job with multiple processes using bsub -n 10 for example. In general, when is it appropriate to create a single job with multiple processes? Shouldn't we adopt this strategy instead?

cc: @bjarthur @DrChainsaw

@vchuravy
Copy link
Member

vchuravy commented Oct 7, 2020

I strongly favor the single job with multiple processes strategy. If feasible one should allocate the resources before starting Julia.
Most cluster systems have a queue associated so multiple Jobs can easily time-out.

@DrChainsaw
Copy link
Collaborator

What does the -n flag do? I found this in the manual but it was not clear to me what it means exactly:

-n
Submits a parallel job and specifies the number of tasks in the job.

From what I have seen, the current strategy is to use a jobarray so that one bsub command puts N jobs in the queue.

Assuming -n N actually gives you N cores/machines for a single bsub, then how does the scheduler typically prioritize such a request vs e.g. a jobarray of N jobs?

Getting all resources before starting any computation does not always seem feasible to me. At my place it is not uncommon for the scheduler to just give you a subset of the requested cores if the cluster is loaded and this is by design. If -n could circumvent this policy I'm sure that would be considered abuse.

@juliohm
Copy link
Collaborator Author

juliohm commented Oct 8, 2020

@DrChainsaw in most workflows I've encountered in HPC clusters, all the resources are allocated a priori, and at once, with an option like -n, and only then the job starts with everything available (e.g. set of CPUs, GPUs needed). I never liked this approach of multiple jobs with job IDs as if they were completely independent jobs.

Schedulers have different rules to prioritise jobs. At my current cluster, I was using the -n approach and getting all the machines pretty quickly. Other clusters may have a more busy queue that prioritises small jobs, so launching tiny jobs may be preferable. We should probably add an option to the LSFManager to handle these two cases.

I will try to devote some time today to read the source code of ClusterManagers.jl for similar patterns, and also try to dive in Distributed's source code to learn more about what is happening.

@vchuravy
Copy link
Member

vchuravy commented Oct 8, 2020

Getting all resources before starting any computation does not always seem feasible to me.

That is a very different Job model from the one Distributed.jl expects to use. Right now the goal of ClusterManagers.jl is to create a working environment for Distributed.jl, and as such if you want to allocate more resources than what you can the call to addprocs will fail and time out since it will not be possible to spin up a job able to fulfill your request.

If you want to use ClusterManagers as a batch processing system or as Job queue built on top of a cluster queue, that is a valid but distinctly different use-case that I think is out-of-scope for this package. In the past I have used SnakeMake for scenarios like that.

@DrChainsaw
Copy link
Collaborator

That is a very different Job model from the one Distributed.jl expects to use. Right now the goal of ClusterManagers.jl is to create a working environment for Distributed.jl, and as such if you want to allocate more resources than what you can the call to addprocs will fail and time out since it will not be possible to spin up a job able to fulfill your request.

But if it works right now for this type of workflow, should we then take steps to explicitly forbid it? Maybe I wasn't clear enough, but it is not so much a matter of jobs being refused, but they just don't start at the same time as all the others and if the load is high and the request is large then this could take days (basically until the load decreases which theoretically could be never).

I just put the call addprocs_lsf in a separate task and then use another task to look for and setup new workers which once setup are added to the workerpool.

This way, If I have a task which can use N cores effectively, I can safely ask for N cores instead of having to guess what number of cores will minimize the wallclock time depending on my current (cluster) scheduler priority. Whatever jobs have not started when the task is finished are then just cancelled. This could ofc be achived by just calling addprocs several times with a small number of jobs each time but that has the disadvantage that jobs are not queued as soon as you need the workers.

I'd be interested to hear about use cases when this is not desired since I might be a bit tunnelvisioned here. Are there distributed programs which need exactly N workers or else they can't do any work? I though the idea of a cluster was that multiple people should be able to share it effectively.

@vchuravy
Copy link
Member

vchuravy commented Oct 9, 2020

But if it works right now for this type of workflow, should we then take steps to explicitly forbid it?

I didn't catch that during the original review of addprocs_lsf, it is a significantl divergence from how the rest of the ClusterManagers works.

I'd be interested to hear about use cases when this is not desired since I might be a bit tunnelvisioned here. Are there distributed programs which need exactly N workers or else they can't do any work?

As an example for CLIMA I setup a job that exactly needs 96GPUs, since I will setup the problem domain to max out the memory usage. If I get less than 96 GPUs I will run out of memory and crash.

in my mind a cluster scheduler is a resource manager. I ask for N resources and I will have to wait for those N resources to become available, the code I write will assume that those resources are available when my request has been fulfilled.

In slurm I express that as doing an sbatch/salloc to allocate resources and then use srun to launch jobs on machines.
On Summit which uses lsf https://docs.olcf.ornl.gov/systems/summit_user_guide.html#running-jobs this maps to bsub and jsrun.

@juliohm
Copy link
Collaborator Author

juliohm commented Oct 10, 2020

I fully agree with @vchuravy. We should aim for this "canonical" use case, and get it right for the various clusters.

Also, it seems that this pattern could be abstracted out from all current implementations. We could have a single struct, say HPCCluster with a set of common options like run_cmd (e.g. srun, jsrun) alloc_cmd (e.g. salloc), and then make the implementations for different clusters a specific instance of this struct with options pre-filled. So the logic is kept in a single place as opposed to the current status of the package where each file implements a separate logic.

@DrChainsaw
Copy link
Collaborator

Ok, I see. I guess I was seeing Distributed as more of and extension of threads with some extra overhead for marshalling etc. Even though I can't see why it can't be I'll consider myself downvoted and outranked on this subject so I'm not gonna argue it further :)

So in the canonical all-or-nothing type of task, the drawback of the current way is timeout while to waiting for that last few jobs to start and perhaps just the awkwardness of having jobs idling while waiting for others to start?

What is the way things should work with the -n flag? If I run bsub -n N 'julia --worker' and then bpeek the output I see the ip and port for a single worker. Is it enough to connect to this worker or does one need some other flag/script to start workers on the other processes?

@bjarthur
Copy link
Collaborator

i think there is a confusion here between -n and -Jjobname[1-N]. n is limited to how many "slots" there are on each machine. (on my cluster a slot is 15GB and 1 core.) N is limited to how many slots you have on your cluster, ratioed down by what you have n set to. LSFManager currently just launches one job array for each call to addprocs_lsf. if your julia workers need more than one slot because the code they run uses lots of memory or is multi-threaded, then set n appropriately.

@juliohm juliohm changed the title A single job with multiple processes or multiple jobs with a single process? Specification of the ClusterManagers.jl workflow Oct 14, 2020
@juliohm
Copy link
Collaborator Author

juliohm commented Oct 14, 2020

I've renamed the issue so that we can brainstorm together what is the workflow that we intend to support no matter the actual implementation with platform-specific commands.

Let me start by sharing the workflow that most people coming from MPI and HPC simulations expect. I will distinguish between the user workflow and the cluster manager workflow. The user workflow that I think we should officially support is the single call to addprocs at the top of the script:

using ClusterManagers

addprocs_foo(N, flags=gpus, memory, etc.) # waits for resources to become available

heavy_simulation() # do the heavy simulation which is only possible with all resources

rmprocs() # finish the entire job (i.e. a collection of workers)

In this workflow, we assume that the user of the package won't be updating the pool of processes often, and that the addprocs call is blocking. The user can only perform the heavy simulation when every resource he requested is available. For example, some code may only work with a given amount of memory, or there are parts of the code that communicate explicitly with workers via some arithmetic rule. For example, the code assumes that the number of workers is even, and then the partition of the job is such that even and odd workers never communicate, but all need to be present to avoid deadlocks.

Now, for the cluster manager workflow, we need to brainstorm what will happen inside the addprocs_foo call. We need a pseudo-algorithm that abstracts out the specific implementations with LSF, SLURM, etc managers. For example, we need a single manager, let's call it ClusterManager with a couple of fields:

struct ClusterManager
  subcmd # command used to submit the job
  verbosity # verbosity option
  stdout # option to control IO from workers
end

With this common set of options, we can then create specific managers as type aliases or as a dictionary:

LSFManager = ClusterManager(subcmd=`bsub`, ...)
SLURMManager = ClusterManager(subcmd=`srun`, ...)

The logic should be implemented in a single place, for instance the launch method of the generic ClusterManager:

function launch(..., ::ClusterManager, ...)
  # implement logic with common options
end

That way we will have a clear and shared vision of what we can do with different clusters.

What do you think of this proposal? Anyone more experienced with HPC clusters and with the codebase here would like to draft a PR? It would be great to have your input.

@juliohm juliohm added enhancement and removed LSF labels Oct 14, 2020
@bjarthur
Copy link
Collaborator

people also use clusters for embarrassingly parallel workflows, where each job is independent and doesn't communicate with the others (e.g. distributed pmap). not everything is a simulation. in this case, you don't have to wait for all the workers to come online. the async nature of addprocs is nice here. and it's nice to make additional calls to addprocs if things are finishing fast enough.

more generally, i think of clustermanagers.jl as simply a package which provides an addprocs method for various different cluster schedulers. no more, no less.

lastly, do you @juliohm really have the time to rewrite everything? would that really bring anything new to the table? clustermanagers.jl more or less works, as is. the only real problem as i see it is that we don't have maintainers for each cluster scheduler. i took care of qsub when i had access to one, and now i take care of lsf since my workplace switched to that. people come and go. it's the nature of the business and i'm not sure there is anything to be done about it.

@juliohm
Copy link
Collaborator Author

juliohm commented Oct 14, 2020

Thanks for the input @bjarthur. Can you be more specific about which part of which workflow in the specification above needs to be changed to accommodate this other use case you have in mind? Perhaps that can be translated into a field of the ClusterManager struct? Something like block=true by default?

Regarding the time to rewrite everything, I'd love to, and would already have done it if I had more experience with different managers. The proposal above addresses exactly the issue of maintenance where we would be able to concentrate our efforts and become familiar with a single source file including all the logic. Right now, most cluster managers don't have maintainers because they don't necessarily follow the same logic or the same underlying workflow. If everyone starts to look at a single place when submitting PRs, it will be much harder to introduce bugs. The code would naturally become more robust with time, and maintainers would be able to transfer the knowledge to new maintainers more easily.

@juliohm
Copy link
Collaborator Author

juliohm commented Oct 14, 2020

My suggestion here to start is consider doing the above refactoring for the major 3 managers. I've noticed that most issues and PRs are for SLURM, LSF and PBS. So if we could get these 3 into a single logic, that would be great. Afterwards, we would gradually move the other managers to the same logic.

@manuel-rhdt
Copy link

From the discussion so far I think it has become clear that people in general have two different approaches to cluster computation. These are not at all specific to Julia even though I will make Julia-specific comments in my observations below. I presume that both of these are valid ways to think about a cluster. Let me try to give a brief description of both of these approaches.

I would like to call the first approach the MPI-like workflow. Here you ask the scheduler for a fixed amount of resources (often across multiple nodes) and once these resources become available it launches your job script. Thus, usually you run a single submission command (e.g., sbatch, qsub or bsub) for an entire simulation. When your job script has launched you have guaranteed access to all of the requested resources for the duration of your job. Inside the job script you usually arrange for processes to be launched on every core that was allocated for you. If you are using MPI you usually call something like mpirun or if you don’t use MPI you can launch a process on every core using srun in SLURM or pbsdsh in PBS. In this scenario there can often be a significant delay between submitting your job request to the scheduler and the launch of your job. Essentially, the scheduler has to wait until enough resources become available before running the job.

As far as I can tell, this is the approach that @juliohm is describing as the canonical scenario for many simulations.

The second approach views the HPC cluster as a large pool of cores from which you can dynamically allocate some for your Julia session. I would call this approach Dask-like workflow, since it is how the Python Dask library handles HPC clusters. It sends one submit (sbatch, qsub, …) command per Julia worker to the HPC scheduler (i.e. one job per Julia worker as opposed to one job per simulation). Libraries like Dask (or the Julia equivalent Dagger.jl) can start the computation before all requested cores become available since they can dynamically adapt to the addition or removal of nodes during the computation. This approach is also very useful in interactive julia sessions since you can quickly launch a few additional cores from the HPC cluster (particularly if you do not ask for too many resources, the HPC scheduler can launch your worker processes immediately). I would also like to note that in some embarrassingly parallel problems like Monte-Carlo simulations it is very easy to deal with Julia workers being dynamically added over time since no communication between the workers is required.

I think currently the functionality provided by ClusterManagers.jl is more tuned towards the latter scenario. Let me explain how I would imagine ClusterManagers.jl to work if it were optimised for merely one of the above use cases.

For the first, MPI-like scenario I don’t see a reason why it would be beneficial for the main julia instance to be running outside the cluster and performing the job submission itself. Instead, I would prefer to have julia started by the job script. Then I would expect a call to addprocs(ClusterManager,...) from within the job to create a subprocess for srun or pbsdsh to launch a julia worker for every allocated core. Thus in this scenario, I would never expect ClusterManagers.jl to perform calls to the likes of sbatch or qsub! I also want to note that this functionality is already partially provided by MPIClusterManagers.jl where the function addprocs essentially calls mpiexec julia --worker. Of course this only works if an MPI implementation is installed on the cluster. Indeed, this is what I am currently successfully using in a small in-house PBS cluster.

In the second, Dask-like, scenario it is actually expected that the main julia script is not running inside a job script. Rather, I imagine, that the main julia script through a call to addprocs(manager) submits jobs to the scheduler itself. In other words, it issues calls to sbatch or qsub , once for every desired worker. It then waits until all jobs have been successfully launched by the scheduler. If one does not desire to wait for all jobs to have successfully launched one would instead call @async addprocs(manager) and then dynamically incorporate the workers in the computation as they come online. I would expect there to be a good integration with Dagger.jl to simplify this.

At least for SLURM and PBS, the current ClusterManagers.jl code seems to behave as explained in the preceding paragraph. In my current HPC use case I prefer to use the MPI-like approach of requesting all resources ahead of time. For me MPIClusterManagers.jl works well enough in that case.

Therefore my conclusion would be to double down on the current „Dask-like“ behaviour of this library and explain this clearly in the documentation. An important use case of ClusterManagers.jl is the ability to dynamically add workers to the pool which complements Dagger.jl well. For users wishing to use their cluster in a way more similar to scenario 1, maybe MPIClusterManagers.jl is worth a look?

@kleinhenz
Copy link

kleinhenz commented Jan 22, 2021

@manuel-rhdt that's a really helpful summary of the two approaches. I agree that clearly delineating between these two approaches and deciding explicitly what workflow should be supported by which libraries would really improve the user experience. Right now I think the behavior of the second approach seems quite weird to people expecting the behavior of the first approach and vice versa.

I made SlurmClusterManager.jl to be more explicitly aligned with the first approach.

@juliohm
Copy link
Collaborator Author

juliohm commented Jul 23, 2023

Useful discussion. Thanks to all.

@juliohm juliohm closed this as completed Jul 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants