Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpecCluster rewrite #306

Closed
mrocklin opened this issue Aug 4, 2019 · 1 comment
Closed

SpecCluster rewrite #306

mrocklin opened this issue Aug 4, 2019 · 1 comment

Comments

@mrocklin
Copy link
Member

mrocklin commented Aug 4, 2019

Hi All,

I would like to experiment with an implementation of dask-jobqueue on top of SpecCluster. I think that we are ready for this now. If possible, I would like help, both to share work, and also to make sure other maintainers feel at ease with this design and want to develop it in the future. With that goal, I propose a plan here and hopefully other people can help to implement pieces of it.

A Worker Job Class

Rather than focus around a Cluster class, lets focus around a class for a single Job. This will allow us both to compose with SpecCluster, and to support more heterogeneity in the future. We need this class to have a specific API, which I outline below:

class Job:
    """
    An interface for Scheduler and Worker processes for use in SpecCluster

    This interface is responsible to submit a worker or scheduler process to a
    resource manager like Kubernetes, Yarn, or SLURM/PBS/SGE/...
    It should implement the methods below, like ``start`` and ``close``
    """
    async def start(self):
        """ Submit the process to the resource manager

        For workers this doesn't have to wait until the process actually starts,
        but can return once the resource manager has the request, and will work
        to make the job exist in the future

        For the scheduler we will expect the scheduler's ``.address`` attribute
        to be avaialble after this completes.
        """
        # this will call `qsub/sbatch`
        self.status = "running"

    async def close(self):
        """ Close the process

        This will be called by the Cluster object when we scale down a node,
        but only after we ask the Scheduler to close the worker gracefully.
        This method should kill the process a bit more forcefully and does not
        need to worry about shutting down gracefully
        """
        # this will call `qdel/...`
        self.status = "closed"

(I took these docstrings from the ProcessInterface class in https://github.com/dask/distributed/blob/master/distributed/deploy/spec.py)

So we would build a new generic class with a constructor that handled all of the keyword arguments, and generated a job script. We would probably just copy and paste the current code from the core JobQueueCluster, but remove out all of the cluster handling.

Specialize this class to particular job queuing systems

We would then need to make specific implementations of this for PBS, SLURM, SGE, and so on. My guess is that this involves copy-pasting from the existing implementations.

At this point something like the following should work:

async def f():
    async with Scheduler() as s:
        async with Client(s.address, asynchronous=True) as client:
            job = PBSJob(project=..., queue=..., ...)
            job = await job
            # verify that this job shows up in qstat (or similar)
            await client.wait_for_workers(1)
            await job.close()

import asyncio
asyncio.get_event_loop().run_until_complete(f())

Build a function to construct a SpecCluster

This should be relatively straightforward. The SSH equivalent is here as a model:

https://github.com/dask/distributed/blob/b68660cb873e8efb9d9b47a92e39ab78f2fd7573/distributed/deploy/ssh2.py#L158-L210

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant