Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runner #4710

Closed
wants to merge 2 commits into from
Closed

Add runner #4710

wants to merge 2 commits into from

Conversation

jacobtomlinson
Copy link
Member

@jacobtomlinson jacobtomlinson commented Apr 16, 2021

This PR adds a new Runner base class with a similar intention to Cluster in that it is for other projects to subclass and flesh out platform-specific implementation.

This work was inspired by dask-mpi which has a deployment paradigm unlike any of the existing cluster management tooling.

Dask MPI assumes that the user wants to submit a single Python script to a parallel batch scheduler, resulting in all processes executing this script. Dask MPI then handles deciding which process will be the scheduler, which will be the workers and which will continue executing the user's client code. Negotiation between processes is done via the MPI ranking and comm.

This kind of functionality would also be useful on similar systems which do not use MPI. The concept of running a single script and having all but one invocations bootstrap a Dask cluster, leaving one to complete the client work, is appealing.

The new Runner class takes the concepts from Dask MPI and attempts to mix them with the structure of the Cluster object in terms of asyncio support and the use of context managers.

Usage

The general usage for a runner should look like this.

from dask_foo import FooRunner

from dask.distributed import Client

with FooRunner() as runner:
    with Client(runner) as c:
        # Do client work

This script should be submitted many time in parallel using an appropriate execution engine.

When FooRunner is created each instance of the script will negotiate via some platform specific means to decide which role each process will take. All but the client will block here and run their components. Then when the client finishes and the runner context manager closes all components will be shut down via the Dask comm.

Reference implementation

This PR also includes a reference implementation of the runner which uses asyncio to concurrently execute four instances of the AsyncioRunner concurrently. These instances will negotiate via an asyncio.Lock to decide who is the scheduler, who continues with client code and who are workers.

Dask MPI

See dask/dask-mpi#69 for an implementation of this to replace the current code in dask-mpi.

@fjetter
Copy link
Member

fjetter commented Apr 16, 2021

I'm still a bit confused about the intention of Runner and the distinction to Cluster. Is this intended to be an interface for some cluster manager? Looks like this would be the deply analogue to the ServerNode (worker, scheduler, nanny all inherit from ServerNode)

Will a Cluster instance exist in these cases, and if so, do you have an example where both Cluster and Runner are used? Or are these two classes orthogonal?

I'm asking so many questions since I'm currently reviewing the interfaces around LocalCluster, SpecCluster, Cluster, Adaptive and AdaptiveCore. Looking at these classes, they still feel a bit fragile and I'm wondering if we shouldn't rather invest in a improved Cluster instead of introducing yet another interface.

@mrocklin
Copy link
Member

I have similar thoughts to @fjetter

I'm also curious about the motivations for this and why Cluster doesn't work in those situations. I haven't perceived any frustration around having MPICluster inherit from Cluster. I may not be well attuned there though.

@jacobtomlinson
Copy link
Member Author

jacobtomlinson commented Apr 19, 2021

I started out using Cluster as the base for Runner, but it doesn't fit the same paradigm so I removed it and just copied a couple of chunks of code over. I could imagine that both Cluster and Runner could inherit from some higher level base (similar to ServerNode), but wanted to start a discussion here before taking something like that on.

Typically all Cluster classes create clusters from within an existing parent process. The simplest example is an IPython process on my laptop. They either create subprocesses, establish SSH connections, start VMs or containers or something. They also tell each process what to be, scheduler or worker.

The paradigm in Dask MPI is different. There is no parent process. Instead the same script is submitted and executed many times in parallel via mpirun. These processes need to work out between them who does what. One is the scheduler, one may be a client, others are workers. The client process is effectively the parent process from the Cluster paradigm, except no processes need to be spawned, they already exist. Instead we need to discover where to find the already running scheduler.

I started writing an MPICluster class, but to me it felt like that class should be the one calling mpirun. And that doesn't make a huge amount of sense.

Azure ML example

An example of where this would be useful to me is Azure ML. That platform has an API which allows you to submit batch jobs which are run via MPI. Constructing the job definition means deciding how many processes should run, what datasets get mounted, what workspaces are available, what the runtime is, etc. But you have to provide a single script or command for all the processes to run.

In that case I expect we would want an AzureMLCluster cluster manager based on Cluster which handles all the setup and submission of the job. But we would also want an AzureMLRunner (or probably just MPIRunner from dask/dask-mpi#69) which would handle the setup of the processes.

Faster VM startup example

In many of the VM based cluster managers in dask-cloudprovider they use SpecCluster. This means that when the cluster manager is created one VM is launched with the scheduler process, then once that is up and running and we can connect to the scheduler we also launch the worker VMs in parallel. In some cases VMs can take many minutes to start, and by launching the scheduler first and waiting for it to be ready means we pay this penalty twice.

I would prefer to write a runner for each cloud platform which can use platform native methods to have all the processes negotiate about who is the scheduler.

In that case we would still need a cluster manager to create the processes, but we would leverage a runner to do the actual setup of the cluster from within those processes.


Homogenous cluster startup

This is a future thought and a bit of a tangent but I would love to see a homogenous command for starting Dask clusters in general which leverage some distributed locking/synchronization or leadership election method. Platform agnostic tools such as etcd, consul or zookeeper could be useful here.

The current paradigm of running a cluster means that the scheduler must be started first.

$ dask-scheduler
$ # wait for scheduler
$ dask-worker <scheduler IP>
$ dask-worker <scheduler IP>
$ dask-worker <scheduler IP>

Instead it would be prefereable to run :

$ dask-etcd <etcd address>
$ dask-etcd <etcd address>
$ dask-etcd <etcd address>
$ dask-etcd <etcd address>

Etcd would handle which process is the scheduler and which are the workers. This allows all processes to be started concurrently without worrying about race conditions.

Workers could proxy port 8786 to the scheduler process, which would allow a client to connect to any process in the cluster.

In the case of the scheduler process being lost a worker could switch roles and start a new scheduler. This would effectively be a cluster restart and all work would be lost, but it would allow reuse of resources in a scheduler failure state.

@fjetter
Copy link
Member

fjetter commented Apr 19, 2021

I'm still a bit struggling to understand the ultimate intention. From what I understand is that you want a class which is actually not managing the cluster but rather manages a process/node but what does it mean to manage this process/node?

What I mean with managing a cluster is particularly to start, stop and observe cluster node instances (scheduler, worker). That's about it in terms of functionality I see in a Cluster instance. Down the road, I could see some health checks implemented here as well (like a cluster nanny) to restart node instances if necessary but we're ultimately still at start, stop, observe.

From what I understand is that you do not want any of this since MPI (I'm only superficially aware at how MPI works) doesn't work this way. What is this Runner class supposed to be capable of other than starting/stopping a single Worker/Scheduler? Why is not an actual script sufficient?
There is still the need for some kind of controller process to schedule the MPI jobs. Why can't this controller process not be a cluster? The only difference I can really see is that for these kind of workflows, the lifetime of Cluster (i.e. the user control process) is smaller than the computation and it will therefore not take care of the shutdown nor any adaptive scaling.


If I take the homogeneous deployment as an example or long term goal, I would actually argue that we would not want to have a Cluster instance but rather an actual subclass of ServerNode (Nanny is also a ServerNode) and is responsible for leader election and starts the corresponding subprocesses, etc. I would imagine this class to look different to the runner you are suggesting. Even with this kind of node, there might still be viable use cases for a Cluster to control how many nodes/instances we have running. However, I think it's best to move the HA deployment to a different discussion, unless this is the ultimate goal of your proposal.

@mrocklin
Copy link
Member

I can certainly see the motivation for faster VM startup (we've run into this recently at Coiled). I've raised #4715 with some other thoughts and maybe an alternative for that one.

The dask-etcd thing sounds fun too. This sounds like an extension/generalization of the use-another-service-to-find-the-scheduler approach that would be useful for faster VM startup time.

In both cases I'm curious if there is a more focused abstraction we could add around "something somewhere else that allows Dask servers to coordinate safely". This seems more orthogonal to our current abstractions, and so might be easier to motivate. Thoughts?

@jacobtomlinson
Copy link
Member Author

@fjetter

What is this Runner class supposed to be capable of other than starting/stopping a single Worker/Scheduler? Why is not an actual script sufficient?

Today Dask MPI uses a method to handle this coordination. Users import and call this method at the top of this script. Only the client process continues beyond this point, all other processes start up the scheduler and workers.

I dislike that this is a bit magic. The scheduler address is updated in the dask.config object for example. So the user is expect to run client = Client() with no config. It's not very transparent what is actually happening without digging into the code.

Part of my motivation here was to move to a context manager for this, to give transparency to what is going on. I could just make that PR to the dask-mpi project, but I want to build similar utilities in dask-cloudprovider which do not use MPI. So it made sense to base class it here.

There is still the need for some kind of controller process to schedule the MPI jobs.

Maybe. But that should probably be implemented as a Cluster object. It's pretty typical for users to run mpirun to start their script, or batch systems may abstract this in various ways.

I feel like this is orthogonal to this discussion though.

The goal here is to coordinate Dask components from within an existing parallel job via a context manager.

However, I think it's best to move the HA deployment to a different discussion, unless this is the ultimate goal of your proposal.

Yeah I was thinking out loud here. Happy to discuss another time.

@mrocklin

... I'm curious if there is a more focused abstraction we could add around "something somewhere else that allows Dask servers to coordinate safely".

I agree that moving away from the scheduler/worker startup model would remove the need for this completely.

I tacked that on to this discussion as a bit of an afterthought, but perhaps that is a better route to go down than the dask-mpi model.

assert await c.submit(lambda x: x + 1, 10).result() == 11
assert await c.submit(lambda x: x + 1, 20).result() == 21

await asyncio.gather(*[run_code(commworld) for _ in range(4)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this spawns four async tasks (or equivalently four MPI jobs, four processes, etc.) of which one decides to become a scheduler, two decide to become a worker and one will become a client.
Scheduler and worker will effectively ignore the ctx mgr body and act as if they were ordinary server nodes. While the client will actually connect and execute the code. So, effectively this is an async local cluster with two workers in disguise.

This implementation will negotiate the "role" (which I was calling node type) via AsyncCommWorld which in this case is simply an async lock but in general this interface would need to implement some sort of leader election via a shared filesystem, a distribtued KV store (zookeeper, etcd,...). This inteface is currently discussed in #4715

Is this an accurate summary of what's going on?

Copy link
Member Author

@jacobtomlinson jacobtomlinson Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's it!

The key difference between this and a Cluster is that the roles are worked out after the processes (or coroutines in the reference implementation) have been created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key difference between this and a Cluster is that the roles are worked out after the processes

From an implementation perspective, the two may look alike but conceptually I feel these are very different concepts which is why I had a hard time following.

Conceptually I see the Cluster as the entity which is allowed, or even required, to talk to the hardware or resource manager (trying to avoid the "cluster" term here). It will talk to this hardware manager via some kind of API to spawn new ServerNode instances of type {Scheduler|Worker} somewhere else (different process, different VM, ...) while the Runner will spawn one ServerNode next to it, similar to what the Nanny does with the Worker.

Something like
image

Anyhow, I think that's a nice concept, it just took me while to understand and I think it is valuable to document this properly.
I'm wondering if we need some standardized interface for AsyncCommWorld or whether this is too backend specific

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyhow, I think that's a nice concept, it just took me while to understand and I think it is valuable to document this properly.

Great. I think this is useful, but may not be the final incarnation of it. I had a chat with @mrocklin yesterday about service discovery and leadership election which is related to this change.

I'm wondering if we need some standardized interface for AsyncCommWorld or whether this is too backend specific

I think that may be too specific. That class was more of a necessary evil because the lock had to live somewhere.

from ..worker import Worker


class Role(Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer a term like "type". In deploy environments, role reminds me of some IAM entity. Maybe that's just me :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ServerRole or ServerType would be better here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both would work for me. Dealer's choice

Comment on lines +91 to +92
async def before_scheduler_start(self) -> None:
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example what kind of functionality would be executed in these hooks? Adding more functions later on is usually simpler than removing them again later on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a look at the MPI PR some of the hooks are used there, but not all of them.

@mrocklin
Copy link
Member

Summarizing my viewpoint from my conversation with @jacobtomlinson I think that we might not want to combine the Scheduler/Worker/Client hybrid class with the get/set-scheduler-address coordinator. I think that those two abstractions might be better living apart.

commworld = AsyncCommWorld()

async def run_code(commworld):
with suppress(SystemExit):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems atypical in Dask tests. If we go with some sort of dask-runner or dask-server class I think that we should reconsider explicitly calling sys.exit(0) when that class finishes up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants