Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use compressor, filters, post_compressor for Array v3 create #1944

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
24 changes: 17 additions & 7 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import numpy as np
import numpy.typing as npt

from zarr.abc.codec import Codec
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec
from zarr.array import Array, AsyncArray
from zarr.buffer import NDArrayLike
from zarr.chunk_key_encodings import ChunkKeyEncoding
from zarr.codecs.bytes import BytesCodec
from zarr.common import JSON, ChunkCoords, MemoryOrder, OpenMode, ZarrFormat
from zarr.group import AsyncGroup
from zarr.metadata import ArrayV2Metadata, ArrayV3Metadata
Expand All @@ -20,6 +21,8 @@
make_store_path,
)

default_pre_compressor = BytesCodec()

# TODO: this type could use some more thought, noqa to avoid "Variable "asynchronous.ArrayLike" is not valid as a type"
ArrayLike = Union[AsyncArray | Array | npt.NDArray[Any]] # noqa
PathLike = str
Expand Down Expand Up @@ -532,15 +535,16 @@ async def create(
*, # Note: this is a change from v2
chunks: ChunkCoords | None = None, # TODO: v2 allowed chunks=True
dtype: npt.DTypeLike | None = None,
compressor: dict[str, JSON] | None = None, # TODO: default and type change
compressor: dict[str, JSON] | BytesBytesCodec | None = None, # TODO: default and type change
filters: Iterable[dict[str, JSON] | ArrayArrayCodec] = (),
pre_compressor: dict[str, JSON] | ArrayBytesCodec = default_pre_compressor,
fill_value: Any = 0, # TODO: need type
order: MemoryOrder | None = None, # TODO: default change
store: str | StoreLike | None = None,
synchronizer: Any | None = None,
overwrite: bool = False,
path: PathLike | None = None,
chunk_store: StoreLike | None = None,
filters: list[dict[str, JSON]] | None = None, # TODO: type has changed
cache_metadata: bool | None = None,
cache_attrs: bool | None = None,
read_only: bool | None = None,
Expand All @@ -559,7 +563,6 @@ async def create(
| tuple[Literal["v2"], Literal[".", "/"]]
| None
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
**kwargs: Any,
) -> AsyncArray:
Expand Down Expand Up @@ -683,20 +686,27 @@ async def create(
if path is not None:
store_path = store_path / path

compressor_out: tuple[dict[str, JSON] | BytesBytesCodec, ...]
# normalize compressor to a tuple
if compressor is None:
compressor_out = ()
else:
compressor_out = (compressor,)

return await AsyncArray.create(
store_path,
shape=shape,
chunks=chunks,
dtype=dtype,
compressor=compressor,
compressors=compressor_out,
filters=filters,
pre_compressor=pre_compressor,
fill_value=fill_value,
exists_ok=overwrite, # TODO: name change
filters=filters,
dimension_separator=dimension_separator,
zarr_format=zarr_format,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
attributes=attributes,
**kwargs,
Expand Down
60 changes: 28 additions & 32 deletions src/zarr/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import numpy as np
import numpy.typing as npt

from zarr.abc.codec import Codec
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec
from zarr.abc.store import set_or_delete
from zarr.attributes import Attributes
from zarr.buffer import BufferPrototype, NDArrayLike, NDBuffer, default_buffer_prototype
Expand Down Expand Up @@ -113,14 +113,14 @@ async def create(
| tuple[Literal["v2"], Literal[".", "/"]]
| None
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ChunkCoords | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
compressor: dict[str, JSON] | None = None,
filters: Iterable[dict[str, JSON] | ArrayArrayCodec] = (),
pre_compressor: dict[str, JSON] | ArrayBytesCodec | None = None,
compressors: Iterable[dict[str, JSON] | BytesBytesCodec] = (),
# runtime
exists_ok: bool = False,
) -> AsyncArray:
Expand All @@ -142,31 +142,21 @@ async def create(
raise ValueError(
"order cannot be used for arrays with version 3. Use a transpose codec instead."
)
if filters is not None:
raise ValueError(
"filters cannot be used for arrays with version 3. Use array-to-array codecs instead."
)
if compressor is not None:
raise ValueError(
"compressor cannot be used for arrays with version 3. Use bytes-to-bytes codecs instead."
)
return await cls._create_v3(
store_path,
shape=shape,
dtype=dtype,
chunk_shape=chunk_shape,
fill_value=fill_value,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
filters=filters,
pre_compressor=pre_compressor,
compressors=compressors,
dimension_names=dimension_names,
attributes=attributes,
exists_ok=exists_ok,
)
elif zarr_format == 2:
if codecs is not None:
raise ValueError(
"codecs cannot be used for arrays with version 2. Use filters and compressor instead."
)
if chunk_key_encoding is not None:
raise ValueError(
"chunk_key_encoding cannot be used for arrays with version 2. Use dimension_separator instead."
Expand All @@ -182,7 +172,7 @@ async def create(
fill_value=fill_value,
order=order,
filters=filters,
compressor=compressor,
compressor=pre_compressor,
attributes=attributes,
exists_ok=exists_ok,
)
Expand All @@ -204,15 +194,25 @@ async def _create_v3(
| tuple[Literal["v2"], Literal[".", "/"]]
| None
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
pre_compressor: dict[str, JSON] | ArrayBytesCodec | None = None,
filters: Iterable[dict[str, JSON] | ArrayArrayCodec] = (),
compressors: Iterable[dict[str, JSON] | BytesBytesCodec] = (),
dimension_names: Iterable[str] | None = None,
attributes: dict[str, JSON] | None = None,
exists_ok: bool = False,
) -> AsyncArray:
if not exists_ok:
assert not await (store_path / ZARR_JSON).exists()

codecs = list(codecs) if codecs is not None else [BytesCodec()]
codecs: tuple[dict[str, JSON] | Codec, ...]
_pre_compressor: dict[str, JSON] | ArrayBytesCodec

if pre_compressor is None:
_pre_compressor = BytesCodec()
else:
_pre_compressor = pre_compressor

codecs = (*filters, _pre_compressor, *compressors)

if fill_value is None:
if dtype == np.dtype("bool"):
Expand Down Expand Up @@ -258,8 +258,8 @@ async def _create_v2(
dimension_separator: Literal[".", "/"] | None = None,
fill_value: None | int | float = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
compressor: dict[str, JSON] | None = None,
filters: Iterable[dict[str, JSON] | Codec] = (),
compressor: dict[str, JSON] | ArrayBytesCodec | None = None,
attributes: dict[str, JSON] | None = None,
exists_ok: bool = False,
) -> AsyncArray:
Expand All @@ -284,11 +284,7 @@ async def _create_v2(
compressor=(
numcodecs.get_codec(compressor).get_config() if compressor is not None else None
),
filters=(
[numcodecs.get_codec(filter).get_config() for filter in filters]
if filters is not None
else None
),
filters=tuple(numcodecs.get_codec(filter).get_config() for filter in filters),
attributes=attributes,
)
array = cls(metadata=metadata, store_path=store_path)
Expand Down Expand Up @@ -596,14 +592,14 @@ def create(
| tuple[Literal["v2"], Literal[".", "/"]]
| None
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ChunkCoords | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
compressor: dict[str, JSON] | None = None,
compressor: dict[str, JSON] | ArrayBytesCodec | None = None,
filters: Iterable[dict[str, JSON] | ArrayArrayCodec] = (),
post_compressors: Iterable[dict[str, JSON] | BytesBytesCodec] = (),
# runtime
exists_ok: bool = False,
) -> Array:
Expand All @@ -617,13 +613,13 @@ def create(
fill_value=fill_value,
chunk_shape=chunk_shape,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
dimension_names=dimension_names,
chunks=chunks,
dimension_separator=dimension_separator,
order=order,
pre_compressor=compressor,
filters=filters,
compressor=compressor,
compressors=post_compressors,
exists_ok=exists_ok,
),
)
Expand Down
14 changes: 7 additions & 7 deletions src/zarr/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import numpy.typing as npt

from zarr.abc.codec import Codec
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec
from zarr.abc.metadata import Metadata
from zarr.abc.store import set_or_delete
from zarr.array import Array, AsyncArray
Expand Down Expand Up @@ -327,14 +327,14 @@ async def create_array(
| tuple[Literal["v2"], Literal[".", "/"]]
| None
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
# v2 only
chunks: ChunkCoords | None = None,
dimension_separator: Literal[".", "/"] | None = None,
order: Literal["C", "F"] | None = None,
filters: list[dict[str, JSON]] | None = None,
compressor: dict[str, JSON] | None = None,
compressor: dict[str, JSON] | ArrayBytesCodec | None = None,
filters: Iterable[dict[str, JSON] | ArrayArrayCodec] = (),
post_compressors: Iterable[dict[str, JSON] | BytesBytesCodec] = (),
# runtime
exists_ok: bool = False,
) -> AsyncArray:
Expand All @@ -345,14 +345,14 @@ async def create_array(
chunk_shape=chunk_shape,
fill_value=fill_value,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
pre_compressor=compressor,
filters=filters,
compressors=post_compressors,
dimension_names=dimension_names,
attributes=attributes,
chunks=chunks,
dimension_separator=dimension_separator,
order=order,
filters=filters,
compressor=compressor,
exists_ok=exists_ok,
zarr_format=self.metadata.zarr_format,
)
Expand Down
23 changes: 14 additions & 9 deletions src/zarr/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from collections.abc import Iterable
from dataclasses import dataclass, field, replace
from enum import Enum
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, cast

if TYPE_CHECKING:
from typing import Any, Literal

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -282,7 +285,7 @@ def from_dict(cls, data: dict[str, JSON]) -> ArrayV3Metadata:
# check that the zarr_format attribute is correct
_ = parse_zarr_format_v3(data.pop("zarr_format")) # type: ignore[arg-type]
# check that the node_type attribute is correct
_ = parse_node_type_array(data.pop("node_type")) # type: ignore[arg-type]
_ = parse_node_type_array(data.pop("node_type"))

data["dimension_names"] = data.pop("dimension_names", None)

Expand Down Expand Up @@ -330,7 +333,7 @@ def __init__(
order: Literal["C", "F"],
dimension_separator: Literal[".", "/"] = ".",
compressor: dict[str, JSON] | None = None,
filters: list[dict[str, JSON]] | None = None,
filters: Iterable[dict[str, JSON]] | None = None,
attributes: dict[str, JSON] | None = None,
):
"""
Expand Down Expand Up @@ -463,26 +466,28 @@ def parse_attributes(data: None | dict[str, JSON]) -> dict[str, JSON]:
# that takes 2 arguments
def parse_zarr_format_v3(data: Literal[3]) -> Literal[3]:
if data == 3:
return data
return 3
raise ValueError(f"Invalid value. Expected 3. Got {data}.")


# todo: move to its own module and drop _v2 suffix
def parse_zarr_format_v2(data: Literal[2]) -> Literal[2]:
if data == 2:
return data
return 2
raise ValueError(f"Invalid value. Expected 2. Got {data}.")


def parse_node_type_array(data: Literal["array"]) -> Literal["array"]:
def parse_node_type_array(data: Any) -> Literal["array"]:
if data == "array":
return data
return "array"
raise ValueError(f"Invalid value. Expected 'array'. Got {data}.")


# todo: real validation
def parse_filters(data: list[dict[str, JSON]] | None) -> list[dict[str, JSON]] | None:
return data
def parse_filters(data: Any) -> tuple[dict[str, JSON]] | None:
if isinstance(data, Iterable):
result = tuple(data)
return cast(tuple[dict[str, JSON]], result)


# todo: real validation
Expand Down
11 changes: 3 additions & 8 deletions tests/v3/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,9 @@ async def test_codecs_use_of_prototype():
chunk_shape=(5, 5),
dtype=expect.dtype,
fill_value=0,
codecs=[
TransposeCodec(order=(1, 0)),
BytesCodec(),
BloscCodec(),
Crc32cCodec(),
GzipCodec(),
ZstdCodec(),
],
filters=[TransposeCodec(order=(1, 0))],
pre_compressor=BytesCodec(),
compressors=(BloscCodec(), Crc32cCodec(), GzipCodec(), ZstdCodec()),
)
expect[:] = np.arange(100).reshape(10, 10)

Expand Down
Loading