Skip to content

Commit ff02034

Browse files
authored
Merge pull request #622 from dcs4cop/forman-617-levels_on_s3
Introduced parameter "base_dataset_id" for writing multi-level datasets
2 parents f776640 + 37785bc commit ff02034

File tree

7 files changed

+196
-69
lines changed

7 files changed

+196
-69
lines changed

CHANGES.md

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
### Enhancements
44

5+
* Introduced parameter `base_dataset_id` for writing multi-level
6+
datasets with the "file", "s3", and "memory" data stores.
7+
If given, the base dataset will be linked only with the
8+
value of `base_dataset_id`, instead of being copied as-is.
9+
This can save large amounts of storage space. (#617)
10+
511
### Fixes
612

713
* Fixed `FsDataAccessor.write_data()` implementations,

test/core/store/fs/test_registry.py

+73-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os.path
22
import unittest
3+
import warnings
34
from abc import ABC, abstractmethod
4-
from typing import Type, Union
5+
from typing import Any, Dict, Optional, Type, Union
56

67
import fsspec
78
import xarray as xr
@@ -38,7 +39,7 @@ def prepare_fs(cls, fs: fsspec.AbstractFileSystem, root: str):
3839
# print(f'{fs.protocol}: making root {root}')
3940
fs.mkdirs(root)
4041

41-
# Write a text file into each subdirectory so
42+
# Write a text file into each subdirectory, so
4243
# we also test that store.get_data_ids() scans
4344
# recursively.
4445
dir_path = root
@@ -54,6 +55,7 @@ def prepare_fs(cls, fs: fsspec.AbstractFileSystem, root: str):
5455
def test_mldataset_levels(self):
5556
data_store = self.create_data_store()
5657
self.assertMultiLevelDatasetFormatSupported(data_store)
58+
self.assertMultiLevelDatasetFormatWithLinkSupported(data_store)
5759

5860
def test_dataset_zarr(self):
5961
data_store = self.create_data_store()
@@ -73,6 +75,47 @@ def assertMultiLevelDatasetFormatSupported(self,
7375
MultiLevelDataset,
7476
MultiLevelDatasetDescriptor)
7577

78+
# Test that use_saved_levels works
79+
self.assertDatasetSupported(data_store,
80+
'.levels',
81+
'mldataset',
82+
MultiLevelDataset,
83+
MultiLevelDatasetDescriptor,
84+
write_params=dict(
85+
use_saved_levels=True,
86+
))
87+
88+
def assertMultiLevelDatasetFormatWithLinkSupported(
89+
self,
90+
data_store: MutableDataStore
91+
):
92+
base_dataset = self.new_cube_data()
93+
base_dataset_id = f'{DATA_PATH}/base-ds.zarr'
94+
data_store.write_data(base_dataset, base_dataset_id)
95+
96+
# Test that base_dataset_id works
97+
self.assertDatasetSupported(data_store,
98+
'.levels',
99+
'mldataset',
100+
MultiLevelDataset,
101+
MultiLevelDatasetDescriptor,
102+
write_params=dict(
103+
base_dataset_id=base_dataset_id,
104+
))
105+
106+
# Test that base_dataset_id + use_saved_levels works
107+
self.assertDatasetSupported(data_store,
108+
'.levels',
109+
'mldataset',
110+
MultiLevelDataset,
111+
MultiLevelDatasetDescriptor,
112+
write_params=dict(
113+
base_dataset_id=base_dataset_id,
114+
use_saved_levels=True,
115+
))
116+
117+
data_store.delete_data(base_dataset_id)
118+
76119
def assertDatasetFormatSupported(self,
77120
data_store: MutableDataStore,
78121
filename_ext: str):
@@ -89,8 +132,12 @@ def assertDatasetSupported(
89132
expected_data_type_alias: str,
90133
expected_type: Union[Type[xr.Dataset],
91134
Type[MultiLevelDataset]],
92-
expected_descriptor_type: Union[Type[DatasetDescriptor],
93-
Type[MultiLevelDatasetDescriptor]]
135+
expected_descriptor_type: Union[
136+
Type[DatasetDescriptor],
137+
Type[MultiLevelDatasetDescriptor]
138+
],
139+
write_params: Optional[Dict[str, Any]] = None,
140+
open_params: Optional[Dict[str, Any]] = None,
94141
):
95142
"""
96143
Call all DataStore operations to ensure data of type
@@ -102,10 +149,15 @@ def assertDatasetSupported(
102149
:param expected_data_type_alias: The expected data type alias.
103150
:param expected_type: The expected data type.
104151
:param expected_descriptor_type: The expected data descriptor type.
152+
:param write_params: Optional write parameters
153+
:param open_params: Optional open parameters
105154
"""
106155

107156
data_id = f'{DATA_PATH}/ds{filename_ext}'
108157

158+
write_params = write_params or {}
159+
open_params = open_params or {}
160+
109161
self.assertIsInstance(data_store, MutableDataStore)
110162

111163
self.assertEqual({'dataset', 'mldataset', 'geodataframe'},
@@ -114,36 +166,44 @@ def assertDatasetSupported(
114166
with self.assertRaises(DataStoreError):
115167
data_store.get_data_types_for_data(data_id)
116168
self.assertEqual(False, data_store.has_data(data_id))
117-
self.assertEqual([], list(data_store.get_data_ids()))
169+
self.assertNotIn(data_id, set(data_store.get_data_ids()))
118170

119-
data = new_cube(variables=dict(A=8, B=9))
120-
written_data_id = data_store.write_data(data, data_id)
171+
data = self.new_cube_data()
172+
written_data_id = data_store.write_data(data, data_id, **write_params)
121173
self.assertEqual(data_id, written_data_id)
174+
122175
self.assertEqual({expected_data_type_alias},
123176
set(data_store.get_data_types_for_data(data_id)))
124177
self.assertEqual(True, data_store.has_data(data_id))
125-
self.assertEqual([data_id], list(data_store.get_data_ids()))
178+
self.assertIn(data_id, set(data_store.get_data_ids()))
126179

127-
data_descriptors = list(data_store.search_data())
180+
data_descriptors = list(data_store.search_data(
181+
data_type=expected_type)
182+
)
128183
self.assertEqual(1, len(data_descriptors))
129184
self.assertIsInstance(data_descriptors[0], DataDescriptor)
130185
self.assertIsInstance(data_descriptors[0], expected_descriptor_type)
131186

132-
data = data_store.open_data(data_id)
187+
data = data_store.open_data(data_id, **open_params)
133188
self.assertIsInstance(data, expected_type)
134189

135190
try:
136191
data_store.delete_data(data_id)
137-
except PermissionError: # Typically occurs on win32 due to fsspec
192+
except PermissionError as e: # May occur on win32 due to fsspec
193+
warnings.warn(f'{e}')
138194
return
139195
with self.assertRaises(DataStoreError):
140196
data_store.get_data_types_for_data(data_id)
141197
self.assertEqual(False, data_store.has_data(data_id))
142-
self.assertEqual([], list(data_store.get_data_ids()))
198+
self.assertNotIn(data_id, set(data_store.get_data_ids()))
143199

200+
@staticmethod
201+
def new_cube_data():
202+
cube = new_cube(variables=dict(A=8.5, B=9.5))
203+
return cube.chunk(dict(time=1, lat=90, lon=180))
144204

145-
class FileFsDataStoresTest(FsDataStoresTestMixin, unittest.TestCase):
146205

206+
class FileFsDataStoresTest(FsDataStoresTestMixin, unittest.TestCase):
147207
def create_data_store(self) -> FsDataStore:
148208
root = os.path.join(new_temp_dir(prefix='xcube'), ROOT_DIR)
149209
self.prepare_fs(fsspec.filesystem('file'), root)

xcube/core/store/fs/accessor.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import copy
2323
from abc import ABC, abstractmethod
24-
from typing import Dict, Any, Tuple
24+
from typing import Dict, Any, Tuple, Optional
2525

2626
import fsspec
2727

@@ -52,6 +52,7 @@
5252
PROTOCOL_PARAM_NAME = 'protocol'
5353
STORAGE_OPTIONS_PARAM_NAME = 'storage_options'
5454
FS_PARAM_NAME = 'fs'
55+
ROOT_PARAM_NAME = 'root'
5556

5657

5758
class FsAccessor:
@@ -74,7 +75,9 @@ def get_storage_options_schema(cls) -> JsonObjectSchema:
7475

7576
@classmethod
7677
def load_fs(cls, params: Dict[str, Any]) \
77-
-> Tuple[fsspec.AbstractFileSystem, Dict[str, Any]]:
78+
-> Tuple[fsspec.AbstractFileSystem,
79+
Optional[str],
80+
Dict[str, Any]]:
7881
"""
7982
Load a filesystem instance from *params*.
8083
@@ -85,17 +88,21 @@ def load_fs(cls, params: Dict[str, Any]) \
8588
8689
:param params: Parameters passed to a filesystem
8790
data store, opener, or writer call.
88-
:return: A tuple comprising the filesystem
91+
:return: A tuple comprising the filesystem, an optional root path,
8992
and the modified *params*.
9093
"""
9194
params = dict(params)
9295

93-
# Filesystem data stores pass "fs" kwarg to
96+
# Filesystem data-stores pass "fs" and "root" kwargs to
9497
# data opener and writer calls.
9598
fs = params.pop(FS_PARAM_NAME, None)
99+
root = params.pop(ROOT_PARAM_NAME, None)
96100
if fs is not None:
97101
assert_instance(fs, fsspec.AbstractFileSystem, name=FS_PARAM_NAME)
98-
return fs, params
102+
if root is not None:
103+
assert_instance(root, str, name=ROOT_PARAM_NAME)
104+
if fs:
105+
return fs, root, params
99106

100107
protocol = cls.get_protocol()
101108
if protocol == 'abstract':
@@ -117,9 +124,13 @@ def load_fs(cls, params: Dict[str, Any]) \
117124
if storage_options else False)
118125

119126
try:
120-
return fsspec.filesystem(protocol,
121-
use_listings_cache=use_listings_cache,
122-
**(storage_options or {})), params
127+
return (
128+
fsspec.filesystem(protocol,
129+
use_listings_cache=use_listings_cache,
130+
**(storage_options or {})),
131+
root,
132+
params
133+
)
123134
except (ValueError, ImportError):
124135
raise DataStoreError(f"Cannot instantiate"
125136
f" filesystem {protocol!r}")
@@ -189,5 +200,5 @@ def get_delete_data_params_schema(self, data_id: str = None) \
189200
def delete_data(self,
190201
data_id: str,
191202
**delete_params):
192-
fs, delete_params = self.load_fs(delete_params)
203+
fs, _, delete_params = self.load_fs(delete_params)
193204
fs.delete(data_id, **delete_params)

xcube/core/store/fs/impl/dataset.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# SOFTWARE.
2121

2222
from abc import ABC
23-
from typing import Tuple
23+
from typing import Tuple, Optional
2424

2525
import xarray as xr
2626
import zarr
@@ -121,8 +121,10 @@
121121
additional_properties=True,
122122
),
123123
consolidated=JsonBooleanSchema(
124-
description='If True, apply Zarr’s consolidate_metadata()'
125-
' function to the store after writing.'
124+
description='If True (the default), consolidate all metadata'
125+
' files ("**/.zarray", "**/.zattrs")'
126+
' into a single top-level file ".zmetadata"',
127+
default=True,
126128
),
127129
append_dim=JsonStringSchema(
128130
description='If set, the dimension on which the'
@@ -160,9 +162,11 @@ def get_open_data_params_schema(self, data_id: str = None) \
160162
ZARR_OPEN_DATA_PARAMS_SCHEMA
161163
)
162164

163-
def open_data(self, data_id: str, **open_params) -> xr.Dataset:
165+
def open_data(self,
166+
data_id: str,
167+
**open_params) -> xr.Dataset:
164168
assert_instance(data_id, str, name='data_id')
165-
fs, open_params = self.load_fs(open_params)
169+
fs, root, open_params = self.load_fs(open_params)
166170
zarr_store = fs.get_mapper(data_id)
167171
cache_size = open_params.pop('cache_size', None)
168172
if isinstance(cache_size, int) and cache_size > 0:
@@ -194,7 +198,7 @@ def write_data(self,
194198
**write_params) -> str:
195199
assert_instance(data, xr.Dataset, name='data')
196200
assert_instance(data_id, str, name='data_id')
197-
fs, write_params = self.load_fs(write_params)
201+
fs, root, write_params = self.load_fs(write_params)
198202
zarr_store = fs.get_mapper(data_id, create=True)
199203
log_access = write_params.pop('log_access', None)
200204
if log_access:
@@ -214,7 +218,7 @@ def write_data(self,
214218
def delete_data(self,
215219
data_id: str,
216220
**delete_params):
217-
fs, delete_params = self.load_fs(delete_params)
221+
fs, root, delete_params = self.load_fs(delete_params)
218222
delete_params.pop('recursive', None)
219223
fs.delete(data_id, recursive=True, **delete_params)
220224

@@ -253,7 +257,7 @@ def open_data(self,
253257
data_id: str,
254258
**open_params) -> xr.Dataset:
255259
assert_instance(data_id, str, name='data_id')
256-
fs, open_params = self.load_fs(open_params)
260+
fs, root, open_params = self.load_fs(open_params)
257261

258262
# This doesn't yet work as expected with fsspec and netcdf:
259263
# engine = open_params.pop('engine', 'scipy')
@@ -281,7 +285,7 @@ def write_data(self,
281285
**write_params) -> str:
282286
assert_instance(data, xr.Dataset, name='data')
283287
assert_instance(data_id, str, name='data_id')
284-
fs, write_params = self.load_fs(write_params)
288+
fs, root, write_params = self.load_fs(write_params)
285289
if not replace and fs.exists(data_id):
286290
raise DataStoreError(f'Data resource {data_id} already exists')
287291

xcube/core/store/fs/impl/geodataframe.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# SOFTWARE.
2121

2222
from abc import abstractmethod, ABC
23-
from typing import Tuple
23+
from typing import Tuple, Optional
2424

2525
import geopandas as gpd
2626
import pandas as pd
@@ -60,7 +60,7 @@ def get_open_data_params_schema(self, data_id: str = None) \
6060
def open_data(self, data_id: str, **open_params) -> gpd.GeoDataFrame:
6161
# TODO: implement me correctly,
6262
# this is not valid for shapefile AND geojson
63-
fs, open_params = self.load_fs(open_params)
63+
fs, root, open_params = self.load_fs(open_params)
6464
is_local = is_local_fs(fs)
6565
if is_local:
6666
file_path = data_id
@@ -86,7 +86,7 @@ def write_data(self,
8686
# TODO: implement me correctly,
8787
# this is not valid for shapefile AND geojson
8888
assert_instance(data, (gpd.GeoDataFrame, pd.DataFrame), 'data')
89-
fs, write_params = self.load_fs(write_params)
89+
fs, root, write_params = self.load_fs(write_params)
9090
is_local = is_local_fs(fs)
9191
if is_local:
9292
file_path = data_id

0 commit comments

Comments
 (0)