Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comparison to Xarray-Beam #117

Open
shoyer opened this issue Aug 31, 2022 · 10 comments
Open

Comparison to Xarray-Beam #117

shoyer opened this issue Aug 31, 2022 · 10 comments

Comments

@shoyer
Copy link

shoyer commented Aug 31, 2022

I would be curious how the cubed approach compares in performance to my Xarray-Beam library, beyond the superficial differences (NumPy vs Xarray data): https://github.com/google/xarray-beam

One issue that comes to mind with storing all data in Zarr is the regular chunk-size limitation. For example, can you efficiently rechunk between arrays with relatively prime chunk sizes (e.g., from 1000 to 999)? I think doing this efficiently requires irregular chunk sizes, or you end up rechunking everything to size 1, which can be super slow.

@tomwhite
Copy link
Member

tomwhite commented Sep 1, 2022

Hi @shoyer, thanks for the great questions!

I haven't done any performance comparisons, but a lot of effort has been made to make Cubed scale horizontally, with conservative modelling/prediction of memory usage. So the emphasis so far has been on reliability over raw speed, with the goal that a user can leave a job running and have high confidence that it will complete successfully. (Hence the inclusion of runtime features like retries, and backup tasks for straggler mitigation, #14.)

The two main runtimes I have been working with, Modal and Lithops, are "pure serverless" and don't have a shuffle - and I think that is a use case Cubed should support.

But it may be interesting to explore a part of the solution space where intermediate data is passed through a shuffle (like Xarray-Beam does). Early on I did some experiments to get Cubed running on Cloud Dataflow (and the unit tests are run on Apache Beam), but there is a lot more to do to translate a Cubed DAG to a Beam DAG that takes advantage of the worker resources efficiently. I think Cloud Dataflow would be a great runtime target for Cubed, even if the shuffle isn't used (or is available as an option).

One issue that comes to mind with storing all data in Zarr is the regular chunk-size limitation. For example, can you efficiently rechunk between arrays with relatively prime chunk sizes (e.g., from 1000 to 999)? I think doing this efficiently requires irregular chunk sizes, or you end up rechunking everything to size 1, which can be super slow.

Cubed delegates rechunking to rechunker's algorithm, and I think the pertinent point here is that the target Zarr file is copied from the source (or intermediate) according to the target's chunk sizes, so when reading from the source it may cross (the source's) chunk boundaries.

In the relatively prime case a chunk of size 999 is read from the source (chunksize 1000) and written to the target (chunksize 999) - and some source reads will cross chunk boundaries. (Typically more than one target chunk is processed in one go, for efficiency, but the effect is the same.) Zarr is capable of reading parts of an array that cross chunk boundaries - so I'm not sure this is a problem. I don't see where it needs to rechunk to size 1 - though maybe I'm missing something!

BTW While looking at this question I stumbled across your rechunker proposal for multi-stage rechunking (pangeo-data/rechunker#89). That looks like a very useful addition - perhaps we should revive it?

Finally, I should add that the Zarr's regular chunk-size limitation does have implications for implementing some array API operations, such as boolean indexing, see #73. So we'll need a solution anyway eventually.

@shoyer
Copy link
Author

shoyer commented Sep 1, 2022

Cubed delegates rechunking to rechunker's algorithm, and I think the pertinent point here is that the target Zarr file is copied from the source (or intermediate) according to the target's chunk sizes, so when reading from the source it may cross (the source's) chunk boundaries.

Ah, good point. In that case you should be OK.

BTW While looking at this question I stumbled across your rechunker proposal for multi-stage rechunking (pangeo-data/rechunker#89). That looks like a very useful addition - perhaps we should revive it?

Yes, would love to do this! That said, my attempts at formulating the multi-stage algorithm all needed irregular chunking.

@rabernat
Copy link

rabernat commented Sep 2, 2022

Love to see this conversation happening.

In my latest work on Pangeo Forge, I have implemented a new single-stage rechunking algorithm in Beam that does not rely on any intermediate storage.

https://github.com/rabernat/pangeo-forge-recipes/blob/c59f3520f2882ad1f08d65407270f42565e88c17/pangeo_forge_recipes/transforms.py#L246-L257

It takes advantage of beam's ability to emit multiple elements from each task together with groupby.

https://github.com/rabernat/pangeo-forge-recipes/blob/rechunking/pangeo_forge_recipes/rechunking.py

It is tested and seems to work, but have not tried it at scale.

@shoyer
Copy link
Author

shoyer commented Sep 2, 2022

In my latest work on Pangeo Forge, I have implemented a new single-stage rechunking algorithm in Beam that does not rely on any intermediate storage.

This looks almost exactly like the RechunkStage transform I wrote for Xarray-Beam:
https://github.com/google/xarray-beam/blob/0.3.1/xarray_beam/_src/rechunk.py#L458

I tested it at moderately large scale (ERA5 surface variables, ~1.5 TB each) and it works well, though multi-stage chunking makes it 3x faster/cheaper (see pangeo-data/rechunker#89 (comment) for details). At some point, I'll probably test this on the ERA5 model-level variables, which are ~200 TB each.

@rabernat
Copy link

rabernat commented Sep 2, 2022

Yes I realize I am rewriting lots of xarray-beam with in Pangeo Forge 😝. I am justifying this as:

  • a learning experience for me
  • a more incremental refactor of Pangeo Forge

The long term idea is to align as much functionality as possible and then start to deprecate overlapping features.

Tom, sorry to hijack your issue tracker to discuss other projects! 🙃

@tomwhite
Copy link
Member

tomwhite commented Sep 2, 2022

Tom, sorry to hijack your issue tracker to discuss other projects! 🙃

That's fine! This is all very interesting - I feel it should be possible to combine efforts somehow...

@tomwhite
Copy link
Member

tomwhite commented Sep 5, 2022

Thinking about this more, it should be possible for Cubed to delegate to Xarray-Beam for its two "primitive ops" (https://github.com/tomwhite/cubed#design): blockwise and rechunk. Cubed implements the whole of the array API using these two ops, so you can thing of Cubed as focusing on that side of things, and Xarray-Beam on providing a massively scalable implementation of these primitives.

Of course, Xarray-Beam doesn't provide a blockwise operation (yet), but I have started prototyping what it might look like in Cubed, and it seems promising.

This works because a Cubed array is basically just a "chunked array", and so is Xarray-Beam's core data model. The idea here is to just use a single variable in Xarray-Beam to represent the array. I think that the Xarray interface for users would be layered on top of Cubed, but that needs more thought and can come later (see pydata/xarray#6807).

BTW the memory management that flows from Cubed is very relevant too, since it provides bounds on each operation, which means that we know that the computation it passes to Beam will fit in worker memory.

Thoughts?

@tomwhite
Copy link
Member

tomwhite commented Sep 9, 2022

it should be possible for Cubed to delegate to Xarray-Beam for its two "primitive ops" (https://github.com/tomwhite/cubed#design): blockwise and rechunk

I've created a prototype that does this here: https://github.com/tomwhite/cubed/tree/xarray-beam

The basic idea for blockwise is to use Dask's make_blockwise_graph and translate between chunk index space and Xarray-Beam's key space (offsets).

Beam's CoGroupByKey is used to bring together input dataset chunks keyed by the output. This is followed by a DoFn that applies the blockwise function (add, transpose, etc) to the grouped input chunks, and produces an output key and dataset chunk.

This works inasmuch as it passes a large proportion of unit tests. However, when run on Dataflow with 20GB inputs (each chunk being 200MB) I'm getting out of memory errors in the CoGroupByKey. I haven't worked out what the problem is, since even taking into account there being several copies of the chunk, the DoFn should not use more than about 1GB of memory.

@shoyer
Copy link
Author

shoyer commented Sep 9, 2022

it should be possible for Cubed to delegate to Xarray-Beam for its two "primitive ops" (https://github.com/tomwhite/cubed#design): blockwise and rechunk

I've created a prototype that does this here: https://github.com/tomwhite/cubed/tree/xarray-beam

The basic idea for blockwise is to use Dask's make_blockwise_graph and translate between chunk index space and Xarray-Beam's key space (offsets).

Beam's CoGroupByKey is used to bring together input dataset chunks keyed by the output. This is followed by a DoFn that applies the blockwise function (add, transpose, etc) to the grouped input chunks, and produces an output key and dataset chunk.

Wow, very cool!

I do wonder whether the overhead of building seperate pcollections for each array will turn out to be problematic (vs. putting all arrays in an xarray.Dataset into a single pcollection). This could potentially make relatively large Beam graphs for cases like Datasets of ~20 arrays (although still small relative to what Dask deals with).

I also worry about the overhead of doing a shuffle due to calling CoGroupByKey in each operation. This is potentially a lot of unnecessary shuffles, which are the most expensive part of most data pipelines. You would want an aggressive optimization layer to fuse together operations like abs(x - y) into a single blockwise function, and even then you would likely have unnecessary shuffle to bring together x and y if they came from the same Xarray dataset.

This works inasmuch as it passes a large proportion of unit tests. However, when run on Dataflow with 20GB inputs (each chunk being 200MB) I'm getting out of memory errors in the CoGroupByKey. I haven't worked out what the problem is, since even taking into account there being several copies of the chunk, the DoFn should not use more than about 1GB of memory.

Do you know what sort of workers you're running on? The default memory limit on Cloud Dataflow may be pretty small. You might try using Dataflow Prime: https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime

(I can also ask some of my colleagues who use Dataflow more regularly)

@tomwhite
Copy link
Member

I do wonder whether the overhead of building seperate pcollections for each array will turn out to be problematic (vs. putting all arrays in an xarray.Dataset into a single pcollection). This could potentially make relatively large Beam graphs for cases like Datasets of ~20 arrays (although still small relative to what Dask deals with).

I hadn't thought of this. It should be possible to put everything into one pcollection, and extend the key to include the array's name.

I also worry about the overhead of doing a shuffle due to calling CoGroupByKey in each operation. This is potentially a lot of unnecessary shuffles, which are the most expensive part of most data pipelines. You would want an aggressive optimization layer to fuse together operations like abs(x - y) into a single blockwise function, and even then you would likely have unnecessary shuffle to bring together x and y if they came from the same Xarray dataset.

I share your concerns.

On the optimization point, Cubed does have a fuse optimization for maps, however it's not wired in for the Xarray-Beam prototype I did here. In the prototype, for simplicity I opted to translate array API calls directly to Beam calls, so that the Beam DAG is constructed as the computation is built up. To take advantage of the Cubed optimizations, it would be possible to change this to build up a Cubed DAG first (just like the other executors), which is then optimized, then converted to a Beam DAG. (I will open issues for the optimizations you mention that have not been implemented yet.)

On the broader point though, I do worry about the overhead of a shuffle for a blockwise operation with two or more inputs. (A single input can be efficientlyimplemented as a ParDo.) The fundamental issue is that Beam doesn't have a zip operation, which is understandable since pcollections are unordered. (Unlike Spark, where RDDs do preserve order.)

In some sense, the approach of the other executors in Cubed, such as the Beam executor (as opposed to the Xarray-Beam prototype), is to provide a zip-like operation on Zarr chunks, so it may well be that using that approach may be fine. It's the kind of thing that could be benchmarked.

Do you know what sort of workers you're running on? The default memory limit on Cloud Dataflow may be pretty small. You might try using Dataflow Prime: https://cloud.google.com/dataflow/docs/guides/enable-dataflow-prime

(I can also ask some of my colleagues who use Dataflow more regularly)

Thanks. I am using the default worker which has 3.75GB of memory. With memory issues it's quite hard to see what operations are using. I've had some success with Fil and resource.getrusage on other executors, so it may be worth trying those tools to work out what is going on in the CoGroupByKey. (Increasing the memory size may get things working, but doesn't tell you anything about the usage patterns.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants