diff --git a/examples/serve/demo/config-with-stores.yml b/examples/serve/demo/config-with-stores.yml index 43858fae6..6a714f02d 100644 --- a/examples/serve/demo/config-with-stores.yml +++ b/examples/serve/demo/config-with-stores.yml @@ -36,7 +36,7 @@ DataStores: # client_kwargs: # endpoint_url: https://s3.eu-central-1.amazonaws.com Datasets: - - Identifier: "*.zarr" + - Path: "*.zarr" Style: "default" # ChunkCacheSize: 1G diff --git a/test/core/store/test_storepool.py b/test/core/store/test_storepool.py index 6f2e8fe30..e3ad83b86 100644 --- a/test/core/store/test_storepool.py +++ b/test/core/store/test_storepool.py @@ -326,6 +326,34 @@ def test_multi_stores_with_params(self): self.assertIsInstance(pool, DataStorePool) self.assertEqual(["local-1", "local-2", "ram-1", "ram-2"], pool.store_instance_ids) for instance_id in pool.store_instance_ids: - self.assertTrue(pool.has_store_config(instance_id)) + self.assertTrue(pool.has_store_instance(instance_id)) self.assertIsInstance(pool.get_store_config(instance_id), DataStoreConfig) self.assertIsInstance(pool.get_store(instance_id), DataStore) + + def test_get_store_instance_id(self): + store_params_1 = { + "root": "./bibo" + } + ds_config_1 = DataStoreConfig(store_id='file', + store_params=store_params_1) + ds_configs = {'dir-1': ds_config_1} + pool = DataStorePool(ds_configs) + + store_params_2 = { + "root": "./babo" + } + ds_config_2 = DataStoreConfig(store_id='file', + store_params=store_params_2) + ds_config_3 = DataStoreConfig(store_id='file', + store_params=store_params_1, + title='A third configuration') + + self.assertEqual('dir-1', pool.get_store_instance_id(ds_config_1)) + self.assertEqual('dir-1', pool.get_store_instance_id(ds_config_1, + strict_check=True)) + + self.assertIsNone(pool.get_store_instance_id(ds_config_2)) + + self.assertEqual('dir-1', pool.get_store_instance_id(ds_config_3)) + self.assertIsNone(pool.get_store_instance_id(ds_config_3, + strict_check=True)) diff --git a/test/webapi/test_config.py b/test/webapi/test_config.py index 059161ffa..77b220fcc 100644 --- a/test/webapi/test_config.py +++ b/test/webapi/test_config.py @@ -60,7 +60,7 @@ def test_from_dict(self): }, "Datasets": [ { - "Identifier": "*.zarr", + "Path": "*.zarr", "Style": "default" } ] diff --git a/test/webapi/test_context.py b/test/webapi/test_context.py index 6546d8544..ea3e818d5 100644 --- a/test/webapi/test_context.py +++ b/test/webapi/test_context.py @@ -211,3 +211,246 @@ def test_interpolates_vars(self): normalize_prefix('/${name}')) self.assertEqual(f'/xcube/v{version}', normalize_prefix('/${name}/v${version}')) + + +class MaybeAssignStoreInstanceIdsTest(unittest.TestCase): + + def test_find_common_store(self): + ctx = new_test_service_context() + dataset_configs = [ + { + 'Identifier': 'z_0', + 'FileSystem': 'local', + 'Path': '/one/path/abc.zarr' + }, + { + 'Identifier': 'z_1', + 'FileSystem': 'local', + 'Path': '/one/path/def.zarr' + }, + { + 'Identifier': 'z_4', + 'FileSystem': 'obs', + 'Path': '/one/path/mno.zarr' + }, + { + 'Identifier': 'z_2', + 'FileSystem': 'local', + 'Path': '/another/path/ghi.zarr' + }, + { + 'Identifier': 'z_3', + 'FileSystem': 'local', + 'Path': '/one/more/path/jkl.zarr' + }, + { + 'Identifier': 'z_5', + 'FileSystem': 'obs', + 'Path': '/one/path/pqr.zarr' + }, + { + 'Identifier': 'z_6', + 'FileSystem': 'local', + 'Path': '/one/path/stu.zarr' + }, + { + 'Identifier': 'z_7', + 'FileSystem': 'local', + 'Path': '/one/more/path/vwx.zarr' + }, + ] + ctx.config['Datasets'] = dataset_configs + adjusted_dataset_configs = ctx.get_dataset_configs() + + expected_dataset_configs = [ + { + 'Identifier': 'z_0', + 'FileSystem': 'local', + 'Path': 'path/abc.zarr', + 'StoreInstanceId': 'local_2' + }, + { + 'Identifier': 'z_1', + 'FileSystem': 'local', + 'Path': 'path/def.zarr', + 'StoreInstanceId': 'local_2' + }, + { + 'Identifier': 'z_4', + 'FileSystem': 'obs', + 'Path': 'mno.zarr', + 'StoreInstanceId': 'obs_1' + }, + { + 'Identifier': 'z_2', + 'FileSystem': 'local', + 'Path': 'ghi.zarr', + 'StoreInstanceId': 'local_1' + }, + { + 'Identifier': 'z_3', + 'FileSystem': 'local', + 'Path': 'more/path/jkl.zarr', + 'StoreInstanceId': 'local_2' + }, + { + 'Identifier': 'z_5', + 'FileSystem': 'obs', + 'Path': 'pqr.zarr', + 'StoreInstanceId': 'obs_1' + }, + { + 'Identifier': 'z_6', + 'FileSystem': 'local', + 'Path': 'path/stu.zarr', + 'StoreInstanceId': 'local_2' + }, + { + 'Identifier': 'z_7', + 'FileSystem': 'local', + 'Path': 'more/path/vwx.zarr', + 'StoreInstanceId': 'local_2' + }, + ] + self.assertEqual(expected_dataset_configs, adjusted_dataset_configs) + + def test_with_instance_id(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'zero', + 'Title': 'Test 0', + 'FileSystem': 'local', + 'StoreInstanceId': 'some_id'} + dataset_config_copy = dataset_config.copy() + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(dataset_config_copy, dataset_config) + + def test_local(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'one', + 'Title': 'Test 1', + 'FileSystem': 'local', + 'Path': 'cube-1-250-250.zarr'} + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(['Identifier', 'Title', 'FileSystem', 'Path', + 'StoreInstanceId'], + list(dataset_config.keys())) + self.assertEqual('one', + dataset_config['Identifier']) + self.assertEqual('Test 1', dataset_config['Title']) + self.assertEqual('local', dataset_config['FileSystem']) + self.assertEqual('cube-1-250-250.zarr', dataset_config["Path"]) + self.assertEqual('local_1', dataset_config['StoreInstanceId']) + + def test_s3(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'two', + 'Title': 'Test 2', + 'FileSystem': 'obs', + 'Endpoint': 'https://s3.eu-central-1.amazonaws.com', + 'Path': 'xcube-examples/OLCI-SNS-RAW-CUBE-2.zarr', + 'Region': 'eu-central-1'} + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(['Identifier', 'Title', 'FileSystem', 'Endpoint', + 'Path', 'Region', 'StoreInstanceId'], + list(dataset_config.keys())) + self.assertEqual('two', dataset_config['Identifier']) + self.assertEqual('Test 2', dataset_config['Title']) + self.assertEqual('obs', dataset_config['FileSystem']) + self.assertEqual('https://s3.eu-central-1.amazonaws.com', + dataset_config['Endpoint']) + self.assertEqual('OLCI-SNS-RAW-CUBE-2.zarr', dataset_config['Path']) + self.assertEqual('eu-central-1', dataset_config['Region']) + self.assertEqual('obs_1', dataset_config['StoreInstanceId']) + + def test_memory(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'three', + 'Title': 'Test 3', + 'FileSystem': 'memory'} + dataset_config_copy = dataset_config.copy() + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(dataset_config_copy, dataset_config) + + def test_missing_file_system(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'five', + 'Title': 'Test 5', + 'Path': 'cube-1-250-250.zarr'} + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(['Identifier', 'Title', 'Path', 'StoreInstanceId'], + list(dataset_config.keys())) + self.assertEqual('five', dataset_config['Identifier']) + self.assertEqual('Test 5', dataset_config['Title']) + self.assertEqual('cube-1-250-250.zarr', dataset_config['Path']) + self.assertEqual('local_1', dataset_config['StoreInstanceId']) + + def test_invalid_file_system(self): + ctx = new_test_service_context() + dataset_config = {'Identifier': 'five', + 'Title': 'Test 5a', + 'FileSystem': 'invalid', + 'Path': 'cube-1-250-250.zarr'} + + ctx.config['Datasets'] = [dataset_config] + dataset_config = ctx.get_dataset_configs()[0] + + self.assertEqual(['Identifier', 'Title', 'FileSystem', 'Path'], + list(dataset_config.keys())) + self.assertEqual('five', dataset_config['Identifier']) + self.assertEqual('Test 5a', dataset_config['Title']) + self.assertEqual('invalid', dataset_config['FileSystem']) + self.assertEqual('cube-1-250-250.zarr', dataset_config['Path']) + + def test_local_store_already_existing(self): + ctx = new_test_service_context() + dataset_config_1 = {'Identifier': 'six', + 'Title': 'Test 6', + 'FileSystem': 'local', + 'Path': 'cube-1-250-250.zarr'} + dataset_config_2 = {'Identifier': 'six_a', + 'Title': 'Test 6 a', + 'FileSystem': 'local', + 'Path': 'cube-5-100-200.zarr'} + + ctx.config['Datasets'] = [dataset_config_1, dataset_config_2] + dataset_configs = ctx.get_dataset_configs() + + self.assertEqual(dataset_configs[0]['StoreInstanceId'], + dataset_configs[1]['StoreInstanceId']) + + def test_s3_store_already_existing(self): + ctx = new_test_service_context() + dataset_config_1 = {'Identifier': 'seven', + 'Title': 'Test 7', + 'FileSystem': 'obs', + 'Endpoint': 'https://s3.eu-central-1.amazonaws.com', + 'Path': 'xcube-examples/OLCI-SNS-RAW-CUBE-2.zarr', + 'Region': 'eu-central-1'} + + dataset_config_2 = {'Identifier': 'seven_a', + 'Title': 'Test 7 a', + 'FileSystem': 'obs', + 'Endpoint': 'https://s3.eu-central-1.amazonaws.com', + 'Path': 'xcube-examples/OLCI-SNS-RAW-CUBE-3.zarr', + 'Region': 'eu-central-1'} + + ctx.config['Datasets'] = [dataset_config_1, dataset_config_2] + dataset_configs = ctx.get_dataset_configs() + + self.assertEqual(dataset_configs[0]['StoreInstanceId'], + dataset_configs[1]['StoreInstanceId']) diff --git a/xcube/core/store/storepool.py b/xcube/core/store/storepool.py index 793f17b58..6160397f8 100644 --- a/xcube/core/store/storepool.py +++ b/xcube/core/store/storepool.py @@ -262,7 +262,25 @@ def store_instance_ids(self) -> List[str]: def store_configs(self) -> List[DataStoreConfig]: return [v.store_config for k, v in self._instances.items()] - def has_store_config(self, store_instance_id: str) -> bool: + def get_store_instance_id(self, + store_config: DataStoreConfig, + strict_check: bool = False) -> Optional[str]: + assert_instance(store_config, DataStoreConfig, 'store_config') + for id, instance in self._instances.items(): + if strict_check: + if instance.store_config == store_config: + return id + else: + if instance.store_config.store_id == store_config.store_id and \ + instance.store_config.store_params == \ + store_config.store_params: + return id + return None + + def has_store_config(self, store_config: DataStoreConfig) -> bool: + return self.get_store_instance_id(store_config) is not None + + def has_store_instance(self, store_instance_id: str) -> bool: assert_instance(store_instance_id, str, 'store_instance_id') return store_instance_id in self._instances diff --git a/xcube/webapi/config.py b/xcube/webapi/config.py index 99b247a80..af5400df4 100644 --- a/xcube/webapi/config.py +++ b/xcube/webapi/config.py @@ -91,6 +91,7 @@ def get_schema(cls) -> JsonObjectSchema: factory=DatasetConfig, required=[ 'Identifier', + 'Path' ], properties=dict( Identifier=IdentifierSchema, @@ -181,10 +182,11 @@ def get_schema(cls) -> JsonObjectSchema: return JsonObjectSchema( factory=DataStoreDatasetConfig, required=[ - 'Identifier' + 'Path' ], properties=dict( Identifier=IdentifierSchema, + Path=PathSchema, StoreInstanceId=IdentifierSchema, # will be set by server StoreOpenParams=JsonObjectSchema(additional_properties=True), **_get_common_dataset_properties() diff --git a/xcube/webapi/context.py b/xcube/webapi/context.py index f807268f2..f765a28f8 100644 --- a/xcube/webapi/context.py +++ b/xcube/webapi/context.py @@ -35,13 +35,10 @@ import pyproj import xarray as xr -from xcube.constants import FORMAT_NAME_ZARR from xcube.constants import LOG from xcube.core.mldataset import BaseMultiLevelDataset from xcube.core.mldataset import MultiLevelDataset from xcube.core.mldataset import augment_ml_dataset -from xcube.core.mldataset import open_ml_dataset_from_local_fs -from xcube.core.mldataset import open_ml_dataset_from_object_storage from xcube.core.mldataset import open_ml_dataset_from_python_code from xcube.core.normalize import decode_cube from xcube.core.store import DATASET_TYPE @@ -69,6 +66,10 @@ # We use tilde, because it is not a reserved URI characters STORE_DS_ID_SEPARATOR = '~' +FS_TYPE_TO_PROTOCOL = { + 'local': 'file', + 'obs': 's3' +} ALL_PLACES = "all" @@ -269,8 +270,107 @@ def get_dataset_configs(self) -> List[DatasetConfigDict]: dataset_configs += \ self.get_dataset_configs_from_stores() self._dataset_configs = dataset_configs + self._maybe_assign_store_instance_ids() return self._dataset_configs + def _maybe_assign_store_instance_ids(self): + assignable_dataset_configs = [dc for dc in self._dataset_configs + if 'StoreInstanceId' not in dc + and dc.get('FileSystem', 'local') + in FS_TYPE_TO_PROTOCOL.keys()] + # split into sublists according to file system and non-root store params + config_lists = [] + for config in assignable_dataset_configs: + store_params = self._get_other_store_params_than_root(config) + file_system = config.get('FileSystem', 'local') + config.get('FileSystem', 'local') + appended = False + for config_list in config_lists: + if config_list[0] == file_system and \ + config_list[1] == store_params: + config_list[2].append(config) + appended = True + break + if not appended: + config_lists.append((file_system, store_params, [config])) + + data_store_pool = self.get_data_store_pool() + if not data_store_pool: + data_store_pool = self._data_store_pool = DataStorePool() + + for file_system, store_params, config_list in config_lists: + # Retrieve paths per configuration + paths = [dc['Path'] for dc in config_list] + list.sort(paths) + # Determine common prefixes of paths (and call them roots) + prefixes = _get_common_prefixes(paths) + if len(prefixes) < 1: + roots = prefixes + else: + # perform further step to merge prefixes with same start + prefixes = list(set(prefixes)) + prefixes.sort() + roots = [] + root_candidate = prefixes[0] + for root in prefixes[1:]: + common_root = os.path.commonprefix([root_candidate, root]) + if _is_not_empty(common_root): + root_candidate = common_root + else: + roots.append(root_candidate) + root_candidate = root + roots.append(root_candidate) + for root in roots: + # ensure root does not end with full or partial directory + # or file name + while not root.endswith("/") and not root.endswith("\\") and \ + len(root) > 0: + root = root[:-1] + abs_root = root + # For local file systems: Determine absolute root from base dir + if file_system == 'local' and not os.path.isabs(abs_root): + abs_root = os.path.join(self._base_dir, abs_root) + abs_root = os.path.normpath(abs_root) + store_params_for_root = store_params.copy() + store_params_for_root['root'] = abs_root + # See if there already is a store with this configuration + data_store_config = DataStoreConfig( + store_id=FS_TYPE_TO_PROTOCOL.get(file_system), + store_params=store_params_for_root) + store_instance_id = data_store_pool.\ + get_store_instance_id(data_store_config) + if not store_instance_id: + # Create new store with new unique store instance id + counter = 1 + while data_store_pool.has_store_instance( + f'{file_system}_{counter}'): + counter += 1 + store_instance_id = f'{file_system}_{counter}' + data_store_pool.add_store_config(store_instance_id, + data_store_config) + + for config in config_list: + if config['Path'].startswith(root): + config['StoreInstanceId'] = store_instance_id + config['Path'] = config['Path'][len(root):] + + def _get_other_store_params_than_root(self, + dataset_config: DatasetConfigDict) \ + -> Dict: + if dataset_config.get('FileSystem', 'local') != 'obs': + return {} + store_params = dict() + if 'Anonymous' in dataset_config: + store_params['anon'] = dataset_config['Anonymous'] + client_kwargs = dict( + ) + if 'Endpoint' in dataset_config: + client_kwargs['endpoint_url'] = dataset_config['Endpoint'] + if 'Region' in dataset_config: + client_kwargs['region_name'] = dataset_config['Region'] + store_params['client_kwargs'] = client_kwargs + return store_params + def get_dataset_configs_from_stores(self) \ -> List[DatasetConfigDict]: @@ -295,7 +395,7 @@ def get_dataset_configs_from_stores(self) \ if store_dataset_configs: for store_dataset_config in store_dataset_configs: dataset_id_pattern = store_dataset_config.get( - 'Identifier', '*' + 'Path', '*' ) if fnmatch.fnmatch(store_dataset_id, dataset_id_pattern): @@ -308,6 +408,7 @@ def get_dataset_configs_from_stores(self) \ StoreInstanceId=store_instance_id, **dataset_config_base ) + dataset_config['Path'] = store_dataset_id dataset_config['Identifier'] = \ f'{store_instance_id}{STORE_DS_ID_SEPARATOR}' \ f'{store_dataset_id}' @@ -341,22 +442,21 @@ def new_dataset_metadata(self, def get_data_store_pool(self) -> Optional[DataStorePool]: data_store_configs = self._config.get('DataStores', []) - if not data_store_configs: - self._data_store_pool = None - elif self._data_store_pool is None: - if not isinstance(data_store_configs, list): - raise ServiceConfigError('DataStores must be a list') - store_configs: Dict[str, DataStoreConfig] = {} - for data_store_config_dict in data_store_configs: - store_instance_id = data_store_config_dict.get('Identifier') - store_id = data_store_config_dict.get('StoreId') - store_params = data_store_config_dict.get('StoreParams', {}) - dataset_configs = data_store_config_dict.get('Datasets') - store_config = DataStoreConfig(store_id, - store_params=store_params, - user_data=dataset_configs) - store_configs[store_instance_id] = store_config - self._data_store_pool = DataStorePool(store_configs) + if not data_store_configs or self._data_store_pool: + return self._data_store_pool + if not isinstance(data_store_configs, list): + raise ServiceConfigError('DataStores must be a list') + store_configs: Dict[str, DataStoreConfig] = {} + for data_store_config_dict in data_store_configs: + store_instance_id = data_store_config_dict.get('Identifier') + store_id = data_store_config_dict.get('StoreId') + store_params = data_store_config_dict.get('StoreParams', {}) + dataset_configs = data_store_config_dict.get('Datasets') + store_config = DataStoreConfig(store_id, + store_params=store_params, + user_data=dataset_configs) + store_configs[store_instance_id] = store_config + self._data_store_pool = DataStorePool(store_configs) return self._data_store_pool def get_dataset_config(self, ds_id: str) -> Dict[str, Any]: @@ -374,9 +474,18 @@ def get_s3_bucket_mapping(self): ds_id = dataset_config.get('Identifier') file_system = dataset_config.get('FileSystem', 'local') if file_system == 'local': - local_path = self.get_config_path(dataset_config, - f'dataset configuration' - f' {ds_id!r}') + store_instance_id = dataset_config.get('StoreInstanceId') + if store_instance_id: + data_store_pool = self.get_data_store_pool() + store_root = data_store_pool.get_store_config( + store_instance_id). \ + store_params.get('root') + data_id = dataset_config.get('Path') + local_path = os.path.join(store_root, data_id) + else: + local_path = self.get_config_path(dataset_config, + f'dataset configuration' + f' {ds_id!r}') local_path = os.path.normpath(local_path) if os.path.isdir(local_path): s3_bucket_mapping[ds_id] = local_path @@ -469,14 +578,15 @@ def _open_ml_dataset(self, dataset_config: DatasetConfigDict) \ if store_instance_id: data_store_pool = self.get_data_store_pool() data_store = data_store_pool.get_store(store_instance_id) - _, data_id = ds_id.split(STORE_DS_ID_SEPARATOR, maxsplit=1) + data_id = dataset_config.get('Path') open_params = dataset_config.get('StoreOpenParams') or {} # Inject chunk_cache_capacity into open parameters chunk_cache_capacity = self.get_dataset_chunk_cache_capacity( dataset_config ) - if (ds_id.endswith('.zarr') or ds_id.endswith('.levels')) \ - and 'cache_size' not in open_params: + if (data_id.endswith('.zarr') or data_id.endswith('.levels')) \ + and 'cache_size' not in open_params \ + and chunk_cache_capacity is not None: open_params['cache_size'] = chunk_cache_capacity with self.measure_time(tag=f"opened dataset {ds_id!r}" f" from data store" @@ -491,19 +601,15 @@ def _open_ml_dataset(self, dataset_config: DatasetConfigDict) \ force_geographic=True) ml_dataset = BaseMultiLevelDataset(cube, ds_id=ds_id) else: - fs_type = dataset_config.get('FileSystem', 'local') - if self._ml_dataset_openers \ - and fs_type in self._ml_dataset_openers: - ml_dataset_opener = self._ml_dataset_openers[fs_type] - elif fs_type in _MULTI_LEVEL_DATASET_OPENERS: - ml_dataset_opener = _MULTI_LEVEL_DATASET_OPENERS[fs_type] - else: + fs_type = dataset_config.get('FileSystem') + if fs_type != 'memory': raise ServiceConfigError(f"Invalid FileSystem {fs_type!r}" f" in dataset configuration" f" {ds_id!r}") with self.measure_time(tag=f"opened dataset {ds_id!r}" f" from {fs_type!r}"): - ml_dataset = ml_dataset_opener(self, dataset_config) + ml_dataset = _open_ml_dataset_from_python_code(self, + dataset_config) augmentation = dataset_config.get('Augmentation') if augmentation: script_path = self.get_config_path( @@ -823,56 +929,6 @@ def normalize_prefix(prefix: Optional[str]) -> str: return prefix -# noinspection PyUnusedLocal -def _open_ml_dataset_from_object_storage( - ctx: ServiceContext, - dataset_config: DatasetConfigDict -) -> MultiLevelDataset: - ds_id = dataset_config.get('Identifier') - path = ctx.get_config_path(dataset_config, - f"dataset configuration {ds_id!r}", - is_url=True) - data_format = dataset_config.get('Format', FORMAT_NAME_ZARR) - - s3_kwargs = dict() - if 'Anonymous' in dataset_config: - s3_kwargs['anon'] = bool(dataset_config['Anonymous']) - if 'AccessKeyId' in dataset_config: - s3_kwargs['key'] = dataset_config['AccessKeyId'] - if 'SecretAccessKey' in dataset_config: - s3_kwargs['secret'] = dataset_config['SecretAccessKey'] - - s3_client_kwargs = dict() - if 'Endpoint' in dataset_config: - s3_client_kwargs['endpoint_url'] = dataset_config['Endpoint'] - if 'Region' in dataset_config: - s3_client_kwargs['region_name'] = dataset_config['Region'] - - chunk_cache_capacity = ctx.get_dataset_chunk_cache_capacity(dataset_config) - return open_ml_dataset_from_object_storage(path, - data_format=data_format, - ds_id=ds_id, - exception_type=ServiceConfigError, - s3_kwargs=s3_kwargs, - s3_client_kwargs=s3_client_kwargs, - chunk_cache_capacity=chunk_cache_capacity) - - -def _open_ml_dataset_from_local_fs(ctx: ServiceContext, - dataset_config: DatasetConfigDict) -> MultiLevelDataset: - ds_id = dataset_config.get('Identifier') - path = ctx.get_config_path(dataset_config, - f"dataset configuration {ds_id}") - data_format = dataset_config.get('Format') - chunk_cache_capacity = ctx.get_dataset_chunk_cache_capacity(dataset_config) - if chunk_cache_capacity: - warnings.warn('chunk cache size is not effective for datasets stored in local file systems') - return open_ml_dataset_from_local_fs(path, - data_format=data_format, - ds_id=ds_id, - exception_type=ServiceConfigError) - - def _open_ml_dataset_from_python_code(ctx: ServiceContext, dataset_config: DatasetConfigDict) -> MultiLevelDataset: ds_id = dataset_config.get('Identifier') @@ -898,8 +954,22 @@ def _open_ml_dataset_from_python_code(ctx: ServiceContext, exception_type=ServiceConfigError) +def _is_not_empty(prefix): + return prefix != '' and prefix != '/' and prefix != '\\' + + +def _get_common_prefixes(p): + # Recursively examine a list of paths for common prefixes: + # If no common prefix is found, split the list in half and + # examine each half separately + prefix = os.path.commonprefix(p) + if _is_not_empty(prefix) or len(p) == 1: + return [prefix] + else: + return _get_common_prefixes(p[:int(len(p) / 2)]) + \ + _get_common_prefixes(p[int(len(p) / 2):]) + + _MULTI_LEVEL_DATASET_OPENERS = { - "obs": _open_ml_dataset_from_object_storage, - "local": _open_ml_dataset_from_local_fs, "memory": _open_ml_dataset_from_python_code, } diff --git a/xcube/webapi/handlers.py b/xcube/webapi/handlers.py index d4fab4ddc..f0cf3057a 100644 --- a/xcube/webapi/handlers.py +++ b/xcube/webapi/handlers.py @@ -280,11 +280,9 @@ def _get_key_and_local_path(self, ds_id: str, path: str): if path and '..' in path.split('/'): raise ServiceBadRequestError(f'AWS S3 data access: received illegal key {key!r}') - local_path = dataset_config.get('Path') - if os.path.isabs(local_path): - local_path = os.path.join(local_path, path) - else: - local_path = os.path.join(self.service_context.base_dir, local_path, path) + bucket_mapping = self.service_context.get_s3_bucket_mapping() + local_path = bucket_mapping.get(ds_id) + local_path = os.path.join(local_path, path) local_path = os.path.normpath(local_path)