Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate concat_dim earlier in prepare_target #229

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ def inputs_for_chunk(
return input_keys


def expand_target_dim(target: FSSpecTarget, concat_dim: Optional[str], dimsize: int) -> None:
def expand_target_dim(
target: FSSpecTarget,
concat_dim: Optional[str],
dimsize: int,
consolidate_dimension_coordinates: bool = True,
) -> None:
target_mapper = target.get_mapper()
zgroup = zarr.open_group(target_mapper)
ds = open_target(target)
Expand All @@ -106,7 +111,10 @@ def expand_target_dim(target: FSSpecTarget, concat_dim: Optional[str], dimsize:
logger.debug(f"resizing array {v} to shape {shape}")
arr.resize(shape)

# now explicity write the sequence coordinate to avoid missing data
if v == concat_dim and concat_dim in zgroup and consolidate_dimension_coordinates:
_consolidate_dimension_coordinate(target_mapper, dims={v})

# now explicitly write the sequence coordinate to avoid missing data
# when reopening
if concat_dim in zgroup:
zgroup[concat_dim][:] = 0
Expand Down Expand Up @@ -234,6 +242,15 @@ def region_and_conflicts_for_chunk(
)
conflicts = chunk_grid.chunk_conflicts(chunk_index, target_grid)

if config.consolidate_dimension_coordinates:
# with config.consolidate_dimension_coordinates we consolidate the concat dim (e.g. time)
# during prepare_target. This means that we *always* have conflicts when writing the
# concat dim coordinate labels, and so *always* need to acquire a lock before writing.
# Since we're writing small values for coordinates, it's hoped that updating the
# coordinates will be a small fraction of the overall `store_chunk`, and so the locking
# of this coordinate won't be too contentious.
conflicts[config.concat_dim] = {0}

return region, conflicts


Expand Down Expand Up @@ -475,7 +492,12 @@ def filter_init_chunks(chunk_key):
)
n_sequence = sum(input_sequence_lens)
logger.info(f"Expanding target concat dim '{config.concat_dim}' to size {n_sequence}")
expand_target_dim(config.target, config.concat_dim, n_sequence)
expand_target_dim(
config.target,
config.concat_dim,
n_sequence,
consolidate_dimension_coordinates=config.consolidate_dimension_coordinates,
)
# TODO: handle possible subsetting
# The init chunks might not cover the whole dataset along multiple dimensions!

Expand Down Expand Up @@ -549,20 +571,20 @@ def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
)


def finalize_target(*, config: XarrayZarrRecipe) -> None:
if config.target is None:
raise ValueError("target has not been set.")

if config.consolidate_dimension_coordinates:
logger.info("Consolidating dimension coordinate arrays")
target_mapper = config.target.get_mapper()
group = zarr.open(target_mapper, mode="a")
# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/214
# intersect the dims from the array metadata with the Zarr group
# to handle coordinateless dimensions.
def _consolidate_dimension_coordinate(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff here looks strange.

All I did was

  1. move the if config.consolidate_dimension_coordinates to its own function so that I could call it from two places (here and earlier in prepare_target for concat_dim)
  2. Let the caller pass in the dims to check (so that we can do just concat_dim early on, and the other dims later; I think this is what we want).
  3. Added a check that arr.chunks != arr.shape (to avoid rewriting concat_diminfinalize_target`, since it would already be consolidated)

target_mapper: fsspec.mapping.FSMap, dims: Optional[Set[str]] = None
) -> None:
group = zarr.open(target_mapper, mode="a")
# https://github.com/pangeo-forge/pangeo-forge-recipes/issues/214
# intersect the dims from the array metadata with the Zarr group
# to handle coordinateless dimensions.
if dims is None:
dims = set(_gather_coordinate_dimensions(group)) & set(group)
for dim in dims:
arr = group[dim]

for dim in dims:
arr = group[dim]
if arr.chunks != arr.shape:
logger.info("Consolidating dimension coordinate %s", dim)
attrs = dict(arr.attrs)
new = group.array(
dim,
Expand All @@ -577,6 +599,14 @@ def finalize_target(*, config: XarrayZarrRecipe) -> None:
)
new.attrs.update(attrs)


def finalize_target(*, config: XarrayZarrRecipe) -> None:
if config.target is None:
raise ValueError("target has not been set.")

if config.consolidate_dimension_coordinates:
_consolidate_dimension_coordinate(config.target.get_mapper())

if config.consolidate_zarr:
logger.info("Consolidating Zarr metadata")
target_mapper = config.target.get_mapper()
Expand Down
18 changes: 17 additions & 1 deletion tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture

from pangeo_forge_recipes.executors.function import FunctionPipelineExecutor
from pangeo_forge_recipes.patterns import FilePattern
from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe
from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe, xarray_zarr_recipe_compiler


def make_netCDFtoZarr_recipe(
Expand Down Expand Up @@ -359,6 +360,21 @@ def test_chunks_distributed_locking(
)


def test_early_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
RecipeClass, file_pattern, kwargs, ds_expected, target = netCDFtoZarr_recipe

rec = RecipeClass(file_pattern, **kwargs)
pipeline = xarray_zarr_recipe_compiler(rec)
# grab just cache_input and prepare_target, to make sure we consolidate early
pipeline = type(pipeline)(pipeline.stages[:2], config=rec)
executor = FunctionPipelineExecutor.compile(pipeline)
executor()

target_mapper = rec.target.get_mapper()
zgroup = zarr.open_group(target_mapper)
assert zgroup[rec.concat_dim].chunks == zgroup[rec.concat_dim].shape


def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
RecipeClass, file_pattern, kwargs, ds_expected, target = netCDFtoZarr_recipe

Expand Down