-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spatial chunking for concurrent dask regridding #26
Conversation
Codecov Report
@@ Coverage Diff @@
## master #26 +/- ##
=======================================
Coverage 95.19% 95.19%
=======================================
Files 6 6
Lines 229 229
=======================================
Hits 218 218
Misses 11 11 Continue to review full report at Codecov.
|
That's fast work! Interesting that Seems like the current code only does spatial chunking but not regridding yet? It would be necessary to test if this method scale with # of CPUs (on a single multi-core computer, not talking about distributed yet). I am concerning about calling complicated library functions in each dask worker. Do all ocgis functions (chunking, regridding...) release GIL?
Probably in OCGIS's repo & docs? Given that you are not using xarray here, this seems like a general OCGIS example. If dask really works it might become the standard way of parallelizing ESMPy/OCGIS. Not having to spawn MPI processes is a big plus for users.
I suspect that pure numpy arrays will suffice. Metadata can be easily fixed after all heavy computations are done. If you only care about chunking coordinates (for parallel weight generation), not chunking input data, the workflow is simply:
Again I am just concerning about scalability. Not sure how much heavy-lifting that
Yes in pure ESMPy/OCGIS approach you will apply ESMPy's black-box regridders to each "chunked array" (not a standard chunked dask array actually; each source "chunk" has overlap with each other). For xESMF I am in favor of native scipy/numba SMM functions (requires in-memory weight retrieval) as it generally incurs less overhead, particularly when working with dask...
Threads should be much faster than multi-processing if all OCGIS/ESMPy functions release GIL. Given that they've been relying on MPI, I guess GIL hasn't been a concern at all? |
I am actually having trouble with running the code: NCPP/ocgis#489 |
Thanks for the timely response. You gave me a number of things to consider.
Ha, well, thankfully it was mostly ready to go. That being said, I was thinking to develop this example with xESMF so didn't test other regridding. Investigating threads a bit more, I learned ESMF must be built with ESMF_COMM='mpiuni', ESMF_PTHREAD='OFF', and potentially ESMF_OPENMP='OFF' for it to play nicely with the dask scheduler. So yeah, should have been slower. I'm honestly a little surprised at these issues and solving them will mean digging deep into dask, probably. The ocgis/xarray code works with no problems.
Yes, the plan is to use dask for situations where the regridding operation requires out-of-core memory. Dask will also be useful for testing with pangeo to see about elastic scaling (i.e. run the operation on four VMs concurrently as opposed to four times on one VM). It would nice to get the hybrid dask/MPI working on independent VMs. That comes later though! I'm thinking this may have to go the route of one process per VM as opposed to multiple processes/threads per VM. At least for now. Dask concurrency is causing some fun, undefined memory issues when running both under "processes" and "threads" contexts. The "synchronous" scheduler seems okay. The "processes" issues really surprise me as I expected process isolation, and I think there should be a way to address this.
I was thinking you might want to try xESMF as the regridder. I'll need to get appropriate builds for dask-friendly ESMF on conda unless you want to build your own without threading support.
You're right that threads may be a total non-starter. I really didn't think that through.
I do already have it in the ocgis repo. This code was designed to be part of the dask/xESMF workflow to avoid computing weights on the main process only. Maybe I should move this PR over to that repo? Both esmpy/ocgis use MPI pretty densely at points, but the goal will be to use dask where possible in ocgis at least. The async MPI is the hardest to deal with... Esmpy dask-parallelization will probably require something like this for the foreseeable future. Hopefully the dask-related memory issues can be resolved.
Sure. The xarray (or ocgis) conversion is optional if pure numpy is fine.
Makes sense. What I have now is how I would use the workflow, but it is good to think about how it may be adapted to other scenarios. Are you able to work with xESMF in the file-based context? Eventually, I imagine chunking along all dimensions would be desirable since long time-series are often the primary bottleneck.
I think we are on the same page here. Unless we are working with certain unstructured grids, the GridChunker is doing comparatively little - especially with well-behaved rectilinear grids. Reassembling a weight file is tricky since offsets must be tracked for each subset to ensure the local-global indices are equivalent. This would need to be adapted to the dask workflow. You are probably interested in creating global weight files.
If only the regridding needs this overlap, it probably doesn't make sense to worry about the source data and chunk only the destination since that's what we care about. Again, not so much a concern with the file-based approach. File locks would be that way to go for reassembling the global file to avoid shipping data around. This all hinges on each worker having access to the same storage of course...
Yep, that's fine.
You got it. I've worked within the GIL as it relates to Python |
Let me re-organize the topics because the discussion is getting convoluted... 1. Using dask to parallelize chunked weight generationMy understanding of the current status is that:
Regarding the two comments:
Given that weight generation is very compute-intensive, it seems quite wasteful to just use one single core per VM. For ~10 km global grid, I/O is not an issue at all (only ~100 MB for the coordinate values), while computing weights can take many minutes on a single core. For ~1 km global grid, I/O and memory become a pain (~10 GB for the coordinate values), but computing weights is an even bigger pain... Here you need both out-of-core and parallelism, otherwise the process will be compute-bounded. I think we must pick one way:
The hybrid one feels very tricky so you might try if pure dask can work... 2. Reading chunks from files versus in-memory splitI see that Regarding the two comments:
Do you mean that I absolutely want to work with in-memory numpy arrays. Reasons include:
It would be the best if everything can be done in memory, including chunking the grid coordinates, in-memory retrieval (#11), and reassembling chunked weights. I am worried that 3. How to plug the regridding step into the current chunking frameworkIn response to the comment
I probably shouldn't call high-level xESMF functions inside How would you deal with the chunk |
👍 Good call. The discussion was starting to need a TOC. I added a section 4 at the end for next steps. 1. Using dask to parallelize chunked weight generationI agree that
It is definitely wasteful depending on the size of the VMs! A swarm of small VMs could potentially be used but it all comes down to performance tradeoffs within workflows.
This makes sense to me and is the goal of the spatial chunking. We'll need to find process-localization, and I'll need to debug why ESMF initialization does not like the dask process scheduler. Hybrid MPI/dask would be super cool, but that's future stuff. 2. Reading chunks from files versus in-memory split
The command line interface works only for NetCDF files. The GridChunker object can work with in-memory arrays. This is the object to use for spatial chunk creation. I don't think it makes sense to use the CLI within this dask example. However, one could imagine cheating by executing an MPI-enabled executable through "mpirun" on dask workers. Your reasons for using in-memory arrays make sense, and I also think this is where the example should go. Regarding
Sure. It will be good to minimize data transfers but since we're working with a naive example we can shelve this concern for the moment.
Yes, we are honestly nearly there and will get this out as soon as we can. Our internal three week deadline has passed I know... 3. How to plug the regridding step into the current chunking framework
This will turn into a nuanced discussion that we should have at another time. Once ESMPy in-memory weight retrieval is in, I think performance will be boosted for certain grids and cluster configurations. This is all situation-dependent of course. I agree with you given the current state of this example!
Xarray is not used at all in the "traditional approach". In the case where the global regridding operation can fit in machine memory, one just calls an ocgis script using
The RegridOperation handles all the ESMPy interaction and can write weights, etc. Some of the more advanced ESMPy features are in a branch related to NCPP/ocgis#488 which should be in the ocgis trunk soon.
I suppose we would bring it in, but I hadn't really thought about. The weights-to-file or direct ESMF SMM has been sufficient. In sum, the file-based MPI regridding approach solves the big grid problem using "old school" tools. It works well enough, but I consider it a static solution. I believe xESMF is the right front-end for dask integration which will offer a dynamic treatment of big grids. Ocgis could be used in this example I suppose, but it doesn't offer threaded SMM for example. I think ocgis can offer a number of useful features moving forward. 4. Next stepsHere are some potential next steps. What do you think?
|
I agree with most of your comments -- glad that we've reached a consensus😃 Also agree with the next steps. Let me order them in a progressive way, so we can get some obvious outcome at each stage (instead of fighting with ESMF+dask for months!). Stage 1. In-memory weight retrieval I want to solve #11 before making other major updates to xESMF. Serial computation is totally fine at this stage. Then I would hope any later parallel-computing enhancement will be based on this in-memory approach instead of file I/O approach. It can indeed cause some pain at initial development, but is good for the long-term. Pure numpy arrays are a lot easier to handle than "NetCDF files with a specific format requirement" (for both users and developers). Outcome:
Stage 2. Parallelized weight generation with ESMF + dask Outcome:
~1 km grids are still rare right now, so I care more about computational speed-up, not memory constraint, at this stage. This stage does not involve weight application, which might still be done in an embarrassingly parallel way if the weights aren't too large. Working towards ~1 km grid is a future direction, but I want to make sure that effort we put in can show immediate benefit for the majority of users who are dealing with 10 ~100 km grid. Bringing down the weight computation time from several minutes (common for ~10 km grid) to tens of seconds is a quite decent achievement. Stage 3. Spatial-chunked regridding (weight application) Outcome: Out-of-core regridding for ultra-high-horizontal-resolution data This is only for ~1 km grid over a large region that even a single layer of input data and the regridding weights themselves cannot comfortably fit into memory. Merely out-of-core computation on a single machine (needs to swap both data and weights!) will be deadly slow, so this approach should be used with dask-distributed where each chunk of weights lives on a different worker. I bet that pulling the correct input data chunk to each worker will cause lots of fun and trouble. This stage is too early to discuss, anyway. 4. OCGIS <-> xarray conversion
Disclaimer: This outline is based on my understanding of general scientific users' need. If you need to process ~1 km data you may indeed want to prioritize Stage 3 🤔 |
This is a great summary! Thanks. I agree that time could be spent in the wrong places easily. The ESMF/dask compatibility could be rabbit trail indeed. Your prioritization makes sense. I know #11 will be critical for you to move forward. So, I think we should make tickets for the orphan topics and probably close this PR (can open later if the code is relevant). Did you want me to take a crack at that (I'll mostly just use your text anyway)?
Parallel weight generation on the master worker could help here if you were looking for something more immediate... I think we had talked about this. |
Sure, please do so!
Do you mean MPI? I'd like avoid adding MPI as a dependency, if there's a pure dask solution (I feel that most Earth scientists don't know MPI programming). If dask+ESMF cannot work then I will fall back to the old MPI solution. |
masking notebook edits from zeros2nan PR
@JiaweiZhuang, this notebook PR implements example spatial chunking with conversion to xarray. I thought it would be pretty easy for you to add xESMF code to this. I can give the xESMF code a shot if you don't have time.
There are a number of general features that should be added, but I'm not quite sure how to address them yet. I thought it best to let this evolve a little before jumping to conclusions (also new to dask in general). I am definitely not expecting to merge this right away. Some topics:
Note: You'll need to use the ocgis master branch for this. I needed to add the chunk creation for a specific index and the "decode_cf" option to xarray conversion. Conversion to datetime objects in xarray can be a bit slow, especially if it is not needed!