diff --git a/CHANGES.md b/CHANGES.md index 442d64aab..d116536a2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,12 +1,13 @@ ## Changes in 0.2.0 (in dev) -* Fixed problem where CTRL+C didn't function anymore with `xcube serve`. (#87) -* Fixed error `indexes along dimension 'y' are not equal` occurred when using - `xcube gen` with processed variables that used flag values (#86) +### Enhancements + +* Added new CLI command `xcube prune`. The tool deletes all block files associated with empty (NaN- + only) chunks in given INPUT cube, which must have ZARR format. This can drastically reduce files + in sparse cubes and improve cube reading performance. (#92) * `xcube serve` has a new `prefix` option which is a path appended to the server's host. The `prefix` option replaces the `name` option which is now deprecated but kept for backward compatibility. (#79) -* Fixed `xcube serve` WMTS KVP API to allow for case-insensitive query parameters. (#77) * Added new CLI command `xcube resample` that is used to generate temporarily up- or downsampled data cubes from other data cubes. * `xcube serve` can now be run with data cube paths and styling information given via the CLI rather @@ -29,14 +30,22 @@ * Added `xcube gen` option `--sort` when input data list should be sorted (#33) * Added `xcube vars2dim` command to make variables a cube dimension (#31) * Added `xcube serve` option `--traceperf` that allows switching on performance diagnostics. -* Fixed error in plugins when importing `xcube.api.gen` (#62) -* Fixed import of plugins only when executing `xcube.cli` (#66) * Included possibility to read the input file paths from a text file. (#47) * Restructured and clarified code base (#27) * Moved to Python 3.7 (#25) * Excluding all input processors except for the default one. They are now plugins and have own repositories within the xcube's organisation. (#49) +### Fixes + +* Fixed problem where CTRL+C didn't function anymore with `xcube serve`. (#87) +* Fixed error `indexes along dimension 'y' are not equal` occurred when using + `xcube gen` with processed variables that used flag values (#86) +* Fixed `xcube serve` WMTS KVP API to allow for case-insensitive query parameters. (#77) +* Fixed error in plugins when importing `xcube.api.gen` (#62) +* Fixed import of plugins only when executing `xcube.cli` (#66) + + ## Changes in 0.1.0 * Respecting chunk sizes when computing tile sizes [#44](https://github.com/dcs4cop/xcube-server/issues/44) diff --git a/README.md b/README.md index 3db144de0..af1c40f99 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Data cubes with [xarray](http://xarray.pydata.org/). - [`xcube gen`](#xcube-gen) - [`xcube grid`](#xcube-grid) - [`xcube level`](#xcube-level) + - [`xcube prune`](#xcube-prune) - [`xcube resample`](#xcube-resample) - [`xcube serve`](#xcube-serve) - [`xcube vars2dim`](#xcube-vars2dim) @@ -484,6 +485,19 @@ Example: $ xcube level -l -t 720 data/cubes/test-cube.zarr +## `xcube prune` + + $ xcube prune --help + Usage: xcube prune [OPTIONS] INPUT + + Delete empty chunks. Deletes all block files associated with empty (NaN- + only) chunks in given INPUT cube, which must have ZARR format. + + Options: + --dry-run Just read and process input, but don't produce any outputs. + --help Show this message and exit. + + ## `xcube vars2dim` diff --git a/test/api/test_chunk.py b/test/api/test_chunk.py index 0cca5f47b..947474477 100644 --- a/test/api/test_chunk.py +++ b/test/api/test_chunk.py @@ -1,7 +1,9 @@ import unittest +import numpy as np + from test.sampledata import new_test_dataset -from xcube.api.chunk import chunk_dataset +from xcube.api.chunk import chunk_dataset, get_empty_dataset_chunks, compute_chunk_slices class ChunkDatasetTest(unittest.TestCase): @@ -37,3 +39,79 @@ def test_unchunk_dataset(self): chunked_dataset = chunk_dataset(dataset, format_name="zarr") self.assertEqual({"_FillValue": -999.0}, chunked_dataset.precipitation.encoding) self.assertEqual({"_FillValue": -999.0}, chunked_dataset.temperature.encoding) + + +class GetEmptyDatasetChunksTest(unittest.TestCase): + + def test_not_chunked(self): + dataset = new_test_dataset(["2010-01-01", "2010-01-02"], precipitation=0.4, temperature=275.2) + with self.assertRaises(ValueError) as cm: + get_empty_dataset_chunks(dataset) + self.assertEqual('data array not chunked', f'{cm.exception}') + + def test_non_empty(self): + dataset = new_test_dataset(["2010-01-01", "2010-01-02"], + precipitation=0.4, temperature=275.2).chunk(dict(time=1, lat=90, lon=90)) + empty_dataset_chunks = get_empty_dataset_chunks(dataset) + self.assertEqual({'precipitation': (), 'temperature': ()}, + empty_dataset_chunks) + + def test_all_empty(self): + dataset = new_test_dataset(["2010-01-01", "2010-01-02"], + precipitation=np.nan, temperature=np.nan).chunk(dict(time=1, lat=90, lon=90)) + empty_dataset_chunks = get_empty_dataset_chunks(dataset) + self.assertEqual({'precipitation': ((0, 0, 0), + (0, 0, 1), + (0, 0, 2), + (0, 0, 3), + (0, 1, 0), + (0, 1, 1), + (0, 1, 2), + (0, 1, 3), + (1, 0, 0), + (1, 0, 1), + (1, 0, 2), + (1, 0, 3), + (1, 1, 0), + (1, 1, 1), + (1, 1, 2), + (1, 1, 3)), + 'temperature': ((0, 0, 0), + (0, 0, 1), + (0, 0, 2), + (0, 0, 3), + (0, 1, 0), + (0, 1, 1), + (0, 1, 2), + (0, 1, 3), + (1, 0, 0), + (1, 0, 1), + (1, 0, 2), + (1, 0, 3), + (1, 1, 0), + (1, 1, 1), + (1, 1, 2), + (1, 1, 3))}, empty_dataset_chunks) + + +class ComputeChunkSlicesTest(unittest.TestCase): + + def test_compute_chunk_slices(self): + chunk_slices = compute_chunk_slices(((1, 1), (90, 90), (90, 90, 90, 90))) + self.assertEqual([((0, 0, 0), ((0, 1), (0, 90), (0, 90))), + ((0, 0, 1), ((0, 1), (0, 90), (90, 180))), + ((0, 0, 2), ((0, 1), (0, 90), (180, 270))), + ((0, 0, 3), ((0, 1), (0, 90), (270, 360))), + ((0, 1, 0), ((0, 1), (90, 180), (0, 90))), + ((0, 1, 1), ((0, 1), (90, 180), (90, 180))), + ((0, 1, 2), ((0, 1), (90, 180), (180, 270))), + ((0, 1, 3), ((0, 1), (90, 180), (270, 360))), + ((1, 0, 0), ((1, 2), (0, 90), (0, 90))), + ((1, 0, 1), ((1, 2), (0, 90), (90, 180))), + ((1, 0, 2), ((1, 2), (0, 90), (180, 270))), + ((1, 0, 3), ((1, 2), (0, 90), (270, 360))), + ((1, 1, 0), ((1, 2), (90, 180), (0, 90))), + ((1, 1, 1), ((1, 2), (90, 180), (90, 180))), + ((1, 1, 2), ((1, 2), (90, 180), (180, 270))), + ((1, 1, 3), ((1, 2), (90, 180), (270, 360)))], + list(chunk_slices)) diff --git a/test/cli/test_prune.py b/test/cli/test_prune.py new file mode 100644 index 000000000..f869f144c --- /dev/null +++ b/test/cli/test_prune.py @@ -0,0 +1,114 @@ +import os.path +import sys +import numpy as np +import xarray as xr + +from test.cli.helpers import CliTest +from xcube.api import assert_cube, new_cube +from xcube.api.readwrite import write_cube +from xcube.cli.prune import _delete_block_file +from xcube.util.dsio import rimraf + + +class PruneTest(CliTest): + + def test_help_option(self): + result = self.invoke_cli(['prune', '--help']) + self.assertEqual(0, result.exit_code) + + +class PruneDataTest(CliTest): + TEST_CUBE = "test.zarr" + + def setUp(self) -> None: + rimraf(self.TEST_CUBE) + cube = new_cube(time_periods=3, + variables=dict(precipitation=np.nan, + temperature=np.nan)).chunk(dict(time=1, lat=90, lon=90)) + + write_cube(cube, self.TEST_CUBE, "zarr", cube_asserted=True) + + def tearDown(self) -> None: + rimraf(self.TEST_CUBE) + + def test_dry_run(self): + result = self.invoke_cli(['prune', self.TEST_CUBE, "--dry-run"]) + self.assertEqual(0, result.exit_code) + self.assertEqual("Opening cube from 'test.zarr'...\n" + "Identifying empty blocks...\n" + "Deleting 24 empty block file(s) for variable 'precipitation'...\n" + "Deleting 24 empty block file(s) for variable 'temperature'...\n" + "Done, 48 block file(s) deleted.\n", + result.stdout) + expected_file_names = sorted(['.zarray', + '.zattrs', + '0.0.0', '0.0.1', '0.0.2', '0.0.3', '0.1.0', '0.1.1', '0.1.2', '0.1.3', + '1.0.0', '1.0.1', '1.0.2', '1.0.3', '1.1.0', '1.1.1', '1.1.2', '1.1.3', + '2.0.0', '2.0.1', '2.0.2', '2.0.3', '2.1.0', '2.1.1', '2.1.2', '2.1.3']) + self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/precipitation'))) + self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/temperature'))) + ds = xr.open_zarr('test.zarr') + assert_cube(ds) + self.assertIn('precipitation', ds) + self.assertEqual((3, 180, 360), ds.precipitation.shape) + self.assertEqual(('time', 'lat', 'lon'), ds.precipitation.dims) + self.assertIn('temperature', ds) + self.assertEqual((3, 180, 360), ds.temperature.shape) + self.assertEqual(('time', 'lat', 'lon'), ds.temperature.dims) + + def test_no_dry_run(self): + result = self.invoke_cli(['prune', self.TEST_CUBE]) + self.assertEqual(0, result.exit_code) + self.assertEqual("Opening cube from 'test.zarr'...\n" + "Identifying empty blocks...\n" + "Deleting 24 empty block file(s) for variable 'precipitation'...\n" + "Deleting 24 empty block file(s) for variable 'temperature'...\n" + "Done, 48 block file(s) deleted.\n", + result.stdout) + expected_file_names = sorted(['.zarray', '.zattrs']) + self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/precipitation'))) + self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/temperature'))) + ds = xr.open_zarr('test.zarr') + assert_cube(ds) + self.assertIn('precipitation', ds) + self.assertEqual((3, 180, 360), ds.precipitation.shape) + self.assertEqual(('time', 'lat', 'lon'), ds.precipitation.dims) + self.assertIn('temperature', ds) + self.assertEqual((3, 180, 360), ds.temperature.shape) + self.assertEqual(('time', 'lat', 'lon'), ds.temperature.dims) + + def test_delete_block_file(self): + actual_message = None + + def monitor(message): + nonlocal actual_message + actual_message = message + + actual_message = None + ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (0, 3, 76), True, monitor=monitor) + self.assertFalse(ok) + self.assertEqual(f"error: could neither find block file " + f"{os.path.join(self.TEST_CUBE, 'precipitation', '0.3.76')} nor " + f"{os.path.join(self.TEST_CUBE, 'precipitation', '0', '3', '76')}", actual_message) + + actual_message = None + ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 0), True, monitor=monitor) + self.assertTrue(ok) + self.assertEqual(None, actual_message) + + actual_message = None + ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 0), False, monitor=monitor) + self.assertTrue(ok) + self.assertEqual(None, actual_message) + self.assertFalse(os.path.exists(os.path.join(self.TEST_CUBE, 'precipitation', '1.1.0'))) + + if sys.platform == 'win32': + path = os.path.join(self.TEST_CUBE, 'precipitation', '1.1.1') + # Open block, so we cannot delete (Windows only) + # noinspection PyUnusedLocal + with open(path, 'wb') as fp: + actual_message = None + ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 1), False, monitor=monitor) + self.assertFalse(ok) + self.assertIsNotNone(actual_message) + self.assertTrue(actual_message.startswith(f'error: failed to delete block file {path}: ')) diff --git a/xcube/api/chunk.py b/xcube/api/chunk.py index a83790b24..026293996 100644 --- a/xcube/api/chunk.py +++ b/xcube/api/chunk.py @@ -1,5 +1,7 @@ -from typing import Dict +import itertools +from typing import Dict, Tuple, Iterable +import numpy as np import xarray as xr from xcube.util.dsio import FORMAT_NAME_ZARR, FORMAT_NAME_NETCDF4 @@ -37,3 +39,55 @@ def chunk_dataset(dataset: xr.Dataset, del var.encoding[chunk_sizes_attr_name] return chunked_ds + + +def get_empty_dataset_chunks(dataset: xr.Dataset) -> Dict[str, Tuple[Tuple[int, ...]]]: + """ + Identify empty dataset chunks and return their indices. + + :param dataset: The dataset. + :return: A mapping from variable name to a list of block indices. + """ + return {var_name: get_empty_var_chunks(dataset[var_name]) for var_name in dataset.data_vars} + + +def get_empty_var_chunks(var: xr.DataArray) -> Tuple[Tuple[int, ...]]: + """ + Identify empty variable chunks and return their indices. + + :param var: The variable. + :return: A list of block indices. + """ + chunks = var.chunks + if chunks is None: + raise ValueError('data array not chunked') + + chunk_slices = compute_chunk_slices(chunks) + + empty_chunk_indexes = [] + for chunk_index, chunk_slice in chunk_slices: + data_index = tuple(slice(start, end) for start, end in chunk_slice) + data = var[data_index] + if np.all(np.isnan(data)): + empty_chunk_indexes.append(chunk_index) + # print(f'empty: {var.name}/{".".join(map(str, chunk_index))}') + + # noinspection PyTypeChecker + return tuple(empty_chunk_indexes) + + +def compute_chunk_slices(chunks: Tuple[Tuple[int, ...], ...]) -> Iterable: + + chunk_indices = [] + for c in chunks: + chunk_indices.append(tuple(i for i in range(len(c)))) + + chunk_slices = [] + for c in chunks: + x = [] + o = 0 + for s in c: + x.append((o, o + s)) + o += s + chunk_slices.append(tuple(x)) + return zip(itertools.product(*chunk_indices), itertools.product(*chunk_slices)) diff --git a/xcube/cli/cli.py b/xcube/cli/cli.py index c9c94c538..16fc3600b 100644 --- a/xcube/cli/cli.py +++ b/xcube/cli/cli.py @@ -5,6 +5,7 @@ from xcube.cli.apply import apply from xcube.cli.gen import gen from xcube.cli.grid import grid +from xcube.cli.prune import prune from xcube.cli.resample import resample from xcube.cli.serve import serve from xcube.cli.timeit import timeit @@ -244,6 +245,7 @@ def cli(traceback=False, scheduler=None): cli.add_command(gen) cli.add_command(grid) cli.add_command(level) +cli.add_command(prune) cli.add_command(resample) cli.add_command(serve) cli.add_command(timeit) diff --git a/xcube/cli/prune.py b/xcube/cli/prune.py new file mode 100644 index 000000000..2c8911de6 --- /dev/null +++ b/xcube/cli/prune.py @@ -0,0 +1,93 @@ +# The MIT License (MIT) +# Copyright (c) 2019 by the xcube development team and contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import os +import os.path +import warnings + +import click + +# TODO (forman): move FORMAT_NAME_ZARR to constants, +from xcube.api.chunk import get_empty_dataset_chunks +from xcube.util.dsio import FORMAT_NAME_ZARR + + +# noinspection PyShadowingBuiltins +@click.command(name='prune') +@click.argument('input') +@click.option('--dry-run', is_flag=True, + help='Just read and process input, but don\'t produce any outputs.') +def prune(input, dry_run): + """ + Delete empty chunks. + Deletes all data files associated with empty (NaN-only) chunks in given INPUT cube, + which must have ZARR format. + """ + _prune(input_path=input, dry_run=dry_run, monitor=print) + return 0 + + +def _prune(input_path: str = None, + dry_run: bool = False, + monitor=None): + from xcube.api import open_cube + from xcube.util.dsio import guess_dataset_format + + input_format = guess_dataset_format(input_path) + if input_format != FORMAT_NAME_ZARR: + raise click.ClickException("input must be a cube in ZARR format") + + monitor(f'Opening cube from {input_path!r}...') + with open_cube(input_path) as cube: + monitor('Identifying empty blocks...') + empty_chunks = get_empty_dataset_chunks(cube) + + num_deleted = 0 + for var_name, chunk_indices in empty_chunks.items(): + monitor(f'Deleting {len(chunk_indices)} empty block file(s) for variable {var_name!r}...') + for chunk_index in chunk_indices: + ok = _delete_block_file(input_path, var_name, chunk_index, dry_run, monitor) + if ok: + num_deleted += 1 + + monitor(f'Done, {num_deleted} block file(s) deleted.') + + +def _delete_block_file(input_path, var_name, chunk_index, dry_run, monitor) -> bool: + block_path = None + block_path_1 = block_path_2 = os.path.join(input_path, var_name, '.'.join(map(str, chunk_index))) + if os.path.isfile(block_path_1): + block_path = block_path_1 + else: + block_path_2 = os.path.join(input_path, var_name, *map(str, chunk_index)) + if os.path.isfile(block_path_2): + block_path = block_path_2 + if block_path: + if dry_run: + return True + try: + os.remove(block_path) + return True + except OSError as e: + monitor(f'error: failed to delete block file {block_path}: {e}') + else: + monitor(f'error: could neither find block file {block_path_1} nor {block_path_2}') + return False