From 08641872d5feb768aaba6beaff73eb5aab087241 Mon Sep 17 00:00:00 2001 From: Joseph Hamman Date: Fri, 3 Nov 2023 16:39:33 -0700 Subject: [PATCH] use abcs plus implementation notes --- zarr/v3/abc/codec.py | 10 ++ zarr/v3/abc/store.py | 14 +- zarr/v3/array.py | 336 +++++++++++++++++++++++++------------------ zarr/v3/codecs.py | 64 +-------- zarr/v3/store.py | 14 +- 5 files changed, 227 insertions(+), 211 deletions(-) diff --git a/zarr/v3/abc/codec.py b/zarr/v3/abc/codec.py index 383d26fd0d..f84fc74af9 100644 --- a/zarr/v3/abc/codec.py +++ b/zarr/v3/abc/codec.py @@ -1,3 +1,13 @@ +# Notes: +# 1. These are missing methods described in the spec. I expected to see these method definitions: +# def compute_encoded_representation_type(self, decoded_representation_type): +# def encode(self, decoded_value): +# def decode(self, encoded_value, decoded_representation_type): +# def partial_decode(self, input_handle, decoded_representation_type, decoded_regions): +# def compute_encoded_size(self, input_size): +# 2. Understand why array metadata is included on all codecs + + from __future__ import annotations from abc import abstractmethod, ABC diff --git a/zarr/v3/abc/store.py b/zarr/v3/abc/store.py index fc275801fa..5469cafe6d 100644 --- a/zarr/v3/abc/store.py +++ b/zarr/v3/abc/store.py @@ -9,7 +9,7 @@ class Store(ABC): class ReadStore(Store): @abstractmethod - def get(self, key: str) -> bytes: + async def get(self, key: str) -> bytes: """Retrieve the value associated with a given key. Parameters @@ -23,7 +23,7 @@ def get(self, key: str) -> bytes: ... @abstractmethod - def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: + async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: """Retrieve possibly partial values from given key_ranges. Parameters @@ -41,7 +41,7 @@ def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: class WriteStore(ReadStore): @abstractmethod - def set(self, key: str, value: bytes) -> None: + async def set(self, key: str, value: bytes) -> None: """Store a (key, value) pair. Parameters @@ -52,7 +52,7 @@ def set(self, key: str, value: bytes) -> None: ... @abstractmethod - def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: """Store values at a given key, starting at byte range_start. Parameters @@ -67,7 +67,7 @@ def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> class ListMixin: @abstractmethod - def list(self) -> List[str]: + async def list(self) -> List[str]: """Retrieve all keys in the store. Returns @@ -77,7 +77,7 @@ def list(self) -> List[str]: ... @abstractmethod - def list_prefix(self, prefix: str) -> List[str]: + async def list_prefix(self, prefix: str) -> List[str]: """Retrieve all keys in the store. Parameters @@ -91,7 +91,7 @@ def list_prefix(self, prefix: str) -> List[str]: ... @abstractmethod - def list_dir(self, prefix: str) -> List[str]: + async def list_dir(self, prefix: str) -> List[str]: """ Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix. diff --git a/zarr/v3/array.py b/zarr/v3/array.py index e69f306eca..3c0d7eba5c 100644 --- a/zarr/v3/array.py +++ b/zarr/v3/array.py @@ -1,3 +1,14 @@ +# Notes on what I've changed here: +# 1. Split Array into AsyncArray and Array +# 2. Inherit from abc (SynchronousArray, AsynchronousArray) +# 3. Added .size and .attrs methods +# 4. Temporarily disabled the creation of ArrayV2 +# 5. Added from_json to AsyncArray + +# Questions to consider: +# 1. Was splitting the array into two classes really necessary? +# 2. Do we really need runtime_configuration? Specifically, the asyncio_loop seems problematic + from __future__ import annotations import json @@ -6,7 +17,9 @@ import numpy as np from attr import evolve, frozen -from zarr.v3.array_v2 import ArrayV2 +from zarr.v3.abc.array import SynchronousArray, AsynchronousArray + +# from zarr.v3.array_v2 import ArrayV2 from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec from zarr.v3.common import ( ZARR_JSON, @@ -34,40 +47,14 @@ @frozen -class _AsyncArrayProxy: - array: Array - - def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: - return _AsyncArraySelectionProxy(self.array, selection) - - -@frozen -class _AsyncArraySelectionProxy: - array: Array - selection: Selection - - async def get(self) -> np.ndarray: - return await self.array._get_async(self.selection) - - async def set(self, value: np.ndarray): - return await self.array._set_async(self.selection, value) - - -def _json_convert(o): - if isinstance(o, DataType): - return o.name - raise TypeError - - -@frozen -class Array: +class AsyncArray(AsynchronousArray): metadata: ArrayMetadata store_path: StorePath runtime_configuration: RuntimeConfiguration codec_pipeline: CodecPipeline @classmethod - async def create_async( + async def create( cls, store: StoreLike, *, @@ -84,7 +71,7 @@ async def create_async( attributes: Optional[Dict[str, Any]] = None, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), exists_ok: bool = False, - ) -> Array: + ) -> AsyncArray: store_path = make_store_path(store) if not exists_ok: assert not await (store_path / ZARR_JSON).exists_async() @@ -140,92 +127,45 @@ async def create_async( return array @classmethod - def create( + def from_json( cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: Union[str, np.dtype], - chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[CodecMetadata]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - exists_ok: bool = False, - ) -> Array: - return sync( - cls.create_async( - store=store, - shape=shape, - dtype=dtype, - chunk_shape=chunk_shape, - fill_value=fill_value, - chunk_key_encoding=chunk_key_encoding, - codecs=codecs, - dimension_names=dimension_names, - attributes=attributes, - runtime_configuration=runtime_configuration, - exists_ok=exists_ok, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + ) -> AsyncArray: + metadata = ArrayMetadata.from_json(zarr_json) + async_array = cls( + metadata=metadata, + store_path=store_path, + runtime_configuration=runtime_configuration, + codec_pipeline=CodecPipeline.from_metadata( + metadata.codecs, metadata.get_core_metadata(runtime_configuration) ), - runtime_configuration.asyncio_loop, ) + async_array._validate_metadata() + return async_array @classmethod - async def open_async( + async def open( cls, store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Array: + ) -> AsyncArray: store_path = make_store_path(store) zarr_json_bytes = await (store_path / ZARR_JSON).get_async() assert zarr_json_bytes is not None return cls.from_json( store_path, json.loads(zarr_json_bytes), - runtime_configuration=runtime_configuration or RuntimeConfiguration(), - ) - - @classmethod - def open( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Array: - return sync( - cls.open_async(store, runtime_configuration=runtime_configuration), - runtime_configuration.asyncio_loop, - ) - - @classmethod - def from_json( - cls, - store_path: StorePath, - zarr_json: Any, - runtime_configuration: RuntimeConfiguration, - ) -> Array: - metadata = ArrayMetadata.from_json(zarr_json) - out = cls( - metadata=metadata, - store_path=store_path, runtime_configuration=runtime_configuration, - codec_pipeline=CodecPipeline.from_metadata( - metadata.codecs, metadata.get_core_metadata(runtime_configuration) - ), ) - out._validate_metadata() - return out @classmethod - async def open_auto_async( + async def open_auto( cls, store: StoreLike, runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Union[Array, ArrayV2]: + ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] store_path = make_store_path(store) v3_metadata_bytes = await (store_path / ZARR_JSON).get_async() if v3_metadata_bytes is not None: @@ -234,32 +174,9 @@ async def open_auto_async( json.loads(v3_metadata_bytes), runtime_configuration=runtime_configuration or RuntimeConfiguration(), ) - return await ArrayV2.open_async(store_path) - - @classmethod - def open_auto( - cls, - store: StoreLike, - runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), - ) -> Union[Array, ArrayV2]: - return sync( - cls.open_auto_async(store, runtime_configuration), - runtime_configuration.asyncio_loop, - ) - - async def _save_metadata(self) -> None: - self._validate_metadata() - - await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) - - def _validate_metadata(self) -> None: - assert len(self.metadata.shape) == len( - self.metadata.chunk_grid.configuration.chunk_shape - ), "`chunk_shape` and `shape` need to have the same number of dimensions." - assert self.metadata.dimension_names is None or len(self.metadata.shape) == len( - self.metadata.dimension_names - ), "`dimension_names` and `shape` need to have the same number of dimensions." - assert self.metadata.fill_value is not None, "`fill_value` is required." + else: + raise ValueError("no v2 support yet") + # return await ArrayV2.open_async(store_path) @property def ndim(self) -> int: @@ -269,18 +186,19 @@ def ndim(self) -> int: def shape(self) -> ChunkCoords: return self.metadata.shape + @property + def size(self) -> int: + return np.prod(self.metadata.shape) + @property def dtype(self) -> np.dtype: return self.metadata.dtype @property - def async_(self) -> _AsyncArrayProxy: - return _AsyncArrayProxy(self) - - def __getitem__(self, selection: Selection): - return sync(self._get_async(selection), self.runtime_configuration.asyncio_loop) + def attrs(self) -> dict: + return self.metadata.attributes - async def _get_async(self, selection: Selection): + async def getitem(self, selection: Selection): indexer = BasicIndexer( selection, shape=self.metadata.shape, @@ -309,6 +227,20 @@ async def _get_async(self, selection: Selection): else: return out[()] + async def _save_metadata(self) -> None: + self._validate_metadata() + + await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + + def _validate_metadata(self) -> None: + assert len(self.metadata.shape) == len( + self.metadata.chunk_grid.configuration.chunk_shape + ), "`chunk_shape` and `shape` need to have the same number of dimensions." + assert self.metadata.dimension_names is None or len(self.metadata.shape) == len( + self.metadata.dimension_names + ), "`dimension_names` and `shape` need to have the same number of dimensions." + assert self.metadata.fill_value is not None, "`fill_value` is required." + async def _read_chunk( self, chunk_coords: ChunkCoords, @@ -339,10 +271,7 @@ async def _read_chunk( else: out[out_selection] = self.metadata.fill_value - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - sync(self._set_async(selection, value), self.runtime_configuration.asyncio_loop) - - async def _set_async(self, selection: Selection, value: np.ndarray) -> None: + async def setitem(self, selection: Selection, value: np.ndarray) -> None: chunk_shape = self.metadata.chunk_grid.configuration.chunk_shape indexer = BasicIndexer( selection, @@ -444,7 +373,7 @@ async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.nda else: await store_path.set_async(chunk_bytes) - async def resize_async(self, new_shape: ChunkCoords) -> Array: + async def resize(self, new_shape: ChunkCoords) -> Array: assert len(new_shape) == len(self.metadata.shape) new_metadata = evolve(self.metadata, shape=new_shape) @@ -470,21 +399,152 @@ async def _delete_key(key: str) -> None: await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) return evolve(self, metadata=new_metadata) - def resize(self, new_shape: ChunkCoords) -> Array: - return sync(self.resize_async(new_shape), self.runtime_configuration.asyncio_loop) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Array: + async def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: new_metadata = evolve(self.metadata, attributes=new_attributes) # Write new metadata await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) return evolve(self, metadata=new_metadata) + def __repr__(self): + return f"" + + async def info(self): + return NotImplemented + + +@frozen +class Array(SynchronousArray): + _async_array: AsyncArray + + @classmethod + def create( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: Union[str, np.dtype], + chunk_shape: ChunkCoords, + fill_value: Optional[Any] = None, + chunk_key_encoding: Union[ + Tuple[Literal["default"], Literal[".", "/"]], + Tuple[Literal["v2"], Literal[".", "/"]], + ] = ("default", "/"), + codecs: Optional[Iterable[CodecMetadata]] = None, + dimension_names: Optional[Iterable[str]] = None, + attributes: Optional[Dict[str, Any]] = None, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + exists_ok: bool = False, + ) -> Array: + async_array = sync( + AsyncArray.create( + store=store, + shape=shape, + dtype=dtype, + chunk_shape=chunk_shape, + fill_value=fill_value, + chunk_key_encoding=chunk_key_encoding, + codecs=codecs, + dimension_names=dimension_names, + attributes=attributes, + runtime_configuration=runtime_configuration, + exists_ok=exists_ok, + ), + runtime_configuration.asyncio_loop, + ) + return cls(async_array) + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + ) -> Array: + async_array = AsyncArray.from_json( + store_path=store_path, zarr_json=zarr_json, runtime_configuration=runtime_configuration + ) + return cls(async_array) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Array: + + async_array = sync( + AsyncArray.open(store, runtime_configuration=runtime_configuration), + runtime_configuration.asyncio_loop, + ) + async_array._validate_metadata() + return cls(async_array) + + @classmethod + def open_auto( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Array: # TODO: Union[Array, ArrayV2]: + async_array = sync( + AsyncArray.open_auto(store, runtime_configuration), + runtime_configuration.asyncio_loop, + ) + return cls(async_array) + + @property + def ndim(self) -> int: + return self._async_array.ndim + + @property + def shape(self) -> ChunkCoords: + return self._async_array.shape + + @property + def size(self) -> int: + return self._async_array.size + + @property + def dtype(self) -> np.dtype: + return self._async_array.dtype + + @property + def attrs(self) -> dict: + return self._async_array.attrs + + @property + def store_path(self) -> str: + return self._async_array.store_path + + def __getitem__(self, selection: Selection): + return sync( + self._async_array.getitem(selection), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def __setitem__(self, selection: Selection, value: np.ndarray) -> None: + sync( + self._async_array.setitem(selection, value), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def resize(self, new_shape: ChunkCoords) -> Array: + return sync( + self._async_array.resize(new_shape), + self._async_array.runtime_configuration.asyncio_loop, + ) + def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: return sync( - self.update_attributes_async(new_attributes), - self.runtime_configuration.asyncio_loop, + self._async_array.update_attributes(new_attributes), + self._async_array.runtime_configuration.asyncio_loop, ) def __repr__(self): return f"" + + def info(self): + return sync( + self._async_array.info(), + self._async_array.runtime_configuration.asyncio_loop, + ) diff --git a/zarr/v3/codecs.py b/zarr/v3/codecs.py index ff15f2ebf9..ff913c42b2 100644 --- a/zarr/v3/codecs.py +++ b/zarr/v3/codecs.py @@ -1,6 +1,5 @@ from __future__ import annotations -from abc import ABC, abstractmethod from functools import reduce from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Tuple, Union from warnings import warn @@ -13,6 +12,7 @@ from numcodecs.gzip import GZip from zstandard import ZstdCompressor, ZstdDecompressor +from zarr.v3.abc.codec import Codec, ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec from zarr.v3.common import BytesLike, to_thread from zarr.v3.metadata import ( BloscCodecConfigurationMetadata, @@ -38,68 +38,6 @@ numcodecs.blosc.use_threads = False -class Codec(ABC): - supports_partial_decode: bool - supports_partial_encode: bool - is_fixed_size: bool - array_metadata: CoreArrayMetadata - - @abstractmethod - def compute_encoded_size(self, input_byte_length: int) -> int: - pass - - def resolve_metadata(self) -> CoreArrayMetadata: - return self.array_metadata - - -class ArrayArrayCodec(Codec): - @abstractmethod - async def decode( - self, - chunk_array: np.ndarray, - ) -> np.ndarray: - pass - - @abstractmethod - async def encode( - self, - chunk_array: np.ndarray, - ) -> Optional[np.ndarray]: - pass - - -class ArrayBytesCodec(Codec): - @abstractmethod - async def decode( - self, - chunk_array: BytesLike, - ) -> np.ndarray: - pass - - @abstractmethod - async def encode( - self, - chunk_array: np.ndarray, - ) -> Optional[BytesLike]: - pass - - -class BytesBytesCodec(Codec): - @abstractmethod - async def decode( - self, - chunk_array: BytesLike, - ) -> BytesLike: - pass - - @abstractmethod - async def encode( - self, - chunk_array: BytesLike, - ) -> Optional[BytesLike]: - pass - - @frozen class CodecPipeline: codecs: List[Codec] diff --git a/zarr/v3/store.py b/zarr/v3/store.py index 67e54340d0..f7472c68d2 100644 --- a/zarr/v3/store.py +++ b/zarr/v3/store.py @@ -1,3 +1,10 @@ +# TODO: +# 1. Stores should inherit from zarr.v3.abc.store classes +# 2. remove "_async" suffix from all methods? + +# Changes I've made here: +# 1. Make delay import of fsspec + from __future__ import annotations import asyncio @@ -5,13 +12,11 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union -import fsspec -from fsspec.asyn import AsyncFileSystem - from zarr.v3.common import BytesLike, to_thread if TYPE_CHECKING: from upath import UPath + from fsspec.asyn import AsyncFileSystem def _dereference_path(root: str, path: str) -> str: @@ -205,6 +210,7 @@ class RemoteStore(Store): def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): from upath import UPath + import fsspec if isinstance(url, str): self.root = UPath(url, **storage_options) @@ -219,6 +225,8 @@ def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): assert fs.__class__.async_impl, "FileSystem needs to support async operations." def make_fs(self) -> Tuple[AsyncFileSystem, str]: + import fsspec + storage_options = self.root._kwargs.copy() storage_options.pop("_url", None) fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs)