diff --git a/pyproject.toml b/pyproject.toml index 922b10346d..3933376b12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,8 +92,12 @@ extra-dependencies = [ "coverage", "pytest", "pytest-cov", + "msgpack", + "lmdb", + "zstandard", + "crc32c", "pytest-asyncio", - "mypy", + "typing_extensions" ] features = ["extra"] diff --git a/src/zarr/v3/__init__.py b/src/zarr/v3/__init__.py index 038dff89be..3441fa67be 100644 --- a/src/zarr/v3/__init__.py +++ b/src/zarr/v3/__init__.py @@ -3,10 +3,11 @@ from typing import Union import zarr.v3.codecs # noqa: F401 -from zarr.v3.array import Array # noqa: F401 -from zarr.v3.array_v2 import ArrayV2 # noqa: F401 -from zarr.v3.group import Group # noqa: F401 -from zarr.v3.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401 +from zarr.v3.array import Array, AsyncArray # noqa: F401 +from zarr.v3.array_v2 import ArrayV2 +from zarr.v3.config import RuntimeConfiguration # noqa: F401 +from zarr.v3.group import AsyncGroup, Group # noqa: F401 +from zarr.v3.metadata import runtime_configuration # noqa: F401 from zarr.v3.store import ( # noqa: F401 StoreLike, make_store_path, @@ -17,19 +18,24 @@ async def open_auto_async( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), -) -> Union[Array, ArrayV2, Group]: +) -> Union[AsyncArray, AsyncGroup]: store_path = make_store_path(store) try: - return await Array.open(store_path, runtime_configuration=runtime_configuration_) + return await AsyncArray.open(store_path, runtime_configuration=runtime_configuration_) except KeyError: - return await Group.open(store_path, runtime_configuration=runtime_configuration_) + return await AsyncGroup.open(store_path, runtime_configuration=runtime_configuration_) def open_auto( store: StoreLike, runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), ) -> Union[Array, ArrayV2, Group]: - return _sync( + object = _sync( open_auto_async(store, runtime_configuration_), runtime_configuration_.asyncio_loop, ) + if isinstance(object, AsyncArray): + return Array(object) + if isinstance(object, AsyncGroup): + return Group(object) + raise TypeError(f"Unexpected object type. Got {type(object)}.") diff --git a/src/zarr/v3/abc/codec.py b/src/zarr/v3/abc/codec.py index 0a7c68784f..d0e51ff894 100644 --- a/src/zarr/v3/abc/codec.py +++ b/src/zarr/v3/abc/codec.py @@ -1,37 +1,25 @@ from __future__ import annotations -from abc import abstractmethod, ABC -from typing import TYPE_CHECKING, Optional, Type +from abc import abstractmethod +from typing import TYPE_CHECKING, Optional import numpy as np +from zarr.v3.abc.metadata import Metadata -from zarr.v3.common import BytesLike, SliceSelection +from zarr.v3.common import ArraySpec from zarr.v3.store import StorePath if TYPE_CHECKING: - from zarr.v3.metadata import ( - ArraySpec, - ArrayMetadata, - DataType, - CodecMetadata, - RuntimeConfiguration, - ) + from typing_extensions import Self + from zarr.v3.common import BytesLike, SliceSelection + from zarr.v3.metadata import ArrayMetadata + from zarr.v3.config import RuntimeConfiguration -class Codec(ABC): +class Codec(Metadata): is_fixed_size: bool - @classmethod - @abstractmethod - def get_metadata_class(cls) -> Type[CodecMetadata]: - pass - - @classmethod - @abstractmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> Codec: - pass - @abstractmethod def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int: pass @@ -39,7 +27,7 @@ def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: return chunk_spec - def evolve(self, *, ndim: int, data_type: DataType) -> Codec: + def evolve(self, array_spec: ArraySpec) -> Self: return self def validate(self, array_metadata: ArrayMetadata) -> None: diff --git a/src/zarr/v3/abc/metadata.py b/src/zarr/v3/abc/metadata.py new file mode 100644 index 0000000000..bdd2f86d59 --- /dev/null +++ b/src/zarr/v3/abc/metadata.py @@ -0,0 +1,44 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Sequence + +if TYPE_CHECKING: + from typing import Dict + from typing_extensions import Self + +from dataclasses import fields + +from zarr.v3.common import JSON + + +class Metadata: + def to_dict(self) -> JSON: + """ + Recursively serialize this model to a dictionary. + This method inspects the fields of self and calls `x.to_dict()` for any fields that + are instances of `Metadata`. Sequences of `Metadata` are similarly recursed into, and + the output of that recursion is collected in a list. + """ + ... + out_dict = {} + for field in fields(self): + key = field.name + value = getattr(self, key) + if isinstance(value, Metadata): + out_dict[field.name] = getattr(self, field.name).to_dict() + elif isinstance(value, str): + out_dict[key] = value + elif isinstance(value, Sequence): + out_dict[key] = [v.to_dict() if isinstance(v, Metadata) else v for v in value] + else: + out_dict[key] = value + + return out_dict + + @classmethod + def from_dict(cls, data: Dict[str, JSON]) -> Self: + """ + Create an instance of the model from a dictionary + """ + ... + + return cls(**data) diff --git a/src/zarr/v3/array.py b/src/zarr/v3/array.py index dadde1658a..632f7d8ec7 100644 --- a/src/zarr/v3/array.py +++ b/src/zarr/v3/array.py @@ -2,7 +2,7 @@ # 1. Split Array into AsyncArray and Array # 3. Added .size and .attrs methods # 4. Temporarily disabled the creation of ArrayV2 -# 5. Added from_json to AsyncArray +# 5. Added from_dict to AsyncArray # Questions to consider: # 1. Was splitting the array into two classes really necessary? @@ -10,47 +10,65 @@ from __future__ import annotations +from dataclasses import dataclass, replace + import json from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union import numpy as np -from attr import evolve, frozen +from zarr.v3.abc.codec import Codec # from zarr.v3.array_v2 import ArrayV2 -from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec -from zarr.v3.codecs.registry import get_codec_from_metadata +from zarr.v3.codecs import BytesCodec from zarr.v3.common import ( ZARR_JSON, + ArraySpec, ChunkCoords, Selection, SliceSelection, concurrent_map, ) +from zarr.v3.config import RuntimeConfiguration + from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice -from zarr.v3.metadata import ( - ArrayMetadata, - ArraySpec, - DataType, - DefaultChunkKeyEncodingConfigurationMetadata, - DefaultChunkKeyEncodingMetadata, - RegularChunkGridConfigurationMetadata, - RegularChunkGridMetadata, - RuntimeConfiguration, - V2ChunkKeyEncodingConfigurationMetadata, - V2ChunkKeyEncodingMetadata, - dtype_to_data_type, -) +from zarr.v3.chunk_grids import RegularChunkGrid +from zarr.v3.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.v3.metadata import ArrayMetadata from zarr.v3.store import StoreLike, StorePath, make_store_path from zarr.v3.sync import sync -@frozen +def parse_array_metadata(data: Any): + if isinstance(data, ArrayMetadata): + return data + elif isinstance(data, dict): + return ArrayMetadata.from_dict(data) + else: + raise TypeError + + +@dataclass(frozen=True) class AsyncArray: metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration - codec_pipeline: CodecPipeline + + @property + def codecs(self): + return self.metadata.codecs + + def __init__( + self, + metadata: ArrayMetadata, + store_path: StorePath, + runtime_configuration: RuntimeConfiguration, + ): + metadata_parsed = parse_array_metadata(metadata) + + object.__setattr__(self, "metadata", metadata_parsed) + object.__setattr__(self, "store_path", store_path) + object.__setattr__(self, "runtime_configuration", runtime_configuration) @classmethod async def create( @@ -65,7 +83,7 @@ async def create( Tuple[Literal["default"], Literal[".", "/"]], Tuple[Literal["v2"], Literal[".", "/"]], ] = ("default", "/"), - codecs: Optional[Iterable[CodecMetadata]] = None, + codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, dimension_names: Optional[Iterable[str]] = None, attributes: Optional[Dict[str, Any]] = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), @@ -75,36 +93,22 @@ async def create( if not exists_ok: assert not await (store_path / ZARR_JSON).exists() - data_type = ( - DataType[dtype] if isinstance(dtype, str) else DataType[dtype_to_data_type[dtype.str]] - ) - - codecs = list(codecs) if codecs is not None else [bytes_codec()] + codecs = list(codecs) if codecs is not None else [BytesCodec()] if fill_value is None: - if data_type == DataType.bool: + if dtype == np.dtype("bool"): fill_value = False else: fill_value = 0 metadata = ArrayMetadata( shape=shape, - data_type=data_type, - chunk_grid=RegularChunkGridMetadata( - configuration=RegularChunkGridConfigurationMetadata(chunk_shape=chunk_shape) - ), + data_type=dtype, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), chunk_key_encoding=( - V2ChunkKeyEncodingMetadata( - configuration=V2ChunkKeyEncodingConfigurationMetadata( - separator=chunk_key_encoding[1] - ) - ) + V2ChunkKeyEncoding(separator=chunk_key_encoding[1]) if chunk_key_encoding[0] == "v2" - else DefaultChunkKeyEncodingMetadata( - configuration=DefaultChunkKeyEncodingConfigurationMetadata( - separator=chunk_key_encoding[1] - ) - ) + else DefaultChunkKeyEncoding(separator=chunk_key_encoding[1]) ), fill_value=fill_value, codecs=codecs, @@ -117,38 +121,22 @@ async def create( metadata=metadata, store_path=store_path, runtime_configuration=runtime_configuration, - codec_pipeline=CodecPipeline.create( - [ - get_codec_from_metadata(codec).evolve(ndim=len(shape), data_type=data_type) - for codec in codecs - ] - ), ) await array._save_metadata() return array @classmethod - def from_json( + def from_dict( cls, store_path: StorePath, - zarr_json: Any, + data: Dict[str, Any], runtime_configuration: RuntimeConfiguration, ) -> AsyncArray: - metadata = ArrayMetadata.from_json(zarr_json) - codecs = [ - get_codec_from_metadata(codec).evolve( - ndim=len(metadata.shape), data_type=metadata.data_type - ) - for codec in metadata.codecs - ] + metadata = ArrayMetadata.from_dict(data) async_array = cls( - metadata=metadata, - store_path=store_path, - runtime_configuration=runtime_configuration, - codec_pipeline=CodecPipeline.create(codecs), + metadata=metadata, store_path=store_path, runtime_configuration=runtime_configuration ) - async_array._validate_metadata() return async_array @classmethod @@ -160,7 +148,7 @@ async def open( store_path = make_store_path(store) zarr_json_bytes = await (store_path / ZARR_JSON).get() assert zarr_json_bytes is not None - return cls.from_json( + return cls.from_dict( store_path, json.loads(zarr_json_bytes), runtime_configuration=runtime_configuration, @@ -175,7 +163,7 @@ async def open_auto( store_path = make_store_path(store) v3_metadata_bytes = await (store_path / ZARR_JSON).get() if v3_metadata_bytes is not None: - return cls.from_json( + return cls.from_dict( store_path, json.loads(v3_metadata_bytes), runtime_configuration=runtime_configuration or RuntimeConfiguration(), @@ -205,10 +193,11 @@ def attrs(self) -> dict: return self.metadata.attributes async def getitem(self, selection: Selection): + assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=self.metadata.chunk_grid.configuration.chunk_shape, + chunk_shape=self.metadata.chunk_grid.chunk_shape, ) # setup output array @@ -234,20 +223,8 @@ async def getitem(self, selection: Selection): return out[()] async def _save_metadata(self) -> None: - self._validate_metadata() - await (self.store_path / ZARR_JSON).set(self.metadata.to_bytes()) - def _validate_metadata(self) -> None: - assert len(self.metadata.shape) == len( - self.metadata.chunk_grid.configuration.chunk_shape - ), "`chunk_shape` and `shape` need to have the same number of dimensions." - assert self.metadata.dimension_names is None or len(self.metadata.shape) == len( - self.metadata.dimension_names - ), "`dimension_names` and `shape` need to have the same number of dimensions." - assert self.metadata.fill_value is not None, "`fill_value` is required." - self.codec_pipeline.validate(self.metadata) - async def _read_chunk( self, chunk_coords: ChunkCoords, @@ -260,8 +237,8 @@ async def _read_chunk( chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) store_path = self.store_path / chunk_key - if self.codec_pipeline.supports_partial_decode: - chunk_array = await self.codec_pipeline.decode_partial( + if self.codecs.supports_partial_decode: + chunk_array = await self.codecs.decode_partial( store_path, chunk_selection, chunk_spec, self.runtime_configuration ) if chunk_array is not None: @@ -271,7 +248,7 @@ async def _read_chunk( else: chunk_bytes = await store_path.get() if chunk_bytes is not None: - chunk_array = await self.codec_pipeline.decode( + chunk_array = await self.codecs.decode( chunk_bytes, chunk_spec, self.runtime_configuration ) tmp = chunk_array[chunk_selection] @@ -280,7 +257,8 @@ async def _read_chunk( out[out_selection] = self.metadata.fill_value async def setitem(self, selection: Selection, value: np.ndarray) -> None: - chunk_shape = self.metadata.chunk_grid.configuration.chunk_shape + assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) + chunk_shape = self.metadata.chunk_grid.chunk_shape indexer = BasicIndexer( selection, shape=self.metadata.shape, @@ -341,9 +319,9 @@ async def _write_chunk( chunk_array = value[out_selection] await self._write_chunk_to_store(store_path, chunk_array, chunk_spec) - elif self.codec_pipeline.supports_partial_encode: + elif self.codecs.supports_partial_encode: # print("encode_partial", chunk_coords, chunk_selection, repr(self)) - await self.codec_pipeline.encode_partial( + await self.codecs.encode_partial( store_path, value[out_selection], chunk_selection, @@ -364,9 +342,7 @@ async def _write_chunk( chunk_array.fill(self.metadata.fill_value) else: chunk_array = ( - await self.codec_pipeline.decode( - chunk_bytes, chunk_spec, self.runtime_configuration - ) + await self.codecs.decode(chunk_bytes, chunk_spec, self.runtime_configuration) ).copy() # make a writable copy chunk_array[chunk_selection] = value[out_selection] @@ -379,7 +355,7 @@ async def _write_chunk_to_store( # chunks that only contain fill_value will be removed await store_path.delete() else: - chunk_bytes = await self.codec_pipeline.encode( + chunk_bytes = await self.codecs.encode( chunk_array, chunk_spec, self.runtime_configuration ) if chunk_bytes is None: @@ -388,11 +364,17 @@ async def _write_chunk_to_store( await store_path.set(chunk_bytes) async def resize(self, new_shape: ChunkCoords) -> AsyncArray: - assert len(new_shape) == len(self.metadata.shape) - new_metadata = evolve(self.metadata, shape=new_shape) + if len(new_shape) != len(self.metadata.shape): + raise ValueError( + "The new shape must have the same number of dimensions " + + f"(={len(self.metadata.shape)})." + ) + + new_metadata = replace(self.metadata, shape=new_shape) # Remove all chunks outside of the new shape - chunk_shape = self.metadata.chunk_grid.configuration.chunk_shape + assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) + chunk_shape = self.metadata.chunk_grid.chunk_shape chunk_key_encoding = self.metadata.chunk_key_encoding old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) @@ -411,14 +393,14 @@ async def _delete_key(key: str) -> None: # Write new metadata await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) - return evolve(self, metadata=new_metadata) + return replace(self, metadata=new_metadata) - async def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: - new_metadata = evolve(self.metadata, attributes=new_attributes) + async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray: + new_metadata = replace(self.metadata, attributes=new_attributes) # Write new metadata await (self.store_path / ZARR_JSON).set(new_metadata.to_bytes()) - return evolve(self, metadata=new_metadata) + return replace(self, metadata=new_metadata) def __repr__(self): return f"" @@ -427,7 +409,7 @@ async def info(self): return NotImplemented -@frozen +@dataclass(frozen=True) class Array: _async_array: AsyncArray @@ -444,7 +426,7 @@ def create( Tuple[Literal["default"], Literal[".", "/"]], Tuple[Literal["v2"], Literal[".", "/"]], ] = ("default", "/"), - codecs: Optional[Iterable[CodecMetadata]] = None, + codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, dimension_names: Optional[Iterable[str]] = None, attributes: Optional[Dict[str, Any]] = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), @@ -469,14 +451,14 @@ def create( return cls(async_array) @classmethod - def from_json( + def from_dict( cls, store_path: StorePath, - zarr_json: Any, + data: Dict[str, Any], runtime_configuration: RuntimeConfiguration, ) -> Array: - async_array = AsyncArray.from_json( - store_path=store_path, zarr_json=zarr_json, runtime_configuration=runtime_configuration + async_array = AsyncArray.from_dict( + store_path=store_path, data=data, runtime_configuration=runtime_configuration ) return cls(async_array) @@ -490,7 +472,6 @@ def open( AsyncArray.open(store, runtime_configuration=runtime_configuration), runtime_configuration.asyncio_loop, ) - async_array._validate_metadata() return cls(async_array) @classmethod @@ -530,7 +511,7 @@ def metadata(self) -> ArrayMetadata: return self._async_array.metadata @property - def store_path(self) -> str: + def store_path(self) -> StorePath: return self._async_array.store_path def __getitem__(self, selection: Selection): diff --git a/src/zarr/v3/array_v2.py b/src/zarr/v3/array_v2.py index dc4cbebd5e..f150d2dbd2 100644 --- a/src/zarr/v3/array_v2.py +++ b/src/zarr/v3/array_v2.py @@ -1,12 +1,13 @@ from __future__ import annotations import asyncio +from dataclasses import dataclass, replace import json from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union import numcodecs import numpy as np -from attr import evolve, frozen + from numcodecs.compat import ensure_bytes, ensure_ndarray from zarr.v3.common import ( @@ -19,8 +20,9 @@ concurrent_map, to_thread, ) +from zarr.v3.config import RuntimeConfiguration from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice -from zarr.v3.metadata import ArrayV2Metadata, CodecMetadata, RuntimeConfiguration +from zarr.v3.metadata import ArrayV2Metadata from zarr.v3.store import StoreLike, StorePath, make_store_path from zarr.v3.sync import sync @@ -28,7 +30,7 @@ from zarr.v3.array import Array -@frozen +@dataclass(frozen=True) class _AsyncArrayProxy: array: ArrayV2 @@ -36,7 +38,7 @@ def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: return _AsyncArraySelectionProxy(self.array, selection) -@frozen +@dataclass(frozen=True) class _AsyncArraySelectionProxy: array: ArrayV2 selection: Selection @@ -48,7 +50,7 @@ async def set(self, value: np.ndarray): return await self.array.set_async(self.selection, value) -@frozen +@dataclass(frozen=True) class ArrayV2: metadata: ArrayV2Metadata attributes: Optional[Dict[str, Any]] @@ -74,7 +76,7 @@ async def create_async( ) -> ArrayV2: store_path = make_store_path(store) if not exists_ok: - assert not await (store_path / ZARRAY_JSON).exists_async() + assert not await (store_path / ZARRAY_JSON).exists() metadata = ArrayV2Metadata( shape=shape, @@ -144,11 +146,11 @@ async def open_async( ) -> ArrayV2: store_path = make_store_path(store) zarray_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZARRAY_JSON).get_async(), - (store_path / ZATTRS_JSON).get_async(), + (store_path / ZARRAY_JSON).get(), + (store_path / ZATTRS_JSON).get(), ) assert zarray_bytes is not None - return cls.from_json( + return cls.from_dict( store_path, zarray_json=json.loads(zarray_bytes), zattrs_json=json.loads(zattrs_bytes) if zattrs_bytes is not None else None, @@ -167,14 +169,14 @@ def open( ) @classmethod - def from_json( + def from_dict( cls, store_path: StorePath, zarray_json: Any, zattrs_json: Optional[Any], runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), ) -> ArrayV2: - metadata = ArrayV2Metadata.from_json(zarray_json) + metadata = ArrayV2Metadata.from_dict(zarray_json) out = cls( store_path=store_path, metadata=metadata, @@ -187,13 +189,13 @@ def from_json( async def _save_metadata(self) -> None: self._validate_metadata() - await (self.store_path / ZARRAY_JSON).set_async(self.metadata.to_bytes()) + await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes()) if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set_async( + await (self.store_path / ZATTRS_JSON).set( json.dumps(self.attributes).encode(), ) else: - await (self.store_path / ZATTRS_JSON).delete_async() + await (self.store_path / ZATTRS_JSON).delete() def _validate_metadata(self) -> None: assert len(self.metadata.shape) == len( @@ -256,7 +258,7 @@ async def _read_chunk( ): store_path = self.store_path / self._encode_chunk_key(chunk_coords) - chunk_array = await self._decode_chunk(await store_path.get_async()) + chunk_array = await self._decode_chunk(await store_path.get()) if chunk_array is not None: tmp = chunk_array[chunk_selection] out[out_selection] = tmp @@ -357,7 +359,7 @@ async def _write_chunk( else: # writing partial chunks # read chunk first - tmp = await self._decode_chunk(await store_path.get_async()) + tmp = await self._decode_chunk(await store_path.get()) # merge new value if tmp is None: @@ -379,13 +381,13 @@ async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.nda chunk_bytes: Optional[BytesLike] if np.all(chunk_array == self.metadata.fill_value): # chunks that only contain fill_value will be removed - await store_path.delete_async() + await store_path.delete() else: chunk_bytes = await self._encode_chunk(chunk_array) if chunk_bytes is None: - await store_path.delete_async() + await store_path.delete() else: - await store_path.set_async(chunk_bytes) + await store_path.set(chunk_bytes) async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: chunk_array = chunk_array.ravel(order=self.metadata.order) @@ -411,7 +413,7 @@ def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: assert len(new_shape) == len(self.metadata.shape) - new_metadata = evolve(self.metadata, shape=new_shape) + new_metadata = replace(self.metadata, shape=new_shape) # Remove all chunks outside of the new shape chunk_shape = self.metadata.chunks @@ -419,7 +421,7 @@ async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) async def _delete_key(key: str) -> None: - await (self.store_path / key).delete_async() + await (self.store_path / key).delete() await concurrent_map( [ @@ -430,8 +432,8 @@ async def _delete_key(key: str) -> None: ) # Write new metadata - await (self.store_path / ZARRAY_JSON).set_async(new_metadata.to_bytes()) - return evolve(self, metadata=new_metadata) + await (self.store_path / ZARRAY_JSON).set(new_metadata.to_bytes()) + return replace(self, metadata=new_metadata) def resize(self, new_shape: ChunkCoords) -> ArrayV2: return sync(self.resize_async(new_shape), self.runtime_configuration.asyncio_loop) @@ -439,36 +441,22 @@ def resize(self, new_shape: ChunkCoords) -> ArrayV2: async def convert_to_v3_async(self) -> Array: from sys import byteorder as sys_byteorder + from zarr.v3.abc.codec import Codec from zarr.v3.array import Array from zarr.v3.common import ZARR_JSON - from zarr.v3.metadata import ( - ArrayMetadata, - DataType, - RegularChunkGridConfigurationMetadata, - RegularChunkGridMetadata, - V2ChunkKeyEncodingConfigurationMetadata, - V2ChunkKeyEncodingMetadata, - dtype_to_data_type, - ) - from zarr.v3.codecs.blosc import ( - BloscCodecConfigurationMetadata, - BloscCodecMetadata, - blosc_shuffle_int_to_str, - ) - from zarr.v3.codecs.bytes import ( - BytesCodecConfigurationMetadata, - BytesCodecMetadata, - ) - from zarr.v3.codecs.gzip import ( - GzipCodecConfigurationMetadata, - GzipCodecMetadata, - ) - from zarr.v3.codecs.transpose import ( - TransposeCodecConfigurationMetadata, - TransposeCodecMetadata, + from zarr.v3.chunk_grids import RegularChunkGrid + from zarr.v3.chunk_key_encodings import V2ChunkKeyEncoding + from zarr.v3.metadata import ArrayMetadata, DataType + + from zarr.v3.codecs import ( + BloscCodec, + BloscShuffle, + BytesCodec, + GzipCodec, + TransposeCodec, ) - data_type = DataType[dtype_to_data_type[self.metadata.dtype.str]] + data_type = DataType.from_dtype(self.metadata.dtype) endian: Literal["little", "big"] if self.metadata.dtype.byteorder == "=": endian = sys_byteorder @@ -481,19 +469,11 @@ async def convert_to_v3_async(self) -> Array: self.metadata.filters is None or len(self.metadata.filters) == 0 ), "Filters are not supported by v3." - codecs: List[CodecMetadata] = [] + codecs: List[Codec] = [] if self.metadata.order == "F": - codecs.append( - TransposeCodecMetadata( - configuration=TransposeCodecConfigurationMetadata( - order=tuple(reversed(range(self.metadata.ndim))) - ) - ) - ) - codecs.append( - BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian=endian)) - ) + codecs.append(TransposeCodec(order=tuple(reversed(range(self.metadata.ndim))))) + codecs.append(BytesCodec(endian=endian)) if self.metadata.compressor is not None: v2_codec = numcodecs.get_codec(self.metadata.compressor).get_config() @@ -502,55 +482,41 @@ async def convert_to_v3_async(self) -> Array: "gzip", ), "Only blosc and gzip are supported by v3." if v2_codec["id"] == "blosc": - shuffle = blosc_shuffle_int_to_str[v2_codec.get("shuffle", 0)] codecs.append( - BloscCodecMetadata( - configuration=BloscCodecConfigurationMetadata( - typesize=data_type.byte_count, - cname=v2_codec["cname"], - clevel=v2_codec["clevel"], - shuffle=shuffle, - blocksize=v2_codec.get("blocksize", 0), - ) + BloscCodec( + typesize=data_type.byte_count, + cname=v2_codec["cname"], + clevel=v2_codec["clevel"], + shuffle=BloscShuffle.from_int(v2_codec.get("shuffle", 0)), + blocksize=v2_codec.get("blocksize", 0), ) ) elif v2_codec["id"] == "gzip": - codecs.append( - GzipCodecMetadata( - configuration=GzipCodecConfigurationMetadata(level=v2_codec.get("level", 5)) - ) - ) + codecs.append(GzipCodec(level=v2_codec.get("level", 5))) new_metadata = ArrayMetadata( shape=self.metadata.shape, - chunk_grid=RegularChunkGridMetadata( - configuration=RegularChunkGridConfigurationMetadata( - chunk_shape=self.metadata.chunks - ) - ), + chunk_grid=RegularChunkGrid(chunk_shape=self.metadata.chunks), data_type=data_type, fill_value=0 if self.metadata.fill_value is None else self.metadata.fill_value, - chunk_key_encoding=V2ChunkKeyEncodingMetadata( - configuration=V2ChunkKeyEncodingConfigurationMetadata( - separator=self.metadata.dimension_separator - ) - ), + chunk_key_encoding=V2ChunkKeyEncoding(separator=self.metadata.dimension_separator), codecs=codecs, attributes=self.attributes or {}, + dimension_names=None, ) new_metadata_bytes = new_metadata.to_bytes() - await (self.store_path / ZARR_JSON).set_async(new_metadata_bytes) + await (self.store_path / ZARR_JSON).set(new_metadata_bytes) - return Array.from_json( + return Array.from_dict( store_path=self.store_path, - zarr_json=json.loads(new_metadata_bytes), + data=json.loads(new_metadata_bytes), runtime_configuration=self.runtime_configuration, ) async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: - await (self.store_path / ZATTRS_JSON).set_async(json.dumps(new_attributes).encode()) - return evolve(self, attributes=new_attributes) + await (self.store_path / ZATTRS_JSON).set(json.dumps(new_attributes).encode()) + return replace(self, attributes=new_attributes) def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: return sync( diff --git a/src/zarr/v3/chunk_grids.py b/src/zarr/v3/chunk_grids.py new file mode 100644 index 0000000000..6c48323798 --- /dev/null +++ b/src/zarr/v3/chunk_grids.py @@ -0,0 +1,47 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Dict +from dataclasses import dataclass +from zarr.v3.abc.metadata import Metadata + +from zarr.v3.common import ( + JSON, + ChunkCoords, + ChunkCoordsLike, + parse_named_configuration, + parse_shapelike, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(frozen=True) +class ChunkGrid(Metadata): + @classmethod + def from_dict(cls, data: Dict[str, JSON]) -> ChunkGrid: + if isinstance(data, ChunkGrid): + return data # type: ignore + + name_parsed, _ = parse_named_configuration(data) + if name_parsed == "regular": + return RegularChunkGrid.from_dict(data) + raise ValueError(f"Unknown chunk grid. Got {name_parsed}.") + + +@dataclass(frozen=True) +class RegularChunkGrid(ChunkGrid): + chunk_shape: ChunkCoords + + def __init__(self, *, chunk_shape: ChunkCoordsLike) -> None: + chunk_shape_parsed = parse_shapelike(chunk_shape) + + object.__setattr__(self, "chunk_shape", chunk_shape_parsed) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "regular") + + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + return {"name": "regular", "configuration": {"chunk_shape": list(self.chunk_shape)}} diff --git a/src/zarr/v3/chunk_key_encodings.py b/src/zarr/v3/chunk_key_encodings.py new file mode 100644 index 0000000000..e4339240e3 --- /dev/null +++ b/src/zarr/v3/chunk_key_encodings.py @@ -0,0 +1,81 @@ +from __future__ import annotations +from abc import abstractmethod +from typing import TYPE_CHECKING, Dict, Literal +from dataclasses import dataclass +from zarr.v3.abc.metadata import Metadata + +from zarr.v3.common import ( + JSON, + ChunkCoords, + parse_named_configuration, +) + +if TYPE_CHECKING: + pass + +SeparatorLiteral = Literal[".", "/"] + + +def parse_separator(data: JSON) -> SeparatorLiteral: + if data not in (".", "/"): + raise ValueError(f"Expected an '.' or '/' separator. Got {data} instead.") + return data # type: ignore + + +@dataclass(frozen=True) +class ChunkKeyEncoding(Metadata): + name: str + separator: SeparatorLiteral = "." + + def __init__(self, *, separator: SeparatorLiteral) -> None: + separator_parsed = parse_separator(separator) + + object.__setattr__(self, "separator", separator_parsed) + + @classmethod + def from_dict(cls, data: Dict[str, JSON]) -> ChunkKeyEncoding: + if isinstance(data, ChunkKeyEncoding): + return data # type: ignore + + name_parsed, configuration_parsed = parse_named_configuration(data) + if name_parsed == "default": + return DefaultChunkKeyEncoding(**configuration_parsed) # type: ignore[arg-type] + if name_parsed == "v2": + return V2ChunkKeyEncoding(**configuration_parsed) # type: ignore[arg-type] + raise ValueError(f"Unknown chunk key encoding. Got {name_parsed}.") + + def to_dict(self) -> Dict[str, JSON]: + return {"name": self.name, "configuration": {"separator": self.separator}} + + @abstractmethod + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + pass + + @abstractmethod + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + pass + + +@dataclass(frozen=True) +class DefaultChunkKeyEncoding(ChunkKeyEncoding): + name: Literal["default"] = "default" + + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + if chunk_key == "c": + return () + return tuple(map(int, chunk_key[1:].split(self.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.separator.join(map(str, ("c",) + chunk_coords)) + + +@dataclass(frozen=True) +class V2ChunkKeyEncoding(ChunkKeyEncoding): + name: Literal["v2"] = "v2" + + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + return tuple(map(int, chunk_key.split(self.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier diff --git a/src/zarr/v3/codecs/__init__.py b/src/zarr/v3/codecs/__init__.py index 40c71f6807..474344ec25 100644 --- a/src/zarr/v3/codecs/__init__.py +++ b/src/zarr/v3/codecs/__init__.py @@ -1,323 +1,9 @@ from __future__ import annotations -from typing import ( - TYPE_CHECKING, - Iterable, - Iterator, - List, - Literal, - Optional, - Tuple, - Union, -) -from warnings import warn -from attr import frozen - -import numpy as np - -from zarr.v3.abc.codec import ( - ArrayBytesCodecPartialDecodeMixin, - ArrayBytesCodecPartialEncodeMixin, - Codec, - ArrayArrayCodec, - ArrayBytesCodec, - BytesBytesCodec, -) -from zarr.v3.common import BytesLike, SliceSelection -from zarr.v3.metadata import CodecMetadata, ShardingCodecIndexLocation, RuntimeConfiguration -from zarr.v3.store import StorePath - -if TYPE_CHECKING: - from zarr.v3.metadata import ArrayMetadata, ArraySpec - from zarr.v3.codecs.sharding import ShardingCodecMetadata - from zarr.v3.codecs.blosc import BloscCodecMetadata - from zarr.v3.codecs.bytes import BytesCodecMetadata - from zarr.v3.codecs.transpose import TransposeCodecMetadata - from zarr.v3.codecs.gzip import GzipCodecMetadata - from zarr.v3.codecs.zstd import ZstdCodecMetadata - from zarr.v3.codecs.crc32c_ import Crc32cCodecMetadata - - -def _find_array_bytes_codec( - codecs: Iterable[Tuple[Codec, ArraySpec]] -) -> Tuple[ArrayBytesCodec, ArraySpec]: - for codec, array_spec in codecs: - if isinstance(codec, ArrayBytesCodec): - return (codec, array_spec) - raise KeyError - - -@frozen -class CodecPipeline: - array_array_codecs: List[ArrayArrayCodec] - array_bytes_codec: ArrayBytesCodec - bytes_bytes_codecs: List[BytesBytesCodec] - - @classmethod - def create(cls, codecs: List[Codec]) -> CodecPipeline: - from zarr.v3.codecs.sharding import ShardingCodec - - assert any( - isinstance(codec, ArrayBytesCodec) for codec in codecs - ), "Exactly one array-to-bytes codec is required." - - prev_codec: Optional[Codec] = None - for codec in codecs: - if prev_codec is not None: - assert not isinstance(codec, ArrayBytesCodec) or not isinstance( - prev_codec, ArrayBytesCodec - ), ( - f"ArrayBytesCodec '{type(codec)}' cannot follow after " - + f"ArrayBytesCodec '{type(prev_codec)}' because exactly " - + "1 ArrayBytesCodec is allowed." - ) - assert not isinstance(codec, ArrayBytesCodec) or not isinstance( - prev_codec, BytesBytesCodec - ), ( - f"ArrayBytesCodec '{type(codec)}' cannot follow after " - + f"BytesBytesCodec '{type(prev_codec)}'." - ) - assert not isinstance(codec, ArrayArrayCodec) or not isinstance( - prev_codec, ArrayBytesCodec - ), ( - f"ArrayArrayCodec '{type(codec)}' cannot follow after " - + f"ArrayBytesCodec '{type(prev_codec)}'." - ) - assert not isinstance(codec, ArrayArrayCodec) or not isinstance( - prev_codec, BytesBytesCodec - ), ( - f"ArrayArrayCodec '{type(codec)}' cannot follow after " - + f"BytesBytesCodec '{type(prev_codec)}'." - ) - prev_codec = codec - - if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: - warn( - "Combining a `sharding_indexed` codec disables partial reads and " - + "writes, which may lead to inefficient performance." - ) - - return CodecPipeline( - array_array_codecs=[codec for codec in codecs if isinstance(codec, ArrayArrayCodec)], - array_bytes_codec=[codec for codec in codecs if isinstance(codec, ArrayBytesCodec)][0], - bytes_bytes_codecs=[codec for codec in codecs if isinstance(codec, BytesBytesCodec)], - ) - - @property - def supports_partial_decode(self) -> bool: - return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( - self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin - ) - - @property - def supports_partial_encode(self) -> bool: - return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( - self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin - ) - - def __iter__(self) -> Iterator[Codec]: - for aa_codec in self.array_array_codecs: - yield aa_codec - - yield self.array_bytes_codec - - for bb_codec in self.bytes_bytes_codecs: - yield bb_codec - - def validate(self, array_metadata: ArrayMetadata) -> None: - for codec in self: - codec.validate(array_metadata) - - def _codecs_with_resolved_metadata( - self, array_spec: ArraySpec - ) -> Tuple[ - List[Tuple[ArrayArrayCodec, ArraySpec]], - Tuple[ArrayBytesCodec, ArraySpec], - List[Tuple[BytesBytesCodec, ArraySpec]], - ]: - aa_codecs_with_spec: List[Tuple[ArrayArrayCodec, ArraySpec]] = [] - for aa_codec in self.array_array_codecs: - aa_codecs_with_spec.append((aa_codec, array_spec)) - array_spec = aa_codec.resolve_metadata(array_spec) - - ab_codec_with_spec = (self.array_bytes_codec, array_spec) - array_spec = self.array_bytes_codec.resolve_metadata(array_spec) - - bb_codecs_with_spec: List[Tuple[BytesBytesCodec, ArraySpec]] = [] - for bb_codec in self.bytes_bytes_codecs: - bb_codecs_with_spec.append((bb_codec, array_spec)) - array_spec = bb_codec.resolve_metadata(array_spec) - - return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) - - async def decode( - self, - chunk_bytes: BytesLike, - array_spec: ArraySpec, - runtime_configuration: RuntimeConfiguration, - ) -> np.ndarray: - ( - aa_codecs_with_spec, - ab_codec_with_spec, - bb_codecs_with_spec, - ) = self._codecs_with_resolved_metadata(array_spec) - - for bb_codec, array_spec in bb_codecs_with_spec[::-1]: - chunk_bytes = await bb_codec.decode(chunk_bytes, array_spec, runtime_configuration) - - ab_codec, array_spec = ab_codec_with_spec - chunk_array = await ab_codec.decode(chunk_bytes, array_spec, runtime_configuration) - - for aa_codec, array_spec in aa_codecs_with_spec[::-1]: - chunk_array = await aa_codec.decode(chunk_array, array_spec, runtime_configuration) - - return chunk_array - - async def decode_partial( - self, - store_path: StorePath, - selection: SliceSelection, - chunk_spec: ArraySpec, - runtime_configuration: RuntimeConfiguration, - ) -> Optional[np.ndarray]: - assert self.supports_partial_decode - assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin) - return await self.array_bytes_codec.decode_partial( - store_path, selection, chunk_spec, runtime_configuration - ) - - async def encode( - self, - chunk_array: np.ndarray, - array_spec: ArraySpec, - runtime_configuration: RuntimeConfiguration, - ) -> Optional[BytesLike]: - ( - aa_codecs_with_spec, - ab_codec_with_spec, - bb_codecs_with_spec, - ) = self._codecs_with_resolved_metadata(array_spec) - - for aa_codec, array_spec in aa_codecs_with_spec: - chunk_array_maybe = await aa_codec.encode( - chunk_array, array_spec, runtime_configuration - ) - if chunk_array_maybe is None: - return None - chunk_array = chunk_array_maybe - - ab_codec, array_spec = ab_codec_with_spec - chunk_bytes_maybe = await ab_codec.encode(chunk_array, array_spec, runtime_configuration) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - for bb_codec, array_spec in bb_codecs_with_spec: - chunk_bytes_maybe = await bb_codec.encode( - chunk_bytes, array_spec, runtime_configuration - ) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - return chunk_bytes - - async def encode_partial( - self, - store_path: StorePath, - chunk_array: np.ndarray, - selection: SliceSelection, - chunk_spec: ArraySpec, - runtime_configuration: RuntimeConfiguration, - ) -> None: - assert self.supports_partial_encode - assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin) - await self.array_bytes_codec.encode_partial( - store_path, chunk_array, selection, chunk_spec, runtime_configuration - ) - - def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: - for codec in self: - byte_length = codec.compute_encoded_size(byte_length, array_spec) - array_spec = codec.resolve_metadata(array_spec) - return byte_length - - -def blosc_codec( - typesize: int, - cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd", - clevel: int = 5, - shuffle: Literal["noshuffle", "shuffle", "bitshuffle"] = "noshuffle", - blocksize: int = 0, -) -> "BloscCodecMetadata": - from zarr.v3.codecs.blosc import BloscCodecMetadata, BloscCodecConfigurationMetadata - - return BloscCodecMetadata( - configuration=BloscCodecConfigurationMetadata( - cname=cname, - clevel=clevel, - shuffle=shuffle, - blocksize=blocksize, - typesize=typesize, - ) - ) - - -def bytes_codec(endian: Optional[Literal["big", "little"]] = "little") -> "BytesCodecMetadata": - from zarr.v3.codecs.bytes import BytesCodecMetadata, BytesCodecConfigurationMetadata - - return BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian)) - - -def transpose_codec( - order: Union[Tuple[int, ...], Literal["C", "F"]], ndim: Optional[int] = None -) -> "TransposeCodecMetadata": - from zarr.v3.codecs.transpose import TransposeCodecMetadata, TransposeCodecConfigurationMetadata - - if order == "C" or order == "F": - assert ( - isinstance(ndim, int) and ndim > 0 - ), 'When using "C" or "F" the `ndim` argument needs to be provided.' - if order == "C": - order = tuple(range(ndim)) - if order == "F": - order = tuple(ndim - i - 1 for i in range(ndim)) - - return TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order)) - - -def gzip_codec(level: int = 5) -> "GzipCodecMetadata": - from zarr.v3.codecs.gzip import GzipCodecMetadata, GzipCodecConfigurationMetadata - - return GzipCodecMetadata(configuration=GzipCodecConfigurationMetadata(level)) - - -def zstd_codec(level: int = 0, checksum: bool = False) -> "ZstdCodecMetadata": - from zarr.v3.codecs.zstd import ZstdCodecMetadata, ZstdCodecConfigurationMetadata - - return ZstdCodecMetadata(configuration=ZstdCodecConfigurationMetadata(level, checksum)) - - -def crc32c_codec() -> "Crc32cCodecMetadata": - from zarr.v3.codecs.crc32c_ import Crc32cCodecMetadata - - return Crc32cCodecMetadata() - - -def sharding_codec( - chunk_shape: Tuple[int, ...], - codecs: Optional[Iterable[CodecMetadata]] = None, - index_codecs: Optional[Iterable[CodecMetadata]] = None, - index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end, -) -> "ShardingCodecMetadata": - from zarr.v3.codecs.sharding import ShardingCodecMetadata, ShardingCodecConfigurationMetadata - - codecs = tuple(codecs) if codecs is not None else (bytes_codec(),) - index_codecs = ( - tuple(index_codecs) if index_codecs is not None else (bytes_codec(), crc32c_codec()) - ) - return ShardingCodecMetadata( - configuration=ShardingCodecConfigurationMetadata( - chunk_shape, codecs, index_codecs, index_location - ) - ) +from zarr.v3.codecs.blosc import BloscCodec, BloscCname, BloscShuffle # noqa: F401 +from zarr.v3.codecs.bytes import BytesCodec, Endian # noqa: F401 +from zarr.v3.codecs.crc32c_ import Crc32cCodec # noqa: F401 +from zarr.v3.codecs.gzip import GzipCodec # noqa: F401 +from zarr.v3.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation # noqa: F401 +from zarr.v3.codecs.transpose import TransposeCodec # noqa: F401 +from zarr.v3.codecs.zstd import ZstdCodec # noqa: F401 diff --git a/src/zarr/v3/codecs/blosc.py b/src/zarr/v3/codecs/blosc.py index efc862e636..479865241f 100644 --- a/src/zarr/v3/codecs/blosc.py +++ b/src/zarr/v3/codecs/blosc.py @@ -1,85 +1,161 @@ from __future__ import annotations -from functools import lru_cache +from dataclasses import dataclass, replace +from enum import Enum +from functools import cached_property -from typing import ( - TYPE_CHECKING, - Dict, - Literal, - Optional, - Type, -) +from typing import TYPE_CHECKING, Union import numcodecs import numpy as np -from attr import evolve, frozen, field from numcodecs.blosc import Blosc from zarr.v3.abc.codec import BytesBytesCodec from zarr.v3.codecs.registry import register_codec -from zarr.v3.common import BytesLike, to_thread +from zarr.v3.common import parse_enum, parse_named_configuration, to_thread if TYPE_CHECKING: - from zarr.v3.metadata import ArraySpec, CodecMetadata, DataType, RuntimeConfiguration + from typing import Dict, Optional + from typing_extensions import Self + from zarr.v3.common import JSON, ArraySpec, BytesLike + from zarr.v3.config import RuntimeConfiguration -BloscShuffle = Literal["noshuffle", "shuffle", "bitshuffle"] +class BloscShuffle(Enum): + noshuffle = "noshuffle" + shuffle = "shuffle" + bitshuffle = "bitshuffle" + + @classmethod + def from_int(cls, num: int) -> BloscShuffle: + blosc_shuffle_int_to_str = { + 0: "noshuffle", + 1: "shuffle", + 2: "bitshuffle", + } + if num not in blosc_shuffle_int_to_str: + raise ValueError(f"Value must be between 0 and 2. Got {num}.") + return BloscShuffle[blosc_shuffle_int_to_str[num]] + + +class BloscCname(Enum): + lz4 = "lz4" + lz4hc = "lz4hc" + blosclz = "blosclz" + zstd = "zstd" + snappy = "snappy" + zlib = "zlib" + # See https://zarr.readthedocs.io/en/stable/tutorial.html#configuring-blosc numcodecs.blosc.use_threads = False -@frozen -class BloscCodecConfigurationMetadata: - typesize: int - cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd" - clevel: int = 5 - shuffle: BloscShuffle = "noshuffle" - blocksize: int = 0 +def parse_typesize(data: JSON) -> int: + if isinstance(data, int): + if data > 0: + return data + else: + raise ValueError( + f"Value must be greater than 0. Got {data}, which is less or equal to 0." + ) + raise TypeError(f"Value must be an int. Got {type(data)} instead.") -blosc_shuffle_int_to_str: Dict[int, BloscShuffle] = { - 0: "noshuffle", - 1: "shuffle", - 2: "bitshuffle", -} +# todo: real validation +def parse_clevel(data: JSON) -> int: + if isinstance(data, int): + return data + raise TypeError(f"Value should be an int. Got {type(data)} instead.") -@frozen -class BloscCodecMetadata: - configuration: BloscCodecConfigurationMetadata - name: Literal["blosc"] = field(default="blosc", init=False) +def parse_blocksize(data: JSON) -> int: + if isinstance(data, int): + return data + raise TypeError(f"Value should be an int. Got {type(data)} instead.") -@frozen +@dataclass(frozen=True) class BloscCodec(BytesBytesCodec): - configuration: BloscCodecConfigurationMetadata is_fixed_size = False - @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> BloscCodec: - assert isinstance(codec_metadata, BloscCodecMetadata) - return cls(configuration=codec_metadata.configuration) + typesize: int + cname: BloscCname = BloscCname.zstd + clevel: int = 5 + shuffle: BloscShuffle = BloscShuffle.noshuffle + blocksize: int = 0 + + def __init__( + self, + *, + typesize: Optional[int] = None, + cname: Union[BloscCname, str] = BloscCname.zstd, + clevel: int = 5, + shuffle: Union[BloscShuffle, str, None] = None, + blocksize: int = 0, + ) -> None: + typesize_parsed = parse_typesize(typesize) if typesize is not None else None + cname_parsed = parse_enum(cname, BloscCname) + clevel_parsed = parse_clevel(clevel) + shuffle_parsed = parse_enum(shuffle, BloscShuffle) if shuffle is not None else None + blocksize_parsed = parse_blocksize(blocksize) + + object.__setattr__(self, "typesize", typesize_parsed) + object.__setattr__(self, "cname", cname_parsed) + object.__setattr__(self, "clevel", clevel_parsed) + object.__setattr__(self, "shuffle", shuffle_parsed) + object.__setattr__(self, "blocksize", blocksize_parsed) @classmethod - def get_metadata_class(cls) -> Type[BloscCodecMetadata]: - return BloscCodecMetadata + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "blosc") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + if self.typesize is None: + raise ValueError("`typesize` needs to be set for serialization.") + if self.shuffle is None: + raise ValueError("`shuffle` needs to be set for serialization.") + return { + "name": "blosc", + "configuration": { + "typesize": self.typesize, + "cname": self.cname, + "clevel": self.clevel, + "shuffle": self.shuffle, + "blocksize": self.blocksize, + }, + } - def evolve(self, *, data_type: DataType, **_kwargs) -> BloscCodec: + def evolve(self, array_spec: ArraySpec) -> Self: new_codec = self - if new_codec.configuration.typesize == 0: - new_configuration = evolve(new_codec.configuration, typesize=data_type.byte_count) - new_codec = evolve(new_codec, configuration=new_configuration) + if new_codec.typesize is None: + new_codec = replace(new_codec, typesize=array_spec.dtype.itemsize) + if new_codec.shuffle is None: + new_codec = replace( + new_codec, + shuffle=( + BloscShuffle.bitshuffle + if array_spec.dtype.itemsize == 1 + else BloscShuffle.shuffle + ), + ) return new_codec - @lru_cache - def get_blosc_codec(self) -> Blosc: - map_shuffle_str_to_int = {"noshuffle": 0, "shuffle": 1, "bitshuffle": 2} + @cached_property + def _blosc_codec(self) -> Blosc: + if self.shuffle is None: + raise ValueError("`shuffle` needs to be set for decoding and encoding.") + map_shuffle_str_to_int = { + BloscShuffle.noshuffle: 0, + BloscShuffle.shuffle: 1, + BloscShuffle.bitshuffle: 2, + } config_dict = { - "cname": self.configuration.cname, - "clevel": self.configuration.clevel, - "shuffle": map_shuffle_str_to_int[self.configuration.shuffle], - "blocksize": self.configuration.blocksize, + "cname": self.cname.name, + "clevel": self.clevel, + "shuffle": map_shuffle_str_to_int[self.shuffle], + "blocksize": self.blocksize, } return Blosc.from_config(config_dict) @@ -89,7 +165,7 @@ async def decode( _chunk_spec: ArraySpec, _runtime_configuration: RuntimeConfiguration, ) -> BytesLike: - return await to_thread(self.get_blosc_codec().decode, chunk_bytes) + return await to_thread(self._blosc_codec.decode, chunk_bytes) async def encode( self, @@ -98,7 +174,7 @@ async def encode( _runtime_configuration: RuntimeConfiguration, ) -> Optional[BytesLike]: chunk_array = np.frombuffer(chunk_bytes, dtype=chunk_spec.dtype) - return await to_thread(self.get_blosc_codec().encode, chunk_array) + return await to_thread(self._blosc_codec.encode, chunk_array) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/v3/codecs/bytes.py b/src/zarr/v3/codecs/bytes.py index de7c750bc9..f92fe5606d 100644 --- a/src/zarr/v3/codecs/bytes.py +++ b/src/zarr/v3/codecs/bytes.py @@ -1,65 +1,72 @@ from __future__ import annotations +from dataclasses import dataclass, replace +from enum import Enum +import sys -from typing import ( - TYPE_CHECKING, - Literal, - Optional, - Type, -) +from typing import TYPE_CHECKING, Dict, Optional, Union import numpy as np -from attr import frozen, field from zarr.v3.abc.codec import ArrayBytesCodec from zarr.v3.codecs.registry import register_codec -from zarr.v3.common import BytesLike +from zarr.v3.common import parse_enum, parse_named_configuration if TYPE_CHECKING: - from zarr.v3.metadata import CodecMetadata, ArraySpec, ArrayMetadata, RuntimeConfiguration + from zarr.v3.common import JSON, ArraySpec, BytesLike + from zarr.v3.config import RuntimeConfiguration + from typing_extensions import Self -Endian = Literal["big", "little"] +class Endian(Enum): + big = "big" + little = "little" -@frozen -class BytesCodecConfigurationMetadata: - endian: Optional[Endian] = "little" +default_system_endian = Endian(sys.byteorder) -@frozen -class BytesCodecMetadata: - configuration: BytesCodecConfigurationMetadata - name: Literal["bytes"] = field(default="bytes", init=True) - - -@frozen +@dataclass(frozen=True) class BytesCodec(ArrayBytesCodec): - configuration: BytesCodecConfigurationMetadata is_fixed_size = True - @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> BytesCodec: - assert isinstance(codec_metadata, BytesCodecMetadata) - return cls(configuration=codec_metadata.configuration) + endian: Optional[Endian] - @classmethod - def get_metadata_class(cls) -> Type[BytesCodecMetadata]: - return BytesCodecMetadata + def __init__(self, *, endian: Union[Endian, str, None] = default_system_endian) -> None: + endian_parsed = None if endian is None else parse_enum(endian, Endian) - def validate(self, array_metadata: ArrayMetadata) -> None: - assert ( - not array_metadata.data_type.has_endianness or self.configuration.endian is not None - ), "The `endian` configuration needs to be specified for multi-byte data types." + object.__setattr__(self, "endian", endian_parsed) + + @classmethod + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "bytes", require_configuration=False + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + if self.endian is None: + return {"name": "bytes"} + else: + return {"name": "bytes", "configuration": {"endian": self.endian}} + + def evolve(self, array_spec: ArraySpec) -> Self: + if array_spec.dtype.itemsize == 0: + if self.endian is not None: + return replace(self, endian=None) + elif self.endian is None: + raise ValueError( + "The `endian` configuration needs to be specified for multi-byte data types." + ) + return self def _get_byteorder(self, array: np.ndarray) -> Endian: if array.dtype.byteorder == "<": - return "little" + return Endian.little elif array.dtype.byteorder == ">": - return "big" + return Endian.big else: - import sys - - return sys.byteorder + return default_system_endian async def decode( self, @@ -68,13 +75,14 @@ async def decode( _runtime_configuration: RuntimeConfiguration, ) -> np.ndarray: if chunk_spec.dtype.itemsize > 0: - if self.configuration.endian == "little": + if self.endian == Endian.little: prefix = "<" else: prefix = ">" - dtype = np.dtype(f"{prefix}{chunk_spec.data_type.to_numpy_shortname()}") + dtype = np.dtype(f"{prefix}{chunk_spec.dtype.str[1:]}") else: - dtype = np.dtype(f"|{chunk_spec.data_type.to_numpy_shortname()}") + dtype = np.dtype(f"|{chunk_spec.dtype.str[1:]}") + print(dtype) chunk_array = np.frombuffer(chunk_bytes, dtype) # ensure correct chunk shape @@ -92,8 +100,8 @@ async def encode( ) -> Optional[BytesLike]: if chunk_array.dtype.itemsize > 1: byteorder = self._get_byteorder(chunk_array) - if self.configuration.endian != byteorder: - new_dtype = chunk_array.dtype.newbyteorder(self.configuration.endian) + if self.endian is not None and self.endian != byteorder: + new_dtype = chunk_array.dtype.newbyteorder(self.endian.name) chunk_array = chunk_array.astype(new_dtype) return chunk_array.tobytes() diff --git a/src/zarr/v3/codecs/crc32c_.py b/src/zarr/v3/codecs/crc32c_.py index 4f8b9c7b0b..555bdeae3b 100644 --- a/src/zarr/v3/codecs/crc32c_.py +++ b/src/zarr/v3/codecs/crc32c_.py @@ -1,41 +1,34 @@ from __future__ import annotations +from dataclasses import dataclass -from typing import ( - TYPE_CHECKING, - Literal, - Optional, - Type, -) +from typing import TYPE_CHECKING import numpy as np -from attr import frozen, field + from crc32c import crc32c from zarr.v3.abc.codec import BytesBytesCodec from zarr.v3.codecs.registry import register_codec -from zarr.v3.common import BytesLike +from zarr.v3.common import parse_named_configuration if TYPE_CHECKING: - from zarr.v3.metadata import ArraySpec, CodecMetadata, RuntimeConfiguration - + from typing import Dict, Optional + from typing_extensions import Self + from zarr.v3.common import JSON, BytesLike, ArraySpec + from zarr.v3.config import RuntimeConfiguration -@frozen -class Crc32cCodecMetadata: - name: Literal["crc32c"] = field(default="crc32c", init=False) - -@frozen +@dataclass(frozen=True) class Crc32cCodec(BytesBytesCodec): is_fixed_size = True @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> Crc32cCodec: - assert isinstance(codec_metadata, Crc32cCodecMetadata) + def from_dict(cls, data: Dict[str, JSON]) -> Self: + parse_named_configuration(data, "crc32c", require_configuration=False) return cls() - @classmethod - def get_metadata_class(cls) -> Type[Crc32cCodecMetadata]: - return Crc32cCodecMetadata + def to_dict(self) -> Dict[str, JSON]: + return {"name": "crc32c"} async def decode( self, @@ -46,7 +39,13 @@ async def decode( crc32_bytes = chunk_bytes[-4:] inner_bytes = chunk_bytes[:-4] - assert np.uint32(crc32c(inner_bytes)).tobytes() == bytes(crc32_bytes) + computed_checksum = np.uint32(crc32c(inner_bytes)).tobytes() + stored_checksum = bytes(crc32_bytes) + if computed_checksum != stored_checksum: + raise ValueError( + "Stored and computed checksum do not match. " + + f"Stored: {stored_checksum!r}. Computed: {computed_checksum!r}." + ) return inner_bytes async def encode( diff --git a/src/zarr/v3/codecs/gzip.py b/src/zarr/v3/codecs/gzip.py index a3fafc1382..478eee90c1 100644 --- a/src/zarr/v3/codecs/gzip.py +++ b/src/zarr/v3/codecs/gzip.py @@ -1,48 +1,48 @@ from __future__ import annotations +from dataclasses import dataclass -from typing import ( - TYPE_CHECKING, - Literal, - Optional, - Type, -) +from typing import TYPE_CHECKING -from attr import frozen, field from numcodecs.gzip import GZip - from zarr.v3.abc.codec import BytesBytesCodec from zarr.v3.codecs.registry import register_codec -from zarr.v3.common import BytesLike, to_thread +from zarr.v3.common import parse_named_configuration, to_thread if TYPE_CHECKING: - from zarr.v3.metadata import ArraySpec, CodecMetadata, RuntimeConfiguration - - -@frozen -class GzipCodecConfigurationMetadata: - level: int = 5 + from typing import Optional, Dict + from typing_extensions import Self + from zarr.v3.common import JSON, ArraySpec, BytesLike + from zarr.v3.config import RuntimeConfiguration -@frozen -class GzipCodecMetadata: - configuration: GzipCodecConfigurationMetadata - name: Literal["gzip"] = field(default="gzip", init=False) +def parse_gzip_level(data: JSON) -> int: + if not isinstance(data, (int)): + raise TypeError(f"Expected int, got {type(data)}") + if data not in range(0, 10): + raise ValueError( + f"Expected an integer from the inclusive range (0, 9). Got {data} instead." + ) + return data -@frozen +@dataclass(frozen=True) class GzipCodec(BytesBytesCodec): - configuration: GzipCodecConfigurationMetadata - is_fixed_size = True + is_fixed_size = False - @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> GzipCodec: - assert isinstance(codec_metadata, GzipCodecMetadata) + level: int = 5 - return cls(configuration=codec_metadata.configuration) + def __init__(self, *, level: int = 5) -> None: + level_parsed = parse_gzip_level(level) + + object.__setattr__(self, "level", level_parsed) @classmethod - def get_metadata_class(cls) -> Type[GzipCodecMetadata]: - return GzipCodecMetadata + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "gzip") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + return {"name": "gzip", "configuration": {"level": self.level}} async def decode( self, @@ -50,7 +50,7 @@ async def decode( _chunk_spec: ArraySpec, _runtime_configuration: RuntimeConfiguration, ) -> BytesLike: - return await to_thread(GZip(self.configuration.level).decode, chunk_bytes) + return await to_thread(GZip(self.level).decode, chunk_bytes) async def encode( self, @@ -58,7 +58,7 @@ async def encode( _chunk_spec: ArraySpec, _runtime_configuration: RuntimeConfiguration, ) -> Optional[BytesLike]: - return await to_thread(GZip(self.configuration.level).encode, chunk_bytes) + return await to_thread(GZip(self.level).encode, chunk_bytes) def compute_encoded_size( self, diff --git a/src/zarr/v3/codecs/pipeline.py b/src/zarr/v3/codecs/pipeline.py new file mode 100644 index 0000000000..7bb872eb79 --- /dev/null +++ b/src/zarr/v3/codecs/pipeline.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Iterable +import numpy as np +from dataclasses import dataclass +from warnings import warn + +from zarr.v3.abc.codec import ( + ArrayArrayCodec, + ArrayBytesCodec, + ArrayBytesCodecPartialDecodeMixin, + ArrayBytesCodecPartialEncodeMixin, + BytesBytesCodec, + Codec, +) +from zarr.v3.abc.metadata import Metadata +from zarr.v3.codecs.registry import get_codec_class +from zarr.v3.common import parse_named_configuration + +if TYPE_CHECKING: + from typing import Iterator, List, Optional, Tuple, Union + from zarr.v3.store import StorePath + from zarr.v3.metadata import ArrayMetadata + from zarr.v3.config import RuntimeConfiguration + from zarr.v3.common import JSON, ArraySpec, BytesLike, SliceSelection + + +@dataclass(frozen=True) +class CodecPipeline(Metadata): + array_array_codecs: Tuple[ArrayArrayCodec, ...] + array_bytes_codec: ArrayBytesCodec + bytes_bytes_codecs: Tuple[BytesBytesCodec, ...] + + @classmethod + def from_dict(cls, data: Iterable[Union[JSON, Codec]]) -> CodecPipeline: + out: List[Codec] = [] + if not isinstance(data, Iterable): + raise TypeError(f"Expected iterable, got {type(data)}") + + for c in data: + if isinstance(c, Codec): + out.append(c) + else: + name_parsed, _ = parse_named_configuration(c, require_configuration=False) + out.append(get_codec_class(name_parsed).from_dict(c)) # type: ignore[arg-type] + return CodecPipeline.from_list(out) + + def to_dict(self) -> JSON: + return [c.to_dict() for c in self] + + def evolve(self, array_spec: ArraySpec) -> CodecPipeline: + return CodecPipeline.from_list([c.evolve(array_spec) for c in self]) + + @classmethod + def from_list(cls, codecs: List[Codec]) -> CodecPipeline: + from zarr.v3.codecs.sharding import ShardingCodec + + if not any(isinstance(codec, ArrayBytesCodec) for codec in codecs): + raise ValueError("Exactly one array-to-bytes codec is required.") + + prev_codec: Optional[Codec] = None + for codec in codecs: + if prev_codec is not None: + if isinstance(codec, ArrayBytesCodec) and isinstance(prev_codec, ArrayBytesCodec): + raise ValueError( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}' because exactly " + + "1 ArrayBytesCodec is allowed." + ) + if isinstance(codec, ArrayBytesCodec) and isinstance(prev_codec, BytesBytesCodec): + raise ValueError( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + if isinstance(codec, ArrayArrayCodec) and isinstance(prev_codec, ArrayBytesCodec): + raise ValueError( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}'." + ) + if isinstance(codec, ArrayArrayCodec) and isinstance(prev_codec, BytesBytesCodec): + raise ValueError( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + prev_codec = codec + + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: + warn( + "Combining a `sharding_indexed` codec disables partial reads and " + + "writes, which may lead to inefficient performance." + ) + + return CodecPipeline( + array_array_codecs=tuple( + codec for codec in codecs if isinstance(codec, ArrayArrayCodec) + ), + array_bytes_codec=[codec for codec in codecs if isinstance(codec, ArrayBytesCodec)][0], + bytes_bytes_codecs=tuple( + codec for codec in codecs if isinstance(codec, BytesBytesCodec) + ), + ) + + @property + def supports_partial_decode(self) -> bool: + return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( + self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin + ) + + @property + def supports_partial_encode(self) -> bool: + return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( + self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin + ) + + def __iter__(self) -> Iterator[Codec]: + for aa_codec in self.array_array_codecs: + yield aa_codec + + yield self.array_bytes_codec + + for bb_codec in self.bytes_bytes_codecs: + yield bb_codec + + def validate(self, array_metadata: ArrayMetadata) -> None: + for codec in self: + codec.validate(array_metadata) + + def _codecs_with_resolved_metadata( + self, array_spec: ArraySpec + ) -> Tuple[ + List[Tuple[ArrayArrayCodec, ArraySpec]], + Tuple[ArrayBytesCodec, ArraySpec], + List[Tuple[BytesBytesCodec, ArraySpec]], + ]: + aa_codecs_with_spec: List[Tuple[ArrayArrayCodec, ArraySpec]] = [] + for aa_codec in self.array_array_codecs: + aa_codecs_with_spec.append((aa_codec, array_spec)) + array_spec = aa_codec.resolve_metadata(array_spec) + + ab_codec_with_spec = (self.array_bytes_codec, array_spec) + array_spec = self.array_bytes_codec.resolve_metadata(array_spec) + + bb_codecs_with_spec: List[Tuple[BytesBytesCodec, ArraySpec]] = [] + for bb_codec in self.bytes_bytes_codecs: + bb_codecs_with_spec.append((bb_codec, array_spec)) + array_spec = bb_codec.resolve_metadata(array_spec) + + return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + + async def decode( + self, + chunk_bytes: BytesLike, + array_spec: ArraySpec, + runtime_configuration: RuntimeConfiguration, + ) -> np.ndarray: + ( + aa_codecs_with_spec, + ab_codec_with_spec, + bb_codecs_with_spec, + ) = self._codecs_with_resolved_metadata(array_spec) + + for bb_codec, array_spec in bb_codecs_with_spec[::-1]: + chunk_bytes = await bb_codec.decode(chunk_bytes, array_spec, runtime_configuration) + + ab_codec, array_spec = ab_codec_with_spec + chunk_array = await ab_codec.decode(chunk_bytes, array_spec, runtime_configuration) + + for aa_codec, array_spec in aa_codecs_with_spec[::-1]: + chunk_array = await aa_codec.decode(chunk_array, array_spec, runtime_configuration) + + return chunk_array + + async def decode_partial( + self, + store_path: StorePath, + selection: SliceSelection, + chunk_spec: ArraySpec, + runtime_configuration: RuntimeConfiguration, + ) -> Optional[np.ndarray]: + assert self.supports_partial_decode + assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin) + return await self.array_bytes_codec.decode_partial( + store_path, selection, chunk_spec, runtime_configuration + ) + + async def encode( + self, + chunk_array: np.ndarray, + array_spec: ArraySpec, + runtime_configuration: RuntimeConfiguration, + ) -> Optional[BytesLike]: + ( + aa_codecs_with_spec, + ab_codec_with_spec, + bb_codecs_with_spec, + ) = self._codecs_with_resolved_metadata(array_spec) + + for aa_codec, array_spec in aa_codecs_with_spec: + chunk_array_maybe = await aa_codec.encode( + chunk_array, array_spec, runtime_configuration + ) + if chunk_array_maybe is None: + return None + chunk_array = chunk_array_maybe + + ab_codec, array_spec = ab_codec_with_spec + chunk_bytes_maybe = await ab_codec.encode(chunk_array, array_spec, runtime_configuration) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + for bb_codec, array_spec in bb_codecs_with_spec: + chunk_bytes_maybe = await bb_codec.encode( + chunk_bytes, array_spec, runtime_configuration + ) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + return chunk_bytes + + async def encode_partial( + self, + store_path: StorePath, + chunk_array: np.ndarray, + selection: SliceSelection, + chunk_spec: ArraySpec, + runtime_configuration: RuntimeConfiguration, + ) -> None: + assert self.supports_partial_encode + assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin) + await self.array_bytes_codec.encode_partial( + store_path, chunk_array, selection, chunk_spec, runtime_configuration + ) + + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + for codec in self: + byte_length = codec.compute_encoded_size(byte_length, array_spec) + array_spec = codec.resolve_metadata(array_spec) + return byte_length diff --git a/src/zarr/v3/codecs/registry.py b/src/zarr/v3/codecs/registry.py index bdd9a5765d..4cf2736685 100644 --- a/src/zarr/v3/codecs/registry.py +++ b/src/zarr/v3/codecs/registry.py @@ -1,18 +1,14 @@ from __future__ import annotations +from typing import TYPE_CHECKING -from typing import Dict, NamedTuple, Type -from importlib.metadata import EntryPoint, entry_points as get_entry_points - -from zarr.v3.abc.codec import Codec -from zarr.v3.metadata import CodecMetadata - +if TYPE_CHECKING: + from typing import Dict, Type + from zarr.v3.abc.codec import Codec -class CodecRegistryItem(NamedTuple): - codec_cls: Type[Codec] - codec_metadata_cls: Type[CodecMetadata] +from importlib.metadata import EntryPoint, entry_points as get_entry_points -__codec_registry: Dict[str, CodecRegistryItem] = {} +__codec_registry: Dict[str, Type[Codec]] = {} __lazy_load_codecs: Dict[str, EntryPoint] = {} @@ -29,10 +25,10 @@ def _collect_entrypoints() -> None: def register_codec(key: str, codec_cls: Type[Codec]) -> None: - __codec_registry[key] = CodecRegistryItem(codec_cls, codec_cls.get_metadata_class()) + __codec_registry[key] = codec_cls -def _get_codec_item(key: str) -> CodecRegistryItem: +def get_codec_class(key: str) -> Type[Codec]: item = __codec_registry.get(key) if item is None: if key in __lazy_load_codecs: @@ -45,17 +41,4 @@ def _get_codec_item(key: str) -> CodecRegistryItem: raise KeyError(key) -def get_codec_from_metadata(val: CodecMetadata) -> Codec: - key = val.name - return _get_codec_item(key).codec_cls.from_metadata(val) - - -def get_codec_metadata_class(key: str) -> Type[CodecMetadata]: - return _get_codec_item(key).codec_metadata_cls - - -def get_codec_class(key: str) -> Type[Codec]: - return _get_codec_item(key).codec_cls - - _collect_entrypoints() diff --git a/src/zarr/v3/codecs/sharding.py b/src/zarr/v3/codecs/sharding.py index 26020f160f..0385154c0f 100644 --- a/src/zarr/v3/codecs/sharding.py +++ b/src/zarr/v3/codecs/sharding.py @@ -1,37 +1,31 @@ from __future__ import annotations -from functools import cached_property, lru_cache - -from typing import ( - Awaitable, - Callable, - Iterator, - List, - Literal, - Mapping, - NamedTuple, - Optional, - Set, - Tuple, - Type, -) -from attr import field, frozen +from enum import Enum +from typing import TYPE_CHECKING, Iterable, Mapping, NamedTuple, Union +from dataclasses import dataclass, replace +from functools import lru_cache + import numpy as np from zarr.v3.abc.codec import ( + Codec, ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin, ) - -from zarr.v3.codecs import CodecPipeline -from zarr.v3.codecs.registry import get_codec_from_metadata, register_codec +from zarr.v3.codecs.bytes import BytesCodec +from zarr.v3.codecs.crc32c_ import Crc32cCodec +from zarr.v3.codecs.pipeline import CodecPipeline +from zarr.v3.codecs.registry import register_codec from zarr.v3.common import ( - BytesLike, - ChunkCoords, - SliceSelection, + ArraySpec, + ChunkCoordsLike, concurrent_map, + parse_enum, + parse_named_configuration, + parse_shapelike, product, ) +from zarr.v3.chunk_grids import RegularChunkGrid from zarr.v3.indexing import ( BasicIndexer, c_order_iter, @@ -40,31 +34,33 @@ ) from zarr.v3.metadata import ( ArrayMetadata, - ArraySpec, - DataType, - CodecMetadata, - RegularChunkGridMetadata, - ShardingCodecIndexLocation, - RuntimeConfiguration, runtime_configuration as make_runtime_configuration, + parse_codecs, ) -from zarr.v3.store import StorePath + +if TYPE_CHECKING: + from typing import Awaitable, Callable, Dict, Iterator, List, Optional, Set, Tuple + from typing_extensions import Self + + from zarr.v3.store import StorePath + from zarr.v3.common import ( + JSON, + ChunkCoords, + BytesLike, + SliceSelection, + ) + from zarr.v3.config import RuntimeConfiguration MAX_UINT_64 = 2**64 - 1 -@frozen -class ShardingCodecConfigurationMetadata: - chunk_shape: ChunkCoords - codecs: Tuple[CodecMetadata, ...] - index_codecs: Tuple[CodecMetadata, ...] - index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end +class ShardingCodecIndexLocation(Enum): + start = "start" + end = "end" -@frozen -class ShardingCodecMetadata: - configuration: ShardingCodecConfigurationMetadata - name: Literal["sharding_indexed"] = field(default="sharding_indexed", init=False) +def parse_index_location(data: JSON) -> ShardingCodecIndexLocation: + return parse_enum(data, ShardingCodecIndexLocation) class _ShardIndex(NamedTuple): @@ -141,7 +137,7 @@ async def from_bytes( shard_index_size = codec._shard_index_size(chunks_per_shard) obj = cls() obj.buf = memoryview(buf) - if codec.configuration.index_location == ShardingCodecIndexLocation.start: + if codec.index_location == ShardingCodecIndexLocation.start: shard_index_bytes = obj.buf[:shard_index_size] else: shard_index_bytes = obj.buf[-shard_index_size:] @@ -222,42 +218,85 @@ async def finalize( return out_buf -@frozen +@dataclass(frozen=True) class ShardingCodec( ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin ): - configuration: ShardingCodecConfigurationMetadata + chunk_shape: ChunkCoords + codecs: CodecPipeline + index_codecs: CodecPipeline + index_location: ShardingCodecIndexLocation = ShardingCodecIndexLocation.end - @classmethod - def from_metadata( - cls, - codec_metadata: CodecMetadata, - ) -> ShardingCodec: - assert isinstance(codec_metadata, ShardingCodecMetadata) - return cls(configuration=codec_metadata.configuration) + def __init__( + self, + *, + chunk_shape: ChunkCoordsLike, + codecs: Optional[Iterable[Union[Codec, JSON]]] = None, + index_codecs: Optional[Iterable[Union[Codec, JSON]]] = None, + index_location: Optional[ShardingCodecIndexLocation] = ShardingCodecIndexLocation.end, + ) -> None: + chunk_shape_parsed = parse_shapelike(chunk_shape) + codecs_parsed = ( + parse_codecs(codecs) if codecs is not None else CodecPipeline.from_list([BytesCodec()]) + ) + index_codecs_parsed = ( + parse_codecs(index_codecs) + if index_codecs is not None + else CodecPipeline.from_list([BytesCodec(), Crc32cCodec()]) + ) + index_location_parsed = ( + parse_index_location(index_location) + if index_location is not None + else ShardingCodecIndexLocation.end + ) + + object.__setattr__(self, "chunk_shape", chunk_shape_parsed) + object.__setattr__(self, "codecs", codecs_parsed) + object.__setattr__(self, "index_codecs", index_codecs_parsed) + object.__setattr__(self, "index_location", index_location_parsed) @classmethod - def get_metadata_class(cls) -> Type[ShardingCodecMetadata]: - return ShardingCodecMetadata + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "sharding_indexed") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + return { + "name": "sharding_indexed", + "configuration": { + "chunk_shape": list(self.chunk_shape), + "codecs": self.codecs.to_dict(), + "index_codecs": self.index_codecs.to_dict(), + "index_location": self.index_location, + }, + } + + def evolve(self, array_spec: ArraySpec) -> Self: + shard_spec = self._get_chunk_spec(array_spec) + evolved_codecs = self.codecs.evolve(shard_spec) + if evolved_codecs != self.codecs: + return replace(self, codecs=evolved_codecs) + return self def validate(self, array_metadata: ArrayMetadata) -> None: - assert len(self.configuration.chunk_shape) == array_metadata.ndim, ( - "The shard's `chunk_shape` and array's `shape` need to have the " - + "same number of dimensions." - ) - assert isinstance( - array_metadata.chunk_grid, RegularChunkGridMetadata - ), "Sharding is only compatible with regular chunk grids." - assert all( + if len(self.chunk_shape) != array_metadata.ndim: + raise ValueError( + "The shard's `chunk_shape` and array's `shape` need to have the " + + "same number of dimensions." + ) + if not isinstance(array_metadata.chunk_grid, RegularChunkGrid): + raise ValueError("Sharding is only compatible with regular chunk grids.") + if not all( s % c == 0 for s, c in zip( - array_metadata.chunk_grid.configuration.chunk_shape, - self.configuration.chunk_shape, + array_metadata.chunk_grid.chunk_shape, + self.chunk_shape, + ) + ): + raise ValueError( + "The array's `chunk_shape` needs to be divisible by the " + + "shard's inner `chunk_shape`." ) - ), ( - "The array's `chunk_shape` needs to be divisible by the " - + "shard's inner `chunk_shape`." - ) async def decode( self, @@ -267,7 +306,7 @@ async def decode( ) -> np.ndarray: # print("decode") shard_shape = shard_spec.shape - chunk_shape = self.configuration.chunk_shape + chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) indexer = BasicIndexer( @@ -316,7 +355,7 @@ async def decode_partial( runtime_configuration: RuntimeConfiguration, ) -> Optional[np.ndarray]: shard_shape = shard_spec.shape - chunk_shape = self.configuration.chunk_shape + chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) indexer = BasicIndexer( @@ -389,9 +428,7 @@ async def _read_chunk( chunk_spec = self._get_chunk_spec(shard_spec) chunk_bytes = shard_dict.get(chunk_coords, None) if chunk_bytes is not None: - chunk_array = await self._codec_pipeline.decode( - chunk_bytes, chunk_spec, runtime_configuration - ) + chunk_array = await self.codecs.decode(chunk_bytes, chunk_spec, runtime_configuration) tmp = chunk_array[chunk_selection] out[out_selection] = tmp else: @@ -404,7 +441,7 @@ async def encode( runtime_configuration: RuntimeConfiguration, ) -> Optional[BytesLike]: shard_shape = shard_spec.shape - chunk_shape = self.configuration.chunk_shape + chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) indexer = list( @@ -435,9 +472,7 @@ async def _write_chunk( chunk_spec = self._get_chunk_spec(shard_spec) return ( chunk_coords, - await self._codec_pipeline.encode( - chunk_array, chunk_spec, runtime_configuration - ), + await self.codecs.encode(chunk_array, chunk_spec, runtime_configuration), ) return (chunk_coords, None) @@ -458,9 +493,7 @@ async def _write_chunk( if chunk_bytes is not None: shard_builder.append(chunk_coords, chunk_bytes) - return await shard_builder.finalize( - self.configuration.index_location, self._encode_shard_index - ) + return await shard_builder.finalize(self.index_location, self._encode_shard_index) async def encode_partial( self, @@ -472,7 +505,7 @@ async def encode_partial( ) -> None: # print("encode_partial") shard_shape = shard_spec.shape - chunk_shape = self.configuration.chunk_shape + chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) chunk_spec = self._get_chunk_spec(shard_spec) @@ -496,7 +529,7 @@ async def _write_chunk( out_selection: SliceSelection, ) -> Tuple[ChunkCoords, Optional[BytesLike]]: chunk_array = None - if is_total_slice(chunk_selection, self.configuration.chunk_shape): + if is_total_slice(chunk_selection, self.chunk_shape): chunk_array = shard_array[out_selection] else: # handling writing partial chunks @@ -506,24 +539,20 @@ async def _write_chunk( # merge new value if chunk_bytes is None: chunk_array = np.empty( - self.configuration.chunk_shape, + self.chunk_shape, dtype=shard_spec.dtype, ) chunk_array.fill(shard_spec.fill_value) else: chunk_array = ( - await self._codec_pipeline.decode( - chunk_bytes, chunk_spec, runtime_configuration - ) + await self.codecs.decode(chunk_bytes, chunk_spec, runtime_configuration) ).copy() # make a writable copy chunk_array[chunk_selection] = shard_array[out_selection] if not np.array_equiv(chunk_array, shard_spec.fill_value): return ( chunk_coords, - await self._codec_pipeline.encode( - chunk_array, chunk_spec, runtime_configuration - ), + await self.codecs.encode(chunk_array, chunk_spec, runtime_configuration), ) else: return (chunk_coords, None) @@ -559,7 +588,7 @@ async def _write_chunk( else: await store_path.set( await shard_builder.finalize( - self.configuration.index_location, + self.index_location, self._encode_shard_index, ) ) @@ -575,7 +604,7 @@ async def _decode_shard_index( self, index_bytes: BytesLike, chunks_per_shard: ChunkCoords ) -> _ShardIndex: return _ShardIndex( - await self._index_codec_pipeline.decode( + await self.index_codecs.decode( index_bytes, self._get_index_chunk_spec(chunks_per_shard), make_runtime_configuration("C"), @@ -583,7 +612,7 @@ async def _decode_shard_index( ) async def _encode_shard_index(self, index: _ShardIndex) -> BytesLike: - index_bytes = await self._index_codec_pipeline.encode( + index_bytes = await self.index_codecs.encode( index.offsets_and_lengths, self._get_index_chunk_spec(index.chunks_per_shard), make_runtime_configuration("C"), @@ -592,7 +621,7 @@ async def _encode_shard_index(self, index: _ShardIndex) -> BytesLike: return index_bytes def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int: - return self._index_codec_pipeline.compute_encoded_size( + return self.index_codecs.compute_encoded_size( 16 * product(chunks_per_shard), self._get_index_chunk_spec(chunks_per_shard) ) @@ -600,15 +629,15 @@ def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int: def _get_index_chunk_spec(self, chunks_per_shard: ChunkCoords) -> ArraySpec: return ArraySpec( shape=chunks_per_shard + (2,), - data_type=DataType.uint64, + dtype=np.dtype(" ArraySpec: return ArraySpec( - shape=self.configuration.chunk_shape, - data_type=shard_spec.data_type, + shape=self.chunk_shape, + dtype=shard_spec.dtype, fill_value=shard_spec.fill_value, ) @@ -618,25 +647,15 @@ def _get_chunks_per_shard(self, shard_spec: ArraySpec) -> ChunkCoords: s // c for s, c in zip( shard_spec.shape, - self.configuration.chunk_shape, + self.chunk_shape, ) ) - @cached_property - def _index_codec_pipeline(self) -> CodecPipeline: - return CodecPipeline.create( - [get_codec_from_metadata(c) for c in self.configuration.index_codecs] - ) - - @cached_property - def _codec_pipeline(self) -> CodecPipeline: - return CodecPipeline.create([get_codec_from_metadata(c) for c in self.configuration.codecs]) - async def _load_shard_index_maybe( self, store_path: StorePath, chunks_per_shard: ChunkCoords ) -> Optional[_ShardIndex]: shard_index_size = self._shard_index_size(chunks_per_shard) - if self.configuration.index_location == ShardingCodecIndexLocation.start: + if self.index_location == ShardingCodecIndexLocation.start: index_bytes = await store_path.get((0, shard_index_size)) else: index_bytes = await store_path.get((-shard_index_size, None)) diff --git a/src/zarr/v3/codecs/transpose.py b/src/zarr/v3/codecs/transpose.py index de6eb0a480..f214d1e7f1 100644 --- a/src/zarr/v3/codecs/transpose.py +++ b/src/zarr/v3/codecs/transpose.py @@ -1,80 +1,75 @@ from __future__ import annotations +from typing import TYPE_CHECKING, Dict, Iterable -from typing import ( - TYPE_CHECKING, - Literal, - Optional, - Tuple, - Type, -) +from dataclasses import dataclass, replace + +from zarr.v3.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration + +if TYPE_CHECKING: + from zarr.v3.config import RuntimeConfiguration + from typing import TYPE_CHECKING, Optional, Tuple + from typing_extensions import Self import numpy as np -from attr import evolve, frozen, field from zarr.v3.abc.codec import ArrayArrayCodec from zarr.v3.codecs.registry import register_codec -if TYPE_CHECKING: - from zarr.v3.metadata import ArraySpec, CodecMetadata, RuntimeConfiguration +def parse_transpose_order(data: JSON) -> Tuple[int]: + if not isinstance(data, Iterable): + raise TypeError(f"Expected an iterable. Got {data} instead.") + if not all(isinstance(a, int) for a in data): + raise TypeError(f"Expected an iterable of integers. Got {data} instead.") + return tuple(data) # type: ignore[return-value] -@frozen -class TransposeCodecConfigurationMetadata: - order: Tuple[int, ...] +@dataclass(frozen=True) +class TransposeCodec(ArrayArrayCodec): + is_fixed_size = True -@frozen -class TransposeCodecMetadata: - configuration: TransposeCodecConfigurationMetadata - name: Literal["transpose"] = field(default="transpose", init=False) + order: Tuple[int, ...] + def __init__(self, *, order: ChunkCoordsLike) -> None: + order_parsed = parse_transpose_order(order) # type: ignore[arg-type] -@frozen -class TransposeCodec(ArrayArrayCodec): - order: Tuple[int, ...] - is_fixed_size = True + object.__setattr__(self, "order", order_parsed) @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> TransposeCodec: - assert isinstance(codec_metadata, TransposeCodecMetadata) - return cls(order=codec_metadata.configuration.order) + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "transpose") + return cls(**configuration_parsed) # type: ignore[arg-type] - def evolve(self, *, ndim: int, **_kwargs) -> TransposeCodec: - # Compatibility with older version of ZEP1 - if self.order == "F": # type: ignore - order = tuple(ndim - x - 1 for x in range(ndim)) + def to_dict(self) -> Dict[str, JSON]: + return {"name": "transpose", "configuration": {"order": list(self.order)}} - elif self.order == "C": # type: ignore - order = tuple(range(ndim)) - - else: - assert len(self.order) == ndim, ( + def evolve(self, array_spec: ArraySpec) -> Self: + if len(self.order) != array_spec.ndim: + raise ValueError( "The `order` tuple needs have as many entries as " - + f"there are dimensions in the array. Got: {self.order}" + + f"there are dimensions in the array. Got {self.order}." ) - assert len(self.order) == len(set(self.order)), ( - "There must not be duplicates in the `order` tuple. " + f"Got: {self.order}" + if len(self.order) != len(set(self.order)): + raise ValueError( + f"There must not be duplicates in the `order` tuple. Got {self.order}." ) - assert all(0 <= x < ndim for x in self.order), ( + if not all(0 <= x < array_spec.ndim for x in self.order): + raise ValueError( "All entries in the `order` tuple must be between 0 and " - + f"the number of dimensions in the array. Got: {self.order}" + + f"the number of dimensions in the array. Got {self.order}." ) - order = tuple(self.order) + order = tuple(self.order) if order != self.order: - return evolve(self, order=order) + return replace(self, order=order) return self - @classmethod - def get_metadata_class(cls) -> Type[TransposeCodecMetadata]: - return TransposeCodecMetadata - def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: - from zarr.v3.metadata import ArraySpec + from zarr.v3.common import ArraySpec return ArraySpec( shape=tuple(chunk_spec.shape[self.order[i]] for i in range(chunk_spec.ndim)), - data_type=chunk_spec.data_type, + dtype=chunk_spec.dtype, fill_value=chunk_spec.fill_value, ) diff --git a/src/zarr/v3/codecs/zstd.py b/src/zarr/v3/codecs/zstd.py index 59ce1cf088..774bb8bdbb 100644 --- a/src/zarr/v3/codecs/zstd.py +++ b/src/zarr/v3/codecs/zstd.py @@ -1,53 +1,59 @@ from __future__ import annotations +from typing import TYPE_CHECKING +from dataclasses import dataclass -from typing import ( - TYPE_CHECKING, - Literal, - Optional, - Type, -) -from attr import frozen, field from zstandard import ZstdCompressor, ZstdDecompressor from zarr.v3.abc.codec import BytesBytesCodec from zarr.v3.codecs.registry import register_codec -from zarr.v3.common import BytesLike, to_thread +from zarr.v3.common import parse_named_configuration, to_thread if TYPE_CHECKING: - from zarr.v3.metadata import ArraySpec, CodecMetadata, RuntimeConfiguration + from typing import Dict, Optional + from typing_extensions import Self + from zarr.v3.config import RuntimeConfiguration + from zarr.v3.common import BytesLike, JSON, ArraySpec -@frozen -class ZstdCodecConfigurationMetadata: - level: int = 0 - checksum: bool = False +def parse_zstd_level(data: JSON) -> int: + if isinstance(data, int): + if data >= 23: + raise ValueError(f"Value must be less than or equal to 22. Got {data} instead.") + return data + raise TypeError(f"Got value with type {type(data)}, but expected an int.") -@frozen -class ZstdCodecMetadata: - configuration: ZstdCodecConfigurationMetadata - name: Literal["zstd"] = field(default="zstd", init=False) +def parse_checksum(data: JSON) -> bool: + if isinstance(data, bool): + return data + raise TypeError(f"Expected bool. Got {type(data)}.") -@frozen +@dataclass(frozen=True) class ZstdCodec(BytesBytesCodec): - configuration: ZstdCodecConfigurationMetadata is_fixed_size = True - @classmethod - def from_metadata(cls, codec_metadata: CodecMetadata) -> ZstdCodec: - assert isinstance(codec_metadata, ZstdCodecMetadata) - return cls(configuration=codec_metadata.configuration) + level: int = 0 + checksum: bool = False + + def __init__(self, *, level: int = 0, checksum: bool = False) -> None: + level_parsed = parse_zstd_level(level) + checksum_parsed = parse_checksum(checksum) + + object.__setattr__(self, "level", level_parsed) + object.__setattr__(self, "checksum", checksum_parsed) @classmethod - def get_metadata_class(cls) -> Type[ZstdCodecMetadata]: - return ZstdCodecMetadata + def from_dict(cls, data: Dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration(data, "zstd") + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> Dict[str, JSON]: + return {"name": "zstd", "configuration": {"level": self.level, "checksum": self.checksum}} def _compress(self, data: bytes) -> bytes: - ctx = ZstdCompressor( - level=self.configuration.level, write_checksum=self.configuration.checksum - ) + ctx = ZstdCompressor(level=self.level, write_checksum=self.checksum) return ctx.compress(data) def _decompress(self, data: bytes) -> bytes: diff --git a/src/zarr/v3/common.py b/src/zarr/v3/common.py index e91356c4e2..1caf83a764 100644 --- a/src/zarr/v3/common.py +++ b/src/zarr/v3/common.py @@ -1,23 +1,15 @@ from __future__ import annotations - +from typing import TYPE_CHECKING, Union, Tuple, Iterable, Dict, List, TypeVar, overload import asyncio import contextvars +from dataclasses import dataclass +from enum import Enum import functools -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Literal, - Optional, - Tuple, - TypeVar, - Union, -) + +if TYPE_CHECKING: + from typing import Any, Awaitable, Callable, Iterator, Optional, Type import numpy as np -from cattr import Converter ZARR_JSON = "zarr.json" ZARRAY_JSON = ".zarray" @@ -26,83 +18,10 @@ BytesLike = Union[bytes, bytearray, memoryview] ChunkCoords = Tuple[int, ...] +ChunkCoordsLike = Iterable[int] SliceSelection = Tuple[slice, ...] Selection = Union[slice, SliceSelection] - - -def make_cattr(): - from zarr.v3.metadata import ( - ChunkKeyEncodingMetadata, - CodecMetadata, - DefaultChunkKeyEncodingMetadata, - V2ChunkKeyEncodingMetadata, - ) - from zarr.v3.codecs.registry import get_codec_metadata_class - - converter = Converter() - - def _structure_chunk_key_encoding_metadata(d: Dict[str, Any], _t) -> ChunkKeyEncodingMetadata: - if d["name"] == "default": - return converter.structure(d, DefaultChunkKeyEncodingMetadata) - if d["name"] == "v2": - return converter.structure(d, V2ChunkKeyEncodingMetadata) - raise KeyError - - converter.register_structure_hook( - ChunkKeyEncodingMetadata, _structure_chunk_key_encoding_metadata - ) - - def _structure_codec_metadata(d: Dict[str, Any], _t=None) -> CodecMetadata: - codec_metadata_cls = get_codec_metadata_class(d["name"]) - return converter.structure(d, codec_metadata_cls) - - converter.register_structure_hook(CodecMetadata, _structure_codec_metadata) - - converter.register_structure_hook_factory( - lambda t: str(t) == "ForwardRef('CodecMetadata')", - lambda t: _structure_codec_metadata, - ) - - def _structure_order(d: Any, _t=None) -> Union[Literal["C", "F"], Tuple[int, ...]]: - if d == "C": - return "C" - if d == "F": - return "F" - if isinstance(d, list): - return tuple(d) - raise KeyError - - converter.register_structure_hook_factory( - lambda t: str(t) == "typing.Union[typing.Literal['C', 'F'], typing.Tuple[int, ...]]", - lambda t: _structure_order, - ) - - # Needed for v2 fill_value - def _structure_fill_value(d: Any, _t=None) -> Union[None, int, float]: - if d is None: - return None - try: - return int(d) - except ValueError: - pass - try: - return float(d) - except ValueError: - pass - raise ValueError - - converter.register_structure_hook_factory( - lambda t: str(t) == "typing.Union[NoneType, int, float]", - lambda t: _structure_fill_value, - ) - - # Needed for v2 dtype - converter.register_structure_hook( - np.dtype, - lambda d, _: np.dtype(d), - ) - - return converter +JSON = Union[str, None, int, float, Enum, Dict[str, "JSON"], List["JSON"], Tuple["JSON", ...]] def product(tup: ChunkCoords) -> int: @@ -134,3 +53,111 @@ async def to_thread(func, /, *args, **kwargs): ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) return await loop.run_in_executor(None, func_call) + + +E = TypeVar("E", bound=Enum) + + +def enum_names(enum: Type[E]) -> Iterator[str]: + for item in enum: + yield item.name + + +def parse_enum(data: JSON, cls: Type[E]) -> E: + if isinstance(data, cls): + return data + if not isinstance(data, str): + raise TypeError(f"Expected str, got {type(data)}") + if data in enum_names(cls): + return cls(data) + raise ValueError(f"Value must be one of {repr(list(enum_names(cls)))}. Got {data} instead.") + + +@dataclass(frozen=True) +class ArraySpec: + shape: ChunkCoords + dtype: np.dtype + fill_value: Any + + def __init__(self, shape, dtype, fill_value): + shape_parsed = parse_shapelike(shape) + dtype_parsed = parse_dtype(dtype) + fill_value_parsed = parse_fill_value(fill_value) + + object.__setattr__(self, "shape", shape_parsed) + object.__setattr__(self, "dtype", dtype_parsed) + object.__setattr__(self, "fill_value", fill_value_parsed) + + @property + def ndim(self) -> int: + return len(self.shape) + + +def parse_name(data: JSON, expected: Optional[str] = None) -> str: + if isinstance(data, str): + if expected is None or data == expected: + return data + raise ValueError(f"Expected '{expected}'. Got {data} instead.") + else: + raise TypeError(f"Expected a string, got an instance of {type(data)}.") + + +def parse_configuration(data: JSON) -> dict: + if not isinstance(data, dict): + raise TypeError(f"Expected dict, got {type(data)}") + return data + + +@overload +def parse_named_configuration( + data: JSON, expected_name: Optional[str] = None +) -> Tuple[str, Dict[str, JSON]]: + ... + + +@overload +def parse_named_configuration( + data: JSON, expected_name: Optional[str] = None, *, require_configuration: bool = True +) -> Tuple[str, Optional[Dict[str, JSON]]]: + ... + + +def parse_named_configuration( + data: JSON, expected_name: Optional[str] = None, *, require_configuration: bool = True +) -> Tuple[str, Optional[JSON]]: + if not isinstance(data, dict): + raise TypeError(f"Expected dict, got {type(data)}") + if "name" not in data: + raise ValueError(f"Named configuration does not have a 'name' key. Got {data}.") + name_parsed = parse_name(data["name"], expected_name) + if "configuration" in data: + configuration_parsed = parse_configuration(data["configuration"]) + elif require_configuration: + raise ValueError(f"Named configuration does not have a 'configuration' key. Got {data}.") + else: + configuration_parsed = None + return name_parsed, configuration_parsed + + +def parse_shapelike(data: Any) -> Tuple[int, ...]: + if not isinstance(data, Iterable): + raise TypeError(f"Expected an iterable. Got {data} instead.") + data_tuple = tuple(data) + if len(data_tuple) == 0: + raise ValueError("Expected at least one element. Got 0.") + if not all(isinstance(v, int) for v in data_tuple): + msg = f"Expected an iterable of integers. Got {type(data)} instead." + raise TypeError(msg) + if not all(lambda v: v > 0 for v in data_tuple): + raise ValueError(f"All values must be greater than 0. Got {data}.") + return data_tuple + + +def parse_dtype(data: Any) -> np.dtype: + # todo: real validation + return np.dtype(data) + + +def parse_fill_value(data: Any) -> Any: + # todo: real validation + return data diff --git a/src/zarr/v3/config.py b/src/zarr/v3/config.py index 28df25899a..98a25994c4 100644 --- a/src/zarr/v3/config.py +++ b/src/zarr/v3/config.py @@ -1,19 +1,53 @@ from __future__ import annotations from asyncio import AbstractEventLoop -from typing import Literal, Optional -from attr import frozen +from dataclasses import dataclass +from typing import Any, Literal, Optional -@frozen +@dataclass(frozen=True) class SyncConfiguration: concurrency: Optional[int] = None asyncio_loop: Optional[AbstractEventLoop] = None -@frozen +def parse_indexing_order(data: Any) -> Literal["C", "F"]: + if data in ("C", "F"): + return data + msg = f"Expected one of ('C', 'F'), got {data} instead." + raise ValueError(msg) + + +# todo: handle negative values? +def parse_concurrency(data: Any) -> int | None: + if data is None or isinstance(data, int): + return data + raise TypeError(f"Expected int or None, got {type(data)}") + + +def parse_asyncio_loop(data: Any) -> AbstractEventLoop | None: + if data is None or isinstance(data, AbstractEventLoop): + return data + raise TypeError(f"Expected AbstractEventLoop or None, got {type(data)}") + + +@dataclass(frozen=True) class RuntimeConfiguration: order: Literal["C", "F"] = "C" - # TODO: remove these in favor of the SyncConfiguration object concurrency: Optional[int] = None asyncio_loop: Optional[AbstractEventLoop] = None + + def __init__( + self, + order: Literal["C", "F"] = "C", + concurrency: Optional[int] = None, + asyncio_loop: Optional[AbstractEventLoop] = None, + ): + + order_parsed = parse_indexing_order(order) + concurrency_parsed = parse_concurrency(concurrency) + asyncio_loop_parsed = parse_asyncio_loop(asyncio_loop) + + object.__setattr__(self, "order", order_parsed) + object.__setattr__(self, "concurrency", concurrency_parsed) + object.__setattr__(self, "asyncio_loop_parsed", asyncio_loop_parsed) diff --git a/src/zarr/v3/group.py b/src/zarr/v3/group.py index a5d0e68165..acd5ca0d62 100644 --- a/src/zarr/v3/group.py +++ b/src/zarr/v3/group.py @@ -1,15 +1,15 @@ from __future__ import annotations +from dataclasses import asdict, dataclass, field, replace import asyncio import json import logging from typing import Any, Dict, Literal, Optional, Union, AsyncIterator, Iterator, List - -from attr import asdict, field, frozen # , validators +from zarr.v3.abc.metadata import Metadata from zarr.v3.array import AsyncArray, Array from zarr.v3.attributes import Attributes -from zarr.v3.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr +from zarr.v3.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON from zarr.v3.config import RuntimeConfiguration, SyncConfiguration from zarr.v3.store import StoreLike, StorePath, make_store_path from zarr.v3.sync import SyncMixin, sync @@ -17,29 +17,56 @@ logger = logging.getLogger("zarr.group") -@frozen -class GroupMetadata: - attributes: Dict[str, Any] = field(factory=dict) +def parse_zarr_format(data: Any) -> Literal[2, 3]: + if data in (2, 3): + return data + msg = msg = f"Invalid zarr_format. Expected one 2 or 3. Got {data}." + raise ValueError(msg) + + +# todo: convert None to empty dict +def parse_attributes(data: Any) -> Dict[str, Any]: + if data is None: + return {} + elif isinstance(data, dict) and all(map(lambda v: isinstance(v, str), data.keys())): + return data + msg = f"Expected dict with string keys. Got {type(data)} instead." + raise TypeError(msg) + + +@dataclass(frozen=True) +class GroupMetadata(Metadata): + attributes: Dict[str, Any] = field(default_factory=dict) zarr_format: Literal[2, 3] = 3 - node_type: Literal["group"] = field(default="group", init=True) + node_type: Literal["group"] = field(default="group", init=False) + # todo: rename this, since it doesn't return bytes def to_bytes(self) -> Dict[str, bytes]: if self.zarr_format == 3: - return {ZARR_JSON: json.dumps(asdict(self)).encode()} - elif self.zarr_format == 2: + return {ZARR_JSON: json.dumps(self.to_dict()).encode()} + else: return { ZGROUP_JSON: self.zarr_format, ZATTRS_JSON: json.dumps(self.attributes).encode(), } - else: - raise ValueError(f"unexpected zarr_format: {self.zarr_format}") + + def __init__(self, attributes: Dict[str, Any] = None, zarr_format: Literal[2, 3] = 3): + attributes_parsed = parse_attributes(attributes) + zarr_format_parsed = parse_zarr_format(zarr_format) + + object.__setattr__(self, "attributes", attributes_parsed) + object.__setattr__(self, "zarr_format", zarr_format_parsed) @classmethod - def from_json(cls, zarr_json: Any) -> GroupMetadata: - return make_cattr().structure(zarr_json, GroupMetadata) + def from_dict(cls, data: Dict[str, Any]) -> GroupMetadata: + assert data.pop("node_type", None) in ("group", None) + return cls(**data) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) -@frozen +@dataclass(frozen=True) class AsyncGroup: metadata: GroupMetadata store_path: StorePath @@ -77,6 +104,8 @@ async def open( zarr_format: Literal[2, 3] = 3, ) -> AsyncGroup: store_path = make_store_path(store) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + assert zarr_json_bytes is not None # TODO: consider trying to autodiscover the zarr-format here if zarr_format == 3: @@ -102,17 +131,17 @@ async def open( zarr_json = {**zgroup, "attributes": zattrs} else: raise ValueError(f"unexpected zarr_format: {zarr_format}") - return cls.from_json(store_path, zarr_json, runtime_configuration) + return cls.from_dict(store_path, zarr_json, runtime_configuration) @classmethod - def from_json( + def from_dict( cls, store_path: StorePath, - zarr_json: Any, + data: Dict[str, Any], runtime_configuration: RuntimeConfiguration, ) -> Group: group = cls( - metadata=GroupMetadata.from_json(zarr_json), + metadata=GroupMetadata.from_dict(data), store_path=store_path, runtime_configuration=runtime_configuration, ) @@ -138,9 +167,9 @@ async def getitem( else: zarr_json = json.loads(zarr_json_bytes) if zarr_json["node_type"] == "group": - return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + return type(self).from_dict(store_path, zarr_json, self.runtime_configuration) if zarr_json["node_type"] == "array": - return AsyncArray.from_json( + return AsyncArray.from_dict( store_path, zarr_json, runtime_configuration=self.runtime_configuration ) elif self.metadata.zarr_format == 2: @@ -160,7 +189,7 @@ async def getitem( if zarray is not None: # TODO: update this once the V2 array support is part of the primary array class zarr_json = {**zarray, "attributes": zattrs} - return AsyncArray.from_json( + return AsyncArray.from_dict( store_path, zarray, runtime_configuration=self.runtime_configuration ) else: @@ -173,7 +202,7 @@ async def getitem( else {"zarr_format": self.metadata.zarr_format} ) zarr_json = {**zgroup, "attributes": zattrs} - return type(self).from_json(store_path, zarr_json, self.runtime_configuration) + return type(self).from_dict(store_path, zarr_json, self.runtime_configuration) else: raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") @@ -291,7 +320,7 @@ async def move(self, source: str, dest: str) -> None: raise NotImplementedError -@frozen +@dataclass(frozen=True) class Group(SyncMixin): _async_group: AsyncGroup _sync_configuration: SyncConfiguration = field(init=True, default=SyncConfiguration()) @@ -348,6 +377,13 @@ def __setitem__(self, key, value): """__setitem__ is not supported in v3""" raise NotImplementedError + async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Group: + new_metadata = replace(self.metadata, attributes=new_attributes) + + # Write new metadata + await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + return replace(self, metadata=new_metadata) + @property def metadata(self) -> GroupMetadata: return self._async_group.metadata diff --git a/src/zarr/v3/metadata.py b/src/zarr/v3/metadata.py index c6dd9f1f46..de3055abdc 100644 --- a/src/zarr/v3/metadata.py +++ b/src/zarr/v3/metadata.py @@ -1,21 +1,31 @@ from __future__ import annotations - -import json -from asyncio import AbstractEventLoop from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Protocol, Tuple, Union - +from typing import TYPE_CHECKING, cast, Dict, Iterable +from dataclasses import dataclass, field +import json import numpy as np -from attr import asdict, field, frozen -from zarr.v3.common import ChunkCoords, make_cattr +from zarr.v3.chunk_grids import ChunkGrid, RegularChunkGrid +from zarr.v3.chunk_key_encodings import ChunkKeyEncoding, parse_separator -@frozen -class RuntimeConfiguration: - order: Literal["C", "F"] = "C" - concurrency: Optional[int] = None - asyncio_loop: Optional[AbstractEventLoop] = None +if TYPE_CHECKING: + from typing import Any, Literal, Union, List, Optional, Tuple + from zarr.v3.codecs.pipeline import CodecPipeline + + +from zarr.v3.abc.codec import Codec +from zarr.v3.abc.metadata import Metadata + +from zarr.v3.common import ( + JSON, + ArraySpec, + ChunkCoords, + parse_dtype, + parse_fill_value, + parse_shapelike, +) +from zarr.v3.config import RuntimeConfiguration, parse_indexing_order def runtime_configuration( @@ -79,120 +89,95 @@ def to_numpy_shortname(self) -> str: } return data_type_to_numpy[self] - -dtype_to_data_type = { - "|b1": "bool", - "bool": "bool", - "|i1": "int8", - " ChunkCoords: - if chunk_key == "c": - return () - return tuple(map(int, chunk_key[1:].split(self.configuration.separator))) - - def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - return self.configuration.separator.join(map(str, ("c",) + chunk_coords)) - - -@frozen -class V2ChunkKeyEncodingConfigurationMetadata: - separator: Literal[".", "/"] = "." - - -@frozen -class V2ChunkKeyEncodingMetadata: - configuration: V2ChunkKeyEncodingConfigurationMetadata = ( - V2ChunkKeyEncodingConfigurationMetadata() - ) - name: Literal["v2"] = "v2" - - def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: - return tuple(map(int, chunk_key.split(self.configuration.separator))) - - def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.configuration.separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - - -ChunkKeyEncodingMetadata = Union[DefaultChunkKeyEncodingMetadata, V2ChunkKeyEncodingMetadata] - - -class CodecMetadata(Protocol): - @property - def name(self) -> str: - pass - - -class ShardingCodecIndexLocation(Enum): - start = "start" - end = "end" - - -@frozen -class ArraySpec: - shape: ChunkCoords - data_type: DataType - fill_value: Any - - @property - def dtype(self) -> np.dtype: - return np.dtype(self.data_type.value) - - @property - def ndim(self) -> int: - return len(self.shape) + @classmethod + def from_dtype(cls, dtype: np.dtype) -> DataType: + dtype_to_data_type = { + "|b1": "bool", + "bool": "bool", + "|i1": "int8", + " None: + if isinstance(self.chunk_grid, RegularChunkGrid) and len(self.shape) != len( + self.chunk_grid.chunk_shape + ): + raise ValueError( + "`chunk_shape` and `shape` need to have the same number of dimensions." + ) + if self.dimension_names is not None and len(self.shape) != len(self.dimension_names): + raise ValueError( + "`dimension_names` and `shape` need to have the same number of dimensions." + ) + if self.fill_value is None: + raise ValueError("`fill_value` is required.") + self.codecs.validate(self) @property def dtype(self) -> np.dtype: - return np.dtype(self.data_type.value) + return self.data_type @property def ndim(self) -> int: @@ -200,35 +185,57 @@ def ndim(self) -> int: def get_chunk_spec(self, _chunk_coords: ChunkCoords) -> ArraySpec: assert isinstance( - self.chunk_grid, RegularChunkGridMetadata + self.chunk_grid, RegularChunkGrid ), "Currently, only regular chunk grid is supported" return ArraySpec( - shape=self.chunk_grid.configuration.chunk_shape, - data_type=self.data_type, + shape=self.chunk_grid.chunk_shape, + dtype=self.dtype, fill_value=self.fill_value, ) def to_bytes(self) -> bytes: def _json_convert(o): + if isinstance(o, np.dtype): + return str(o) if isinstance(o, Enum): return o.name + # this serializes numcodecs compressors + # todo: implement to_dict for codecs + elif hasattr(o, "get_config"): + return o.get_config() raise TypeError return json.dumps( - asdict( - self, - filter=lambda attr, value: attr.name != "dimension_names" or value is not None, - ), + self.to_dict(), default=_json_convert, ).encode() @classmethod - def from_json(cls, zarr_json: Any) -> ArrayMetadata: - return make_cattr().structure(zarr_json, cls) + def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: + # check that the zarr_format attribute is correct + _ = parse_zarr_format_v3(data.pop("zarr_format")) + # check that the node_type attribute is correct + _ = parse_node_type_array(data.pop("node_type")) + + dimension_names = data.pop("dimension_names", None) + + return cls(**data, dimension_names=dimension_names) + + def to_dict(self) -> Dict[str, Any]: + out_dict = super().to_dict() + + if not isinstance(out_dict, dict): + raise TypeError(f"Expected dict. Got {type(out_dict)}.") + # if `dimension_names` is `None`, we do not include it in + # the metadata document + if out_dict["dimension_names"] is None: + out_dict.pop("dimension_names") + return out_dict -@frozen -class ArrayV2Metadata: + +@dataclass(frozen=True) +class ArrayV2Metadata(Metadata): shape: ChunkCoords chunks: ChunkCoords dtype: np.dtype @@ -237,7 +244,47 @@ class ArrayV2Metadata: filters: Optional[List[Dict[str, Any]]] = None dimension_separator: Literal[".", "/"] = "." compressor: Optional[Dict[str, Any]] = None - zarr_format: Literal[2] = 2 + attributes: Optional[Dict[str, Any]] = field(default_factory=dict) + zarr_format: Literal[2] = field(init=False, default=2) + + def __init__( + self, + *, + shape: ChunkCoords, + dtype: np.dtype, + chunks: ChunkCoords, + fill_value: Any, + order: Literal["C", "F"], + dimension_separator: Literal[".", "/"] = ".", + compressor: Optional[Dict[str, Any]] = None, + filters: Optional[List[Dict[str, Any]]] = None, + attributes: Optional[Dict[str, JSON]] = None, + ): + """ + Metadata for a Zarr version 2 array. + """ + shape_parsed = parse_shapelike(shape) + data_type_parsed = parse_dtype(dtype) + chunks_parsed = parse_shapelike(chunks) + compressor_parsed = parse_compressor(compressor) + order_parsed = parse_indexing_order(order) + dimension_separator_parsed = parse_separator(order) + filters_parsed = parse_filters(filters) + fill_value_parsed = parse_fill_value(fill_value) + attributes_parsed = parse_attributes(attributes) + + object.__setattr__(self, "shape", shape_parsed) + object.__setattr__(self, "data_type", data_type_parsed) + object.__setattr__(self, "chunks", chunks_parsed) + object.__setattr__(self, "compressor", compressor_parsed) + object.__setattr__(self, "order", order_parsed) + object.__setattr__(self, "dimension_separator", dimension_separator_parsed) + object.__setattr__(self, "filters", filters_parsed) + object.__setattr__(self, "fill_value", fill_value_parsed) + object.__setattr__(self, "attributes", attributes_parsed) + + # ensure that the metadata document is consistent + _ = parse_v2_metadata(self) @property def ndim(self) -> int: @@ -252,8 +299,78 @@ def _json_convert(o): return o.descr raise TypeError - return json.dumps(asdict(self), default=_json_convert).encode() + return json.dumps(self.to_dict(), default=_json_convert).encode() @classmethod - def from_json(cls, zarr_json: Any) -> ArrayV2Metadata: - return make_cattr().structure(zarr_json, cls) + def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: + # check that the zarr_format attribute is correct + _ = parse_zarr_format_v2(data.pop("zarr_format")) + return cls(**data) + + +def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: + if data is None: + return data + if isinstance(data, Iterable) and all([isinstance(x, str) for x in data]): + return tuple(data) + msg = f"Expected either None or a iterable of str, got {type(data)}" + raise TypeError(msg) + + +# todo: real validation +def parse_attributes(data: Any) -> Dict[str, JSON]: + if data is None: + return {} + + data_json = cast(Dict[str, JSON], data) + return data_json + + +# todo: move to its own module and drop _v3 suffix +# todo: consider folding all the literal parsing into a single function +# that takes 2 arguments +def parse_zarr_format_v3(data: Any) -> Literal[3]: + if data == 3: + return data + raise ValueError(f"Invalid value. Expected 3. Got {data}.") + + +# todo: move to its own module and drop _v2 suffix +def parse_zarr_format_v2(data: Any) -> Literal[2]: + if data == 2: + return data + raise ValueError(f"Invalid value. Expected 2. Got {data}.") + + +def parse_node_type_array(data: Any) -> Literal["array"]: + if data == "array": + return data + raise ValueError(f"Invalid value. Expected 'array'. Got {data}.") + + +# todo: real validation +def parse_filters(data: Any) -> List[Codec]: + return data + + +# todo: real validation +def parse_compressor(data: Any) -> Codec: + return data + + +def parse_v2_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata: + if (l_chunks := len(data.chunks)) != (l_shape := len(data.shape)): + msg = ( + f"The `shape` and `chunks` attributes must have the same length. " + f"`chunks` has length {l_chunks}, but `shape` has length {l_shape}." + ) + raise ValueError(msg) + return data + + +def parse_codecs(data: Iterable[Union[Codec, JSON]]) -> CodecPipeline: + from zarr.v3.codecs.pipeline import CodecPipeline + + if not isinstance(data, Iterable): + raise TypeError(f"Expected iterable, got {type(data)}") + return CodecPipeline.from_dict(data) diff --git a/src/zarr/v3/store/memory.py b/src/zarr/v3/store/memory.py index 1370375851..afacfa4321 100644 --- a/src/zarr/v3/store/memory.py +++ b/src/zarr/v3/store/memory.py @@ -49,7 +49,7 @@ async def set( ) -> None: assert isinstance(key, str) if not isinstance(value, (bytes, bytearray, memoryview)): - raise TypeError(f"expected BytesLike, got {type(value)}") + raise TypeError(f"Expected BytesLike. Got {type(value)}.") if byte_range is not None: buf = bytearray(self._store_dict[key]) diff --git a/src/zarr/v3/sync.py b/src/zarr/v3/sync.py index e88c8e93f2..f0996c019e 100644 --- a/src/zarr/v3/sync.py +++ b/src/zarr/v3/sync.py @@ -2,7 +2,16 @@ import asyncio import threading -from typing import Any, Coroutine, List, Optional +from typing import ( + Any, + AsyncIterator, + Callable, + Coroutine, + List, + Optional, + TypeVar, +) +from typing_extensions import ParamSpec from zarr.v3.config import SyncConfiguration @@ -90,18 +99,24 @@ def _get_loop(): return loop[0] +P = ParamSpec("P") +T = TypeVar("T") + + class SyncMixin: _sync_configuration: SyncConfiguration - def _sync(self, coroutine: Coroutine): # TODO: type this + def _sync(self, coroutine: Coroutine[Any, Any, T]) -> T: # TODO: refactor this to to take *args and **kwargs and pass those to the method # this should allow us to better type the sync wrapper return sync(coroutine, loop=self._sync_configuration.asyncio_loop) - def _sync_iter(self, func: Coroutine, *args, **kwargs) -> List[Any]: # TODO: type this - async def iter_to_list() -> List[Any]: + def _sync_iter( + self, func: Callable[P, AsyncIterator[T]], *args: P.args, **kwargs: P.kwargs + ) -> List[T]: + async def iter_to_list() -> List[T]: # TODO: replace with generators so we don't materialize the entire iterator at once return [item async for item in func(*args, **kwargs)] - return self._sync(iter_to_list) + return self._sync(iter_to_list()) diff --git a/tests/test_codecs_v3.py b/tests/test_codecs_v3.py index 2b18969874..333c2094bf 100644 --- a/tests/test_codecs_v3.py +++ b/tests/test_codecs_v3.py @@ -1,23 +1,33 @@ from __future__ import annotations +from dataclasses import dataclass import json -from typing import Iterator, List, Literal, Optional -from attr import frozen +from typing import Iterator, List, Literal, Optional, Tuple + import numpy as np import pytest import zarr -from zarr.v3 import codecs +from zarr.v3.abc.codec import Codec from zarr.v3.array import Array, AsyncArray from zarr.v3.common import Selection from zarr.v3.indexing import morton_order_iter -from zarr.v3.metadata import CodecMetadata, ShardingCodecIndexLocation, runtime_configuration +from zarr.v3.codecs import ( + ShardingCodec, + ShardingCodecIndexLocation, + BloscCodec, + BytesCodec, + GzipCodec, + TransposeCodec, + ZstdCodec, +) +from zarr.v3.metadata import runtime_configuration from zarr.v3.abc.store import Store from zarr.v3.store import MemoryStore, StorePath -@frozen +@dataclass(frozen=True) class _AsyncArrayProxy: array: AsyncArray @@ -25,7 +35,7 @@ def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: return _AsyncArraySelectionProxy(self.array, selection) -@frozen +@dataclass(frozen=True) class _AsyncArraySelectionProxy: array: AsyncArray selection: Selection @@ -47,9 +57,14 @@ def sample_data() -> np.ndarray: return np.arange(0, 128 * 128 * 128, dtype="uint16").reshape((128, 128, 128), order="F") -@pytest.mark.parametrize( - "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] -) +def order_from_dim(order: Literal["F", "C"], ndim: int) -> Tuple[int, ...]: + if order == "F": + return tuple(ndim - x - 1 for x in range(ndim)) + else: + return tuple(range(ndim)) + + +@pytest.mark.parametrize("index_location", ["start", "end"]) def test_sharding( store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation ): @@ -60,12 +75,12 @@ def test_sharding( dtype=sample_data.dtype, fill_value=0, codecs=[ - codecs.sharding_codec( - (32, 32, 32), - [ - codecs.transpose_codec("F", sample_data.ndim), - codecs.bytes_codec(), - codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ShardingCodec( + chunk_shape=(32, 32, 32), + codecs=[ + TransposeCodec(order=order_from_dim("F", sample_data.ndim)), + BytesCodec(), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -79,9 +94,7 @@ def test_sharding( assert np.array_equal(sample_data, read_data) -@pytest.mark.parametrize( - "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] -) +@pytest.mark.parametrize("index_location", ["start", "end"]) def test_sharding_partial( store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation ): @@ -92,12 +105,12 @@ def test_sharding_partial( dtype=sample_data.dtype, fill_value=0, codecs=[ - codecs.sharding_codec( - (32, 32, 32), - [ - codecs.transpose_codec("F", sample_data.ndim), - codecs.bytes_codec(), - codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ShardingCodec( + chunk_shape=(32, 32, 32), + codecs=[ + TransposeCodec(order=order_from_dim("F", sample_data.ndim)), + BytesCodec(), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -114,9 +127,7 @@ def test_sharding_partial( assert np.array_equal(sample_data, read_data) -@pytest.mark.parametrize( - "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] -) +@pytest.mark.parametrize("index_location", ["start", "end"]) def test_sharding_partial_read( store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation ): @@ -127,12 +138,12 @@ def test_sharding_partial_read( dtype=sample_data.dtype, fill_value=1, codecs=[ - codecs.sharding_codec( - (32, 32, 32), - [ - codecs.transpose_codec("F", sample_data.ndim), - codecs.bytes_codec(), - codecs.blosc_codec(typesize=sample_data.dtype.itemsize, cname="lz4"), + ShardingCodec( + chunk_shape=(32, 32, 32), + codecs=[ + TransposeCodec(order=order_from_dim("F", sample_data.ndim)), + BytesCodec(), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -143,9 +154,7 @@ def test_sharding_partial_read( assert np.all(read_data == 1) -@pytest.mark.parametrize( - "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] -) +@pytest.mark.parametrize("index_location", ["start", "end"]) def test_sharding_partial_overwrite( store: Store, sample_data: np.ndarray, index_location: ShardingCodecIndexLocation ): @@ -158,12 +167,12 @@ def test_sharding_partial_overwrite( dtype=data.dtype, fill_value=1, codecs=[ - codecs.sharding_codec( - (32, 32, 32), - [ - codecs.transpose_codec("F", data.ndim), - codecs.bytes_codec(), - codecs.blosc_codec(typesize=data.dtype.itemsize, cname="lz4"), + ShardingCodec( + chunk_shape=(32, 32, 32), + codecs=[ + TransposeCodec(order=order_from_dim("F", data.ndim)), + BytesCodec(), + BloscCodec(cname="lz4"), ], index_location=index_location, ) @@ -183,11 +192,11 @@ def test_sharding_partial_overwrite( @pytest.mark.parametrize( "outer_index_location", - [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end], + ["start", "end"], ) @pytest.mark.parametrize( "inner_index_location", - [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end], + ["start", "end"], ) def test_nested_sharding( store: Store, @@ -202,9 +211,11 @@ def test_nested_sharding( dtype=sample_data.dtype, fill_value=0, codecs=[ - codecs.sharding_codec( - (32, 32, 32), - [codecs.sharding_codec((16, 16, 16), index_location=inner_index_location)], + ShardingCodec( + chunk_shape=(32, 32, 32), + codecs=[ + ShardingCodec(chunk_shape=(16, 16, 16), index_location=inner_index_location) + ], index_location=outer_index_location, ) ], @@ -233,15 +244,15 @@ async def test_order( ): data = np.arange(0, 256, dtype="uint16").reshape((32, 8), order=input_order) - codecs_: List[CodecMetadata] = ( + codecs_: List[Codec] = ( [ - codecs.sharding_codec( - (16, 8), - codecs=[codecs.transpose_codec(store_order, data.ndim), codecs.bytes_codec()], + ShardingCodec( + chunk_shape=(16, 8), + codecs=[TransposeCodec(order=order_from_dim(store_order, data.ndim)), BytesCodec()], ) ] if with_sharding - else [codecs.transpose_codec(store_order, data.ndim), codecs.bytes_codec()] + else [TransposeCodec(order=order_from_dim(store_order, data.ndim)), BytesCodec()] ) a = await AsyncArray.create( @@ -300,9 +311,7 @@ def test_order_implicit( ): data = np.arange(0, 256, dtype="uint16").reshape((16, 16), order=input_order) - codecs_: Optional[List[CodecMetadata]] = ( - [codecs.sharding_codec((8, 8))] if with_sharding else None - ) + codecs_: Optional[List[Codec]] = [ShardingCodec(chunk_shape=(8, 8))] if with_sharding else None a = Array.create( store / "order_implicit", @@ -345,15 +354,15 @@ async def test_transpose( ): data = np.arange(0, 256, dtype="uint16").reshape((1, 32, 8), order=input_order) - codecs_: List[CodecMetadata] = ( + codecs_: List[Codec] = ( [ - codecs.sharding_codec( - (1, 16, 8), - codecs=[codecs.transpose_codec((2, 1, 0)), codecs.bytes_codec()], + ShardingCodec( + chunk_shape=(1, 16, 8), + codecs=[TransposeCodec(order=(2, 1, 0)), BytesCodec()], ) ] if with_sharding - else [codecs.transpose_codec((2, 1, 0)), codecs.bytes_codec()] + else [TransposeCodec(order=(2, 1, 0)), BytesCodec()] ) a = await AsyncArray.create( @@ -405,7 +414,7 @@ def test_transpose_invalid( data = np.arange(0, 256, dtype="uint16").reshape((1, 32, 8)) for order in [(1, 0), (3, 2, 1), (3, 3, 1)]: - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "transpose_invalid", shape=data.shape, @@ -413,7 +422,7 @@ def test_transpose_invalid( dtype=data.dtype, fill_value=0, chunk_key_encoding=("v2", "."), - codecs=[codecs.transpose_codec(order), codecs.bytes_codec()], + codecs=[TransposeCodec(order=order), BytesCodec()], ) @@ -437,12 +446,12 @@ def test_open_sharding(store: Store): dtype="int32", fill_value=0, codecs=[ - codecs.sharding_codec( - (8, 8), - [ - codecs.transpose_codec("F", 2), - codecs.bytes_codec(), - codecs.blosc_codec(typesize=4), + ShardingCodec( + chunk_shape=(8, 8), + codecs=[ + TransposeCodec(order=order_from_dim("F", 2)), + BytesCodec(), + BloscCodec(), ], ) ], @@ -580,11 +589,11 @@ def test_write_partial_sharded_chunks(store: Store): dtype=data.dtype, fill_value=1, codecs=[ - codecs.sharding_codec( + ShardingCodec( chunk_shape=(10, 10), codecs=[ - codecs.bytes_codec(), - codecs.blosc_codec(typesize=data.dtype.itemsize), + BytesCodec(), + BloscCodec(), ], ) ], @@ -618,7 +627,7 @@ async def test_delete_empty_sharded_chunks(store: Store): chunk_shape=(8, 16), dtype="uint16", fill_value=1, - codecs=[codecs.sharding_codec(chunk_shape=(8, 8))], + codecs=[ShardingCodec(chunk_shape=(8, 8))], ) await _AsyncArrayProxy(a)[:, :].set(np.zeros((16, 16))) await _AsyncArrayProxy(a)[8:, :].set(np.ones((8, 16))) @@ -679,7 +688,7 @@ async def test_zarr_compat_F(store: Store): dtype=data.dtype, chunk_key_encoding=("v2", "."), fill_value=1, - codecs=[codecs.transpose_codec("F", data.ndim), codecs.bytes_codec()], + codecs=[TransposeCodec(order=order_from_dim("F", data.ndim)), BytesCodec()], ) z2 = zarr.create( @@ -743,7 +752,7 @@ def test_gzip(store: Store): chunk_shape=(16, 16), dtype=data.dtype, fill_value=0, - codecs=[codecs.bytes_codec(), codecs.gzip_codec()], + codecs=[BytesCodec(), GzipCodec()], ) a[:, :] = data @@ -760,7 +769,7 @@ def test_zstd(store: Store, checksum: bool): chunk_shape=(16, 16), dtype=data.dtype, fill_value=0, - codecs=[codecs.bytes_codec(), codecs.zstd_codec(level=0, checksum=checksum)], + codecs=[BytesCodec(), ZstdCodec(level=0, checksum=checksum)], ) a[:, :] = data @@ -779,7 +788,7 @@ async def test_endian(store: Store, endian: Literal["big", "little"]): dtype=data.dtype, fill_value=0, chunk_key_encoding=("v2", "."), - codecs=[codecs.bytes_codec(endian)], + codecs=[BytesCodec(endian=endian)], ) await _AsyncArrayProxy(a)[:, :].set(data) @@ -815,7 +824,7 @@ async def test_endian_write( dtype="uint16", fill_value=0, chunk_key_encoding=("v2", "."), - codecs=[codecs.bytes_codec(dtype_store_endian)], + codecs=[BytesCodec(endian=dtype_store_endian)], ) await _AsyncArrayProxy(a)[:, :].set(data) @@ -835,7 +844,7 @@ async def test_endian_write( def test_invalid_metadata(store: Store): - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "invalid_chunk_shape", shape=(16, 16, 16), @@ -844,7 +853,7 @@ def test_invalid_metadata(store: Store): fill_value=0, ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "invalid_endian", shape=(16, 16), @@ -852,12 +861,12 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.bytes_codec("big"), - codecs.transpose_codec("F", 2), + BytesCodec(endian="big"), + TransposeCodec(order=order_from_dim("F", 2)), ], ) - with pytest.raises(AssertionError): + with pytest.raises(TypeError): Array.create( store / "invalid_order", shape=(16, 16), @@ -865,12 +874,12 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.bytes_codec(), - codecs.transpose_codec("F"), + BytesCodec(), + TransposeCodec(order="F"), ], ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "invalid_missing_bytes_codec", shape=(16, 16), @@ -878,11 +887,11 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.transpose_codec("F", 2), + TransposeCodec(order=order_from_dim("F", 2)), ], ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "invalid_inner_chunk_shape", shape=(16, 16), @@ -890,10 +899,10 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.sharding_codec(chunk_shape=(8,)), + ShardingCodec(chunk_shape=(8,)), ], ) - with pytest.raises(AssertionError): + with pytest.raises(ValueError): Array.create( store / "invalid_inner_chunk_shape", shape=(16, 16), @@ -901,7 +910,7 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.sharding_codec(chunk_shape=(8, 7)), + ShardingCodec(chunk_shape=(8, 7)), ], ) @@ -913,8 +922,8 @@ def test_invalid_metadata(store: Store): dtype=np.dtype("uint8"), fill_value=0, codecs=[ - codecs.sharding_codec(chunk_shape=(8, 8)), - codecs.gzip_codec(), + ShardingCodec(chunk_shape=(8, 8)), + GzipCodec(), ], ) @@ -933,17 +942,62 @@ async def test_resize(store: Store): ) await _AsyncArrayProxy(a)[:16, :18].set(data) - assert await (store / "resize/0.0").get() is not None - assert await (store / "resize/0.1").get() is not None - assert await (store / "resize/1.0").get() is not None - assert await (store / "resize/1.1").get() is not None + assert await (store / "resize" / "0.0").get() is not None + assert await (store / "resize" / "0.1").get() is not None + assert await (store / "resize" / "1.0").get() is not None + assert await (store / "resize" / "1.1").get() is not None a = await a.resize((10, 12)) assert a.metadata.shape == (10, 12) - assert await (store / "resize/0.0").get() is not None - assert await (store / "resize/0.1").get() is not None - assert await (store / "resize/1.0").get() is None - assert await (store / "resize/1.1").get() is None + assert await (store / "resize" / "0.0").get() is not None + assert await (store / "resize" / "0.1").get() is not None + assert await (store / "resize" / "1.0").get() is None + assert await (store / "resize" / "1.1").get() is None + + +@pytest.mark.asyncio +async def test_blosc_evolve(store: Store): + await AsyncArray.create( + store / "blosc_evolve_u1", + shape=(16, 16), + chunk_shape=(16, 16), + dtype="uint8", + fill_value=0, + codecs=[BytesCodec(), BloscCodec()], + ) + + zarr_json = json.loads(await (store / "blosc_evolve_u1" / "zarr.json").get()) + blosc_configuration_json = zarr_json["codecs"][1]["configuration"] + assert blosc_configuration_json["typesize"] == 1 + assert blosc_configuration_json["shuffle"] == "bitshuffle" + + await AsyncArray.create( + store / "blosc_evolve_u2", + shape=(16, 16), + chunk_shape=(16, 16), + dtype="uint16", + fill_value=0, + codecs=[BytesCodec(), BloscCodec()], + ) + + zarr_json = json.loads(await (store / "blosc_evolve_u2" / "zarr.json").get()) + blosc_configuration_json = zarr_json["codecs"][1]["configuration"] + assert blosc_configuration_json["typesize"] == 2 + assert blosc_configuration_json["shuffle"] == "shuffle" + + await AsyncArray.create( + store / "sharding_blosc_evolve", + shape=(16, 16), + chunk_shape=(16, 16), + dtype="uint16", + fill_value=0, + codecs=[ShardingCodec(chunk_shape=(16, 16), codecs=[BytesCodec(), BloscCodec()])], + ) + + zarr_json = json.loads(await (store / "sharding_blosc_evolve" / "zarr.json").get()) + blosc_configuration_json = zarr_json["codecs"][0]["configuration"]["codecs"][1]["configuration"] + assert blosc_configuration_json["typesize"] == 2 + assert blosc_configuration_json["shuffle"] == "shuffle" def test_exists_ok(store: Store): diff --git a/tests/v3/test_common.py b/tests/v3/test_common.py new file mode 100644 index 0000000000..33e91d793f --- /dev/null +++ b/tests/v3/test_common.py @@ -0,0 +1,97 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Iterable + +if TYPE_CHECKING: + from typing import Literal, Any, Tuple + +import numpy as np +from src.zarr.v3.config import parse_indexing_order +from src.zarr.v3.common import parse_shapelike +from zarr.v3.common import parse_name, product +import pytest + + +@pytest.mark.parametrize("data", [(0, 0, 0, 0), (1, 3, 4, 5, 6), (2, 4)]) +def test_product(data: Tuple[int, ...]): + assert product(data) == np.prod(data) + + +# todo: test +def test_concurrent_map(): + ... + + +# todo: test +def test_to_thread(): + ... + + +# todo: test +def test_enum_names(): + ... + + +# todo: test +def test_parse_enum(): + ... + + +@pytest.mark.parametrize("data", [("foo", "bar"), (10, 11)]) +def test_parse_name_invalid(data: Tuple[Any, Any]): + observed, expected = data + if isinstance(observed, str): + with pytest.raises(ValueError, match=f"Expected '{expected}'. Got {observed} instead."): + parse_name(observed, expected) + else: + with pytest.raises( + TypeError, match=f"Expected a string, got an instance of {type(observed)}." + ): + parse_name(observed, expected) + + +@pytest.mark.parametrize("data", [("foo", "foo"), ("10", "10")]) +def test_parse_name_valid(data: Tuple[Any, Any]): + observed, expected = data + assert parse_name(observed, expected) == observed + + +@pytest.mark.parametrize("data", [0, 1, "hello", "f"]) +def test_parse_indexing_order_invalid(data): + with pytest.raises(ValueError, match="Expected one of"): + parse_indexing_order(data) + + +@pytest.mark.parametrize("data", ["C", "F"]) +def parse_indexing_order_valid(data: Literal["C", "F"]): + assert parse_indexing_order(data) == data + + +@pytest.mark.parametrize("data", [10, ("0", 1, 2, 3), {"0": "0"}, []]) +def test_parse_shapelike_invalid(data: Any): + if isinstance(data, Iterable): + if len(data) == 0: + with pytest.raises(ValueError, match="Expected at least one element."): + parse_shapelike(data) + else: + with pytest.raises(TypeError, match="Expected an iterable of integers"): + parse_shapelike(data) + else: + with pytest.raises(TypeError, match="Expected an iterable."): + parse_shapelike(data) + + +@pytest.mark.parametrize("data", [range(10), [0, 1, 2, 3], (3, 4, 5)]) +def test_parse_shapelike_valid(data: Iterable[Any]): + assert parse_shapelike(data) == tuple(data) + + +# todo: more dtypes +@pytest.mark.parametrize("data", [("uint8", np.uint8), ("float64", np.float64)]) +def parse_dtype(data: Tuple[str, np.dtype]): + unparsed, parsed = data + assert parse_dtype(unparsed) == parsed + + +# todo: figure out what it means to test this +def test_parse_fill_value(): + ... diff --git a/tests/v3/test_metadata.py b/tests/v3/test_metadata.py new file mode 100644 index 0000000000..c7ca0f2e1a --- /dev/null +++ b/tests/v3/test_metadata.py @@ -0,0 +1,60 @@ +from __future__ import annotations +import pytest +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Sequence, Any + +from zarr.v3.metadata import parse_dimension_names, parse_zarr_format_v2, parse_zarr_format_v3 + +# todo: test +def test_datatype_enum(): + ... + + +# todo: test +# this will almost certainly be a collection of tests +def test_array_metadata_v3(): + ... + + +# todo: test +# this will almost certainly be a collection of tests +def test_array_metadata_v2(): + ... + + +@pytest.mark.parametrize("data", [None, ("a", "b", "c"), ["a", "a", "a"]]) +def parse_dimension_names_valid(data: Sequence[str] | None) -> None: + assert parse_dimension_names(data) == data + + +@pytest.mark.parametrize("data", [(), [1, 2, "a"], {"foo": 10}]) +def parse_dimension_names_invalid(data: Any) -> None: + with pytest.raises(TypeError, match="Expected either None or iterable of str,"): + parse_dimension_names(data) + + +# todo: test +def test_parse_attributes() -> None: + ... + + +def test_parse_zarr_format_v3_valid() -> None: + assert parse_zarr_format_v3(3) == 3 + + +@pytest.mark.parametrize("data", [None, 1, 2, 4, 5, "3"]) +def test_parse_zarr_foramt_v3_invalid(data: Any) -> None: + with pytest.raises(ValueError, match=f"Invalid value. Expected 3. Got {data}"): + parse_zarr_format_v3(data) + + +def test_parse_zarr_format_v2_valid() -> None: + assert parse_zarr_format_v2(2) == 2 + + +@pytest.mark.parametrize("data", [None, 1, 3, 4, 5, "3"]) +def test_parse_zarr_foramt_v2_invalid(data: Any) -> None: + with pytest.raises(ValueError, match=f"Invalid value. Expected 2. Got {data}"): + parse_zarr_format_v2(data)