diff --git a/docs/consolidated_metadata.rst b/docs/consolidated_metadata.rst new file mode 100644 index 0000000000..480a855831 --- /dev/null +++ b/docs/consolidated_metadata.rst @@ -0,0 +1,74 @@ +Consolidated Metadata +===================== + +Zarr-Python implements the `Consolidated Metadata_` extension to the Zarr Spec. +Consolidated metadata can reduce the time needed to load the metadata for an +entire hierarchy, especially when the metadata is being served over a network. +Consolidated metadata essentially stores all the metadata for a hierarchy in the +metadata of the root Group. + +Usage +----- + +If consolidated metadata is present in a Zarr Group's metadata then it is used +by default. The initial read to open the group will need to communicate with +the store (reading from a file for a :class:`zarr.store.LocalStore`, making a +network request for a :class:`zarr.store.RemoteStore`). After that, any subsequent +metadata reads get child Group or Array nodes will *not* require reads from the store. + +In Python, the consolidated metadata is available on the ``.consolidated_metadata`` +attribute of the ``GroupMetadata`` object. + +.. code-block:: python + + >>> import zarr + >>> store = zarr.store.MemoryStore({}, mode="w") + >>> group = zarr.open_group(store=store) + >>> group.create_array(shape=(1,), name="a") + >>> group.create_array(shape=(2, 2), name="b") + >>> group.create_array(shape=(3, 3, 3), name="c") + >>> zarr.consolidate_metadata(store) + +If we open that group, the Group's metadata has a :class:`zarr.ConsolidatedMetadata` +that can be used. + +.. code-block:: python + + >>> consolidated = zarr.open_group(store=store) + >>> consolidated.metadata.consolidated_metadata.metadata + {'b': ArrayV3Metadata(shape=(2, 2), fill_value=np.float64(0.0), ...), + 'a': ArrayV3Metadata(shape=(1,), fill_value=np.float64(0.0), ...), + 'c': ArrayV3Metadata(shape=(3, 3, 3), fill_value=np.float64(0.0), ...)} + +Operations on the group to get children automatically use the consolidated metadata. + +.. code-block:: python + + >>> consolidated["a"] # no read / HTTP request to the Store is required + + +With nested groups, the consolidated metadata is available on the children, recursively. + +... code-block:: python + + >>> child = group.create_group("child", attributes={"kind": "child"}) + >>> grandchild = child.create_group("child", attributes={"kind": "grandchild"}) + >>> consolidated = zarr.consolidate_metadata(store) + + >>> consolidated["child"].metadata.consolidated_metadata + ConsolidatedMetadata(metadata={'child': GroupMetadata(attributes={'kind': 'grandchild'}, zarr_format=3, )}, ...) + +Synchronization and Concurrency +------------------------------- + +Consolidated metadata is intended for read-heavy use cases on slowly changing +hierarchies. For hierarchies where new nodes are constantly being added, +removed, or modified, consolidated metadata may not be desirable. + +1. It will add some overhead to each update operation, since the metadata + would need to be re-consolidated to keep it in sync with the store. +2. Readers using consolidated metadata will regularly see a "past" version + of the metadata, at the time they read the root node with its consolidated + metadata. + +.. _Consolidated Metadata: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#consolidated-metadata \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index da8e9bcf62..fd43d5e411 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,7 @@ Zarr-Python getting_started tutorial + consolidated_metadata api/index spec release diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 12f58a4e31..a6363bc185 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import dataclasses import warnings from typing import TYPE_CHECKING, Any, Literal, cast @@ -9,9 +10,17 @@ from zarr.abc.store import Store from zarr.core.array import Array, AsyncArray, get_array_metadata -from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat +from zarr.core.buffer import NDArrayLike +from zarr.core.chunk_key_encodings import ChunkKeyEncoding +from zarr.core.common import ( + JSON, + AccessModeLiteral, + ChunkCoords, + MemoryOrder, + ZarrFormat, +) from zarr.core.config import config -from zarr.core.group import AsyncGroup +from zarr.core.group import AsyncGroup, ConsolidatedMetadata, GroupMetadata from zarr.core.metadata import ArrayMetadataDict, ArrayV2Metadata, ArrayV3Metadata from zarr.errors import NodeTypeValidationError from zarr.storage import ( @@ -132,8 +141,64 @@ def _default_zarr_version() -> ZarrFormat: return cast(ZarrFormat, int(config.get("default_zarr_version", 3))) -async def consolidate_metadata(*args: Any, **kwargs: Any) -> AsyncGroup: - raise NotImplementedError +async def consolidate_metadata( + store: StoreLike, + path: str | None = None, + zarr_format: ZarrFormat | None = None, +) -> AsyncGroup: + """ + Consolidate the metadata of all nodes in a hierarchy. + + Upon completion, the metadata of the root node in the Zarr hierarchy will be + updated to include all the metadata of child nodes. + + Parameters + ---------- + store: StoreLike + The store-like object whose metadata you wish to consolidate. + path: str, optional + A path to a group in the store to consolidate at. Only children + below that group will be consolidated. + + By default, the root node is used so all the metadata in the + store is consolidated. + zarr_format : {2, 3, None}, optional + The zarr format of the hierarchy. By default the zarr format + is inferred. + + Returns + ------- + group: AsyncGroup + The group, with the ``consolidated_metadata`` field set to include + the metadata of each child node. + """ + store_path = await make_store_path(store) + + if path is not None: + store_path = store_path / path + + group = await AsyncGroup.open(store_path, zarr_format=zarr_format, use_consolidated=False) + group.store_path.store._check_writable() + + members_metadata = {k: v.metadata async for k, v in group.members(max_depth=None)} + + # While consolidating, we want to be explicit about when child groups + # are empty by inserting an empty dict for consolidated_metadata.metadata + for k, v in members_metadata.items(): + if isinstance(v, GroupMetadata) and v.consolidated_metadata is None: + v = dataclasses.replace(v, consolidated_metadata=ConsolidatedMetadata(metadata={})) + members_metadata[k] = v + + ConsolidatedMetadata._flat_to_nested(members_metadata) + + consolidated_metadata = ConsolidatedMetadata(metadata=members_metadata) + metadata = dataclasses.replace(group.metadata, consolidated_metadata=consolidated_metadata) + group = dataclasses.replace( + group, + metadata=metadata, + ) + await group._save_metadata() + return group async def copy(*args: Any, **kwargs: Any) -> tuple[int, int, int]: @@ -256,8 +321,18 @@ async def open( return await open_group(store=store_path, zarr_format=zarr_format, **kwargs) -async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup: - raise NotImplementedError +async def open_consolidated( + *args: Any, use_consolidated: Literal[True] = True, **kwargs: Any +) -> AsyncGroup: + """ + Alias for :func:`open_group` with ``use_consolidated=True``. + """ + if use_consolidated is not True: + raise TypeError( + "'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with " + "'use_consolidated=False' to bypass consolidated metadata." + ) + return await open_group(*args, use_consolidated=use_consolidated, **kwargs) async def save( @@ -549,6 +624,7 @@ async def open_group( zarr_format: ZarrFormat | None = None, meta_array: Any | None = None, # not used attributes: dict[str, JSON] | None = None, + use_consolidated: bool | str | None = None, ) -> AsyncGroup: """Open a group using file-mode-like semantics. @@ -589,6 +665,22 @@ async def open_group( to users. Use `numpy.empty(())` by default. attributes : dict A dictionary of JSON-serializable values with user-defined attributes. + use_consolidated : bool or str, default None + Whether to use consolidated metadata. + + By default, consolidated metadata is used if it's present in the + store (in the ``zarr.json`` for Zarr v3 and in the ``.zmetadata`` file + for Zarr v2). + + To explicitly require consolidated metadata, set ``use_consolidated=True``, + which will raise an exception if consolidated metadata is not found. + + To explicitly *not* use consolidated metadata, set ``use_consolidated=False``, + which will fall back to using the regular, non consolidated metadata. + + Zarr v2 allowed configuring the key storing the consolidated metadata + (``.zmetadata`` by default). Specify the custom key as ``use_consolidated`` + to load consolidated metadata from a non-default key. Returns ------- @@ -615,7 +707,9 @@ async def open_group( attributes = {} try: - return await AsyncGroup.open(store_path, zarr_format=zarr_format) + return await AsyncGroup.open( + store_path, zarr_format=zarr_format, use_consolidated=use_consolidated + ) except (KeyError, FileNotFoundError): return await AsyncGroup.from_store( store_path, @@ -777,7 +871,9 @@ async def create( ) else: warnings.warn( - "dimension_separator is not yet implemented", RuntimeWarning, stacklevel=2 + "dimension_separator is not yet implemented", + RuntimeWarning, + stacklevel=2, ) if write_empty_chunks: warnings.warn("write_empty_chunks is not yet implemented", RuntimeWarning, stacklevel=2) diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index d76216b781..9dcd6fe2d5 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import zarr.api.asynchronous as async_api from zarr._compat import _deprecate_positional_args @@ -90,8 +90,10 @@ def open( return Group(obj) -def open_consolidated(*args: Any, **kwargs: Any) -> Group: - return Group(sync(async_api.open_consolidated(*args, **kwargs))) +def open_consolidated(*args: Any, use_consolidated: Literal[True] = True, **kwargs: Any) -> Group: + return Group( + sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs)) + ) def save( @@ -208,6 +210,7 @@ def open_group( zarr_format: ZarrFormat | None = None, meta_array: Any | None = None, # not used in async api attributes: dict[str, JSON] | None = None, + use_consolidated: bool | str | None = None, ) -> Group: return Group( sync( @@ -223,6 +226,7 @@ def open_group( zarr_format=zarr_format, meta_array=meta_array, attributes=attributes, + use_consolidated=use_consolidated, ) ) ) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index d687c42194..ef4921d46f 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -73,6 +73,7 @@ ArrayV3MetadataDict, T_ArrayMetadata, ) +from zarr.core.metadata.v3 import parse_node_type_array from zarr.core.sync import collect_aiterator, sync from zarr.errors import MetadataValidationError from zarr.registry import get_pipeline_class @@ -165,6 +166,9 @@ async def get_array_metadata( # V3 arrays are comprised of a zarr.json object assert zarr_json_bytes is not None metadata_dict = json.loads(zarr_json_bytes.to_bytes()) + + parse_node_type_array(metadata_dict.get("node_type")) + return metadata_dict diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index 80c743cc90..2762b4d2e6 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -24,6 +24,7 @@ ZARRAY_JSON = ".zarray" ZGROUP_JSON = ".zgroup" ZATTRS_JSON = ".zattrs" +ZMETADATA_V2_JSON = ".zmetadata" ByteRangeRequest = tuple[int | None, int | None] BytesLike = bytes | bytearray | memoryview @@ -31,6 +32,7 @@ ChunkCoords = tuple[int, ...] ChunkCoordsLike = Iterable[int] ZarrFormat = Literal[2, 3] +NodeType = Literal["array", "group"] JSON = None | str | int | float | Mapping[str, "JSON"] | tuple["JSON", ...] MemoryOrder = Literal["C", "F"] AccessModeLiteral = Literal["r", "r+", "a", "w", "w-"] diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index dd5a25f098..0b15e2f085 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -1,10 +1,12 @@ from __future__ import annotations import asyncio +import itertools import json import logging +from collections import defaultdict from dataclasses import asdict, dataclass, field, fields, replace -from typing import TYPE_CHECKING, Literal, TypeVar, cast, overload +from typing import TYPE_CHECKING, Literal, TypeVar, assert_never, cast, overload import numpy as np import numpy.typing as npt @@ -22,12 +24,16 @@ ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, + ZMETADATA_V2_JSON, ChunkCoords, + NodeType, ShapeLike, ZarrFormat, parse_shapelike, ) from zarr.core.config import config +from zarr.core.metadata import ArrayV2Metadata, ArrayV3Metadata +from zarr.core.metadata.v3 import V3JsonEncoder from zarr.core.sync import SyncMixin, sync from zarr.errors import MetadataValidationError from zarr.storage import StoreLike, make_store_path @@ -40,8 +46,6 @@ from zarr.abc.codec import Codec from zarr.core.buffer import Buffer, BufferPrototype from zarr.core.chunk_key_encodings import ChunkKeyEncoding - from zarr.core.metadata.v2 import ArrayV2Metadata - from zarr.core.metadata.v3 import ArrayV3Metadata logger = logging.getLogger("zarr.group") @@ -51,10 +55,16 @@ def parse_zarr_format(data: Any) -> ZarrFormat: if data in (2, 3): return cast(Literal[2, 3], data) - msg = msg = f"Invalid zarr_format. Expected one 2 or 3. Got {data}." + msg = f"Invalid zarr_format. Expected one of 2 or 3. Got {data}." raise ValueError(msg) +def parse_node_type(data: Any) -> NodeType: + if data in ("array", "group"): + return cast(Literal["array", "group"], data) + raise MetadataValidationError("node_type", "array or group", data) + + # todo: convert None to empty dict def parse_attributes(data: Any) -> dict[str, Any]: if data is None: @@ -87,10 +97,207 @@ def _parse_async_node( raise TypeError(f"Unknown node type, got {type(node)}") +@dataclass(frozen=True) +class ConsolidatedMetadata: + """ + Consolidated Metadata for this Group. + + This stores the metadata of child nodes below this group. Any child groups + will have their consolidated metadata set appropriately. + """ + + metadata: dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata] + kind: Literal["inline"] = "inline" + must_understand: Literal[False] = False + + def to_dict(self) -> dict[str, JSON]: + return { + "kind": self.kind, + "must_understand": self.must_understand, + "metadata": {k: v.to_dict() for k, v in self.flattened_metadata.items()}, + } + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata: + data = dict(data) + + kind = data.get("kind") + if kind != "inline": + raise ValueError(f"Consolidated metadata kind='{kind}' is not supported.") + + raw_metadata = data.get("metadata") + if not isinstance(raw_metadata, dict): + raise TypeError(f"Unexpected type for 'metadata': {type(raw_metadata)}") + + metadata: dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata] = {} + if raw_metadata: + for k, v in raw_metadata.items(): + if not isinstance(v, dict): + raise TypeError( + f"Invalid value for metadata items. key='{k}', type='{type(v).__name__}'" + ) + + # zarr_format is present in v2 and v3. + zarr_format = parse_zarr_format(v["zarr_format"]) + + if zarr_format == 3: + node_type = parse_node_type(v.get("node_type", None)) + if node_type == "group": + metadata[k] = GroupMetadata.from_dict(v) + elif node_type == "array": + metadata[k] = ArrayV3Metadata.from_dict(v) + else: + assert_never(node_type) + elif zarr_format == 2: + if "shape" in v: + metadata[k] = ArrayV2Metadata.from_dict(v) + else: + metadata[k] = GroupMetadata.from_dict(v) + else: + assert_never(zarr_format) + + cls._flat_to_nested(metadata) + + return cls(metadata=metadata) + + @staticmethod + def _flat_to_nested( + metadata: dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata], + ) -> None: + """ + Convert a flat metadata representation to a nested one. + + Notes + ----- + Flat metadata is used when persisting the consolidated metadata. The keys + include the full path, not just the node name. The key prefixes can be + used to determine which nodes are children of which other nodes. + + Nested metadata is used in-memory. The outermost level will only have the + *immediate* children of the Group. All nested child groups will be stored + under the consolidated metadata of their immediate parent. + """ + # We have a flat mapping from {k: v} where the keys include the *full* + # path segment: + # { + # "/a/b": { group_metadata }, + # "/a/b/array-0": { array_metadata }, + # "/a/b/array-1": { array_metadata }, + # } + # + # We want to reorganize the metadata such that each Group contains the + # array metadata of its immediate children. + # In the example, the group at `/a/b` will have consolidated metadata + # for its children `array-0` and `array-1`. + # + # metadata = dict(metadata) + + keys = sorted(metadata, key=lambda k: k.count("/")) + grouped = { + k: list(v) for k, v in itertools.groupby(keys, key=lambda k: k.rsplit("/", 1)[0]) + } + + # we go top down and directly manipulate metadata. + for key, children_keys in grouped.items(): + # key is a key like "a", "a/b", "a/b/c" + # The basic idea is to find the immediate parent (so "", "a", or "a/b") + # and update that node's consolidated metadata to include the metadata + # in children_keys + *prefixes, name = key.split("/") + parent = metadata + + while prefixes: + # e.g. a/b/c has a parent "a/b". Walk through to get + # metadata["a"]["b"] + part = prefixes.pop(0) + # we can assume that parent[part] here is a group + # otherwise we wouldn't have a node with this `part` prefix. + # We can also assume that the parent node will have consolidated metadata, + # because we're walking top to bottom. + parent = parent[part].consolidated_metadata.metadata # type: ignore[union-attr] + + node = parent[name] + children_keys = list(children_keys) + + if isinstance(node, ArrayV2Metadata | ArrayV3Metadata): + # These are already present, either thanks to being an array in the + # root, or by being collected as a child in the else clause + continue + children_keys = list(children_keys) + # We pop from metadata, since we're *moving* this under group + children = { + child_key.split("/")[-1]: metadata.pop(child_key) + for child_key in children_keys + if child_key != key + } + parent[name] = replace( + node, consolidated_metadata=ConsolidatedMetadata(metadata=children) + ) + + @property + def flattened_metadata(self) -> dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata]: + """ + Return the flattened representation of Consolidated Metadata. + + The returned dictionary will have a key for each child node in the hierarchy + under this group. Under the default (nested) representation available through + ``self.metadata``, the dictionary only contains keys for immediate children. + + The keys of the dictionary will include the full path to a child node from + the current group, where segments are joined by ``/``. + + Examples + -------- + >>> cm = ConsolidatedMetadata( + ... metadata={ + ... "group-0": GroupMetadata( + ... consolidated_metadata=ConsolidatedMetadata( + ... { + ... "group-0-0": GroupMetadata(), + ... } + ... ) + ... ), + ... "group-1": GroupMetadata(), + ... } + ... ) + {'group-0': GroupMetadata(attributes={}, zarr_format=3, consolidated_metadata=None, node_type='group'), + 'group-0/group-0-0': GroupMetadata(attributes={}, zarr_format=3, consolidated_metadata=None, node_type='group'), + 'group-1': GroupMetadata(attributes={}, zarr_format=3, consolidated_metadata=None, node_type='group')} + """ + metadata = {} + + def flatten( + key: str, group: GroupMetadata | ArrayV2Metadata | ArrayV3Metadata + ) -> dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata]: + children: dict[str, ArrayV2Metadata | ArrayV3Metadata | GroupMetadata] = {} + if isinstance(group, ArrayV2Metadata | ArrayV3Metadata): + children[key] = group + else: + if group.consolidated_metadata and group.consolidated_metadata.metadata is not None: + children[key] = replace( + group, consolidated_metadata=ConsolidatedMetadata(metadata={}) + ) + for name, val in group.consolidated_metadata.metadata.items(): + full_key = f"{key}/{name}" + if isinstance(val, GroupMetadata): + children.update(flatten(full_key, val)) + else: + children[full_key] = val + else: + children[key] = replace(group, consolidated_metadata=None) + return children + + for k, v in self.metadata.items(): + metadata.update(flatten(k, v)) + + return metadata + + @dataclass(frozen=True) class GroupMetadata(Metadata): attributes: dict[str, Any] = field(default_factory=dict) zarr_format: ZarrFormat = 3 + consolidated_metadata: ConsolidatedMetadata | None = None node_type: Literal["group"] = field(default="group", init=False) def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: @@ -98,11 +305,11 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: if self.zarr_format == 3: return { ZARR_JSON: prototype.buffer.from_bytes( - json.dumps(self.to_dict(), indent=json_indent).encode() + json.dumps(self.to_dict(), cls=V3JsonEncoder).encode() ) } else: - return { + items = { ZGROUP_JSON: prototype.buffer.from_bytes( json.dumps({"zarr_format": self.zarr_format}, indent=json_indent).encode() ), @@ -110,19 +317,58 @@ def to_buffer_dict(self, prototype: BufferPrototype) -> dict[str, Buffer]: json.dumps(self.attributes, indent=json_indent).encode() ), } + if self.consolidated_metadata: + d = { + ZGROUP_JSON: {"zarr_format": self.zarr_format}, + ZATTRS_JSON: self.attributes, + } + consolidated_metadata = self.consolidated_metadata.to_dict()["metadata"] + assert isinstance(consolidated_metadata, dict) + for k, v in consolidated_metadata.items(): + attrs = v.pop("attributes", None) + d[f"{k}/{ZATTRS_JSON}"] = attrs + if "shape" in v: + # it's an array + d[f"{k}/{ZARRAY_JSON}"] = v + else: + d[f"{k}/{ZGROUP_JSON}"] = { + "zarr_format": self.zarr_format, + "consolidated_metadata": { + "metadata": {}, + "must_understand": False, + "kind": "inline", + }, + } + + items[ZMETADATA_V2_JSON] = prototype.buffer.from_bytes( + json.dumps( + {"metadata": d, "zarr_consolidated_format": 1}, + cls=V3JsonEncoder, + ).encode() + ) + + return items def __init__( - self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3 + self, + attributes: dict[str, Any] | None = None, + zarr_format: ZarrFormat = 3, + consolidated_metadata: ConsolidatedMetadata | None = None, ) -> None: attributes_parsed = parse_attributes(attributes) zarr_format_parsed = parse_zarr_format(zarr_format) object.__setattr__(self, "attributes", attributes_parsed) object.__setattr__(self, "zarr_format", zarr_format_parsed) + object.__setattr__(self, "consolidated_metadata", consolidated_metadata) @classmethod def from_dict(cls, data: dict[str, Any]) -> GroupMetadata: + data = dict(data) assert data.pop("node_type", None) in ("group", None) + consolidated_metadata = data.pop("consolidated_metadata", None) + if consolidated_metadata: + data["consolidated_metadata"] = ConsolidatedMetadata.from_dict(consolidated_metadata) zarr_format = data.get("zarr_format") if zarr_format == 2 or zarr_format is None: @@ -135,7 +381,10 @@ def from_dict(cls, data: dict[str, Any]) -> GroupMetadata: return cls(**data) def to_dict(self) -> dict[str, Any]: - return asdict(self) + result = asdict(replace(self, consolidated_metadata=None)) + if self.consolidated_metadata: + result["consolidated_metadata"] = self.consolidated_metadata.to_dict() + return result @dataclass(frozen=True) @@ -168,24 +417,71 @@ async def open( cls, store: StoreLike, zarr_format: Literal[2, 3, None] = 3, + use_consolidated: bool | str | None = None, ) -> AsyncGroup: + """ + Open a new AsyncGroup + + Parameters + ---------- + store: StoreLike + zarr_format: {2, 3}, optional + use_consolidated: bool or str, default None + Whether to use consolidated metadata. + + By default, consolidated metadata is used if it's present in the + store (in the ``zarr.json`` for Zarr v3 and in the ``.zmetadata`` file + for Zarr v2). + + To explicitly require consolidated metadata, set ``use_consolidated=True``, + which will raise an exception if consolidated metadata is not found. + + To explicitly *not* use consolidated metadata, set ``use_consolidated=False``, + which will fall back to using the regular, non consolidated metadata. + + Zarr v2 allowed configuring the key storing the consolidated metadata + (``.zmetadata`` by default). Specify the custom key as ``use_consolidated`` + to load consolidated metadata from a non-default key. + """ store_path = await make_store_path(store) + consolidated_key = ZMETADATA_V2_JSON + + if (zarr_format == 2 or zarr_format is None) and isinstance(use_consolidated, str): + consolidated_key = use_consolidated + if zarr_format == 2: - zgroup_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZGROUP_JSON).get(), (store_path / ZATTRS_JSON).get() + paths = [store_path / ZGROUP_JSON, store_path / ZATTRS_JSON] + if use_consolidated or use_consolidated is None: + paths.append(store_path / consolidated_key) + + zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather( + *[path.get() for path in paths] ) if zgroup_bytes is None: raise FileNotFoundError(store_path) + + if use_consolidated or use_consolidated is None: + maybe_consolidated_metadata_bytes = rest[0] + + else: + maybe_consolidated_metadata_bytes = None + elif zarr_format == 3: zarr_json_bytes = await (store_path / ZARR_JSON).get() if zarr_json_bytes is None: raise FileNotFoundError(store_path) elif zarr_format is None: - zarr_json_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather( + ( + zarr_json_bytes, + zgroup_bytes, + zattrs_bytes, + maybe_consolidated_metadata_bytes, + ) = await asyncio.gather( (store_path / ZARR_JSON).get(), (store_path / ZGROUP_JSON).get(), (store_path / ZATTRS_JSON).get(), + (store_path / str(consolidated_key)).get(), ) if zarr_json_bytes is not None and zgroup_bytes is not None: # TODO: revisit this exception type @@ -204,15 +500,87 @@ async def open( raise MetadataValidationError("zarr_format", "2, 3, or None", zarr_format) if zarr_format == 2: - # V2 groups are comprised of a .zgroup and .zattrs objects + # this is checked above, asserting here for mypy assert zgroup_bytes is not None - zgroup = json.loads(zgroup_bytes.to_bytes()) - zattrs = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {} - group_metadata = {**zgroup, "attributes": zattrs} + + if use_consolidated and maybe_consolidated_metadata_bytes is None: + # the user requested consolidated metadata, but it was missing + raise ValueError(consolidated_key) + + elif use_consolidated is False: + # the user explicitly opted out of consolidated_metadata. + # Discard anything we might have read. + maybe_consolidated_metadata_bytes = None + + return cls._from_bytes_v2( + store_path, zgroup_bytes, zattrs_bytes, maybe_consolidated_metadata_bytes + ) else: # V3 groups are comprised of a zarr.json object assert zarr_json_bytes is not None - group_metadata = json.loads(zarr_json_bytes.to_bytes()) + if not isinstance(use_consolidated, bool | None): + raise TypeError("use_consolidated must be a bool or None for Zarr V3.") + + return cls._from_bytes_v3( + store_path, + zarr_json_bytes, + use_consolidated=use_consolidated, + ) + + @classmethod + def _from_bytes_v2( + cls, + store_path: StorePath, + zgroup_bytes: Buffer, + zattrs_bytes: Buffer | None, + consolidated_metadata_bytes: Buffer | None, + ) -> AsyncGroup: + # V2 groups are comprised of a .zgroup and .zattrs objects + zgroup = json.loads(zgroup_bytes.to_bytes()) + zattrs = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {} + group_metadata = {**zgroup, "attributes": zattrs} + + if consolidated_metadata_bytes is not None: + v2_consolidated_metadata = json.loads(consolidated_metadata_bytes.to_bytes()) + v2_consolidated_metadata = v2_consolidated_metadata["metadata"] + # We already read zattrs and zgroup. Should we ignore these? + v2_consolidated_metadata.pop(".zattrs") + v2_consolidated_metadata.pop(".zgroup") + + consolidated_metadata: defaultdict[str, dict[str, Any]] = defaultdict(dict) + + # keys like air/.zarray, air/.zattrs + for k, v in v2_consolidated_metadata.items(): + path, kind = k.rsplit("/.", 1) + + if kind == "zarray": + consolidated_metadata[path].update(v) + elif kind == "zattrs": + consolidated_metadata[path]["attributes"] = v + elif kind == "zgroup": + consolidated_metadata[path].update(v) + else: + raise ValueError(f"Invalid file type '{kind}' at path '{path}") + group_metadata["consolidated_metadata"] = { + "metadata": dict(consolidated_metadata), + "kind": "inline", + "must_understand": False, + } + + return cls.from_dict(store_path, group_metadata) + + @classmethod + def _from_bytes_v3( + cls, store_path: StorePath, zarr_json_bytes: Buffer, use_consolidated: bool | None + ) -> AsyncGroup: + group_metadata = json.loads(zarr_json_bytes.to_bytes()) + if use_consolidated and group_metadata.get("consolidated_metadata") is None: + msg = f"Consolidated metadata requested with 'use_consolidated=True' but not found in '{store_path.path}'." + raise ValueError(msg) + + elif use_consolidated is False: + # Drop consolidated metadata if it's there. + group_metadata.pop("consolidated_metadata", None) return cls.from_dict(store_path, group_metadata) @@ -234,13 +602,16 @@ async def getitem( store_path = self.store_path / key logger.debug("key=%s, store_path=%s", key, store_path) + # Consolidated metadata lets us avoid some I/O operations so try that first. + if self.metadata.consolidated_metadata is not None: + return self._getitem_consolidated(store_path, key, prefix=self.name) + # Note: # in zarr-python v2, we first check if `key` references an Array, else if `key` references # a group,using standalone `contains_array` and `contains_group` functions. These functions # are reusable, but for v3 they would perform redundant I/O operations. # Not clear how much of that strategy we want to keep here. - - if self.metadata.zarr_format == 3: + elif self.metadata.zarr_format == 3: zarr_json_bytes = await (store_path / ZARR_JSON).get() if zarr_json_bytes is None: raise KeyError(key) @@ -284,19 +655,54 @@ async def getitem( else: raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") + def _getitem_consolidated( + self, store_path: StorePath, key: str, prefix: str + ) -> AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup: + # getitem, in the special case where we have consolidated metadata. + # Note that this is a regular def (non async) function. + # This shouldn't do any additional I/O. + + # the caller needs to verify this! + assert self.metadata.consolidated_metadata is not None + + try: + metadata = self.metadata.consolidated_metadata.metadata[key] + except KeyError as e: + # The Group Metadata has consolidated metadata, but the key + # isn't present. We trust this to mean that the key isn't in + # the hierarchy, and *don't* fall back to checking the store. + msg = f"'{key}' not found in consolidated metadata." + raise KeyError(msg) from e + + # update store_path to ensure that AsyncArray/Group.name is correct + if prefix != "/": + key = "/".join([prefix.lstrip("/"), key]) + store_path = StorePath(store=store_path.store, path=key) + + if isinstance(metadata, GroupMetadata): + return AsyncGroup(metadata=metadata, store_path=store_path) + else: + return AsyncArray(metadata=metadata, store_path=store_path) + async def delitem(self, key: str) -> None: store_path = self.store_path / key if self.metadata.zarr_format == 3: await (store_path / ZARR_JSON).delete() + elif self.metadata.zarr_format == 2: await asyncio.gather( (store_path / ZGROUP_JSON).delete(), # TODO: missing_ok=False (store_path / ZARRAY_JSON).delete(), # TODO: missing_ok=False (store_path / ZATTRS_JSON).delete(), # TODO: missing_ok=True ) + else: raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") + if self.metadata.consolidated_metadata: + self.metadata.consolidated_metadata.metadata.pop(key, None) + await self._save_metadata() + async def get( self, key: str, default: DefaultT | None = None ) -> AsyncArray[Any] | AsyncGroup | DefaultT | None: @@ -668,6 +1074,8 @@ async def nmembers( ------- count : int """ + if self.metadata.consolidated_metadata is not None: + return len(self.metadata.consolidated_metadata.flattened_metadata) # TODO: consider using aioitertools.builtins.sum for this # return await aioitertools.builtins.sum((1 async for _ in self.members()), start=0) n = 0 @@ -695,6 +1103,12 @@ async def members( ``max_depth=None`` to include all nodes, and some positive integer to consider children within that many levels of the root Group. + Returns + ------- + path: + A string giving the path to the target, relative to the Group ``self``. + value: AsyncArray or AsyncGroup + The AsyncArray or AsyncGroup that is a child of ``self``. """ if max_depth is not None and max_depth < 0: raise ValueError(f"max_depth must be None or >= 0. Got '{max_depth}' instead") @@ -706,6 +1120,14 @@ async def _members( ) -> AsyncGenerator[ tuple[str, AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup], None ]: + if self.metadata.consolidated_metadata is not None: + # we should be able to do members without any additional I/O + members = self._members_consolidated(max_depth, current_depth) + + for member in members: + yield member + return + if not self.store_path.store.supports_listing: msg = ( f"The store associated with this group ({type(self.store_path.store)}) " @@ -719,6 +1141,11 @@ async def _members( # and scoped to specific zarr versions _skip_keys = ("zarr.json", ".zgroup", ".zattrs") + # hmm lots of I/O and logic interleaved here. + # We *could* have an async gen over self.metadata.consolidated_metadata.metadata.keys() + # and plug in here. `getitem` will skip I/O. + # Kinda a shame to have all the asyncio task overhead though, when it isn't needed. + async for key in self.store_path.store.list_dir(self.store_path.path): if key in _skip_keys: continue @@ -743,9 +1170,31 @@ async def _members( # as opposed to a prefix, in the store under the prefix associated with this group # in which case `key` cannot be the name of a sub-array or sub-group. logger.warning( - "Object at %s is not recognized as a component of a Zarr hierarchy.", key + "Object at %s is not recognized as a component of a Zarr hierarchy.", + key, ) + def _members_consolidated( + self, max_depth: int | None, current_depth: int, prefix: str = "" + ) -> Generator[ + tuple[str, AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup], None + ]: + consolidated_metadata = self.metadata.consolidated_metadata + + # we kind of just want the top-level keys. + if consolidated_metadata is not None: + for key in consolidated_metadata.metadata.keys(): + obj = self._getitem_consolidated( + self.store_path, key, prefix=self.name + ) # Metadata -> Group/Array + key = f"{prefix}/{key}".lstrip("/") + yield key, obj + + if ((max_depth is None) or (current_depth < max_depth)) and isinstance( + obj, AsyncGroup + ): + yield from obj._members_consolidated(max_depth, current_depth + 1, prefix=key) + async def keys(self) -> AsyncGenerator[str, None]: async for key, _ in self.members(): yield key diff --git a/src/zarr/core/metadata/v2.py b/src/zarr/core/metadata/v2.py index 90f171fc40..d1dd86880d 100644 --- a/src/zarr/core/metadata/v2.py +++ b/src/zarr/core/metadata/v2.py @@ -207,7 +207,6 @@ def update_attributes(self, attributes: dict[str, JSON]) -> Self: def parse_dtype(data: npt.DTypeLike) -> np.dtype[Any]: - # todo: real validation return np.dtype(data) diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index c1a58e23e1..b85932ef82 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, TypedDict, overload from zarr.abc.metadata import Metadata +from zarr.core.buffer.core import default_buffer_prototype if TYPE_CHECKING: from typing import Self @@ -24,10 +25,15 @@ from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec from zarr.core.array_spec import ArraySpec -from zarr.core.buffer import default_buffer_prototype from zarr.core.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.core.chunk_key_encodings import ChunkKeyEncoding -from zarr.core.common import ZARR_JSON, parse_named_configuration, parse_shapelike +from zarr.core.common import ( + JSON, + ZARR_JSON, + ChunkCoords, + parse_named_configuration, + parse_shapelike, +) from zarr.core.config import config from zarr.core.metadata.common import parse_attributes from zarr.core.strings import _STRING_DTYPE as STRING_NP_DTYPE diff --git a/tests/v3/test_codecs/test_vlen.py b/tests/v3/test_codecs/test_vlen.py index c6f587931a..ca5ccb92fa 100644 --- a/tests/v3/test_codecs/test_vlen.py +++ b/tests/v3/test_codecs/test_vlen.py @@ -11,7 +11,7 @@ from zarr.core.strings import _NUMPY_SUPPORTS_VLEN_STRING from zarr.storage.common import StorePath -numpy_str_dtypes: list[type | None] = [None, str, np.dtypes.StrDType] +numpy_str_dtypes: list[type | str | None] = [None, str, np.dtypes.StrDType] expected_zarr_string_dtype: np.dtype[Any] if _NUMPY_SUPPORTS_VLEN_STRING: numpy_str_dtypes.append(np.dtypes.StringDType) diff --git a/tests/v3/test_group.py b/tests/v3/test_group.py index 2ff0a497f8..9b686a6618 100644 --- a/tests/v3/test_group.py +++ b/tests/v3/test_group.py @@ -8,14 +8,15 @@ import zarr import zarr.api.asynchronous +import zarr.api.synchronous from zarr import Array, AsyncArray, AsyncGroup, Group from zarr.abc.store import Store from zarr.core.buffer import default_buffer_prototype from zarr.core.common import JSON, ZarrFormat -from zarr.core.group import GroupMetadata +from zarr.core.group import ConsolidatedMetadata, GroupMetadata from zarr.core.sync import sync from zarr.errors import ContainsArrayError, ContainsGroupError -from zarr.storage import LocalStore, MemoryStore, StorePath +from zarr.storage import LocalStore, MemoryStore, StorePath, ZipStore from zarr.storage.common import make_store_path from .conftest import parse_store @@ -129,18 +130,23 @@ def test_group_name_properties(store: Store, zarr_format: ZarrFormat) -> None: assert bar.basename == "bar" -def test_group_members(store: Store, zarr_format: ZarrFormat) -> None: +@pytest.mark.parametrize("consolidated_metadata", [True, False]) +def test_group_members(store: Store, zarr_format: ZarrFormat, consolidated_metadata: bool) -> None: """ Test that `Group.members` returns correct values, i.e. the arrays and groups (explicit and implicit) contained in that group. """ + # group/ + # subgroup/ + # subsubgroup/ + # subsubsubgroup + # subarray path = "group" - agroup = AsyncGroup( - metadata=GroupMetadata(zarr_format=zarr_format), - store_path=StorePath(store=store, path=path), + group = Group.from_store( + store=store, + zarr_format=zarr_format, ) - group = Group(agroup) members_expected: dict[str, Array | Group] = {} members_expected["subgroup"] = group.create_group("subgroup") @@ -156,7 +162,10 @@ def test_group_members(store: Store, zarr_format: ZarrFormat) -> None: # add an extra object to the domain of the group. # the list of children should ignore this object. sync( - store.set(f"{path}/extra_object-1", default_buffer_prototype().buffer.from_bytes(b"000000")) + store.set( + f"{path}/extra_object-1", + default_buffer_prototype().buffer.from_bytes(b"000000"), + ) ) # add an extra object under a directory-like prefix in the domain of the group. # this creates a directory with a random key in it @@ -167,6 +176,11 @@ def test_group_members(store: Store, zarr_format: ZarrFormat) -> None: default_buffer_prototype().buffer.from_bytes(b"000000"), ) ) + + if consolidated_metadata: + zarr.consolidate_metadata(store=store, zarr_format=zarr_format) + group = zarr.open_consolidated(store=store, zarr_format=zarr_format) + members_observed = group.members() # members are not guaranteed to be ordered, so sort before comparing assert sorted(dict(members_observed)) == sorted(members_expected) @@ -277,7 +291,8 @@ def test_group_open(store: Store, zarr_format: ZarrFormat, exists_ok: bool) -> N assert group_created_again.store_path == spath -def test_group_getitem(store: Store, zarr_format: ZarrFormat) -> None: +@pytest.mark.parametrize("consolidated", [True, False]) +def test_group_getitem(store: Store, zarr_format: ZarrFormat, consolidated: bool) -> None: """ Test the `Group.__getitem__` method. """ @@ -286,6 +301,12 @@ def test_group_getitem(store: Store, zarr_format: ZarrFormat) -> None: subgroup = group.create_group(name="subgroup") subarray = group.create_array(name="subarray", shape=(10,), chunk_shape=(10,)) + if consolidated: + group = zarr.api.synchronous.consolidate_metadata(store=store, zarr_format=zarr_format) + object.__setattr__( + subgroup.metadata, "consolidated_metadata", ConsolidatedMetadata(metadata={}) + ) + assert group["subgroup"] == subgroup assert group["subarray"] == subarray with pytest.raises(KeyError): @@ -311,7 +332,8 @@ def test_group_get_with_default(store: Store, zarr_format: ZarrFormat) -> None: assert result.attrs["foo"] == "bar" -def test_group_delitem(store: Store, zarr_format: ZarrFormat) -> None: +@pytest.mark.parametrize("consolidated", [True, False]) +def test_group_delitem(store: Store, zarr_format: ZarrFormat, consolidated: bool) -> None: """ Test the `Group.__delitem__` method. """ @@ -322,6 +344,12 @@ def test_group_delitem(store: Store, zarr_format: ZarrFormat) -> None: subgroup = group.create_group(name="subgroup") subarray = group.create_array(name="subarray", shape=(10,), chunk_shape=(10,)) + if consolidated: + group = zarr.api.synchronous.consolidate_metadata(store=store, zarr_format=zarr_format) + object.__setattr__( + subgroup.metadata, "consolidated_metadata", ConsolidatedMetadata(metadata={}) + ) + assert group["subgroup"] == subgroup assert group["subarray"] == subarray @@ -371,7 +399,8 @@ def test_group_contains(store: Store, zarr_format: ZarrFormat) -> None: assert "foo" in group -def test_group_child_iterators(store: Store, zarr_format: ZarrFormat): +@pytest.mark.parametrize("consolidate", [True, False]) +def test_group_child_iterators(store: Store, zarr_format: ZarrFormat, consolidate: bool): group = Group.from_store(store, zarr_format=zarr_format) expected_group_keys = ["g0", "g1"] expected_group_values = [group.create_group(name=name) for name in expected_group_keys] @@ -385,6 +414,86 @@ def test_group_child_iterators(store: Store, zarr_format: ZarrFormat): group.create_array(name=name, shape=(1,)) for name in expected_array_keys ] expected_arrays = list(zip(expected_array_keys, expected_array_values, strict=False)) + fill_value: float | None + if zarr_format == 2: + fill_value = None + else: + fill_value = np.float64(0.0) + + if consolidate: + group = zarr.consolidate_metadata(store) + if zarr_format == 2: + metadata = { + "subarray": { + "attributes": {}, + "dtype": "float64", + "fill_value": fill_value, + "shape": (1,), + "chunks": (1,), + "order": "C", + "zarr_format": zarr_format, + }, + "subgroup": { + "attributes": {}, + "consolidated_metadata": { + "metadata": {}, + "kind": "inline", + "must_understand": False, + }, + "node_type": "group", + "zarr_format": zarr_format, + }, + } + else: + metadata = { + "subarray": { + "attributes": {}, + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": "float64", + "fill_value": fill_value, + "node_type": "array", + "shape": (1,), + "zarr_format": zarr_format, + }, + "subgroup": { + "attributes": {}, + "consolidated_metadata": { + "metadata": {}, + "kind": "inline", + "must_understand": False, + }, + "node_type": "group", + "zarr_format": zarr_format, + }, + } + + object.__setattr__( + expected_group_values[0].metadata, + "consolidated_metadata", + ConsolidatedMetadata.from_dict( + { + "kind": "inline", + "metadata": metadata, + "must_understand": False, + } + ), + ) + object.__setattr__( + expected_group_values[1].metadata, + "consolidated_metadata", + ConsolidatedMetadata(metadata={}), + ) + + result = sorted(group.groups(), key=lambda x: x[0]) + assert result == expected_groups assert sorted(group.groups(), key=lambda x: x[0]) == expected_groups assert sorted(group.group_keys()) == expected_group_keys @@ -718,7 +827,11 @@ async def test_asyncgroup_delitem(store: Store, zarr_format: ZarrFormat) -> None agroup = await AsyncGroup.from_store(store=store, zarr_format=zarr_format) array_name = "sub_array" _ = await agroup.create_array( - name=array_name, shape=(10,), dtype="uint8", chunk_shape=(2,), attributes={"foo": 100} + name=array_name, + shape=(10,), + dtype="uint8", + chunk_shape=(2,), + attributes={"foo": 100}, ) await agroup.delitem(array_name) @@ -829,14 +942,13 @@ def test_serializable_sync_group(store: LocalStore, zarr_format: ZarrFormat) -> expected = Group.from_store(store=store, attributes={"foo": 999}, zarr_format=zarr_format) p = pickle.dumps(expected) actual = pickle.loads(p) - assert actual == expected -async def test_group_members_async(store: LocalStore | MemoryStore) -> None: - group = AsyncGroup( - GroupMetadata(), - store_path=StorePath(store=store, path="root"), +@pytest.mark.parametrize("consolidated_metadata", [True, False]) +async def test_group_members_async(store: Store, consolidated_metadata: bool) -> None: + group = await AsyncGroup.from_store( + store=store, ) a0 = await group.create_array("a0", shape=(1,)) g0 = await group.create_group("g0") @@ -879,6 +991,10 @@ async def test_group_members_async(store: LocalStore | MemoryStore) -> None: ] assert all_children == expected + if consolidated_metadata: + await zarr.api.asynchronous.consolidate_metadata(store=store) + group = await zarr.api.asynchronous.open_group(store=store) + nmembers = await group.nmembers(max_depth=None) assert nmembers == 6 @@ -935,7 +1051,7 @@ async def test_require_groups(store: LocalStore | MemoryStore, zarr_format: Zarr assert no_group == () -async def test_create_dataset(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: +async def test_create_dataset(store: Store, zarr_format: ZarrFormat) -> None: root = await AsyncGroup.from_store(store=store, zarr_format=zarr_format) with pytest.warns(DeprecationWarning): foo = await root.create_dataset("foo", shape=(10,), dtype="uint8") @@ -949,7 +1065,7 @@ async def test_create_dataset(store: LocalStore | MemoryStore, zarr_format: Zarr await root.create_dataset("bar", shape=(100,), dtype="int8") -async def test_require_array(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: +async def test_require_array(store: Store, zarr_format: ZarrFormat) -> None: root = await AsyncGroup.from_store(store=store, zarr_format=zarr_format) foo1 = await root.require_array("foo", shape=(10,), dtype="i8", attributes={"foo": 101}) assert foo1.attrs == {"foo": 101} @@ -974,6 +1090,25 @@ async def test_require_array(store: LocalStore | MemoryStore, zarr_format: ZarrF await root.require_array("bar", shape=(10,), dtype="int8") +@pytest.mark.parametrize("consolidate", [True, False]) +def test_members_name(store: Store, consolidate: bool): + group = Group.from_store(store=store) + a = group.create_group(name="a") + a.create_array("array", shape=(1,)) + b = a.create_group(name="b") + b.create_array("array", shape=(1,)) + + if consolidate: + group = zarr.api.synchronous.consolidate_metadata(store) + + result = group["a"]["b"] + assert result.name == "/a/b" + + paths = sorted(x.name for _, x in group.members(max_depth=None)) + expected = ["/a", "/a/array", "/a/b", "/a/b/array"] + assert paths == expected + + async def test_open_mutable_mapping(): group = await zarr.api.asynchronous.open_group(store={}, mode="w") assert isinstance(group.store_path.store, MemoryStore) @@ -984,6 +1119,134 @@ def test_open_mutable_mapping_sync(): assert isinstance(group.store_path.store, MemoryStore) +class TestConsolidated: + async def test_group_getitem_consolidated(self, store: Store) -> None: + root = await AsyncGroup.from_store(store=store) + # Set up the test structure with + # / + # g0/ # group /g0 + # g1/ # group /g0/g1 + # g2/ # group /g0/g1/g2 + # x1/ # group /x0 + # x2/ # group /x0/x1 + # x3/ # group /x0/x1/x2 + + g0 = await root.create_group("g0") + g1 = await g0.create_group("g1") + await g1.create_group("g2") + + x0 = await root.create_group("x0") + x1 = await x0.create_group("x1") + await x1.create_group("x2") + + await zarr.api.asynchronous.consolidate_metadata(store) + + # On disk, we've consolidated all the metadata in the root zarr.json + group = await zarr.api.asynchronous.open(store=store) + rg0 = await group.getitem("g0") + + expected = ConsolidatedMetadata( + metadata={ + "g1": GroupMetadata( + attributes={}, + zarr_format=3, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "g2": GroupMetadata( + attributes={}, + zarr_format=3, + consolidated_metadata=ConsolidatedMetadata(metadata={}), + ) + } + ), + ), + } + ) + assert rg0.metadata.consolidated_metadata == expected + + rg1 = await rg0.getitem("g1") + assert rg1.metadata.consolidated_metadata == expected.metadata["g1"].consolidated_metadata + + rg2 = await rg1.getitem("g2") + assert rg2.metadata.consolidated_metadata == ConsolidatedMetadata(metadata={}) + + async def test_group_delitem_consolidated(self, store: Store) -> None: + if isinstance(store, ZipStore): + raise pytest.skip("Not implemented") + + root = await AsyncGroup.from_store(store=store) + # Set up the test structure with + # / + # g0/ # group /g0 + # g1/ # group /g0/g1 + # g2/ # group /g0/g1/g2 + # data # array + # x1/ # group /x0 + # x2/ # group /x0/x1 + # x3/ # group /x0/x1/x2 + # data # array + + g0 = await root.create_group("g0") + g1 = await g0.create_group("g1") + g2 = await g1.create_group("g2") + await g2.create_array("data", shape=(1,)) + + x0 = await root.create_group("x0") + x1 = await x0.create_group("x1") + x2 = await x1.create_group("x2") + await x2.create_array("data", shape=(1,)) + + await zarr.api.asynchronous.consolidate_metadata(store) + + group = await zarr.api.asynchronous.open_consolidated(store=store) + assert len(group.metadata.consolidated_metadata.metadata) == 2 + assert "g0" in group.metadata.consolidated_metadata.metadata + + await group.delitem("g0") + assert len(group.metadata.consolidated_metadata.metadata) == 1 + assert "g0" not in group.metadata.consolidated_metadata.metadata + + def test_open_consolidated_raises(self, store: Store) -> None: + if isinstance(store, ZipStore): + raise pytest.skip("Not implemented") + + root = Group.from_store(store=store) + + # fine to be missing by default + zarr.open_group(store=store) + + with pytest.raises(ValueError, match="Consolidated metadata requested."): + zarr.open_group(store=store, use_consolidated=True) + + # Now create consolidated metadata... + root.create_group("g0") + zarr.consolidate_metadata(store) + + # and explicitly ignore it. + group = zarr.open_group(store=store, use_consolidated=False) + assert group.metadata.consolidated_metadata is None + + async def test_open_consolidated_raises_async(self, store: Store) -> None: + if isinstance(store, ZipStore): + raise pytest.skip("Not implemented") + + root = await AsyncGroup.from_store(store=store) + + # fine to be missing by default + await zarr.api.asynchronous.open_group(store=store) + + with pytest.raises(ValueError, match="Consolidated metadata requested."): + await zarr.api.asynchronous.open_group(store=store, use_consolidated=True) + + # Now create consolidated metadata... + await root.create_group("g0") + await zarr.api.asynchronous.consolidate_metadata(store) + + # and explicitly ignore it. + group = await zarr.api.asynchronous.open_group(store=store, use_consolidated=False) + assert group.metadata.consolidated_metadata is None + + class TestGroupMetadata: def test_from_dict_extra_fields(self): data = { diff --git a/tests/v3/test_metadata/test_consolidated.py b/tests/v3/test_metadata/test_consolidated.py new file mode 100644 index 0000000000..c0218602f6 --- /dev/null +++ b/tests/v3/test_metadata/test_consolidated.py @@ -0,0 +1,564 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +import zarr.api.asynchronous +import zarr.api.synchronous +import zarr.storage +from zarr.api.asynchronous import ( + AsyncGroup, + consolidate_metadata, + group, + open, + open_consolidated, +) +from zarr.core.buffer.core import default_buffer_prototype +from zarr.core.group import ConsolidatedMetadata, GroupMetadata +from zarr.core.metadata import ArrayV3Metadata +from zarr.core.metadata.v2 import ArrayV2Metadata +from zarr.storage.common import StorePath + +if TYPE_CHECKING: + from zarr.abc.store import Store + from zarr.core.common import ZarrFormat + + +@pytest.fixture +async def memory_store_with_hierarchy(memory_store: Store) -> None: + g = await group(store=memory_store, attributes={"foo": "bar"}) + await g.create_array(name="air", shape=(1, 2, 3)) + await g.create_array(name="lat", shape=(1,)) + await g.create_array(name="lon", shape=(2,)) + await g.create_array(name="time", shape=(3,)) + + child = await g.create_group("child", attributes={"key": "child"}) + await child.create_array("array", shape=(4, 4), attributes={"key": "child"}) + + grandchild = await child.create_group("grandchild", attributes={"key": "grandchild"}) + await grandchild.create_array("array", shape=(4, 4), attributes={"key": "grandchild"}) + await grandchild.create_group("empty_group", attributes={"key": "empty"}) + return memory_store + + +class TestConsolidated: + async def test_open_consolidated_false_raises(self): + store = zarr.storage.MemoryStore() + with pytest.raises(TypeError, match="use_consolidated"): + await zarr.api.asynchronous.open_consolidated(store, use_consolidated=False) + + def test_open_consolidated_false_raises_sync(self): + store = zarr.storage.MemoryStore() + with pytest.raises(TypeError, match="use_consolidated"): + zarr.open_consolidated(store, use_consolidated=False) + + async def test_consolidated(self, memory_store_with_hierarchy: Store) -> None: + # TODO: Figure out desired keys in + # TODO: variety in the hierarchies + # More nesting + # arrays under arrays + # single array + # etc. + await consolidate_metadata(memory_store_with_hierarchy) + group2 = await AsyncGroup.open(memory_store_with_hierarchy) + + array_metadata = { + "attributes": {}, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": "float64", + "fill_value": np.float64(0.0), + "node_type": "array", + # "shape": (1, 2, 3), + "zarr_format": 3, + } + + expected = GroupMetadata( + attributes={"foo": "bar"}, + consolidated_metadata=ConsolidatedMetadata( + kind="inline", + must_understand=False, + metadata={ + "air": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1, 2, 3), + "chunk_grid": { + "configuration": {"chunk_shape": (1, 2, 3)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lat": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1,), + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lon": ArrayV3Metadata.from_dict( + { + **{"shape": (2,)}, + "chunk_grid": { + "configuration": {"chunk_shape": (2,)}, + "name": "regular", + }, + **array_metadata, + } + ), + "time": ArrayV3Metadata.from_dict( + { + **{ + "shape": (3,), + "chunk_grid": { + "configuration": {"chunk_shape": (3,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "child": GroupMetadata( + attributes={"key": "child"}, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "child"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": {"chunk_shape": (4, 4)}, + "name": "regular", + }, + }, + } + ), + "grandchild": GroupMetadata( + attributes={"key": "grandchild"}, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + # known to be empty child group + "empty_group": GroupMetadata( + consolidated_metadata=ConsolidatedMetadata( + metadata={} + ), + attributes={"key": "empty"}, + ), + "array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "grandchild"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": { + "chunk_shape": (4, 4) + }, + "name": "regular", + }, + }, + } + ), + } + ), + ), + }, + ), + ), + }, + ), + ) + + assert group2.metadata == expected + group3 = await open(store=memory_store_with_hierarchy) + assert group3.metadata == expected + + group4 = await open_consolidated(store=memory_store_with_hierarchy) + assert group4.metadata == expected + + result_raw = json.loads( + ( + await memory_store_with_hierarchy.get( + "zarr.json", prototype=default_buffer_prototype() + ) + ).to_bytes() + )["consolidated_metadata"] + assert result_raw["kind"] == "inline" + assert sorted(result_raw["metadata"]) == [ + "air", + "child", + "child/array", + "child/grandchild", + "child/grandchild/array", + "child/grandchild/empty_group", + "lat", + "lon", + "time", + ] + + def test_consolidated_sync(self, memory_store): + g = zarr.api.synchronous.group(store=memory_store, attributes={"foo": "bar"}) + g.create_array(name="air", shape=(1, 2, 3)) + g.create_array(name="lat", shape=(1,)) + g.create_array(name="lon", shape=(2,)) + g.create_array(name="time", shape=(3,)) + + zarr.api.synchronous.consolidate_metadata(memory_store) + group2 = zarr.api.synchronous.Group.open(memory_store) + + array_metadata = { + "attributes": {}, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": "float64", + "fill_value": np.float64(0.0), + "node_type": "array", + # "shape": (1, 2, 3), + "zarr_format": 3, + } + + expected = GroupMetadata( + attributes={"foo": "bar"}, + consolidated_metadata=ConsolidatedMetadata( + kind="inline", + must_understand=False, + metadata={ + "air": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1, 2, 3), + "chunk_grid": { + "configuration": {"chunk_shape": (1, 2, 3)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lat": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1,), + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lon": ArrayV3Metadata.from_dict( + { + **{"shape": (2,)}, + "chunk_grid": { + "configuration": {"chunk_shape": (2,)}, + "name": "regular", + }, + **array_metadata, + } + ), + "time": ArrayV3Metadata.from_dict( + { + **{ + "shape": (3,), + "chunk_grid": { + "configuration": {"chunk_shape": (3,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + }, + ), + ) + assert group2.metadata == expected + group3 = zarr.api.synchronous.open(store=memory_store) + assert group3.metadata == expected + + group4 = zarr.api.synchronous.open_consolidated(store=memory_store) + assert group4.metadata == expected + + async def test_not_writable_raises(self, memory_store: zarr.storage.MemoryStore) -> None: + await group(store=memory_store, attributes={"foo": "bar"}) + read_store = zarr.storage.MemoryStore(store_dict=memory_store._store_dict) + with pytest.raises(ValueError, match="does not support writing"): + await consolidate_metadata(read_store) + + async def test_non_root_node(self, memory_store_with_hierarchy: Store) -> None: + await consolidate_metadata(memory_store_with_hierarchy, path="child") + root = await AsyncGroup.open(memory_store_with_hierarchy) + child = await AsyncGroup.open(StorePath(memory_store_with_hierarchy) / "child") + + assert root.metadata.consolidated_metadata is None + assert child.metadata.consolidated_metadata is not None + assert "air" not in child.metadata.consolidated_metadata.metadata + assert "grandchild" in child.metadata.consolidated_metadata.metadata + + def test_consolidated_metadata_from_dict(self): + data = {"must_understand": False} + + # missing kind + with pytest.raises(ValueError, match="kind='None'"): + ConsolidatedMetadata.from_dict(data) + + # invalid kind + data["kind"] = "invalid" + with pytest.raises(ValueError, match="kind='invalid'"): + ConsolidatedMetadata.from_dict(data) + + # missing metadata + data["kind"] = "inline" + + with pytest.raises(TypeError, match="Unexpected type for 'metadata'"): + ConsolidatedMetadata.from_dict(data) + + data["kind"] = "inline" + # empty is fine + data["metadata"] = {} + ConsolidatedMetadata.from_dict(data) + + def test_flatten(self): + array_metadata = { + "attributes": {}, + "chunk_key_encoding": { + "configuration": {"separator": "/"}, + "name": "default", + }, + "codecs": ({"configuration": {"endian": "little"}, "name": "bytes"},), + "data_type": "float64", + "fill_value": np.float64(0.0), + "node_type": "array", + # "shape": (1, 2, 3), + "zarr_format": 3, + } + + metadata = ConsolidatedMetadata( + kind="inline", + must_understand=False, + metadata={ + "air": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1, 2, 3), + "chunk_grid": { + "configuration": {"chunk_shape": (1, 2, 3)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "lat": ArrayV3Metadata.from_dict( + { + **{ + "shape": (1,), + "chunk_grid": { + "configuration": {"chunk_shape": (1,)}, + "name": "regular", + }, + }, + **array_metadata, + } + ), + "child": GroupMetadata( + attributes={"key": "child"}, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "child"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": {"chunk_shape": (4, 4)}, + "name": "regular", + }, + }, + } + ), + "grandchild": GroupMetadata( + attributes={"key": "grandchild"}, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "array": ArrayV3Metadata.from_dict( + { + **array_metadata, + **{ + "attributes": {"key": "grandchild"}, + "shape": (4, 4), + "chunk_grid": { + "configuration": {"chunk_shape": (4, 4)}, + "name": "regular", + }, + }, + } + ) + } + ), + ), + }, + ), + ), + }, + ) + result = metadata.flattened_metadata + expected = { + "air": metadata.metadata["air"], + "lat": metadata.metadata["lat"], + "child": GroupMetadata( + attributes={"key": "child"}, consolidated_metadata=ConsolidatedMetadata(metadata={}) + ), + "child/array": metadata.metadata["child"].consolidated_metadata.metadata["array"], + "child/grandchild": GroupMetadata( + attributes={"key": "grandchild"}, + consolidated_metadata=ConsolidatedMetadata(metadata={}), + ), + "child/grandchild/array": ( + metadata.metadata["child"] + .consolidated_metadata.metadata["grandchild"] + .consolidated_metadata.metadata["array"] + ), + } + assert result == expected + + def test_invalid_metadata_raises(self): + payload = { + "kind": "inline", + "must_understand": False, + "metadata": { + "foo": [1, 2, 3] # invalid + }, + } + + with pytest.raises(TypeError, match="key='foo', type='list'"): + ConsolidatedMetadata.from_dict(payload) + + def test_to_dict_empty(self): + meta = ConsolidatedMetadata( + metadata={ + "empty": GroupMetadata( + attributes={"key": "empty"}, + consolidated_metadata=ConsolidatedMetadata(metadata={}), + ) + } + ) + result = meta.to_dict() + expected = { + "kind": "inline", + "must_understand": False, + "metadata": { + "empty": { + "attributes": {"key": "empty"}, + "consolidated_metadata": { + "kind": "inline", + "must_understand": False, + "metadata": {}, + }, + "node_type": "group", + "zarr_format": 3, + } + }, + } + assert result == expected + + @pytest.mark.parametrize("zarr_format", [2, 3]) + async def test_open_consolidated_raises_async(self, zarr_format: ZarrFormat): + store = zarr.storage.MemoryStore(mode="w") + await AsyncGroup.from_store(store, zarr_format=zarr_format) + with pytest.raises(ValueError): + await zarr.api.asynchronous.open_consolidated(store, zarr_format=zarr_format) + + with pytest.raises(ValueError): + await zarr.api.asynchronous.open_consolidated(store, zarr_format=None) + + async def test_consolidated_metadata_v2(self): + store = zarr.storage.MemoryStore(mode="w") + g = await AsyncGroup.from_store(store, attributes={"key": "root"}, zarr_format=2) + await g.create_array(name="a", shape=(1,), attributes={"key": "a"}) + g1 = await g.create_group(name="g1", attributes={"key": "g1"}) + await g1.create_group(name="g2", attributes={"key": "g2"}) + + await zarr.api.asynchronous.consolidate_metadata(store) + result = await zarr.api.asynchronous.open_consolidated(store, zarr_format=2) + + expected = GroupMetadata( + attributes={"key": "root"}, + zarr_format=2, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "a": ArrayV2Metadata( + shape=(1,), + dtype="float64", + attributes={"key": "a"}, + chunks=(1,), + fill_value=None, + order="C", + ), + "g1": GroupMetadata( + attributes={"key": "g1"}, + zarr_format=2, + consolidated_metadata=ConsolidatedMetadata( + metadata={ + "g2": GroupMetadata( + attributes={"key": "g2"}, + zarr_format=2, + consolidated_metadata=ConsolidatedMetadata(metadata={}), + ) + } + ), + ), + } + ), + ) + assert result.metadata == expected + + @pytest.mark.parametrize("zarr_format", [2, 3]) + async def test_use_consolidated_false( + self, memory_store: zarr.storage.MemoryStore, zarr_format: ZarrFormat + ) -> None: + with zarr.config.set(default_zarr_version=zarr_format): + g = await group(store=memory_store, attributes={"foo": "bar"}) + await g.create_group(name="a") + + # test a stale read + await zarr.api.asynchronous.consolidate_metadata(memory_store) + await g.create_group(name="b") + + stale = await zarr.api.asynchronous.open_group(store=memory_store) + assert len([x async for x in stale.members()]) == 1 + assert stale.metadata.consolidated_metadata + assert list(stale.metadata.consolidated_metadata.metadata) == ["a"] + + # bypass stale data + good = await zarr.api.asynchronous.open_group( + store=memory_store, use_consolidated=False + ) + assert len([x async for x in good.members()]) == 2 + + # reconsolidate + await zarr.api.asynchronous.consolidate_metadata(memory_store) + + good = await zarr.api.asynchronous.open_group(store=memory_store) + assert len([x async for x in good.members()]) == 2 + assert good.metadata.consolidated_metadata + assert sorted(good.metadata.consolidated_metadata.metadata) == ["a", "b"] diff --git a/tests/v3/test_metadata/test_v2.py b/tests/v3/test_metadata/test_v2.py index bf6e246668..089d5c98e1 100644 --- a/tests/v3/test_metadata/test_v2.py +++ b/tests/v3/test_metadata/test_v2.py @@ -1,8 +1,17 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING, Literal -from zarr.core.metadata.v2 import ArrayV2Metadata +import numpy as np +import pytest + +import zarr.api.asynchronous +import zarr.storage +from zarr.core.buffer import cpu +from zarr.core.group import ConsolidatedMetadata, GroupMetadata +from zarr.core.metadata import ArrayV2Metadata +from zarr.core.metadata.v2 import parse_zarr_format if TYPE_CHECKING: from typing import Any @@ -10,9 +19,6 @@ from zarr.abc.codec import Codec import numcodecs -import pytest - -from zarr.core.metadata.v2 import parse_zarr_format def test_parse_zarr_format_valid() -> None: @@ -74,6 +80,184 @@ def test_metadata_to_dict( assert observed == expected +class TestConsolidated: + @pytest.fixture + async def v2_consolidated_metadata( + self, memory_store: zarr.storage.MemoryStore + ) -> zarr.storage.MemoryStore: + zmetadata = { + "metadata": { + ".zattrs": { + "Conventions": "COARDS", + }, + ".zgroup": {"zarr_format": 2}, + "air/.zarray": { + "chunks": [730], + "compressor": None, + "dtype": " None: data = { "_nczarr_array": {"dimrefs": ["/dim1", "/dim2"], "storage": "chunked"}, diff --git a/tests/v3/test_metadata/test_v3.py b/tests/v3/test_metadata/test_v3.py index f3b2968680..4e4ba23313 100644 --- a/tests/v3/test_metadata/test_v3.py +++ b/tests/v3/test_metadata/test_v3.py @@ -4,10 +4,22 @@ import re from typing import TYPE_CHECKING, Literal +import numpy as np +import pytest + from zarr.codecs.bytes import BytesCodec from zarr.core.buffer import default_buffer_prototype from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding -from zarr.core.metadata.v3 import ArrayV3Metadata, DataType +from zarr.core.group import parse_node_type +from zarr.core.metadata.v3 import ( + ArrayV3Metadata, + DataType, + default_fill_value, + parse_dimension_names, + parse_fill_value, + parse_zarr_format, +) +from zarr.errors import MetadataValidationError if TYPE_CHECKING: from collections.abc import Sequence @@ -17,15 +29,8 @@ from zarr.core.common import JSON -import numpy as np -import pytest - from zarr.core.metadata.v3 import ( - default_fill_value, - parse_dimension_names, - parse_fill_value, parse_node_type_array, - parse_zarr_format, ) bool_dtypes = ("bool",) @@ -65,15 +70,29 @@ def test_parse_zarr_format_valid() -> None: assert parse_zarr_format(3) == 3 +def test_parse_node_type_valid() -> None: + assert parse_node_type("array") == "array" + assert parse_node_type("group") == "group" + + +@pytest.mark.parametrize("node_type", [None, 2, "other"]) +def test_parse_node_type_invalid(node_type: Any) -> None: + with pytest.raises( + MetadataValidationError, + match=f"Invalid value for 'node_type'. Expected 'array or group'. Got '{node_type}'.", + ): + parse_node_type(node_type) + + @pytest.mark.parametrize("data", [None, "group"]) -def test_parse_node_type_arrayinvalid(data: Any) -> None: +def test_parse_node_type_array_invalid(data: Any) -> None: with pytest.raises( ValueError, match=f"Invalid value for 'node_type'. Expected 'array'. Got '{data}'." ): parse_node_type_array(data) -def test_parse_node_typevalid() -> None: +def test_parse_node_typev_array_alid() -> None: assert parse_node_type_array("array") == "array"