Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speeding up windowed writes #439

Open
artttt opened this issue Dec 2, 2021 · 7 comments
Open

speeding up windowed writes #439

artttt opened this issue Dec 2, 2021 · 7 comments
Labels
dask Dask related issue. proposal Idea for a new feature.

Comments

@artttt
Copy link

artttt commented Dec 2, 2021

Ive been writing COGs using the windowed option
da.rio.to_raster(...... driver="COG",windowed=True)

It works fine but is rather slow.

my array is on a set of distributed dask workers that dont share a file system so as far is i can tell using a lock isnt an option.

It looks to me that it is slow because of the windows being written are very small as they match the cog internal tiling.

I did a bit of testing and for instance if you use extra options such as:

        tiled=True,
        driver="GTiff",
        blockxsize =  10000,
        blockysize = 10000,

the writing is much faster and approaching the speed of the non windowed write and makes better use of the available ram and CPU for compression. Obviously its no longer a COG though.

However if i change the raster_writer code with the following I get a COG and I get fast writes and as far as ive tested it works for arbitrarily large arrays.

def to_raster(self, xarray_dataarray, tags, windowed, lock, compute, **kwargs):
    """
    This method writes to the raster on disk.
    xarray_dataarray: xarray.DataArray
        The input data array to write to disk.
    tags: dict, optional
        A dictionary of tags to write to the raster.
    windowed: bool
        If True and the data array is not a dask array, it will write
        the data to disk using rasterio windows.
    lock: boolean or Lock, optional
        Lock to use to write data using dask.
        If not supplied, it will use a single process.
    compute: bool
        If True (default) and data is a dask array, then compute and save
        the data immediately. If False, return a dask Delayed object.
        Call ".compute()" on the Delayed object to compute the result
        later. Call ``dask.compute(delayed1, delayed2)`` to save
        multiple delayed files at once.
    dtype: np.dtype
        Numpy-compliant dtype used to save raster. If data is not already
        represented by this dtype in memory it is recast. dtype='complex_int16'
        is a special case to write in-memory np.complex64 to CInt16.
    **kwargs
        Keyword arguments to pass into writing the raster.
    """
    kwargs["dtype"], numpy_dtype = _get_dtypes(
        kwargs["dtype"],
        xarray_dataarray.encoding.get("rasterio_dtype"),
        xarray_dataarray.encoding.get("dtype", str(xarray_dataarray.dtype)),
    )

    if kwargs["nodata"] is not None:
        # Ensure dtype of output data matches the expected dtype.
        # This check is added here as the dtype of the data is
        # converted right before writing.
        kwargs["nodata"] = _ensure_nodata_dtype(kwargs["nodata"], numpy_dtype)

    with rasterio.open(self.raster_path, "w", **kwargs) as rds:
        _write_metatata_to_raster(rds, xarray_dataarray, tags)
        if not (lock and is_dask_collection(xarray_dataarray.data)):
            # write data to raster immmediately if not dask array
            if windowed:
                #window_iter = rds.block_windows(1)
                #a bit of a hack to get block windows that are much larger then the cog block windows
                #I wonder how best to align these.
                #maybe to some multiple of the cog blocks (is there an order that is better to write cog blocks?)
                #maybe align to the dask blocks like is done if using a lock
                #to look at : could the encode_cf_variable step occur on the dask workers
                temp_kwargs = kwargs.copy()
                temp_kwargs['driver'] = 'GTiff'
                temp_kwargs['tiled'] = True
                temp_kwargs['blockxsize'] = 512 * 20
                temp_kwargs['blockysize'] =  512 * 20
                with rasterio.io.MemoryFile() as memfile:
                    with memfile.open(**temp_kwargs) as temp_rds:     
                        window_iter = list(temp_rds.block_windows(1))
            else:
                window_iter = [(None, None)]
            for _, window in window_iter:
                if window is not None:
                    out_data = xarray_dataarray.rio.isel_window(window)
                else:
                    out_data = xarray_dataarray
                data = encode_cf_variable(out_data).values.astype(numpy_dtype)
                if data.ndim == 2:
                    rds.write(data, 1, window=window)
                else:
                    rds.write(data, window=window)

    if lock and is_dask_collection(xarray_dataarray.data):
        return dask.array.store(
            encode_cf_variable(xarray_dataarray).data.astype(numpy_dtype),
            self,
            lock=lock,
            compute=compute,
        )
    return None

Basically im decoupling the block_windows used for writing from the internal block_windows of the COG. Obviously done in a slightly hacky way and it would need to be refined so that the windows are suitable for the ram available on the system.

I think if refined enough there is no reason why not to make this the standard way of writing (no need for a windowed=False option, It would also be interesting to see how it performs compared to using a lock

Thanks

@artttt artttt added the proposal Idea for a new feature. label Dec 2, 2021
@snowman2
Copy link
Member

snowman2 commented Dec 2, 2021

This is definitely an interesting use case. I am curious why you need to use windowed=True. Does the default method use too much memory?

I think in your scenario, being able to have a lock on a per machine basis would be your best solution. That way each machine can write to the file on its drive safely. Not sure if such a lock exists though.

@artttt
Copy link
Author

artttt commented Dec 2, 2021

Well im still testing but windowed=False kills my cluster once the array gets large andwindowed =True does not cause an issue on the cluster.

I was premature to say it could do arbitrarily large arrays though. Unfortunately the gdal cog driver seems to ignore GDAL_CACHEMAX and crash when it runs out of memory.

On the other hand the gtiff driver works fine with large arrays and with 512x512 internal blocks its not hard to turn it into a cog after that. I just wrote a compressed 47GB file on a machine with only 32GB of ram without the issues that the cog driver had.

With the modifications in the OP windowed writes seem to be faster then non windowed however i find it not easy with dask to isolate out all the things going on in the background to know if this is real.
Its definitely a big gain on the current windowed writes.

Ive got to admit ive not looked at the lock thing much as I cant see how it can help me if the workers cant all see the same file unless im writing a whole collection of separate files in which case i would probably look at using zarr in an s3 bucket instead.

@snowman2
Copy link
Member

snowman2 commented Dec 2, 2021

Thanks for the update, your reasoning makes sense.

I could be wrong, but don't think that the COG driver works for windowed #410.

For writing multiple files, #433 is relevant. Also, as you said, Zarr is a good choice for that.

@artttt
Copy link
Author

artttt commented Dec 2, 2021

with #410 that issue is because when writing with a lock the file is written in windows in separate sessions for each chunk on this line

with rasterio.open(self.raster_path, "r+") as rds:

im guessing the cog overview creation is only triggered when the file is first populated not on subsequent modification when opened as r+
I dont see the issue that was shown in #410 when doing windowed writes in a single thread to a cog.

@snowman2
Copy link
Member

snowman2 commented Dec 2, 2021

im guessing the cog overview creation is only triggered when the file is first populated not on subsequent modification when opened as r+

https://gdal.org/drivers/raster/cog.html

I am wondering if it works with dask if you use: OVERVIEWS=IGNORE_EXISTING?

@artttt
Copy link
Author

artttt commented Dec 6, 2021

Ive been exploring some refinements
im now using the xarray chunks as the window for writing.
this seems to be working well.

            if windowed:
                client = get_client()    
                ac = as_completed(client.compute(xarray_dataarray.data.to_delayed().ravel().tolist()), with_results=True)  
                offsets = [np.cumsum(c) for c in xarray_dataarray.chunks]
                #todo: deal with data with more then 2 dimensions
                #see here for example https://github.com/corteva/rioxarray/blob/199c05143bb9a68b42090bb7143121214b517626/rioxarray/raster_writer.py#L196

                for future,data in ac:
                    height = xarray_dataarray.chunks[0][future.key[1]]
                    width = xarray_dataarray.chunks[1][future.key[2]]
                    row_off = offsets[0][future.key[1]] - height
                    col_off = offsets[1][future.key[2]]  - width
                    window = Window(col_off, row_off, width, height)
                    if data.ndim == 2:
                        rds.write(data, 1, window=window)
                    else:
                        rds.write(data, window=window)
            else:
                out_data = xarray_dataarray
                data = encode_cf_variable(out_data).values.astype(numpy_dtype)
                if data.ndim == 2:
                    rds.write(data, 1)
                else:
                    rds.write(data)

@snowman2
Copy link
Member

snowman2 commented Dec 6, 2021

That makes sense to me as a solution for writing dask chunks to disk when processing using multiple machines that don't share the same file system. I think this would be a good solution for the mechanism for writing to a raster using dask when a lock isn't provided.

@snowman2 snowman2 added the dask Dask related issue. label Sep 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dask Dask related issue. proposal Idea for a new feature.
Projects
None yet
Development

No branches or pull requests

2 participants