Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling of large files #93

Closed
1 of 2 tasks
rabernat opened this issue Apr 6, 2021 · 4 comments · Fixed by #166
Closed
1 of 2 tasks

Handling of large files #93

rabernat opened this issue Apr 6, 2021 · 4 comments · Fixed by #166

Comments

@rabernat
Copy link
Contributor

rabernat commented Apr 6, 2021

Our base use cases for Pangeo Forge (e.g. pangeo-forge/staged-recipes#20) are many small netCDF files ➡️ one big Zarr. But what if we have just one or a few big netCDF files (e.g. pangeo-forge/staged-recipes#2)? As an extreme example, we might have a single file that is 100 GB or larger. We need the recipe to be able to subset the file lazily to avoid loading the whole things into memory. There are at least two ways to do this.

Subset by variable

This happens now in a limited way. In store_chunk, we iterate through each variable and store each individually. We are not using dask or async at this level of the code, so these writes are guaranteed to happen in serial for each chunk.

https://github.com/pangeo-forge/pangeo-forge/blob/e5f7e7cc463ea31513088e14047b962a90cb69f9/pangeo_forge/recipe.py#L357-L359

There is an opportunity for parallelism here, especially if each chunk dataset has many different variables. We could add an option to split chunks by variable. In this case, the user would probably have to provide an explicit list of variables they want to store when the recipe is being created. Implementing this will unfortunately require a deepish refactor because it will affects the _chunks_inputs data structure.

Also, subsetting by variable can't help us if a single file contains very large arrays. Note that the the whole array from each chunk is read into memory:

https://github.com/pangeo-forge/pangeo-forge/blob/e5f7e7cc463ea31513088e14047b962a90cb69f9/pangeo_forge/recipe.py#L377-L382

To avoid this, we need...

Chunked writing of individual variables

Imagine that an individual file contains a single variable of 10 GB. Unless our workers have significantly more than 10 GB of RAM, the execution will fail. We can overcome this by using chunked writing of individual variables, i.e. by writing the 10 GB array in 500 MB pieces.

The easiest way to do this is probably to leverage Xarray's lazily-index-array wrappers (that we get for free when we open the data with Xarray) together with Dask. In this case, Dask would not be providing parallelism per-se but rather a quick way to implement blocked writes. For example, we could do something like

chunk_size = '500MB'  # how to choose?
data = ds_chunk[varname].chunk(sequence_dim=chunk_size).data
with dask.config.set(scheduler="single-threaded"): 
    dask.array.store(data, zarr_array, lock=false, regions=zarr_region)

Again, we don't want any parallelism within this write operation. So far I am always assuming that all of the key methods in the recipe will execute in a single thread. The parallelism always occurs at a higher level, by mapping out many such tasks simultaneously.

Summary

@TomAugspurger
Copy link
Contributor

I'm processing Daymet with pangeo forge and have run into this. I believe that my workers are running out of memory at

with dask.config.set(
scheduler="single-threaded"
): # make sure we don't use a scheduler
var = xr.backends.zarr.encode_zarr_variable(var_coded)
data = np.asarray(
var.data
) # TODO: can we buffer large data rather than loading it all?
with large variables, something like this 86GB array of precipitation:

<xarray.DataArray 'prcp' (time: 365, y: 8075, x: 7814)>
[23030788250 values with dtype=float32]
Coordinates:
  * x        (x) float32 -4.56e+06 -4.559e+06 -4.558e+06 ... 3.252e+06 3.253e+06
  * y        (y) float32 4.984e+06 4.983e+06 4.982e+06 ... -3.089e+06 -3.09e+06
    lat      (y, x) float32 ...
    lon      (y, x) float32 ...
  * time     (time) datetime64[ns] 1980-01-01T12:00:00 ... 1980-12-30T12:00:00
Attributes:
    long_name:     daily total precipitation
    units:         mm/day
    grid_mapping:  lambert_conformal_conic
    cell_methods:  area: mean time: sum

In your snippet, you have chunk_size = '500MB' # how to choose?. We may not have anything to choose here. Assuming that downstream users of these datasets don't want 86GB arrays (or 10GB for that matter), then recipe authors will be producing datasets with reasonable chunk sizes (100-300MB, say). So if users have set Recipe.target_chunks then just doing ds.chunk(self.target_chunks) should be fine. I'll test it out on this dataset.

@rabernat
Copy link
Contributor Author

Don't have much time today but I thought I would leave a quick comment.

something like this 86GB array of precipitation:

For something like this, I think we really want a third option, not enumerated above, which is to parallelize of the concat dim at the pangeo_forge_recipe level. Imagine an extreme case where we have a single netcdf file with one variable inside that is 1 TB of data. Even if we can get these streaming writes to work (as described above), we would still have no parallelism in the recipe.

Instead, in this case we would want to subset the input along the concat dim and use this to parallelize. In order to do this at the store_chunks stage, but we would need to refactor this function

@contextmanager
def open_chunk(self, chunk_key: ChunkKey):

and have chunk_key somehow encode a subsetting operation, e.g. by adding an extra item to the chunk_key tuple. That would require refactoring here:

# now for the fancy bit: we have to define the mappings _inputs_chunks and _chunks_inputs
# this is where refactoring would need to happen to support more complex file patterns
# (e.g. multiple concat dims)
# for now we assume 1:many chunk_keys:input_keys
# theoretically this could handle more than one merge dimension
# list of iterators that iterates over merge dims normally
# but concat dims in chunks
dimension_iterators = [
range(v)
if k != self._concat_dim
else enumerate(chunked_iterable(range(v), self.inputs_per_chunk))
for k, v in self.file_pattern.dims.items()
]
for k in product(*dimension_iterators):
# typical k would look like (0, (0, (0, 1)))
chunk_key = tuple([v[0] if hasattr(v, "__len__") else v for v in k])
all_as_tuples = tuple([v[1] if hasattr(v, "__len__") else (v,) for v in k])
input_keys = tuple(v for v in product(*all_as_tuples))
self._chunks_inputs[chunk_key] = input_keys
for input_key in input_keys:
self._inputs_chunks[input_key] = (chunk_key,)

@cisaacstern
Copy link
Member

Noting that #164 (first listed as a to-do item in the opening comment of this issue, by @rabernat) is a blocker for running the interior recipes of pangeo-forge/staged-recipes#24. Migrated it to its own Issue for clarity, given that there seem to be a handful of different use-cases under consideration on the main thread here.

@rabernat
Copy link
Contributor Author

I will try to work on this with the goal of completing it by the end of this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants