Skip to content

Commit

Permalink
Merge pull request #1056 from xcube-dev/forman-server_to_require_mlda…
Browse files Browse the repository at this point in the history
…taset

Force xcube Server to open multi-level datasets
  • Loading branch information
konstntokas authored Aug 7, 2024
2 parents 976bddb + 210c6c4 commit 06e46b0
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
9 changes: 6 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
* The `open_data` method of xcube's default `xcube.core.store.DataStore` implementations
now supports a keyword argument `data_type`, which determines the
data type of the return value. Note that `opener_id` includes the `data_type`
at its first position and will override the `date_type` argument.
at its first position and will override the `data_type` argument.
To preserve backward compatibility, the keyword argument `data_type`
has not yet been added to the `open_data()` method arguments. (#1030)
has not yet been literally specified as `open_data()` method argument,
but may be passed as part of `**open_params`. (#1030)
* The `xcube.core.store.DataDescriptor` class now supports specifying time ranges
using both `datetime.date` and `datetime.datetime` objects. Previously,
only `datetime.date` objects were supported.
Expand All @@ -60,7 +61,9 @@
(#1053)
* When opening a GeoTIFF file using a file system data store, the default return value
is changed from `MultiLevelDataset` to `xr.Dataset`, if no `data_type` is assigned
in the `open_params` of the `store.open_data()` method. (#1054)
in the `open_params` of the `store.open_data()` method. (#1054)
xcube server has been adapted to always open `MultiLevelDataset`s from
a specified data store, if that data type is supported.

### Other changes

Expand Down
17 changes: 11 additions & 6 deletions test/core/store/fs/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from xcube.core.store import find_data_store_extensions
from xcube.core.store import find_data_writer_extensions
from xcube.util.jsonschema import JsonObjectSchema
from xcube.util.jsonschema import JsonStringSchema

expected_fs_data_accessor_ids: set = {
"dataset:netcdf:file",
Expand Down Expand Up @@ -60,6 +61,7 @@ def test_find_data_store_extensions(self):
params_schema = data_store.get_data_store_params_schema()
self.assertParamsSchemaIncludesFsParams(params_schema)
params_schema = data_store.get_open_data_params_schema()
self.assertParamsSchemaIncludesDataTypeParam(params_schema)
self.assertParamsSchemaExcludesFsParams(params_schema)
params_schema = data_store.get_delete_data_params_schema()
self.assertParamsSchemaExcludesFsParams(params_schema)
Expand All @@ -69,9 +71,7 @@ def test_find_data_store_extensions(self):
def test_find_data_opener_extensions(self):
extensions = find_data_opener_extensions()
self.assertTrue(len(extensions) >= len(expected_fs_data_accessor_ids))
self.assertEqual(
{"xcube.core.store.opener"}, {ext.point for ext in extensions}
)
self.assertEqual({"xcube.core.store.opener"}, {ext.point for ext in extensions})
self.assertTrue(
expected_fs_data_accessor_ids.issubset({ext.name for ext in extensions})
)
Expand All @@ -88,9 +88,7 @@ def test_find_data_opener_extensions(self):
def test_find_data_writer_extensions(self):
extensions = find_data_writer_extensions()
self.assertTrue(len(extensions) >= len(expected_fs_data_accessor_ids))
self.assertEqual(
{"xcube.core.store.writer"}, {ext.point for ext in extensions}
)
self.assertEqual({"xcube.core.store.writer"}, {ext.point for ext in extensions})
self.assertTrue(
expected_fs_data_accessor_ids.issubset({ext.name for ext in extensions})
)
Expand All @@ -106,6 +104,13 @@ def test_find_data_writer_extensions(self):
params_schema = data_writer.get_delete_data_params_schema()
self.assertParamsSchemaIncludesFsParams(params_schema)

def assertParamsSchemaIncludesDataTypeParam(self, params_schema):
# print(params_schema.to_dict())
self.assertIsInstance(params_schema, JsonObjectSchema)
self.assertIsInstance(params_schema.properties, dict)
self.assertIn("data_type", params_schema.properties)
self.assertIsInstance(params_schema.properties["data_type"], JsonStringSchema)

def assertParamsSchemaIncludesFsParams(self, params_schema):
# print(params_schema.to_dict())
self.assertIsInstance(params_schema, JsonObjectSchema)
Expand Down
3 changes: 1 addition & 2 deletions test/core/store/fs/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,7 @@ def _assert_dataset_supported(
)

with pytest.raises(
DataStoreError,
match=f'Data resource "{data_id}"' f" does not exist in store",
DataStoreError, match=f'Data resource "{data_id}" does not exist in store'
):
data_store.get_data_types_for_data(data_id)
self.assertEqual(False, data_store.has_data(data_id))
Expand Down
35 changes: 26 additions & 9 deletions xcube/core/store/fs/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Permissions are hereby granted under the terms of the MIT License:
# https://opensource.org/licenses/MIT.

import copy
import fnmatch
import os.path
import pathlib
Expand Down Expand Up @@ -85,6 +86,21 @@
"shapefile": (GEO_DATA_FRAME_TYPE.alias,),
}

_DATA_TYPES = tuple(
{
data_type
for types_tuple in _FORMAT_TO_DATA_TYPE_ALIASES.values()
for data_type in types_tuple
}
)

_COMMON_OPEN_DATA_PARAMS_SCHEMA_PROPERTIES = dict(
data_type=JsonStringSchema(
enum=list(_DATA_TYPES),
title="Optional data type",
)
)

_DataId = str
_DataIdTuple = tuple[_DataId, dict[str, Any]]
_DataIdIter = Iterator[_DataId]
Expand Down Expand Up @@ -232,13 +248,7 @@ def get_data_store_params_schema(cls) -> JsonObjectSchema:

@classmethod
def get_data_types(cls) -> tuple[str, ...]:
return tuple(
{
data_type
for types_tuple in _FORMAT_TO_DATA_TYPE_ALIASES.values()
for data_type in types_tuple
}
)
return _DATA_TYPES

def get_data_types_for_data(self, data_id: str) -> tuple[str, ...]:
self._assert_valid_data_id(data_id)
Expand Down Expand Up @@ -309,7 +319,14 @@ def get_open_data_params_schema(
self, data_id: str = None, opener_id: str = None
) -> JsonObjectSchema:
opener = self._find_opener(opener_id=opener_id, data_id=data_id)
return self._get_open_data_params_schema(opener, data_id)
schema = self._get_open_data_params_schema(opener, data_id)
if opener_id is None:
# If the schema for a specific opener was requested, we
# return the opener's schema. Otherwise, we enhance schema
# for parameters, such as "data_type".
schema = copy.deepcopy(schema)
schema.properties |= _COMMON_OPEN_DATA_PARAMS_SCHEMA_PROPERTIES
return schema

def open_data(
self, data_id: str, opener_id: str = None, **open_params
Expand Down Expand Up @@ -648,7 +665,7 @@ def _guess_accessor_id_parts(
if data_type_aliases is None or format_id is None:
if require:
raise DataStoreError(
f"Cannot determine data type for " f" data resource {data_id!r}"
f"Cannot determine data type for data resource {data_id!r}"
)
return None
return data_type_aliases[0], format_id, self.protocol
Expand Down
24 changes: 21 additions & 3 deletions xcube/webapi/datasets/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,26 +592,42 @@ def _open_ml_dataset(self, dataset_config: DatasetConfig) -> MultiLevelDataset:
data_store_pool = self.get_data_store_pool()
data_store = data_store_pool.get_store(store_instance_id)
data_id = dataset_config.get("Path")
open_params = dataset_config.get("StoreOpenParams") or {}
# Inject chunk_cache_capacity into open parameters
open_params = dict(dataset_config.get("StoreOpenParams") or {})
open_params_schema = data_store.get_open_data_params_schema(data_id=data_id)

# Inject cache_size=chunk_cache_capacity, if given and possible
chunk_cache_capacity = self.get_dataset_chunk_cache_capacity(dataset_config)
if (
chunk_cache_capacity
and (data_id.endswith(".zarr") or data_id.endswith(".levels"))
and "cache_size" not in open_params
and "cache_size" in open_params_schema.properties
):
open_params["cache_size"] = chunk_cache_capacity

# Inject data_type="mldataset", if possible
if (
"data_type" not in open_params
and "data_type" in open_params_schema.properties
and "mldataset" in data_store.get_data_types()
):
open_params["data_type"] = "mldataset"

with self.measure_time(
tag=f"Opened dataset {ds_id!r}"
f" from data store"
f" {store_instance_id!r}"
):
dataset = data_store.open_data(data_id, **open_params)

if isinstance(dataset, MultiLevelDataset):
# Expected, nominal case.
ml_dataset: MultiLevelDataset = dataset
else:
# Fallback. Usually results in poor tile computation performance.
ml_dataset = BaseMultiLevelDataset(dataset)

ml_dataset.ds_id = ds_id

else:
fs_type = dataset_config.get("FileSystem")
if fs_type != "memory":
Expand All @@ -620,10 +636,12 @@ def _open_ml_dataset(self, dataset_config: DatasetConfig) -> MultiLevelDataset:
f" in dataset configuration"
f" {ds_id!r}"
)

with self.measure_time(
tag=f"Opened dataset {ds_id!r}" f" from {fs_type!r}"
):
ml_dataset = _open_ml_dataset_from_python_code(self, dataset_config)

augmentation = dataset_config.get("Augmentation")
if augmentation:
script_path = self.get_config_path(
Expand Down

0 comments on commit 06e46b0

Please sign in to comment.