Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand (and pass) nested FSStore tests #709

Merged
merged 19 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,11 @@ def _normalize_key(self, key):
return key.lower() if self.normalize_keys else key

def getitems(self, keys, **kwargs):
keys = [self._normalize_key(key) for key in keys]
return self.map.getitems(keys, on_error="omit")
keys_transformed = [self._normalize_key(key) for key in keys]
results = self.map.getitems(keys_transformed, on_error="omit")
# The function calling this method may not recognize the transformed keys
# So we send the values returned by self.map.getitems back into the original key space.
return {keys[keys_transformed.index(rk)]: rv for rk, rv in results.items()}

def __getitem__(self, key):
key = self._normalize_key(key)
Expand Down Expand Up @@ -1144,9 +1147,28 @@ def dir_path(self, path=None):
def listdir(self, path=None):
dir_path = self.dir_path(path)
try:
out = sorted(p.rstrip('/').rsplit('/', 1)[-1]
for p in self.fs.ls(dir_path, detail=False))
return out
children = sorted(p.rstrip('/').rsplit('/', 1)[-1]
for p in self.fs.ls(dir_path, detail=False))
if self.key_separator != "/":
return children
else:
if array_meta_key in children:
# special handling of directories containing an array to map nested chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant: can you comment if this matches your expectations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant : any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I have reminded myself what it's doing, and I suppose this is reasonable. As to it's accuracy in all cases, I can only guess it looks right.

It makes you wonder how useful listdir actually if for the case of fsspec. I suppose it remains true that a user might want to start exploring a dataset at some high-level group and descend to a specific array, yet make sure they never list the entire set of files (which could be expensive).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @martindurant. I'm going to take that as a 👍 and get this into a 2.8.1 release. It seems that this may be something that needs a re-evaluation down the line.

# keys back to standard chunk keys
new_children = []
root_path = self.dir_path(path)
for entry in children:
entry_path = os.path.join(root_path, entry)
if _prog_number.match(entry) and self.fs.isdir(entry_path):
d-v-b marked this conversation as resolved.
Show resolved Hide resolved
for file_name in self.fs.find(entry_path):
file_path = os.path.join(dir_path, file_name)
rel_path = file_path.split(root_path)[1]
new_children.append(rel_path.replace(os.path.sep, '.'))
else:
new_children.append(entry)
return sorted(new_children)
else:
return children
except IOError:
return []

Expand Down Expand Up @@ -2739,6 +2761,7 @@ class ConsolidatedMetadataStore(MutableMapping):
zarr.convenience.consolidate_metadata, zarr.convenience.open_consolidated
"""

def __init__(self, store, metadata_key='.zmetadata'):
self.store = store

Expand Down
174 changes: 165 additions & 9 deletions zarr/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
from zarr.util import buffer_size
from zarr.tests.util import skip_test_env_var, have_fsspec


# noinspection PyMethodMayBeStatic


class TestArray(unittest.TestCase):

def test_array_init(self):
Expand Down Expand Up @@ -1079,7 +1080,7 @@ def test_structured_array_nested(self):
(1, (1, ((1, 2), (2, 3), (3, 4)), 1), b'bbb'),
(2, (2, ((2, 3), (3, 4), (4, 5)), 2), b'ccc')],
dtype=[('foo', 'i8'), ('bar', [('foo', 'i4'), ('bar', '(3, 2)f4'),
('baz', 'u1')]), ('baz', 'S3')])
('baz', 'u1')]), ('baz', 'S3')])
fill_values = None, b'', (0, (0, ((0, 0), (1, 1), (2, 2)), 0), b'zzz')
self.check_structured_array(d, fill_values)

Expand Down Expand Up @@ -1802,7 +1803,7 @@ def test_structured_array_nested(self):
(1, (1, ((1, 2), (2, 3), (3, 4)), 1), b'bbb'),
(2, (2, ((2, 3), (3, 4), (4, 5)), 2), b'ccc')],
dtype=[('foo', 'i8'), ('bar', [('foo', 'i4'), ('bar', '(3, 2)f4'),
('baz', 'u1')]), ('baz', 'S3')])
('baz', 'u1')]), ('baz', 'S3')])
fill_values = None, b'', (0, (0, ((0, 0), (1, 1), (2, 2)), 0), b'zzz')
with pytest.raises(TypeError):
self.check_structured_array(d, fill_values)
Expand Down Expand Up @@ -2469,36 +2470,50 @@ class TestArrayWithFSStore(TestArray):
def create_array(read_only=False, **kwargs):
path = mkdtemp()
atexit.register(shutil.rmtree, path)
store = FSStore(path)
key_separator = kwargs.pop('key_separator', ".")
store = FSStore(path, key_separator=key_separator, auto_mkdir=True)
cache_metadata = kwargs.pop('cache_metadata', True)
cache_attrs = kwargs.pop('cache_attrs', True)
kwargs.setdefault('compressor', Blosc())
init_array(store, **kwargs)
return Array(store, read_only=read_only, cache_metadata=cache_metadata,
cache_attrs=cache_attrs)

def expected(self):
return [
"ab753fc81df0878589535ca9bad2816ba88d91bc",
"c16261446f9436b1e9f962e57ce3e8f6074abe8a",
"c2ef3b2fb2bc9dcace99cd6dad1a7b66cc1ea058",
"6e52f95ac15b164a8e96843a230fcee0e610729b",
"091fa99bc60706095c9ce30b56ce2503e0223f56",
]

def test_hexdigest(self):
found = []

# Check basic 1-D array
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
assert 'f710da18d45d38d4aaf2afd7fb822fdd73d02957' == z.hexdigest()
found.append(z.hexdigest())

# Check basic 1-D array with different type
z = self.create_array(shape=(1050,), chunks=100, dtype='<f4')
assert '1437428e69754b1e1a38bd7fc9e43669577620db' == z.hexdigest()
found.append(z.hexdigest())

# Check basic 2-D array
z = self.create_array(shape=(20, 35,), chunks=10, dtype='<i4')
assert '6c530b6b9d73e108cc5ee7b6be3d552cc994bdbe' == z.hexdigest()
found.append(z.hexdigest())

# Check basic 1-D array with some data
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
z[200:400] = np.arange(200, 400, dtype='i4')
assert '4c0a76fb1222498e09dcd92f7f9221d6cea8b40e' == z.hexdigest()
found.append(z.hexdigest())

# Check basic 1-D array with attributes
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
z.attrs['foo'] = 'bar'
assert '05b0663ffe1785f38d3a459dec17e57a18f254af' == z.hexdigest()
found.append(z.hexdigest())

assert self.expected() == found


@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
Expand Down Expand Up @@ -2573,3 +2588,144 @@ def test_read_from_all_blocks(self):
z[2:99_000] = 1
b = Array(z.store, read_only=True, partial_decompress=True)
assert (b[2:99_000] == 1).all()


@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
class TestArrayWithFSStoreNested(TestArray):

@staticmethod
def create_array(read_only=False, **kwargs):
path = mkdtemp()
atexit.register(shutil.rmtree, path)
key_separator = kwargs.pop('key_separator', "/")
store = FSStore(path, key_separator=key_separator, auto_mkdir=True)
cache_metadata = kwargs.pop('cache_metadata', True)
cache_attrs = kwargs.pop('cache_attrs', True)
kwargs.setdefault('compressor', Blosc())
init_array(store, **kwargs)
return Array(store, read_only=read_only, cache_metadata=cache_metadata,
cache_attrs=cache_attrs)

def expected(self):
return [
"94884f29b41b9beb8fc99ad7bf9c0cbf0f2ab3c9",
"077aa3bd77b8d354f8f6c15dce5ae4f545788a72",
"22be95d83c097460adb339d80b2d7fe19c513c16",
"85131cec526fa46938fd2c4a6083a58ee11037ea",
"c3167010c162c6198cb2bf3c1da2c46b047c69a1",
]

def test_hexdigest(self):
found = []

# Check basic 1-D array
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
found.append(z.hexdigest())

# Check basic 1-D array with different type
z = self.create_array(shape=(1050,), chunks=100, dtype='<f4')
found.append(z.hexdigest())

# Check basic 2-D array
z = self.create_array(shape=(20, 35,), chunks=10, dtype='<i4')
found.append(z.hexdigest())

# Check basic 1-D array with some data
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
z[200:400] = np.arange(200, 400, dtype='i4')
found.append(z.hexdigest())

# Check basic 1-D array with attributes
z = self.create_array(shape=(1050,), chunks=100, dtype='<i4')
z.attrs['foo'] = 'bar'
found.append(z.hexdigest())

assert self.expected() == found


@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
class TestArrayWithFSStoreNestedPartialRead(TestArray):
@staticmethod
def create_array(read_only=False, **kwargs):
path = mkdtemp()
atexit.register(shutil.rmtree, path)
key_separator = kwargs.pop('key_separator', "/")
store = FSStore(path, key_separator=key_separator, auto_mkdir=True)
cache_metadata = kwargs.pop("cache_metadata", True)
cache_attrs = kwargs.pop("cache_attrs", True)
kwargs.setdefault("compressor", Blosc())
init_array(store, **kwargs)
return Array(
store,
read_only=read_only,
cache_metadata=cache_metadata,
cache_attrs=cache_attrs,
partial_decompress=True,
)

def expected(self):
return [
"94884f29b41b9beb8fc99ad7bf9c0cbf0f2ab3c9",
"077aa3bd77b8d354f8f6c15dce5ae4f545788a72",
"22be95d83c097460adb339d80b2d7fe19c513c16",
"85131cec526fa46938fd2c4a6083a58ee11037ea",
"c3167010c162c6198cb2bf3c1da2c46b047c69a1",
]

def test_hexdigest(self):
found = []

# Check basic 1-D array
z = self.create_array(shape=(1050,), chunks=100, dtype="<i4")
found.append(z.hexdigest())

# Check basic 1-D array with different type
z = self.create_array(shape=(1050,), chunks=100, dtype="<f4")
found.append(z.hexdigest())

# Check basic 2-D array
z = self.create_array(
shape=(
20,
35,
),
chunks=10,
dtype="<i4",
)
found.append(z.hexdigest())

# Check basic 1-D array with some data
z = self.create_array(shape=(1050,), chunks=100, dtype="<i4")
z[200:400] = np.arange(200, 400, dtype="i4")
found.append(z.hexdigest())

# Check basic 1-D array with attributes
z = self.create_array(shape=(1050,), chunks=100, dtype="<i4")
z.attrs["foo"] = "bar"
found.append(z.hexdigest())

assert self.expected() == found

def test_non_cont(self):
z = self.create_array(shape=(500, 500, 500), chunks=(50, 50, 50), dtype="<i4")
z[:, :, :] = 1
# actually go through the partial read by accessing a single item
assert z[0, :, 0].any()

def test_read_nitems_less_than_blocksize_from_multiple_chunks(self):
'''Tests to make sure decompression doesn't fail when `nitems` is
less than a compressed block size, but covers multiple blocks
'''
z = self.create_array(shape=1000000, chunks=100_000)
z[40_000:80_000] = 1
b = Array(z.store, read_only=True, partial_decompress=True)
assert (b[40_000:80_000] == 1).all()

def test_read_from_all_blocks(self):
'''Tests to make sure `PartialReadBuffer.read_part` doesn't fail when
stop isn't in the `start_points` array
'''
z = self.create_array(shape=1000000, chunks=100_000)
z[2:99_000] = 1
b = Array(z.store, read_only=True, partial_decompress=True)
assert (b[2:99_000] == 1).all()
57 changes: 51 additions & 6 deletions zarr/tests/test_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
from zarr.core import Array
from zarr.creation import open_array
from zarr.hierarchy import Group, group, open_group
from zarr.storage import (ABSStore, DBMStore, DirectoryStore, LMDBStore,
LRUStoreCache, MemoryStore, NestedDirectoryStore,
SQLiteStore, ZipStore, array_meta_key, atexit_rmglob,
atexit_rmtree, group_meta_key, init_array,
init_group)
from zarr.storage import (ABSStore, DBMStore, DirectoryStore, FSStore,
LMDBStore, LRUStoreCache, MemoryStore,
NestedDirectoryStore, SQLiteStore, ZipStore,
array_meta_key, atexit_rmglob, atexit_rmtree,
group_meta_key, init_array, init_group)
from zarr.util import InfoReporter
from zarr.tests.util import skip_test_env_var
from zarr.tests.util import skip_test_env_var, have_fsspec


# noinspection PyStatementEffect
Expand Down Expand Up @@ -971,6 +971,51 @@ def create_store():
return store, None


@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
class TestGroupWithFSStore(TestGroup):

@staticmethod
def create_store():
path = tempfile.mkdtemp()
atexit.register(atexit_rmtree, path)
store = FSStore(path)
return store, None

def test_round_trip_nd(self):
data = np.arange(1000).reshape(10, 10, 10)
name = 'raw'

store, _ = self.create_store()
f = open_group(store, mode='w')
f.create_dataset(name, data=data, chunks=(5, 5, 5),
compressor=None)
h = open_group(store, mode='r')
np.testing.assert_array_equal(h[name][:], data)


@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec")
class TestGroupWithNestedFSStore(TestGroupWithFSStore):

@staticmethod
def create_store():
path = tempfile.mkdtemp()
atexit.register(atexit_rmtree, path)
store = FSStore(path, key_separator='/', auto_mkdir=True)
return store, None

def test_inconsistent_dimension_separator(self):
data = np.arange(1000).reshape(10, 10, 10)
name = 'raw'

store, _ = self.create_store()
f = open_group(store, mode='w')

# cannot specify dimension_separator that conflicts with the store
with pytest.raises(ValueError):
f.create_dataset(name, data=data, chunks=(5, 5, 5),
compressor=None, dimension_separator='.')


class TestGroupWithZipStore(TestGroup):

@staticmethod
Expand Down
7 changes: 6 additions & 1 deletion zarr/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,19 @@ def __init__(self, store_key, chunk_store):
self.map = self.chunk_store.map
self.fs = self.chunk_store.fs
self.store_key = store_key
self.key_path = self.map._key_to_str(store_key)
self.buff = None
self.nblocks = None
self.start_points = None
self.n_per_block = None
self.start_points_max = None
self.read_blocks = set()

_key_path = self.map._key_to_str(store_key)
_key_path = _key_path.split('/')
_chunk_path = [self.chunk_store._normalize_key(_key_path[-1])]
_key_path = '/'.join(_key_path[:-1] + _chunk_path)
self.key_path = _key_path

def prepare_chunk(self):
assert self.buff is None
header = self.fs.read_block(self.key_path, 0, 16)
Expand Down