Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tasks from tasks] Create collections with number of chunks not known at graph definition time #6204

Open
crusaderky opened this issue Apr 26, 2022 · 1 comment
Labels
discussion Discussing a topic with no specific actions yet feature Something is missing

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 26, 2022

A common issue is that a user does not know in advance how many chunks a collection needs to have. The typical workaround to this is to generate as many chunks as it could possibly have and then leave some (most) of them either empty or oversplit.

Same problem is for

  • da.Arrays with an unknown number of dimensions
  • df.DataFrame with unknown columns

An elegant solution, which would require #5671 as a prerequisite, is the following:

Proposed core design

Enhance Client.publish_dataset to accept nameless collection(s). If you do, the function returns a single future and the collection is not listed by Client.list_datasets.
Calling result() on the future returns the persisted collection or tuple of collections.
When the future is descoped, the published collection is automatically forgotten.

Sample usage

def dynamic_ones(shape, chunks):
    a = da.ones(shape, chunks)
    return get_client().publish_dataset(a)

c = Client()
a = c.submit(
    dynamic_ones,
    c.submit(dynamic_shape), 
    c.submit(dynamic_chunks)
).result()  # Return persisted collection

Proposed extension: compatibility with pure dask

In dask/dask, add dask.publish(*collections) and the shorthand collection.publish().
If scheduler==distributed, call Client.publish_dataset under the hood and return a delayed wrapping its output distributed.Future.
If scheduler in ("threads", "synchronous" "processes"), call persist() and then return a single dummy delayed object to the persisted collection(s).

Proposed extension: rechunk() nan chunks

Array.rechunk can't be applied to an array where one or more chunks have size nan, since it would create an unknown amount of output chunks. To fix this, it could gain an optional bool parameter delay:

@overload
def rechunk(self: Array, new_chunks: int | tuple, delay: Literal[False] = False) -> Array:
    ...

@overload
def rechunk(self: Array, new_chunks: int | tuple, delay: Literal[True]) -> Delayed[Array]:
    ...

def rechunk(self, new_chunks, delay=False):
    if not delay:
        return ... # current implementation
    if not any(math.isnan(j) for i in self.chunks for j in i):
        return delayed(self.rechunk)(new_chunks)

    @delayed
    def dshape(chunk: np.ndarray) -> tuple[int, ...]:
        return chunk.shape

    @delayed
    def drechunk(arr: Array, chunk_shapes: list[tuple[int, ...]], new_chunks: int | tuple) -> Delayed[Array]:
        # Replace nan chunks with known chunks
        arr = Array(arr.dask, arr.name, to_chunks(chunk_shapes, arr), meta=arr)
        arr = arr.rechunk(new_chunks)
        return arr.publish()

    persisted = self.persist()
    return drechunk(
        persisted.publish(),
        [dshape(c) for c in flatten(persisted.to_delayed())],
        new_chunks,
    )

Similar treatment could be done for all functions in dask.array and dask.dataframe that currently don't work with nan chunks.

@crusaderky crusaderky added discussion Discussing a topic with no specific actions yet feature Something is missing labels Apr 26, 2022
@crusaderky crusaderky changed the title New feature: Create collections with number of chunks not known at graph definition time [Tasks from tasks] Create collections with number of chunks not known at graph definition time Apr 26, 2022
@crusaderky crusaderky changed the title [Tasks from tasks] Create collections with number of chunks not known at graph definition time [Tasks from tasks] Create collections with shape and/or number of chunks not known at graph definition time Apr 26, 2022
@crusaderky crusaderky changed the title [Tasks from tasks] Create collections with shape and/or number of chunks not known at graph definition time [Tasks from tasks] Create collections with number of chunks not known at graph definition time Apr 26, 2022
@crusaderky crusaderky changed the title [Tasks from tasks] Create collections with number of chunks not known at graph definition time [Tasks from tasks] Create collections with dimensionality and/or number of chunks not known at graph definition time Apr 26, 2022
@crusaderky crusaderky changed the title [Tasks from tasks] Create collections with dimensionality and/or number of chunks not known at graph definition time [Tasks from tasks] Create collections with number of chunks not known at graph definition time Apr 26, 2022
@crusaderky
Copy link
Collaborator Author

Coming back to this, I think the high-level wrapper in the example, rechunk, is quite a lazy design.
A much better approach would be to write a DelayedArray class, which replicates the API of Array and retains all possible metadata.

Same for dask.dataframe.repartition(partition_size=...): it should return a DelayedDataframe that knows (and pretty-prints) the column header, but doesn't know about the number of partitions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet feature Something is missing
Projects
None yet
Development

No branches or pull requests

1 participant