Skip to content

Commit

Permalink
Merge pull request #273 from dcs4cop/forman-272-computed_vars
Browse files Browse the repository at this point in the history
Forman 272 computed vars
  • Loading branch information
forman authored Feb 7, 2020
2 parents 6fa0c98 + 1740e5a commit 2216eb6
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 52 deletions.
19 changes: 19 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@
...
```

* The configuration of `xcube serve` has been enhanced to support
augmentation of data cubes by new variables computed on-the-fly (#272).
You can now add a section `Augmentation` into a dataset descriptor, e.g.:

```yaml
Datasets:
- Identifier: abc
...
Augmentation:
Path: compute_new_vars.py
Function: compute_variables
InputParameters:
...
- ...
```
where `compute_variables` is a function that receives the parent xcube dataset
and is expected to return a new dataset with new variables.

* `xcube serve` now provides basic access control via OAuth2 bearer tokens (#263).
To configure a service instance with access control, add the following to the
`xcube serve` configuration file:
Expand Down
9 changes: 9 additions & 0 deletions examples/serve/demo/compute_extra_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import xarray as xr


def compute_variables(ds, factor_chl, factor_tsm):
chl_tsm_sum = factor_chl * ds.conc_chl + factor_tsm * ds.conc_tsm
chl_tsm_sum.attrs.update(dict(units='-',
long_name='Weighted sum of CHL nd TSM conc.',
description='Nonsense variable, for demo purpose only'))
return xr.Dataset(dict(chl_tsm_sum=chl_tsm_sum))
6 changes: 6 additions & 0 deletions examples/serve/demo/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ Datasets:
Path: cube-1-250-250.zarr
Style: default
TimeSeriesDataset: local_ts
Augmentation:
Path: "compute_extra_vars.py"
Function: "compute_variables"
InputParameters:
factor_chl: 0.2
factor_tsm: 0.7
PlaceGroups:
- PlaceGroupRef: inside-cube
- PlaceGroupRef: outside-cube
Expand Down
48 changes: 45 additions & 3 deletions test/core/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,48 @@
import pandas as pd
import xarray as xr

from xcube.core.mldataset import BaseMultiLevelDataset, ComputedMultiLevelDataset
from xcube.core.mldataset import BaseMultiLevelDataset
from xcube.core.mldataset import CombinedMultiLevelDataset
from xcube.core.mldataset import ComputedMultiLevelDataset
from xcube.util.tilegrid import TileGrid


class CombinedMultiLevelDatasetTest(unittest.TestCase):
def test_it(self):
ml_ds_1 = BaseMultiLevelDataset(_get_test_dataset(('noise_1', 'noise_2')))
ml_ds_2 = BaseMultiLevelDataset(_get_test_dataset(('noise_3', 'noise_4')))
ml_ds_3 = BaseMultiLevelDataset(_get_test_dataset(('noise_5', 'noise_6')))

ml_ds = CombinedMultiLevelDataset([ml_ds_1, ml_ds_2, ml_ds_3])

self.assertEqual(3, ml_ds.num_levels)
self.assertEqual(TileGrid(3, 2, 1, 180, 180, (-180, -90, 180, 90), inv_y=False),
ml_ds.tile_grid)

expected_var_names = {'noise_1', 'noise_2',
'noise_3', 'noise_4',
'noise_5', 'noise_6'}

ds0 = ml_ds.get_dataset(0)
self.assertEqual({'time': 14, 'lat': 720, 'lon': 1440, 'bnds': 2}, ds0.dims)
self.assertEqual(expected_var_names, set(map(str, ds0.data_vars)))
self.assertTrue(all(v.dims == ('time', 'lat', 'lon') for v in ds0.data_vars.values()))

ds1 = ml_ds.get_dataset(1)
self.assertEqual({'time': 14, 'lat': 360, 'lon': 720}, ds1.dims)
self.assertEqual(expected_var_names, set(map(str, ds1.data_vars)))
self.assertTrue(all(v.dims == ('time', 'lat', 'lon') for v in ds1.data_vars.values()))

ds2 = ml_ds.get_dataset(2)
self.assertEqual({'time': 14, 'lat': 180, 'lon': 360}, ds2.dims)
self.assertEqual(expected_var_names, set(map(str, ds2.data_vars)))
self.assertTrue(all(v.dims == ('time', 'lat', 'lon') for v in ds2.data_vars.values()))

self.assertEqual([ds0, ds1, ds2], ml_ds.datasets)

ml_ds.close()


class BaseMultiLevelDatasetTest(unittest.TestCase):
def test_it(self):
ds = _get_test_dataset()
Expand All @@ -29,6 +67,8 @@ def test_it(self):
self.assertIsNot(ds, ds2)
self.assertEqual({'time': 14, 'lat': 180, 'lon': 360}, ds2.dims)

self.assertEqual([ds0, ds1, ds2], ml_ds.datasets)

ml_ds.close()


Expand Down Expand Up @@ -63,11 +103,13 @@ def input_ml_dataset_getter(ds_id):
ds2 = ml_ds2.get_dataset(2)
self.assertEqual({'time': 3, 'lat': 180, 'lon': 360}, ds2.dims)

self.assertEqual([ds0, ds1, ds2], ml_ds2.datasets)

ml_ds1.close()
ml_ds2.close()


def _get_test_dataset():
def _get_test_dataset(var_names=('noise',)):
w = 1440
h = 720
p = 14
Expand All @@ -79,7 +121,7 @@ def _get_test_dataset():
dx = (x2 - x1) / w
dy = (y2 - y1) / h

data_vars = dict(noise=(("time", "lat", "lon"), np.random.rand(p, h, w)))
data_vars = {var_name: (("time", "lat", "lon"), np.random.rand(p, h, w)) for var_name in var_names}

lat_bnds = np.array(list(zip(np.linspace(y2, y1 + dy, num=h),
np.linspace(y2 - dy, y1, num=h))))
Expand Down
13 changes: 3 additions & 10 deletions test/webapi/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,10 @@
from xcube.webapi.reqparams import RequestParams


def new_test_service_context(ml_dataset_openers: Dict[str, MultiLevelDatasetOpener] = None) -> ServiceContext:
def new_test_service_context(config_file_name: str = 'config.yml',
ml_dataset_openers: Dict[str, MultiLevelDatasetOpener] = None) -> ServiceContext:
ctx = ServiceContext(base_dir=get_res_test_dir(), ml_dataset_openers=ml_dataset_openers)
config_file = os.path.join(ctx.base_dir, 'config.yml')
with open(config_file) as fp:
ctx.config = yaml.safe_load(fp)
return ctx


def new_demo_service_context(ml_dataset_openers: Dict[str, MultiLevelDatasetOpener] = None) -> ServiceContext:
ctx = ServiceContext(base_dir=get_res_demo_dir(), ml_dataset_openers=ml_dataset_openers)
config_file = os.path.join(ctx.base_dir, 'config.yml')
config_file = os.path.join(ctx.base_dir, config_file_name)
with open(config_file) as fp:
ctx.config = yaml.safe_load(fp)
return ctx
Expand Down
11 changes: 11 additions & 0 deletions test/webapi/res/test/config-aug.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Datasets:
- Identifier: demo-aug
Title: xcube-server Demonstration L2C Cube with augmentation
Path: ../../../../examples/serve/demo/cube-1-250-250.zarr
Style: default
Augmentation:
Path: script.py
Function: compute_variables
InputParameters:
factor_chl: 0.2
factor_tsm: 0.7
1 change: 1 addition & 0 deletions test/webapi/res/test/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Datasets:
Title: xcube-server Demonstration L3 Cube
FileSystem: memory
Path: script.py
Function: compute_dataset
InputDatasets: ["demo"]
InputParameters:
period: "1W"
Expand Down
8 changes: 8 additions & 0 deletions test/webapi/res/test/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ def compute_dataset(ds, period='1W', incl_stdev=False):
return ds_merged
else:
return ds.resample(time=period).mean(dim='time')


def compute_variables(ds, factor_chl, factor_tsm):
chl_tsm_sum = factor_chl * ds.conc_chl + factor_tsm * ds.conc_tsm
chl_tsm_sum.attrs.update(dict(units='-',
long_name='Weighted sum of CHL nd TSM conc.',
description='Nonsense variable, for demo purpose only'))
return xr.Dataset(dict(chl_tsm_sum=chl_tsm_sum))
36 changes: 36 additions & 0 deletions test/webapi/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import xarray as xr

from test.webapi.helpers import new_test_service_context
from xcube.core.mldataset import MultiLevelDataset
from xcube.webapi.errors import ServiceResourceNotFoundError


Expand All @@ -14,6 +15,25 @@ def test_get_dataset_and_variable(self):
ds = ctx.get_dataset('demo')
self.assertIsInstance(ds, xr.Dataset)

ml_ds = ctx.get_ml_dataset('demo')
self.assertIsInstance(ml_ds, MultiLevelDataset)
self.assertIs(3, ml_ds.num_levels)
self.assertIs(ds, ml_ds.get_dataset(0))

for var_name in ('conc_chl', 'conc_tsm'):
for z in range(ml_ds.num_levels):
conc_chl_z = ctx.get_variable_for_z('demo', var_name, z)
self.assertIsInstance(conc_chl_z, xr.DataArray)
with self.assertRaises(ServiceResourceNotFoundError) as cm:
ctx.get_variable_for_z('demo', var_name, 3)
self.assertEqual(404, cm.exception.status_code)
self.assertEqual(f'Variable "{var_name}" has no z-index 3 in dataset "demo"', cm.exception.reason)

with self.assertRaises(ServiceResourceNotFoundError) as cm:
ctx.get_variable_for_z('demo', 'conc_ys', 0)
self.assertEqual(404, cm.exception.status_code)
self.assertEqual('Variable "conc_ys" not found in dataset "demo"', cm.exception.reason)

with self.assertRaises(ServiceResourceNotFoundError) as cm:
ctx.get_dataset('demox')
self.assertEqual(404, cm.exception.status_code)
Expand All @@ -24,6 +44,22 @@ def test_get_dataset_and_variable(self):
self.assertEqual(404, cm.exception.status_code)
self.assertEqual('Variable "conc_ys" not found in dataset "demo"', cm.exception.reason)

def test_get_dataset_with_augmentation(self):
ctx = new_test_service_context(config_file_name='config-aug.yml')

ds = ctx.get_dataset('demo-aug')
self.assertIsInstance(ds, xr.Dataset)

ml_ds = ctx.get_ml_dataset('demo-aug')
self.assertIsInstance(ml_ds, MultiLevelDataset)
self.assertIs(3, ml_ds.num_levels)
self.assertIs(ds, ml_ds.get_dataset(0))

for var_name in ('conc_chl', 'conc_tsm', 'chl_tsm_sum'):
for z in range(ml_ds.num_levels):
conc_chl_z = ctx.get_variable_for_z('demo-aug', var_name, z)
self.assertIsInstance(conc_chl_z, xr.DataArray)

def test_config_and_dataset_cache(self):
ctx = new_test_service_context()
self.assertNotIn('demo', ctx.dataset_cache)
Expand Down
40 changes: 38 additions & 2 deletions xcube/core/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _get_dataset_lazily(self, index: int, **kwargs) -> xr.Dataset:
:return: the dataset for the level at *index*.
"""

def _get_tile_grid_lazily(self):
def _get_tile_grid_lazily(self) -> TileGrid:
"""
Retrieve, i.e. read or compute, the tile grid used by the multi-level dataset.
Expand All @@ -134,6 +134,40 @@ def close(self):
dataset.close()


class CombinedMultiLevelDataset(LazyMultiLevelDataset):
"""
A multi-level dataset that is a combination of other multi-level datasets.
:param ml_datasets: The multi-level datasets to be combined. At least two must be provided.
:param combiner_function: A function used to combine the datasets. It receives a list of
datasets (``xarray.Dataset`` instances) and *combiner_params* as keyword arguments.
Defaults to function ``xarray.merge()`` with default parameters.
:param combiner_params: Parameters to the *combiner_function* passed as keyword arguments.
"""

def __init__(self,
ml_datasets: Sequence[MultiLevelDataset],
combiner_function: Callable = None,
combiner_params: Dict[str, Any] = None):
super().__init__()
if not ml_datasets or len(ml_datasets) < 2:
raise ValueError('ml_datasets must have at least two elements')
self._ml_datasets = ml_datasets
self._combiner_function = combiner_function or xr.merge
self._combiner_params = combiner_params or {}

def _get_tile_grid_lazily(self) -> TileGrid:
return self._ml_datasets[0].tile_grid

def _get_dataset_lazily(self, index: int, **kwargs) -> xr.Dataset:
datasets = [ml_dataset.get_dataset(index) for ml_dataset in self._ml_datasets]
return self._combiner_function(datasets, **self._combiner_params)

def close(self):
for ml_dataset in self._ml_datasets:
ml_dataset.close()


class FileStorageMultiLevelDataset(LazyMultiLevelDataset):
"""
A stored multi-level dataset whose level datasets are lazily read from storage location.
Expand Down Expand Up @@ -320,6 +354,8 @@ def __init__(self,
input_ml_dataset_getter: Callable[[str], MultiLevelDataset],
input_parameters: Dict[str, Any],
exception_type=ValueError):

input_parameters = input_parameters or {}
super().__init__(kwargs=input_parameters)

try:
Expand Down Expand Up @@ -384,7 +420,7 @@ def _get_dataset_lazily(self, index: int, **kwargs) -> xr.Dataset:
return assert_cube(computed_value, name=self._ds_id)


def _get_dataset_tile_grid(dataset: xr.Dataset, num_levels: int = None):
def _get_dataset_tile_grid(dataset: xr.Dataset, num_levels: int = None) -> TileGrid:
geo_extent = get_dataset_bounds(dataset)
inv_y = float(dataset.lat[0]) < float(dataset.lat[-1])
width, height, tile_width, tile_height = _get_cube_spatial_sizes(dataset)
Expand Down
3 changes: 0 additions & 3 deletions xcube/webapi/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,8 @@ def get_id_token(self, require_auth: bool = False) -> Optional[Mapping[str, str]

# TODO: read jwks from cache
response = requests.get(auth_config.well_known_jwks)
print(response.content)
jwks = json.loads(response.content)

print(json.dumps(jwks))

rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
Expand Down
Loading

0 comments on commit 2216eb6

Please sign in to comment.