Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update examples.rst #211

Merged
merged 4 commits into from
Jan 3, 2019
Merged

Update examples.rst #211

merged 4 commits into from
Jan 3, 2019

Conversation

leej3
Copy link
Contributor

@leej3 leej3 commented Dec 17, 2018

Add an example passing arguments to workers using the extra keyword

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just fix the spacing and this could go in, thanks!


cluster = SLURMCluster(
queue='norm',
memory = '8g',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben careful on spaces around equal sign.

@guillaumeeb
Copy link
Member

Test failure seems unrelated, something must be broken with PBS.

processes=1,
cores=8,
extra=['--resources foo=2'],
job_extra=["--time=03:00:00 "],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the space after the time needed?

@lesteve
Copy link
Member

lesteve commented Dec 18, 2018

Would it be possible to show in the example what resources can be useful through some code, or maybe just mention it in the text? At the moment it looks quite abstract ...

Verified

This commit was signed with the committer’s verified signature.
jcubic Jakub T. Jankiewicz
@leej3
Copy link
Contributor Author

leej3 commented Dec 18, 2018

@guillaumeeb you are correct, apologies.

@lesteve, what do you think of the updated version. I did want to emphasize that the resources are indeed abstract, but I agree that an example makes it a lot clearer as to why someone might wish to do this. I also wanted it to be clear what the extra keyword was doing. Is this clearer now?

@guillaumeeb
Copy link
Member

@lesteve glad to see you chimming in! It's really helpful to have another look at all this.

@leej3 could you maybe add a line of code on how you submit jobs/tasks using the resources?

@leej3
Copy link
Contributor Author

leej3 commented Dec 18, 2018

@guillaumeeb how about this? I could make it a minimal example by setting it up with something like:

def load(fn):
    pass
def process(data):  # needs some foo
    import time
    time.sleep(5)
def reduce(*data):  # foo intensive
    return "Processing finished..."

from dask import delayed
raw = [delayed(load)(i) for i in range(10)]
processed = [delayed(process)(r) for r in raw]
reduced = delayed(reduce)(*processed)

That would be useful to people. Perhaps too much to digest though...

@willirath
Copy link
Collaborator

I might be wrong about this, but I don't see the point of having resources specified when dask-jobqueue does not yet support heterogeneous clusters.

Having said that, I'd be very interested in joining any effort towards implementing / documenting heterogeneous clusters.

@guillaumeeb
Copy link
Member

@willirath, this is more or less needed right know to nicely limit the number of tasks on a given node while still reserving all the computing resources, see #181 (comment).

Having said that, I'd be very interested in joining any effort towards implementing / documenting heterogeneous clusters.

That would be really cool!

I don't know if something is feasible by hand right now, maybe it is if we modify JobQueueCluster object between scale calls. But the real idea is to properly implement that in ClusterManager. See dask/distributed#2118 and dask/distributed#2208 (comment) if you've not already.

@guillaumeeb
Copy link
Member

@leej3 your example feels hard to understand. Maybe we can start eaysier? I'm not sure.

@leej3
Copy link
Contributor Author

leej3 commented Dec 18, 2018

Is something like this more along the lines of what you mean?


processed = [delayed(process)(i) for i in range(10)]
futures = client.compute(processed,
resources={tuple(processed): {'foo': 1}})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use {tuple(processed): {'foo': 1}}, and not directly {'foo': 1}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. This may be just due to my own ignorance so correct me if so. I was under the impression that since resources was passed as a dictionary they need to be a hashable type. As a list, the command fails.

Or you referring to the fact that in general the entire task graph could be executed with a single resource specification. This would then work for the example of constraining resources for the full task graph but not generalize beyond that (where one specifies different constraints at different steps of the pipeline).

I found few examples of specification of resource constraints (at the levels of futures), which I felt was not something that was easy to intuit. Perhaps this is not the place to demonstrate it though. Happy to simplify it further.

@willirath
Copy link
Collaborator

willirath commented Dec 18, 2018

@willirath, this is more or less needed right know to nicely limit the number of tasks on a given node while still reserving all the computing resources, see #181 (comment).

My suggestion would be not to try and come up with an abstract example but present one or two from the real world. So what about:

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
                       processes=1,
                       cores=8,
                       extra=['--resources "ssdGB=200 GPU=2"'])

cluster.start_workers(2)
client = Client(cluster)


def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})

(I did not test this!)

@willirath
Copy link
Collaborator

I don't know if something is feasible by hand right now, maybe it is if we modify JobQueueCluster object between scale calls. But the real idea is to properly implement that in ClusterManager. See dask/distributed#2118 and dask/distributed#2208 (comment) if you've not already.

Thanks for these pointers! I'll definitely read into this.

@leej3
Copy link
Contributor Author

leej3 commented Dec 18, 2018

Sounds good. I tested it to confirm it works.

Copy link
Member

@guillaumeeb guillaumeeb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the part on "foo" resource.

argument is for the specification of abstract resources, described `here
<http://distributed.dask.org/en/latest/resources.html>`_. This might be to
specify special hardware availibility that the scheduler is not aware of, for
example GPUs. Below, an arbitrary resource "foo" is specified. Notice that the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not an arbitrary resource anymore!

@leej3
Copy link
Contributor Author

leej3 commented Jan 3, 2019

apologies. Missed that. Fix now. Thanks

@guillaumeeb
Copy link
Member

Thanks @leej3! Merging.

@guillaumeeb guillaumeeb merged commit fa62c0e into dask:master Jan 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants