Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Best practice for handing off persisted collection partitions #988

Closed
rjzamora opened this issue Mar 19, 2024 · 5 comments
Closed

Best practice for handing off persisted collection partitions #988

rjzamora opened this issue Mar 19, 2024 · 5 comments

Comments

@rjzamora
Copy link
Member

While working on rapidsai/dask-cuda#1311, I noticed that a common practice used in down-stream libraries no longer works (cleanly) with the move to dask-expr.

The common practice:

  1. Persist a collection (df = df.persist())
  2. Find the worker-to-partition mapping for the persisted collection using mapping = client.who_has() and df.__dask_keys__()

The problem with dask-expr:

In dask-expr, calling df.persist() will change the "name" (and therefore the keys) of the collection. The name change is a result of both expression optimization, and the creation of a new FromGraph expression. Therefore, you cannot call df = df.persist(), and then search for the keys of df in the cluster.

The question: What is the new "best practice" for patterns like this?

For reference, here is something that seems to work for now:

    df = df.persist()
    try:
        # Only works for FromGraph-backed collection
        persisted_keys = df.keys
    except AttributeError:
        # Only works for a legacy collection
        persisted_keys = df.__dask_keys__()
@mrocklin
Copy link
Member

mrocklin commented Mar 19, 2024 via email

@rjzamora
Copy link
Member Author

Okay, thanks - I suppose this approach is backward compatible:

df = df.persist()
persisted_keys = [f.key for f in c.client.futures_of(df)]

@fjetter
Copy link
Member

fjetter commented Mar 20, 2024

Could you provide a little more context for what you're doing? This feels to me like an abstraction leak that bites us whenever we touch this API. I am touching this API with the scheduler integration again and this shortcoming could be fixed but it would be helpful to know a little about the application

@mrocklin
Copy link
Member

I recommend just using the dask.distributed.futures_of function. It's been around for a while and genearally how this probem gets solved.

@rjzamora
Copy link
Member Author

rjzamora commented Mar 20, 2024

This feels to me like an abstraction leak that bites us whenever we touch this API.

By "this" API, are you referring to futures_of or who_has? I'm happy to use whatever you all recommend moving forward.

it would be helpful to know a little about the application

I've seen this used in a few down-stream libraries. The specific application I am looking at right now is just a custom shuffling algorithm that I am very comfortable experimenting with. However, other down-stream libraries (e.g. cugraph, nemo) also use who_has to temporarily hand-off execution and communication to something other than dask. For example, cugraph will persist the collection, figure out where all the data is, and then execute a collective operation in C++/NCCL land. This is a very common pattern in rapids.

I recommend just using the dask.distributed.futures_of function. It's been around for a while and genearally how this probem gets solved.

Great. I'm not familiar with this API, but happy to use it and recommend it if it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants