-
-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify blocked reprojection implementation by using dask and improve efficiency of parallel reprojection #314
Conversation
Codecov Report
@@ Coverage Diff @@
## main #314 +/- ##
==========================================
+ Coverage 92.02% 92.62% +0.59%
==========================================
Files 24 24
Lines 803 786 -17
==========================================
- Hits 739 728 -11
+ Misses 64 58 -6
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
# to avoid 0 length block sizes when num_cpu_cores is greater than the side of the image | ||
for dim_idx in range(min(len(shape_out), 2)): | ||
if block_size[dim_idx] == 0: | ||
block_size[dim_idx] = shape_out[dim_idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this to instead let dask decide how to chunk the array, though we might want to provide a keyword argument that specifies the typical number of elements in a chunk.
@@ -674,48 +674,3 @@ def test_blocked_against_single(parallel, block_size): | |||
|
|||
np.testing.assert_allclose(array_test, array_reference, equal_nan=True) | |||
np.testing.assert_allclose(footprint_test, footprint_reference, equal_nan=True) | |||
|
|||
|
|||
def test_blocked_corner_cases(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is no longer relevant if we don't try and set the chunk size ourselves.
@@ -630,7 +630,7 @@ def test_identity_with_offset(roundtrip_coords): | |||
|
|||
|
|||
@pytest.mark.parametrize("parallel", [True, 2, False]) | |||
@pytest.mark.parametrize("block_size", [[10, 10], [500, 500], [500, 100], None]) | |||
@pytest.mark.parametrize("block_size", [[30, 30], [500, 500], [500, 100], None]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this as the test was quite slow before
There's still some work to be done then, and of course I will need to do some performance benchmarks to compare this to the previous implementation - some of the tests seem slower so that might not be ideal if true. |
I've now rebased this - I need to figure out how to adapt the code in In the case the block size isn't passed, should we encourage dask to not chunk over broadcast dimensions by passing e.g. (-1, 'auto', 'auto') as the chunk size? (with as many -1s as extra dimensions). @svank - any thoughts? |
Just a note that I am going to go ahead and do a release of reproject with the current implementation of the blocking, because this PR doesn't fundamentally change any user-facing API. After this PR the default block size might change but to some extent that is an implementation detail. |
I don't have any knowledge of how dask works, but I bet the right choice is indeed to only chunk over the WCS dimensions. That would let each chunk compute its coordinate transform information in parallel and then re-use the transformation everywhere it's applicable. I suspect chunking the broadcast dimensions would result in unnecessary repetition of coordinate transformations. On your comment about the risk if the broadcast dimensions are quite large, is that much of a risk in this PR, where the input and output arrays are still both numpy arrays, and so we know the broadcast dimensions must be small enough that both arrays can fit in memory? Or is it that we need memory to hold the whole input, the whole output, plus n_procs * chunk_size of temporary memory, and so chunk_size can't be too large? If that's the case, I think it would be better to limit chunk_size by making the chunks very small along the WCS axes and keeping them full-size along the broadcast axes---I think the numbers I was seeing were suggesting that the coordinate transformations are by far most of the runtime for the interpolating algorithm, so avoiding repeated transformations at all costs may be well worth it. |
Co-authored-by: Stuart Mumford <stuart@cadair.com>
… required scipy, and set minimum required dask version
Ok so I think this is now working well - once issue I ran into that was also happening with the original implementation before this PR was that the input array would get copied to all the processes which resulted in the memory growing a lot. I've now made it so that in parallel mode we save the input array to a memmap and then load the memmap (as a memmap) inside each process which speeds things up and reduces memory usage. |
I stress-tested this by reprojecting a ~10k by 30k array to a 30k by 30k image (different coordinate system) and with 8 processes the reprojection is 4x faster than using the serial version (still some overhead but going to be hard to be 100% efficient!) |
A big part of the remaining inefficiency in parallel mode is due to #342 - if we switch to using vanilla scipy map_coordinates the speedup is 6.5x |
This is an experiment to simplify the implementation of blocked reprojection added in #214 by using dask.
For now the usage of dask is internal and doesn't mean using dask for input/output arrays. However with this in place we could potentially have an option to request that the return type for the data be a dask array rather than a Numpy array. Having dask inputs/outputs is a separate topic so I will leave it to another PR.
I am running into an issue where da.store() is not working but using compute() straight up is, see the FIXME in utils.py. I wonder if this might be a dask bug, but not clear.