Skip to content

Commit

Permalink
Merge pull request #93 from dcs4cop/forman-92-xcube_prune
Browse files Browse the repository at this point in the history
Forman 92 xcube prune
  • Loading branch information
forman authored Jun 17, 2019
2 parents 2f3c9b0 + 1be98be commit bbef679
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 8 deletions.
21 changes: 15 additions & 6 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
## Changes in 0.2.0 (in dev)

* Fixed problem where CTRL+C didn't function anymore with `xcube serve`. (#87)
* Fixed error `indexes along dimension 'y' are not equal` occurred when using
`xcube gen` with processed variables that used flag values (#86)
### Enhancements

* Added new CLI command `xcube prune`. The tool deletes all block files associated with empty (NaN-
only) chunks in given INPUT cube, which must have ZARR format. This can drastically reduce files
in sparse cubes and improve cube reading performance. (#92)
* `xcube serve` has a new `prefix` option which is a path appended to the server's host.
The `prefix` option replaces the `name` option which is now deprecated but kept
for backward compatibility. (#79)
* Fixed `xcube serve` WMTS KVP API to allow for case-insensitive query parameters. (#77)
* Added new CLI command `xcube resample` that is used to generate temporarily up- or downsampled
data cubes from other data cubes.
* `xcube serve` can now be run with data cube paths and styling information given via the CLI rather
Expand All @@ -29,14 +30,22 @@
* Added `xcube gen` option `--sort` when input data list should be sorted (#33)
* Added `xcube vars2dim` command to make variables a cube dimension (#31)
* Added `xcube serve` option `--traceperf` that allows switching on performance diagnostics.
* Fixed error in plugins when importing `xcube.api.gen` (#62)
* Fixed import of plugins only when executing `xcube.cli` (#66)
* Included possibility to read the input file paths from a text file. (#47)
* Restructured and clarified code base (#27)
* Moved to Python 3.7 (#25)
* Excluding all input processors except for the default one. They are now plugins and have own repositories within the
xcube's organisation. (#49)

### Fixes

* Fixed problem where CTRL+C didn't function anymore with `xcube serve`. (#87)
* Fixed error `indexes along dimension 'y' are not equal` occurred when using
`xcube gen` with processed variables that used flag values (#86)
* Fixed `xcube serve` WMTS KVP API to allow for case-insensitive query parameters. (#77)
* Fixed error in plugins when importing `xcube.api.gen` (#62)
* Fixed import of plugins only when executing `xcube.cli` (#66)


## Changes in 0.1.0

* Respecting chunk sizes when computing tile sizes [#44](https://github.com/dcs4cop/xcube-server/issues/44)
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Data cubes with [xarray](http://xarray.pydata.org/).
- [`xcube gen`](#xcube-gen)
- [`xcube grid`](#xcube-grid)
- [`xcube level`](#xcube-level)
- [`xcube prune`](#xcube-prune)
- [`xcube resample`](#xcube-resample)
- [`xcube serve`](#xcube-serve)
- [`xcube vars2dim`](#xcube-vars2dim)
Expand Down Expand Up @@ -484,6 +485,19 @@ Example:

$ xcube level -l -t 720 data/cubes/test-cube.zarr

## `xcube prune`

$ xcube prune --help
Usage: xcube prune [OPTIONS] INPUT

Delete empty chunks. Deletes all block files associated with empty (NaN-
only) chunks in given INPUT cube, which must have ZARR format.

Options:
--dry-run Just read and process input, but don't produce any outputs.
--help Show this message and exit.



## `xcube vars2dim`

Expand Down
80 changes: 79 additions & 1 deletion test/api/test_chunk.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import unittest

import numpy as np

from test.sampledata import new_test_dataset
from xcube.api.chunk import chunk_dataset
from xcube.api.chunk import chunk_dataset, get_empty_dataset_chunks, compute_chunk_slices


class ChunkDatasetTest(unittest.TestCase):
Expand Down Expand Up @@ -37,3 +39,79 @@ def test_unchunk_dataset(self):
chunked_dataset = chunk_dataset(dataset, format_name="zarr")
self.assertEqual({"_FillValue": -999.0}, chunked_dataset.precipitation.encoding)
self.assertEqual({"_FillValue": -999.0}, chunked_dataset.temperature.encoding)


class GetEmptyDatasetChunksTest(unittest.TestCase):

def test_not_chunked(self):
dataset = new_test_dataset(["2010-01-01", "2010-01-02"], precipitation=0.4, temperature=275.2)
with self.assertRaises(ValueError) as cm:
get_empty_dataset_chunks(dataset)
self.assertEqual('data array not chunked', f'{cm.exception}')

def test_non_empty(self):
dataset = new_test_dataset(["2010-01-01", "2010-01-02"],
precipitation=0.4, temperature=275.2).chunk(dict(time=1, lat=90, lon=90))
empty_dataset_chunks = get_empty_dataset_chunks(dataset)
self.assertEqual({'precipitation': (), 'temperature': ()},
empty_dataset_chunks)

def test_all_empty(self):
dataset = new_test_dataset(["2010-01-01", "2010-01-02"],
precipitation=np.nan, temperature=np.nan).chunk(dict(time=1, lat=90, lon=90))
empty_dataset_chunks = get_empty_dataset_chunks(dataset)
self.assertEqual({'precipitation': ((0, 0, 0),
(0, 0, 1),
(0, 0, 2),
(0, 0, 3),
(0, 1, 0),
(0, 1, 1),
(0, 1, 2),
(0, 1, 3),
(1, 0, 0),
(1, 0, 1),
(1, 0, 2),
(1, 0, 3),
(1, 1, 0),
(1, 1, 1),
(1, 1, 2),
(1, 1, 3)),
'temperature': ((0, 0, 0),
(0, 0, 1),
(0, 0, 2),
(0, 0, 3),
(0, 1, 0),
(0, 1, 1),
(0, 1, 2),
(0, 1, 3),
(1, 0, 0),
(1, 0, 1),
(1, 0, 2),
(1, 0, 3),
(1, 1, 0),
(1, 1, 1),
(1, 1, 2),
(1, 1, 3))}, empty_dataset_chunks)


class ComputeChunkSlicesTest(unittest.TestCase):

def test_compute_chunk_slices(self):
chunk_slices = compute_chunk_slices(((1, 1), (90, 90), (90, 90, 90, 90)))
self.assertEqual([((0, 0, 0), ((0, 1), (0, 90), (0, 90))),
((0, 0, 1), ((0, 1), (0, 90), (90, 180))),
((0, 0, 2), ((0, 1), (0, 90), (180, 270))),
((0, 0, 3), ((0, 1), (0, 90), (270, 360))),
((0, 1, 0), ((0, 1), (90, 180), (0, 90))),
((0, 1, 1), ((0, 1), (90, 180), (90, 180))),
((0, 1, 2), ((0, 1), (90, 180), (180, 270))),
((0, 1, 3), ((0, 1), (90, 180), (270, 360))),
((1, 0, 0), ((1, 2), (0, 90), (0, 90))),
((1, 0, 1), ((1, 2), (0, 90), (90, 180))),
((1, 0, 2), ((1, 2), (0, 90), (180, 270))),
((1, 0, 3), ((1, 2), (0, 90), (270, 360))),
((1, 1, 0), ((1, 2), (90, 180), (0, 90))),
((1, 1, 1), ((1, 2), (90, 180), (90, 180))),
((1, 1, 2), ((1, 2), (90, 180), (180, 270))),
((1, 1, 3), ((1, 2), (90, 180), (270, 360)))],
list(chunk_slices))
114 changes: 114 additions & 0 deletions test/cli/test_prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os.path
import sys
import numpy as np
import xarray as xr

from test.cli.helpers import CliTest
from xcube.api import assert_cube, new_cube
from xcube.api.readwrite import write_cube
from xcube.cli.prune import _delete_block_file
from xcube.util.dsio import rimraf


class PruneTest(CliTest):

def test_help_option(self):
result = self.invoke_cli(['prune', '--help'])
self.assertEqual(0, result.exit_code)


class PruneDataTest(CliTest):
TEST_CUBE = "test.zarr"

def setUp(self) -> None:
rimraf(self.TEST_CUBE)
cube = new_cube(time_periods=3,
variables=dict(precipitation=np.nan,
temperature=np.nan)).chunk(dict(time=1, lat=90, lon=90))

write_cube(cube, self.TEST_CUBE, "zarr", cube_asserted=True)

def tearDown(self) -> None:
rimraf(self.TEST_CUBE)

def test_dry_run(self):
result = self.invoke_cli(['prune', self.TEST_CUBE, "--dry-run"])
self.assertEqual(0, result.exit_code)
self.assertEqual("Opening cube from 'test.zarr'...\n"
"Identifying empty blocks...\n"
"Deleting 24 empty block file(s) for variable 'precipitation'...\n"
"Deleting 24 empty block file(s) for variable 'temperature'...\n"
"Done, 48 block file(s) deleted.\n",
result.stdout)
expected_file_names = sorted(['.zarray',
'.zattrs',
'0.0.0', '0.0.1', '0.0.2', '0.0.3', '0.1.0', '0.1.1', '0.1.2', '0.1.3',
'1.0.0', '1.0.1', '1.0.2', '1.0.3', '1.1.0', '1.1.1', '1.1.2', '1.1.3',
'2.0.0', '2.0.1', '2.0.2', '2.0.3', '2.1.0', '2.1.1', '2.1.2', '2.1.3'])
self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/precipitation')))
self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/temperature')))
ds = xr.open_zarr('test.zarr')
assert_cube(ds)
self.assertIn('precipitation', ds)
self.assertEqual((3, 180, 360), ds.precipitation.shape)
self.assertEqual(('time', 'lat', 'lon'), ds.precipitation.dims)
self.assertIn('temperature', ds)
self.assertEqual((3, 180, 360), ds.temperature.shape)
self.assertEqual(('time', 'lat', 'lon'), ds.temperature.dims)

def test_no_dry_run(self):
result = self.invoke_cli(['prune', self.TEST_CUBE])
self.assertEqual(0, result.exit_code)
self.assertEqual("Opening cube from 'test.zarr'...\n"
"Identifying empty blocks...\n"
"Deleting 24 empty block file(s) for variable 'precipitation'...\n"
"Deleting 24 empty block file(s) for variable 'temperature'...\n"
"Done, 48 block file(s) deleted.\n",
result.stdout)
expected_file_names = sorted(['.zarray', '.zattrs'])
self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/precipitation')))
self.assertEqual(expected_file_names, sorted(os.listdir('test.zarr/temperature')))
ds = xr.open_zarr('test.zarr')
assert_cube(ds)
self.assertIn('precipitation', ds)
self.assertEqual((3, 180, 360), ds.precipitation.shape)
self.assertEqual(('time', 'lat', 'lon'), ds.precipitation.dims)
self.assertIn('temperature', ds)
self.assertEqual((3, 180, 360), ds.temperature.shape)
self.assertEqual(('time', 'lat', 'lon'), ds.temperature.dims)

def test_delete_block_file(self):
actual_message = None

def monitor(message):
nonlocal actual_message
actual_message = message

actual_message = None
ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (0, 3, 76), True, monitor=monitor)
self.assertFalse(ok)
self.assertEqual(f"error: could neither find block file "
f"{os.path.join(self.TEST_CUBE, 'precipitation', '0.3.76')} nor "
f"{os.path.join(self.TEST_CUBE, 'precipitation', '0', '3', '76')}", actual_message)

actual_message = None
ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 0), True, monitor=monitor)
self.assertTrue(ok)
self.assertEqual(None, actual_message)

actual_message = None
ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 0), False, monitor=monitor)
self.assertTrue(ok)
self.assertEqual(None, actual_message)
self.assertFalse(os.path.exists(os.path.join(self.TEST_CUBE, 'precipitation', '1.1.0')))

if sys.platform == 'win32':
path = os.path.join(self.TEST_CUBE, 'precipitation', '1.1.1')
# Open block, so we cannot delete (Windows only)
# noinspection PyUnusedLocal
with open(path, 'wb') as fp:
actual_message = None
ok = _delete_block_file(self.TEST_CUBE, 'precipitation', (1, 1, 1), False, monitor=monitor)
self.assertFalse(ok)
self.assertIsNotNone(actual_message)
self.assertTrue(actual_message.startswith(f'error: failed to delete block file {path}: '))
56 changes: 55 additions & 1 deletion xcube/api/chunk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict
import itertools
from typing import Dict, Tuple, Iterable

import numpy as np
import xarray as xr

from xcube.util.dsio import FORMAT_NAME_ZARR, FORMAT_NAME_NETCDF4
Expand Down Expand Up @@ -37,3 +39,55 @@ def chunk_dataset(dataset: xr.Dataset,
del var.encoding[chunk_sizes_attr_name]

return chunked_ds


def get_empty_dataset_chunks(dataset: xr.Dataset) -> Dict[str, Tuple[Tuple[int, ...]]]:
"""
Identify empty dataset chunks and return their indices.
:param dataset: The dataset.
:return: A mapping from variable name to a list of block indices.
"""
return {var_name: get_empty_var_chunks(dataset[var_name]) for var_name in dataset.data_vars}


def get_empty_var_chunks(var: xr.DataArray) -> Tuple[Tuple[int, ...]]:
"""
Identify empty variable chunks and return their indices.
:param var: The variable.
:return: A list of block indices.
"""
chunks = var.chunks
if chunks is None:
raise ValueError('data array not chunked')

chunk_slices = compute_chunk_slices(chunks)

empty_chunk_indexes = []
for chunk_index, chunk_slice in chunk_slices:
data_index = tuple(slice(start, end) for start, end in chunk_slice)
data = var[data_index]
if np.all(np.isnan(data)):
empty_chunk_indexes.append(chunk_index)
# print(f'empty: {var.name}/{".".join(map(str, chunk_index))}')

# noinspection PyTypeChecker
return tuple(empty_chunk_indexes)


def compute_chunk_slices(chunks: Tuple[Tuple[int, ...], ...]) -> Iterable:

chunk_indices = []
for c in chunks:
chunk_indices.append(tuple(i for i in range(len(c))))

chunk_slices = []
for c in chunks:
x = []
o = 0
for s in c:
x.append((o, o + s))
o += s
chunk_slices.append(tuple(x))
return zip(itertools.product(*chunk_indices), itertools.product(*chunk_slices))
2 changes: 2 additions & 0 deletions xcube/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from xcube.cli.apply import apply
from xcube.cli.gen import gen
from xcube.cli.grid import grid
from xcube.cli.prune import prune
from xcube.cli.resample import resample
from xcube.cli.serve import serve
from xcube.cli.timeit import timeit
Expand Down Expand Up @@ -244,6 +245,7 @@ def cli(traceback=False, scheduler=None):
cli.add_command(gen)
cli.add_command(grid)
cli.add_command(level)
cli.add_command(prune)
cli.add_command(resample)
cli.add_command(serve)
cli.add_command(timeit)
Expand Down
Loading

0 comments on commit bbef679

Please sign in to comment.