Skip to content

Commit

Permalink
read zarr-v2 consolidated metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
TomAugspurger committed Sep 11, 2024
1 parent 79bf235 commit fc901eb
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 10 deletions.
6 changes: 5 additions & 1 deletion src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async def open(


async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
kwargs.setdefault("open_consolidated", True)
return await open_group(*args, **kwargs)


Expand Down Expand Up @@ -531,6 +532,7 @@ async def open_group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
attributes: dict[str, JSON] | None = None,
open_consolidated: bool = False,
) -> AsyncGroup:
"""Open a group using file-mode-like semantics.
Expand Down Expand Up @@ -590,7 +592,9 @@ async def open_group(
attributes = {}

try:
return await AsyncGroup.open(store_path, zarr_format=zarr_format)
return await AsyncGroup.open(
store_path, zarr_format=zarr_format, open_consolidated=open_consolidated
)
except (KeyError, FileNotFoundError):
return await AsyncGroup.create(
store_path, zarr_format=zarr_format, exists_ok=True, attributes=attributes
Expand Down
1 change: 1 addition & 0 deletions src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ZARRAY_JSON = ".zarray"
ZGROUP_JSON = ".zgroup"
ZATTRS_JSON = ".zattrs"
ZMETADATA_v2_JSON = ".zmetadata"

BytesLike = bytes | bytearray | memoryview
ShapeLike = tuple[int, ...] | int
Expand Down
58 changes: 54 additions & 4 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import logging
from collections import defaultdict
from dataclasses import asdict, dataclass, field, replace
from typing import TYPE_CHECKING, Literal, cast, overload

Expand All @@ -28,7 +29,7 @@
parse_shapelike,
)
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV3Metadata
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata
from zarr.core.sync import SyncMixin, sync
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import ensure_no_existing_node
Expand Down Expand Up @@ -126,7 +127,14 @@ def from_dict(cls, data: dict[str, JSON]) -> ConsolidatedMetadata:
elif node_type == "array":
metadata[k] = ArrayV3Metadata.from_dict(v)
else:
raise ValueError(f"Invalid node_type: '{node_type}'")
# We either have V2 metadata, or invalid metadata
if "shape" in v:
# probably ArrayV2Metadata
metadata[k] = ArrayV2Metadata.from_dict(v)
else:
# probably v2 Group metadata
metadata[k] = GroupMetadata.from_dict(v)

# assert data["kind"] == "inline"
if data["kind"] != "inline":
raise ValueError
Expand Down Expand Up @@ -226,15 +234,26 @@ async def open(
cls,
store: StoreLike,
zarr_format: Literal[2, 3, None] = 3,
open_consolidated: bool = False,
) -> AsyncGroup:
store_path = await make_store_path(store)

if zarr_format == 2:
zgroup_bytes, zattrs_bytes = await asyncio.gather(
(store_path / ZGROUP_JSON).get(), (store_path / ZATTRS_JSON).get()
paths = [store_path / ZGROUP_JSON, store_path / ZATTRS_JSON]
if open_consolidated:
paths.append(store_path / ".zmetadata") # todo: configurable

zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather(
*[path.get() for path in paths]
)
if zgroup_bytes is None:
raise FileNotFoundError(store_path)

if open_consolidated:
consolidated_metadata_bytes = rest[0]
if consolidated_metadata_bytes is None:
raise FileNotFoundError(paths[-1])

elif zarr_format == 3:
zarr_json_bytes = await (store_path / ZARR_JSON).get()
if zarr_json_bytes is None:
Expand Down Expand Up @@ -265,6 +284,37 @@ async def open(
zgroup = json.loads(zgroup_bytes.to_bytes())
zattrs = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {}
group_metadata = {**zgroup, "attributes": zattrs}

if open_consolidated:
# this *should* be defined.
assert consolidated_metadata_bytes is not None # already checked above

v2_consolidated_metadata = json.loads(consolidated_metadata_bytes.to_bytes())
v2_consolidated_metadata = v2_consolidated_metadata["metadata"]
# We already read zattrs and zgroup. Should we ignore these?
v2_consolidated_metadata.pop(".zattrs")
v2_consolidated_metadata.pop(".zgroup")

consolidated_metadata: defaultdict[str, dict[str, Any]] = defaultdict(dict)

# keys like air/.zarray, air/.zattrs
for k, v in v2_consolidated_metadata.items():
path, kind = k.rsplit("/.", 1)

if kind == "zarray":
consolidated_metadata[path].update(v)
elif kind == "zattrs":
consolidated_metadata[path]["attributes"] = v
elif kind == "zgroup":
consolidated_metadata[path].update(v)
else:
raise ValueError(f"Invalid file type '{kind}' at path '{path}")
group_metadata["consolidated_metadata"] = {
"metadata": dict(consolidated_metadata),
"kind": "inline",
"must_understand": False,
}

else:
# V3 groups are comprised of a zarr.json object
assert zarr_json_bytes is not None
Expand Down
9 changes: 9 additions & 0 deletions tests/v3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from zarr.core.common import ChunkCoords, MemoryOrder, ZarrFormat


HERE = pathlib.Path(__file__).parent


async def parse_store(
store: Literal["local", "memory", "remote"], path: str
) -> LocalStore | MemoryStore | RemoteStore:
Expand Down Expand Up @@ -91,6 +94,12 @@ async def async_group(request: pytest.FixtureRequest, tmpdir: LEGACY_PATH) -> As
return agroup


@pytest.fixture(scope="function")
async def complex_hierarchy() -> LocalStore:
root = HERE / "examples/complex-hierarchy.zarr"
return LocalStore(root=root)


@pytest.fixture(params=["numpy", "cupy"])
def xp(request: pytest.FixtureRequest) -> Iterator[ModuleType]:
"""Fixture to parametrize over numpy-like libraries"""
Expand Down
130 changes: 125 additions & 5 deletions tests/v3/test_metadata/test_v2.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
from typing import Any

from zarr.abc.codec import Codec

import numpy as np
import pytest

import zarr.store
from zarr.codecs import GzipCodec
from zarr.core.buffer import cpu
from zarr.core.group import ConsolidatedMetadata, GroupMetadata
from zarr.core.metadata import ArrayV2Metadata, parse_zarr_format_v2

if TYPE_CHECKING:
from typing import Any

from zarr.abc.codec import Codec


def test_parse_zarr_format_valid() -> None:
assert parse_zarr_format_v2(2) == 2
Expand Down Expand Up @@ -70,3 +75,118 @@ def test_metadata_to_dict(
observed.pop("dimension_separator")

assert observed == expected


async def test_read_consolidated_metadata():
# .zgroup, .zattrs, .metadata
zmetadata = {
"metadata": {
".zattrs": {
"Conventions": "COARDS",
},
".zgroup": {"zarr_format": 2},
"air/.zarray": {
"chunks": [730],
"compressor": {},
"dtype": "<i2",
"fill_value": 0,
"filters": None,
"order": "C",
"shape": [730],
"zarr_format": 2,
},
"air/.zattrs": {
"_ARRAY_DIMENSIONS": ["time"],
"dataset": "NMC Reanalysis",
},
"time/.zarray": {
"chunks": [730],
"compressor": {},
"dtype": "<f4",
"fill_value": "0.0",
"filters": None,
"order": "C",
"shape": [730],
"zarr_format": 2,
},
"time/.zattrs": {
"_ARRAY_DIMENSIONS": ["time"],
"calendar": "standard",
"long_name": "Time",
"standard_name": "time",
"units": "hours since 1800-01-01",
},
"nested/.zattrs": {"key": "value"},
"nested/.zgroup": {"zarr_format": 2},
},
"zarr_consolidated_format": 1,
}
store_dict = {}
store = zarr.store.MemoryStore(store_dict=store_dict, mode="a")
await store.set(
".zattrs", cpu.Buffer.from_bytes(json.dumps({"Conventions": "COARDS"}).encode())
)
await store.set(".zgroup", cpu.Buffer.from_bytes(json.dumps({"zarr_format": 2}).encode()))
await store.set(".zmetadata", cpu.Buffer.from_bytes(json.dumps(zmetadata).encode()))
await store.set(
"air/.zarray",
cpu.Buffer.from_bytes(json.dumps(zmetadata["metadata"]["air/.zarray"]).encode()),
)
await store.set(
"air/.zattrs",
cpu.Buffer.from_bytes(json.dumps(zmetadata["metadata"]["air/.zattrs"]).encode()),
)
await store.set(
"time/.zarray",
cpu.Buffer.from_bytes(json.dumps(zmetadata["metadata"]["time/.zarray"]).encode()),
)
await store.set(
"time/.zattrs",
cpu.Buffer.from_bytes(json.dumps(zmetadata["metadata"]["time/.zattrs"]).encode()),
)

# and a nested group for fun
await store.set("nested/.zattrs", cpu.Buffer.from_bytes(json.dumps({"key": "value"}).encode()))
await store.set(
"nested/.zgroup", cpu.Buffer.from_bytes(json.dumps({"zarr_format": 2}).encode())
)

group = zarr.open_consolidated(store=store, zarr_format=2)
assert group.metadata.consolidated_metadata is not None
expected = ConsolidatedMetadata(
metadata={
"air": ArrayV2Metadata(
shape=(730,),
fill_value=0,
chunks=(730,),
attributes={"_ARRAY_DIMENSIONS": ["time"], "dataset": "NMC Reanalysis"},
dtype=np.dtype("int16"),
order="C",
filters=None,
dimension_separator=".",
compressor={},
),
"time": ArrayV2Metadata(
shape=(730,),
fill_value=0.0,
chunks=(730,),
attributes={
"_ARRAY_DIMENSIONS": ["time"],
"calendar": "standard",
"long_name": "Time",
"standard_name": "time",
"units": "hours since 1800-01-01",
},
dtype=np.dtype("float32"),
order="C",
filters=None,
dimension_separator=".",
compressor={},
),
"nested": GroupMetadata(attributes={"key": "value"}, zarr_format=2),
},
kind="inline",
must_understand=False,
)
result = group.metadata.consolidated_metadata
assert result == expected

0 comments on commit fc901eb

Please sign in to comment.