Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run local code remotely on a worker #4003

Open
mrocklin opened this issue Jul 31, 2020 · 35 comments
Open

Run local code remotely on a worker #4003

mrocklin opened this issue Jul 31, 2020 · 35 comments

Comments

@mrocklin
Copy link
Member

I find myself often wanting to run code on a worker, rather than on my local client. This happens in a few settings:

  1. My workers have access to a data store that I don't, so I need to call something like dd.read_parquet remotely (cc @martindurant @jcrist )
  2. My workers are far away from my client, so client-heavy operations like joblib or Hyperband incur a serious bottleneck from client-scheduler communication (cc @stsievert )
  3. My workers have hardware or libraries like GPUs/RAPIDS that I don't have locally (cc @quasiben @kkraus14)

Today I can do this by writing a function and submitting that function as a task

def f():
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    return df.sum().compute()

result = client.submit(f).result()

It might make sense to provide syntax around this to make it more magical (or it might not). We might do something like the following:

with dask.distributed.remote as result:
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    result = df.sum().compute()

I know that @eriknw has done magic like this in the past. We could enlist this help. However, we may not want to do this due to the magical and novel behavior.

@martindurant
Copy link
Member

"explicit is better than implicit". :)

This feels something like working in a remote ipython kernel, which I suppose does still work (although I haven't seen anyone do it for quite some time).

@stsievert
Copy link
Member

What advantages would the additional syntax provide over submitting a function? Could it supply any debugging utility?

@eriknw
Copy link
Member

eriknw commented Aug 10, 2020

I took the bait and may have gotten carried away. This probably isn't quite what you're looking for, but it's... something ;)

https://github.com/eriknw/innerscope

@drorspei
Copy link
Contributor

drorspei commented Aug 10, 2020

Sounds a lot like ipyparallel, specifically the sync imports context manager and the px magic

@quasiben
Copy link
Member

What advantages would the additional syntax provide over submitting a function ?

The advantages here would be ergonomic. In the past I've done some work where I am submitting work to a remote cluster where GPUs are available and the egdge/login node has no GPU. Having some syntactic sugar here would be helpful

@mrocklin
Copy link
Member Author

mrocklin commented Jul 4, 2021

I took the bait and may have gotten carried away. This probably isn't quite what you're looking for, but it's... something ;)

@eriknw It certainly shows that magic is possible. I'm curious, do you have any thoughts on how this might be used to provide a context-manager like experience? My objective here is to get away from constructing and then calling functions, and towards remote execution.

The closest thing we have today in common usage are IPython magics (indeed, I think that IPyParallel had remote execution cell magics that were awesome). I'd love to find some way to achieve this, but in a way that didn't require an IPython kernel.

@eriknw
Copy link
Member

eriknw commented Jul 5, 2021

As always, the big challenge with context managers is that they don't provide a separate block of the code frame (edit: it is a separate block within the frame, but not a separate frame) that runs within the context.

Nevertheless, here are some (probably totally doable) syntax ideas that use context managers:

with get_result as rv, remotely:
    ...
    rv = ...
# rv is a Future

# specify the number or names of results we want to keep
with get_results(2) as (x, y), remotely:
    ...
    x = ...
    y = ...
# or
with get_results('x', 'y') as (x, y), remotely:
    ...
    x = ...
    y = ...
# x, y are now futures

The key here is context manager chaining. remotely will raise an exception that is caught by the first context manager, and then we can do magic.

We can come up with variations of this spelling, such as:

with running_remotely as futures, get_results('x', 'y'):
    ...
# futures.x and futures.y are now Futures

The magic is easier if we actually put the code we want to run inside of a function body, such as:

@with_running_remotely
def rv():
    ...
    rv = ...
# rv (obtained from the name of the function) is a Future


@with_running_remotely('x', 'y')
def futures():
    ...
    x = ...
    y = ...
# futures.x and futures.y are now Future objects

Maybe something along one of these lines would provide for a convenient user experience. Thoughts? Reactions? I'm open to suggestions for how to improve any particular spelling to make it clearer.

@eriknw
Copy link
Member

eriknw commented Jul 5, 2021

I've started to whip this up. I don't think it'll be too hard or take too long, but I have other things I'm doing today, so please be patient :)

Here's the spelling I'm targeting:

from somewhere import runcode

with runcode() as result, remotely:
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    result = df.sum().compute()

Here, we get the name to retrieve, result, from the target of the as statement. I kind of like , remotely:, which makes the intent explicit (even if it's rather nonstandard usage of syntax).

We can also be more specific and ask to get multiple results:

x = 1

with runcode('y', 'result') as (y, result), remotely:
    y = 2
    result = x + y

# Or
with runcode('y', 'result') as results, remotely:
    y = 2
    result = x + y

Questions

What do you want to do about variables such as x above that are used within the context, but defined outside it?

We have no constraints. We can send them to the remote worker, do the right thing with dask futures, or raise. Or let the user specify the desired behavior.

Do you want the results to be dask futures or the final result (e.g., future.result())?

Futures seem more natural to me most of the time. Perhaps we could find suitable names if both behaviors are wanted.

More variations and brainstorming

As a function decorator, use the return value:

@runcode.remotely
def result():
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    return df.sum().compute()

Specify where to run programmatically:

if run_locally:
    where = somewhere.locally
else:
    where = somewhere.remotely

with runcode() as result, where:
    ....

Also support locally, which may be useful for testing and debugging the magic:

with runcode() as result, locally:
    ...

@runcode.locally
def result():
    ...

Recall remotely and locally don't need to be defined when used as context managers, and we can distinguish them by name.

The common misspelling (missing remotely) can run the code as normal and emit a warning, or raise before running code :

with runcode() as result:
    ...

If you only want to run the code and don't need the result, simply do:

with runcode(), remotely:
    ...

I'm not sold on the name runcode. Suggestions?

Also, this will be pretty magical and experimental. It would probably be best for it to live outside dask and distributed for now.

If anybody hates (or loves) this spelling (and I'm sure some people will), feel free to chime in even if you don't think you can do so constructively 😎

@eriknw
Copy link
Member

eriknw commented Jul 6, 2021

Started: https://github.com/eriknw/afar

Oh, god in heaven, what have I done?

@mrocklin
Copy link
Member Author

mrocklin commented Jul 6, 2021

Sorry for the slow response. I'm on vacation this week and checking github infrequently. (my apologies for restarting this conversation and then ghosting by the way).

I'm excited by this. I have a couple of questions:

  1. Do we need the locally/remotely addition, or is it possible to do this with just a single context manager?

  2. It seems like there is an open question about how to ask for a result. In afar it looks like you're returning all named variables. This seems potentially challenging because there might be many intermediate variables

    Personally, I would probably use either

    1. the expression on the last line
    2. a standard name, like result
    3. Optionally with a list of names of variables to collect that defaults to result, but could be overridden, like def runcode(output=["result"])

Qustions from Erik

What do you want to do about variables such as x above that are used within the context, but defined outside it?

We have no constraints. We can send them to the remote worker, do the right thing with dask futures, or raise. Or let the user specify the desired behavior.

I think that it's likely that people will want to reach outside of local scope for local variables. Sending state somehow, either as pickled state or as Dask futures seems fine to me.

Do you want the results to be dask futures or the final result (e.g., future.result())?
Futures seem more natural to me most of the time. Perhaps we could find suitable names if both behaviors are wanted.

So, this is interesting I think and gets to a larger question of how we handle repeated computations

with runcode() as df:
    df = dd.read_parquet(...)
    result = df
with runcode() as result:
    result = df.groupby(...).mean().compute()

In this case returning the result as a Dask future seems best. This saves us from having to pull back a result that we may not be able to deal with locally (because, for example, we don't have cudf installed or an appropriate GPU device locally). How to pass handles between remote blocks is an interesting question though.

@eriknw
Copy link
Member

eriknw commented Jul 6, 2021

Thanks for the reply and interest!

  1. Do we need the locally/remotely addition, or is it possible to do this with just a single context manager?

Yes/maybe. We currently need , remote: given the magic I use. A single context manager's __enter__ can't both return a value (to be used by as result) and raise an exception to interrupt execution. There is likely other (darker?) magic that we could use to circumvent this limitation. For now, adding , remotely: is easiest and lets us continue exploring.

  1. It seems like there is an open question about how to ask for a result. In afar it looks like you're returning all named variables. This seems potentially challenging because there might be many intermediate variables

I agree: returning all named variables is not the end target. That was the easiest thing to do that let me demonstrate functionality and upload a package to PyPI that isn't name-squatting.

I don't have a strong opinion on how to specify which result to get, or whether the result in as result should be a single future or collection of futures. I can try your suggestions. I'll let you know when I have something working with dask that you can play around with.

@eriknw
Copy link
Member

eriknw commented Jul 7, 2021

Actually, as results is completely unnecessary. We can do the following:

with afar.run, remotely:
    x = 1

with afar.run, remotely:
    y = 2
    z = x + y

result = z.result()

I like your idea of using the final assignment as the only value to "return" by default.

Note that x and y are dask futures when used in the main scope, and x.result() is needed to pull the result locally. However, note how the value of x is used in the second context. It's as if we did client.submit(func, x), which is indeed what we'll do.

We can still support with afar.run("x", "y") as results, but even here as results is unnecessary.

Given your initial example, we could execute the code on a worker, and copy the result locally. Is this syntax clear for that behavior?

with afar.run, locally:
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    result = df.sum().compute()

Because we used locally, result is a value, not a dask Future. Perhaps locally is not the right word to use here, because the code was still executed on a dask worker, it's only the result we copied locally.

We can also put arguments for client.submit in remotely (my preference) or run, such as:

from afar import remotely
on_gpus = remotely(resources={"GPU": 1})

with afar.run, on_gpus:
    ...

with afar.run(resources={GPU": 1}), remotely:
    ...

I'm feeling a little better about this. There may be something useful (or at least convenient) here.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 7, 2021

This looks really slick to me:

with afar.run, remotely:
    x = 1

with afar.run, remotely:
    y = 2
    z = x + y

result = z.result()

We can also put arguments for client.submit in remotely (my preference) or run, such as:

I agree that having a place to support constraints would be good. I don't currently have thoughts on where is best.

@mrocklin
Copy link
Member Author

This code is highly experimental and magical!

I like the warning by the way :)

It's also impressive that it's 99 lines of code (although I suppose that innerscope handles a bit)

@eriknw
Copy link
Member

eriknw commented Jul 16, 2021

oh, the trouble you get me into with your encouragement ;)

I have this minimally working. You can pip install afar to try it out. See my simple (and pretty much only) test here.

I'm sure there are sharp corners and severe/weird limitations with afar. Please report any that are encountered.

It would be super-duper handy if we could easily and reliably modify frame.f_locals. This may be coming as soon as Python 3.11. See PEP 558 and the PR. afar probably isn't what they have in mind for this change :)

@mrocklin
Copy link
Member Author

Question: does the remote side need to have afar installed?

@mrocklin
Copy link
Member Author

I was getting an error to that effect, however I suspect that it may be caused by trying to pull in and serialize local state

@eriknw
Copy link
Member

eriknw commented Jul 16, 2021

It shouldn't need to, but, well, it may need to right now. innerscope is currently required locally and remotely. If afar is worth pursuing, I could make it (and test it) such that neither are required remotely.

Thanks for giving afar a try!

@eriknw
Copy link
Member

eriknw commented Jul 16, 2021

Oh wait, yeah, afar is required remotely right now, because we need to run this function on the worker.

@mrocklin
Copy link
Member Author

OK. Good to know.

Also, big +1 on the name.

@mrocklin
Copy link
Member Author

Thanks for giving afar a try!

It's fun! And it's scratching a long-standing itch of mine.

oh, the trouble you get me into with your encouragement ;)

You are a particularly easy mark it turns out :)

@eriknw
Copy link
Member

eriknw commented Jul 17, 2021

:-P

I'm also fond of the name. from afar import ... sounds almost poetic to me.

Anyway, I added two more features to 0.1.1:

  • afar.get automatically gathers the data locally. No need for .result()
  • remotely(**kwargs) passes keyword arguments to client.submit

Your initial example can now be:

import afar

on_gpus = afar.remotely(resources={"GPU": 1})

from afar.get, on_gpus:
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    result = df.sum().compute()

# Now use `result` directly.  No need for `result.result()`!

I don't know if afar.get is the write word to use, but we'll see. It's short.

I think my itch is satisfied. I'll probably broadcast this out a bit more, but I don't plan to work much more on it. Unless, you know, I'm encouraged to ;)

@mrocklin
Copy link
Member Author

mrocklin commented Jul 17, 2021 via email

@mrocklin
Copy link
Member Author

mrocklin commented Jul 17, 2021 via email

@mrocklin
Copy link
Member Author

I recommend using different keynames, maybe afar-run and afar-get?

Screen Shot 2021-07-16 at 7 18 21 PM

@eriknw
Copy link
Member

eriknw commented Jul 17, 2021

Will do! Give me a few minutes.

@eriknw
Copy link
Member

eriknw commented Jul 17, 2021

Done. I also set up CI, which took a bit longer than expected (b/c had to fix some things). But, tests now pass for Python 3.7, 3.8, 3.9, and PyPy--hooray! afar 0.1.2 released.

@eriknw
Copy link
Member

eriknw commented Jul 17, 2021

0.1.3 released.

Since we can't always update the locals of a frame (and I'm not sure I want to write the hack to do so), it may be more convenient at times to use our own mapping. For example:

def some_func():
    ...
    run = afar.run(data={"a": 1})
    with run, remotely:
        b = a + 1
    # b doesn't exist locally, because we can't update `frame.f_locals`
    assert run.data["b"].result() == 2

    # continue using the data...
    with run, remotely:
        c = a + b

Note that the singleton afar.run doesn't keep data around, because I'm paranoid.

And for warm-fuzzies, I now test on Linux, OS X, and Windows, and with pip and conda (I was seeing differences between these two).

@mrocklin
Copy link
Member Author

Another suggestion, if the last statement isn't an assignment, but just an expression like the following:

with afar.run, remotely:
    df

Maybe we should call repr or repr_html (context dependently) and print the result out. I'm actually not sure how systems like IPython and Jupyter handle context managers when repr'ing. Maybe this would be atypical.

@eriknw
Copy link
Member

eriknw commented Jul 17, 2021

Yeah, that's pretty atypical, but so is afar: eriknw/afar#2

This shows I can return the final expression and display it (but it's not 100% reliable yet). What I don't know is how to compute the appropriate repr remotely and copy that instead of the original object.

Would you expect anything else do be done with the final statement? Should this be made available to the user somehow, or is the repr enough?

@eriknw
Copy link
Member

eriknw commented Jul 18, 2021

Also, I'd wait a bit before broadcasting this out. I'm testing and fixing bugs.

@mrocklin
Copy link
Member Author

Yup. No problem. I'm iterating with a GPU workload on Coiled and it's interesting getting both smooth at the same time.

FWIW this is already a very useful tool for me. I'm finding that GPUs-on-the-cloud feel closer at hand already.

@eriknw
Copy link
Member

eriknw commented Jul 20, 2021

Great to hear!

afar 0.2.0 released. It's more reliable and trustworthy for me. Hopefully it is for you too. Please let me know if it's not.

See release notes here: https://github.com/eriknw/afar/releases

Notably, you can now look at afar.run.context_body to see the source lines that it uses.

Also, added with afar.run, later:, which doesn't execute the context block.

Note that the main limitation of afar is that it requires the source lines to be available, which isn't always the case. This makes things so much easier. We could try to manipulate bytecode instead, but doing so is very dirty and likely to be a headache for each new Python release. As it is, the current approach actually seems pretty reliable. It does peek at bytecode a little bit, so afar probably only works with CPython and PyPy. I am continually amazed by PyPy.

@eriknw
Copy link
Member

eriknw commented Aug 30, 2021

Update: afar 0.5 now supports IPython magics!

%load_ext afar
%%afar
import dask_cudf
df = dask_cudf.read_parquet("s3://...")
result = df.sum().compute()

instead of the original

def f():
    import dask_cudf
    df = dask_cudf.read_parquet("s3://...")
    return df.sum().compute()

result = client.submit(f).result()

More examples:

%%afar x, y  # save both x and y as Dask Futures
x = 1
y = x + 1
z = %afar x + y

or

%afar z = x + y

I think this is starting to get pretty nice.

@jrbourbeau
Copy link
Member

Ah great, I look forward to taking them for a spin. Thanks @eriknw!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants