From 338a660c4f9f1d0258c58078893ea6b504fffae0 Mon Sep 17 00:00:00 2001 From: Joseph Gonzalez Date: Mon, 27 Nov 2023 11:50:33 -0400 Subject: [PATCH] Update requirements, now pydantic v2 is obligatory --- requirements.txt | 22 +- setup.py | 2 +- tensordb/__init__.py | 6 +- tensordb/algorithms.py | 303 +++++++++---------- tensordb/clients/base.py | 271 ++++++++++------- tensordb/clients/file_cache_tensor_client.py | 189 ++++++------ tensordb/clients/tensor_client.py | 130 ++++---- tensordb/storages/json_storage.py | 8 +- tensordb/storages/lock.py | 1 - tensordb/storages/variables.py | 5 +- tensordb/storages/zarr_storage.py | 3 +- tensordb/tensor_definition.py | 12 +- tensordb/tensor_translator.py | 2 +- tensordb/tests/conftest.py | 2 +- tensordb/tests/test_algorithms.py | 44 ++- tensordb/tests/test_tools.py | 61 ++-- tensordb/tests/test_zarr_storage.py | 204 ++++++------- tensordb/utils/method_inspector.py | 20 +- tensordb/utils/tools.py | 4 +- 19 files changed, 675 insertions(+), 614 deletions(-) diff --git a/requirements.txt b/requirements.txt index a351166..65c12d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,9 @@ -pandas==2.* -xarray==2023.* -numpy==1.* -dask[complete]==2023.* -zarr==2.* -bottleneck==1.* -fsspec==2023.* -orjson==3.* -pydantic==1.* -more-itertools==9.* -scipy==1.* -flox==0.* -numba==0.* +pandas>=2.0.0 +xarray[accel]>=2023.0.0 +numpy>=1.26.0 +dask[complete]>=2023.0.0 +zarr>=2.0.0 +orjson>=3.0.0 +pydantic>=2.0.0 +more-itertools>=10.0.0 +loguru>=0.7.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 52d6a7a..ef9c931 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='TensorDB', - version='0.29.2', + version='0.30.0', description='Database based in a file system storage combined with Xarray and Zarr', author='Joseph Nowak', author_email='josephgonowak97@gmail.com', diff --git a/tensordb/__init__.py b/tensordb/__init__.py index 8daa798..a6aed44 100644 --- a/tensordb/__init__.py +++ b/tensordb/__init__.py @@ -1,10 +1,6 @@ from . import tensor_definition from . import utils from .algorithms import Algorithms -from .clients import ( - TensorClient, - FileCacheTensorClient, - BaseTensorClient -) +from .clients import TensorClient, FileCacheTensorClient, BaseTensorClient from .tensor_definition import TensorDefinition from .utils.tools import extract_paths_from_formula diff --git a/tensordb/algorithms.py b/tensordb/algorithms.py index 753a169..997e8e2 100644 --- a/tensordb/algorithms.py +++ b/tensordb/algorithms.py @@ -23,13 +23,13 @@ def shift_on_valid(a, shift): @staticmethod def apply_rolling_operator( - x, - drop_nan, - window, - min_periods, - operator, - fill_method: Literal["ffill", None], - inplace=False + x, + drop_nan, + window, + min_periods, + operator, + fill_method: Literal["ffill", None], + inplace=False, ): min_periods = window if min_periods is None else min_periods @@ -89,11 +89,7 @@ def replace(x, sorted_key_groups, group_values, default_replace): @staticmethod def cumulative_on_sort( - x, - axis, - cum_func, - ascending: bool = False, - keep_nan: bool = True + x, axis, cum_func, ascending: bool = False, keep_nan: bool = True ): arg_x = x # The sort factor allows to modify the order of the sort in numpy @@ -140,13 +136,13 @@ def multi_rank(x, axis, tie_axis): class Algorithms: @classmethod def map_blocks_along_axis( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - func, - dim: str, - dtype, - drop_dim: bool = False, - **kwargs + cls, + new_data: Union[xr.DataArray, xr.Dataset], + func, + dim: str, + dtype, + drop_dim: bool = False, + **kwargs, ) -> xr.DataArray: template = new_data.chunk({dim: -1}) data = template.data @@ -160,26 +156,21 @@ def map_blocks_along_axis( return xr.DataArray( data.map_blocks( - dtype=dtype, - drop_axis=drop_axis, - func=func, - chunks=chunks, - **kwargs + dtype=dtype, drop_axis=drop_axis, func=func, chunks=chunks, **kwargs ), coords=template.coords, dims=template.dims, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def ffill( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - limit: int = None, - until_last_valid: Union[xr.DataArray, bool] = False, + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + limit: int = None, + until_last_valid: Union[xr.DataArray, bool] = False, ) -> xr.DataArray: - result = new_data.ffill(dim=dim, limit=limit) if isinstance(until_last_valid, bool) and until_last_valid: @@ -192,13 +183,13 @@ def ffill( @classmethod def rank( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - method: Literal['average', 'min', 'max', 'dense', 'ordinal'] = 'ordinal', - ascending=True, - nan_policy: Literal["omit", "propagate", "error"] = "omit", - use_bottleneck: bool = False + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + method: Literal["average", "min", "max", "dense", "ordinal"] = "ordinal", + ascending=True, + nan_policy: Literal["omit", "propagate", "error"] = "omit", + use_bottleneck: bool = False, ) -> Union[xr.DataArray, xr.Dataset]: """ This is an implementation of scipy rankdata on xarray, with the possibility to avoid the rank of the nans. @@ -212,7 +203,7 @@ def rank( dim=dim, ascending=ascending, nan_policy=nan_policy, - use_bottleneck=use_bottleneck + use_bottleneck=use_bottleneck, ) return cls.map_blocks_along_axis( @@ -224,15 +215,15 @@ def rank( method=method, ascending=ascending, nan_policy=nan_policy, - use_bottleneck=use_bottleneck + use_bottleneck=use_bottleneck, ) @classmethod def multi_rank( - cls, - new_data: xr.DataArray, - tie_dim: str, - dim: str, + cls, + new_data: xr.DataArray, + tie_dim: str, + dim: str, ) -> Union[xr.DataArray, xr.Dataset]: """ Multi rank implemented using the lexsort of numpy, the nan are keep. @@ -252,15 +243,12 @@ def multi_rank( ), coords=new_data.coords, dims=new_data.dims, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def shift_on_valid( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - shift: int + cls, new_data: Union[xr.DataArray, xr.Dataset], dim: str, shift: int ): if isinstance(new_data, xr.Dataset): return new_data.map( @@ -280,19 +268,19 @@ def shift_on_valid( ), coords=new_data.coords, dims=new_data.dims, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def rolling_along_axis( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - window: int, - operator: str, - min_periods: int = None, - drop_nan: bool = True, - fill_method: str = None + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + window: int, + operator: str, + min_periods: int = None, + drop_nan: bool = True, + fill_method: str = None, ): if isinstance(new_data, xr.Dataset): return new_data.map( @@ -302,7 +290,7 @@ def rolling_along_axis( operator=operator, min_periods=min_periods, drop_nan=drop_nan, - fill_method=fill_method + fill_method=fill_method, ) return xr.DataArray( @@ -317,27 +305,27 @@ def rolling_along_axis( fill_method=fill_method, dtype=new_data.dtype, shape=(new_data.sizes[dim],), - inplace=True + inplace=True, ), coords=new_data.coords, dims=new_data.dims, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def replace( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - to_replace: Dict, - dtype: Any = None, - default_replace=None + cls, + new_data: Union[xr.DataArray, xr.Dataset], + to_replace: Dict, + dtype: Any = None, + default_replace=None, ): if isinstance(new_data, xr.Dataset): return new_data.map( cls.replace, to_replace=to_replace, dtype=dtype, - default_replace=default_replace + default_replace=default_replace, ) dtype = dtype if dtype else new_data.dtype @@ -352,18 +340,18 @@ def replace( dtype=dtype, group_values=group_values, chunks=new_data.chunks, - default_replace=default_replace + default_replace=default_replace, ), coords=new_data.coords, dims=new_data.dims, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def vindex( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - coords: Dict, + cls, + new_data: Union[xr.DataArray, xr.Dataset], + coords: Dict, ): """ Implementation of dask vindex using xarray @@ -379,7 +367,11 @@ def vindex( for i, dim in enumerate(new_data.dims): if dim in coords and not np.array_equal(coords[dim], new_data.coords[dim]): int_coord = new_data.indexes[dim].get_indexer(coords[dim]) - data_slices = (slice(None),) * i + (int_coord,) + (slice(None),) * (len(new_data.dims) - i - 1) + data_slices = ( + (slice(None),) * i + + (int_coord,) + + (slice(None),) * (len(new_data.dims) - i - 1) + ) arr = da.moveaxis(arr.vindex[data_slices], 0, i) equal = False @@ -392,19 +384,19 @@ def vindex( coords={ dim: coords.get(dim, coord) for dim, coord in new_data.coords.items() }, - attrs=new_data.attrs + attrs=new_data.attrs, ) @classmethod def apply_on_groups( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - groups: Union[Dict, xr.DataArray], - dim: str, - func: Union[str, Callable], - keep_shape: bool = False, - unique_groups: np.ndarray = None, - **kwargs + cls, + new_data: Union[xr.DataArray, xr.Dataset], + groups: Union[Dict, xr.DataArray], + dim: str, + func: Union[str, Callable], + keep_shape: bool = False, + unique_groups: np.ndarray = None, + **kwargs, ): """ This method was created as a replacement of the groupby of Xarray when the group is only @@ -450,21 +442,19 @@ def apply_on_groups( dim=dim, func=func, keep_shape=keep_shape, - unique_groups=unique_groups + unique_groups=unique_groups, ) if isinstance(groups, dict): groups = xr.DataArray( - list(groups.values()), - dims=[dim], - coords={dim: list(groups.keys())} + list(groups.values()), dims=[dim], coords={dim: list(groups.keys())} ) if len(groups.dims) != 1 and groups.dims != new_data.dims: raise ValueError( - f'The dimension of the groups must be the same as the dimension of the new_data ' - f'or it must has only one of its dimensions, ' - f'but got {groups.dims} and {new_data.dims}' + f"The dimension of the groups must be the same as the dimension of the new_data " + f"or it must has only one of its dimensions, " + f"but got {groups.dims} and {new_data.dims}" ) axis = new_data.dims.index(dim) @@ -478,7 +468,9 @@ def apply_on_groups( # In case of grouping by an array of more than 1 dimension and the keep_shape is False. output_coord = unique_groups - chunks = new_data.chunks[:axis] + (len(output_coord),) + new_data.chunks[axis + 1:] + chunks = ( + new_data.chunks[:axis] + (len(output_coord),) + new_data.chunks[axis + 1 :] + ) def _reduce(x, g, func, **kwargs): if len(g.dims) == 1: @@ -491,7 +483,7 @@ def _reduce(x, g, func, **kwargs): arr = grouped.map( lambda v: getattr(Algorithms, func)( v, dim=dim, **kwargs - ).compute(scheduler='single-threaded') + ).compute(scheduler="single-threaded") ) if "group" not in arr.dims: @@ -508,19 +500,24 @@ def _reduce(x, g, func, **kwargs): return arr f_dim = next(d for d in x.dims if d != dim) - arr = xr.concat([ - _reduce(x.sel({f_dim: v}), g.sel({f_dim: v}), func, **kwargs) - for v in x.coords[f_dim].values - ], dim=f_dim) + arr = xr.concat( + [ + _reduce(x.sel({f_dim: v}), g.sel({f_dim: v}), func, **kwargs) + for v in x.coords[f_dim].values + ], + dim=f_dim, + ) arr = arr.transpose(*x.dims) return arr data = new_data.chunk({dim: -1}) if len(groups.dims) == len(data.dims): - groups = groups.chunk(data.chunks) + groups = groups.chunk(data.chunksizes) else: groups = groups.compute() - new_coords = {k: output_coord if k == dim else v for k, v in new_data.coords.items()} + new_coords = { + k: output_coord if k == dim else v for k, v in new_data.coords.items() + } data = data.map_blocks( _reduce, @@ -530,20 +527,20 @@ def _reduce(x, g, func, **kwargs): da.empty( dtype=np.float64, chunks=chunks, - shape=[len(new_coords[v]) for v in new_data.dims] + shape=[len(new_coords[v]) for v in new_data.dims], ), coords=new_coords, dims=new_data.dims, - ) + ), ) return data @classmethod def merge_duplicates_coord( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - func: str, + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + func: str, ): """ Group and merge duplicates coord base on a function, this can be a sum or a max. Read numpy-groupies @@ -558,47 +555,38 @@ def merge_duplicates_coord( new_data.coords[dim] = np.arange(new_data.sizes[dim]) return cls.apply_on_groups( - new_data=new_data, - groups=groups, - dim=dim, - func=func, - keep_shape=False + new_data=new_data, groups=groups, dim=dim, func=func, keep_shape=False ) @classmethod def dropna( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dims: List[str], - how: Literal['all'] = 'all', - client: Client = None + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dims: List[str], + how: Literal["all"] = "all", + client: Client = None, ): """ Equivalent of xarray dropna but for multiple dimension and restricted to the all option """ # TODO: Add unit testing - dropped_data = cls.drop_unmarked( - new_data.notnull(), - dims=dims, - client=client - ) + dropped_data = cls.drop_unmarked(new_data.notnull(), dims=dims, client=client) return new_data.sel(dropped_data.coords) @classmethod def drop_unmarked( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dims: List[str], - how: Literal['all'] = 'all', - client: Client = None + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dims: List[str], + how: Literal["all"] = "all", + client: Client = None, ): """ Equivalent of xarray dropna but for boolean and for multiple dimension and restricted to the all option """ # TODO: Add unit testing valid_coords = [ - new_data.any([d for d in new_data.dims if d != dim]) - for dim in dims + new_data.any([d for d in new_data.dims if d != dim]) for dim in dims ] if client is None: valid_coords = dask.compute(*valid_coords) @@ -608,10 +596,10 @@ def drop_unmarked( @classmethod def append_previous( - cls, - old_data: Union[xr.DataArray, xr.Dataset], - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, + cls, + old_data: Union[xr.DataArray, xr.Dataset], + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, ): """ This method only add at the beginning of the new_data the previous data, and this only @@ -619,22 +607,22 @@ def append_previous( """ # Find the nearest coord that is smaller than the first one of the new_data - position = old_data.indexes[dim][old_data.indexes[dim] < new_data.indexes[dim][0]] + position = old_data.indexes[dim][ + old_data.indexes[dim] < new_data.indexes[dim][0] + ] if len(position) == 0: return new_data position = position[-1] - return xr.concat([ - old_data.sel({dim: [position]}).compute(), new_data - ], dim=dim) + return xr.concat([old_data.sel({dim: [position]}).compute(), new_data], dim=dim) @classmethod def cumulative_on_sort( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - func: Union[str, Callable], - ascending=False, - keep_nan=True + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + func: Union[str, Callable], + ascending=False, + keep_nan=True, ): """ Apply cumulative calculation like cumsum and so on but on a sorted way, @@ -655,11 +643,11 @@ def cumulative_on_sort( @classmethod def bitmask_topk( - cls, - new_data: Union[xr.DataArray, xr.Dataset], - dim: str, - top_size, - tie_breaker_dim: str = None + cls, + new_data: Union[xr.DataArray, xr.Dataset], + dim: str, + top_size, + tie_breaker_dim: str = None, ): """ Create a bitmask where the True values means that the cell is on the top N on the dim. @@ -687,7 +675,7 @@ def bitmask_topk( top_data, func=lambda x, axis: rfn.unstructured_to_structured( np.moveaxis(x, axis, -1), - [(f"f{i}", x.dtype) for i in range(x.shape[axis])] + [(f"f{i}", x.dtype) for i in range(x.shape[axis])], ), dim=tie_breaker_dim, dtype=[ @@ -695,7 +683,7 @@ def bitmask_topk( for i in range(new_data.sizes[tie_breaker_dim]) ], axis=new_data.dims.index(tie_breaker_dim), - drop_dim=True + drop_dim=True, ) if top_size >= new_data.sizes[dim]: @@ -710,7 +698,7 @@ def bitmask_topk( topk = xr.DataArray( topk, dims=[d for d in top_data.dims if d != dim], - coords={d: v for d, v in top_data.coords.items() if d != dim} + coords={d: v for d, v in top_data.coords.items() if d != dim}, ) if tie_breaker_dim is None: @@ -719,11 +707,7 @@ def bitmask_topk( def split_structured(x): raw = x.values.view(new_data.dtype).reshape(x.shape + (-1,)) return [ - xr.DataArray( - np.take(raw, i, axis=-1), - coords=x.coords, - dims=x.dims - ) + xr.DataArray(np.take(raw, i, axis=-1), coords=x.coords, dims=x.dims) for i in range(raw.shape[-1]) ] @@ -739,12 +723,11 @@ def structured_inequality(x, top): return bitmask | ties - bitmask = top_data.map_blocks( - structured_inequality, - args=(topk,), - template=first_level.notnull() - ) & first_level.notnull() + bitmask = ( + top_data.map_blocks( + structured_inequality, args=(topk,), template=first_level.notnull() + ) + & first_level.notnull() + ) - return bitmask.expand_dims({ - tie_breaker_dim: new_data.coords[tie_breaker_dim] - }) + return bitmask.expand_dims({tie_breaker_dim: new_data.coords[tie_breaker_dim]}) diff --git a/tensordb/clients/base.py b/tensordb/clients/base.py index 4e9c82d..ff28633 100644 --- a/tensordb/clients/base.py +++ b/tensordb/clients/base.py @@ -1,5 +1,4 @@ import abc -from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from typing import Dict, List, Any, Union, Literal, Callable import dask @@ -11,25 +10,19 @@ from dask.distributed import Client from dask.highlevelgraph import HighLevelGraph from loguru import logger -from pydantic import validate_arguments +from pydantic import validate_call from xarray.backends.common import AbstractWritableDataStore from tensordb.algorithms import Algorithms -from tensordb.storages import ( - BaseStorage, - CachedStorage -) +from tensordb.storages import BaseStorage, CachedStorage from tensordb.tensor_definition import TensorDefinition, MethodDescriptor, Definition from tensordb.utils import dag from tensordb.utils.method_inspector import get_parameters -from tensordb.utils.tools import ( - groupby_chunks, - extract_paths_from_formula -) +from tensordb.utils.tools import groupby_chunks, extract_paths_from_formula class BaseTensorClient(Algorithms): - internal_actions = ['store', 'update', 'append', 'upsert', 'drop'] + internal_actions = ["store", "update", "append", "upsert", "drop"] def add_custom_data(self, path, new_data: Dict): pass @@ -49,7 +42,7 @@ def upsert_tensor(self, definition: TensorDefinition): def get_tensor_definition(self, path: str) -> TensorDefinition: pass - @validate_arguments + @validate_call def update_tensor_metadata(self, path: str, new_metadata: Dict[str, Any]): tensor_definition = self.get_tensor_definition(path) tensor_definition.metadata.update(new_metadata) @@ -63,7 +56,7 @@ def get_all_tensors_definition(self) -> List[TensorDefinition]: def delete_tensor(self, path: str, only_data: bool = False) -> Any: pass - @validate_arguments + @validate_call def delete_tensors(self, paths: List[str], only_data: bool = False): """ Delete multiple tensors, in case that some of them does not exist it is not going to raise an error. @@ -90,9 +83,9 @@ def get_storage(self, path: Union[str, TensorDefinition]) -> BaseStorage: @staticmethod def _exec_on_dask( - func: Callable, - params, - *prev_tasks, + func: Callable, + params, + *prev_tasks, ): try: return func(**params) @@ -102,11 +95,11 @@ def _exec_on_dask( @staticmethod def exec_on_parallel( - method: Callable, - paths_kwargs: Dict[str, Dict[str, Any]], - max_parallelization: int = None, - client: Client = None, - compute_kwargs: Dict[str, Any] = None, + method: Callable, + paths_kwargs: Dict[str, Dict[str, Any]], + max_parallelization: int = None, + client: Client = None, + compute_kwargs: Dict[str, Any] = None, ): """ This method was designed to execute multiple methods of the client on parallel @@ -133,32 +126,36 @@ def exec_on_parallel( """ paths = list(paths_kwargs.keys()) - max_parallelization = np.inf if max_parallelization is None else max_parallelization + max_parallelization = ( + np.inf if max_parallelization is None else max_parallelization + ) max_parallelization = min(max_parallelization, len(paths)) compute_kwargs = compute_kwargs or {} client = dask if client is None else client for sub_paths in mit.chunked(paths, max_parallelization): logger.info(f"Processing the following tensors: {sub_paths}") - client.compute([ - dask.delayed(BaseTensorClient._exec_on_dask)( - func=method, - params={"path": path, **paths_kwargs[path]} - ) - for path in sub_paths - ], **compute_kwargs) + client.compute( + [ + dask.delayed(BaseTensorClient._exec_on_dask)( + func=method, params={"path": path, **paths_kwargs[path]} + ) + for path in sub_paths + ], + **compute_kwargs, + ) def exec_on_dag_order( - self, - method: Union[str, Callable], - kwargs_groups: Dict[str, Dict[str, Any]] = None, - tensors_path: List[str] = None, - parallelization_kwargs: Dict[str, Any] = None, - max_parallelization_per_group: Dict[str, int] = None, - autofill_dependencies: bool = False, - only_on_groups: set = None, - check_dependencies: bool = True, - omit_first_n_levels: int = 0, + self, + method: Union[str, Callable], + kwargs_groups: Dict[str, Dict[str, Any]] = None, + tensors_path: List[str] = None, + parallelization_kwargs: Dict[str, Any] = None, + max_parallelization_per_group: Dict[str, int] = None, + autofill_dependencies: bool = False, + only_on_groups: set = None, + check_dependencies: bool = True, + omit_first_n_levels: int = 0, ): """ This method was designed to execute multiple methods of the client on parallel and following the dag @@ -206,42 +203,57 @@ def exec_on_dag_order( method = getattr(self, method) if isinstance(method, str) else method if tensors_path is None: - tensors = [tensor for tensor in self.get_all_tensors_definition() if tensor.dag is not None] + tensors = [ + tensor + for tensor in self.get_all_tensors_definition() + if tensor.dag is not None + ] else: tensors = [self.get_tensor_definition(path) for path in tensors_path] if autofill_dependencies: - tensors = dag.add_dependencies(tensors, self.get_all_tensors_definition()) + tensors = dag.add_dependencies( + tensors, self.get_all_tensors_definition() + ) for i, level in enumerate(dag.get_tensor_dag(tensors, check_dependencies)): if i < omit_first_n_levels: continue - logger.info(f'Executing the {i} level of the DAG') + logger.info(f"Executing the {i} level of the DAG") # Filter the tensors base on the omit parameter - level = [tensor for tensor in level if method.__name__ not in tensor.dag.omit_on] + level = [ + tensor for tensor in level if method.__name__ not in tensor.dag.omit_on + ] if only_on_groups: # filter the invalid groups - level = [tensor for tensor in level if tensor.dag.group in only_on_groups] + level = [ + tensor for tensor in level if tensor.dag.group in only_on_groups + ] if not level: continue - for tensors in groupby_chunks(level, max_parallelization_per_group, lambda tensor: tensor.dag.group): + for tensors in groupby_chunks( + level, max_parallelization_per_group, lambda tensor: tensor.dag.group + ): self.exec_on_parallel( method=method, - paths_kwargs={tensor.path: kwargs_groups.get(tensor.dag.group, {}) for tensor in tensors}, - **parallelization_kwargs + paths_kwargs={ + tensor.path: kwargs_groups.get(tensor.dag.group, {}) + for tensor in tensors + }, + **parallelization_kwargs, ) def get_dag_for_dask( - self, - method: Union[str, Callable], - kwargs_groups: Dict[str, Dict[str, Any]] = None, - tensors: List[TensorDefinition] = None, - max_parallelization_per_group: Dict[str, int] = None, - map_paths: Dict[str, str] = None, - task_prefix: str = 'task-', - final_task_name: str = 'WAIT', + self, + method: Union[str, Callable], + kwargs_groups: Dict[str, Dict[str, Any]] = None, + tensors: List[TensorDefinition] = None, + max_parallelization_per_group: Dict[str, int] = None, + map_paths: Dict[str, str] = None, + task_prefix: str = "task-", + final_task_name: str = "WAIT", ) -> HighLevelGraph: """ This method was designed to create a Dask DAG for the given method, this is useful for parallelization @@ -270,29 +282,35 @@ def get_dag_for_dask( depends = set(tensor.dag.depends) | new_dependencies.get(path, set()) params = kwargs_groups.get(groups[path], {}) params["path"] = path - func = none_func if method.__name__ in tensor.dag.omit_on else self._exec_on_dask + func = ( + none_func + if method.__name__ in tensor.dag.omit_on + else self._exec_on_dask + ) graph[map_paths.get(path, task_prefix + path)] = ( func, method, params, - *tuple(map_paths.get(p, task_prefix + p) for p in depends) + *tuple(map_paths.get(p, task_prefix + p) for p in depends), ) final_tasks = dag.get_leaf_tasks(tensors, new_dependencies) - final_tasks = tuple(map_paths.get(path, task_prefix + path) for path in final_tasks) + final_tasks = tuple( + map_paths.get(path, task_prefix + path) for path in final_tasks + ) graph[final_task_name] = (none_func, *tuple(final_tasks)) return HighLevelGraph.from_collections(final_task_name, graph) def apply_data_transformation( - self, - data_transformation: List[MethodDescriptor], - storage: BaseStorage, - definition: Dict[str, Definition], - parameters: Dict[str, Any], - debug: bool = False + self, + data_transformation: List[MethodDescriptor], + storage: BaseStorage, + definition: Dict[str, Definition], + parameters: Dict[str, Any], + debug: bool = False, ): - parameters = {**{'new_data': None}, **parameters} + parameters = {**{"new_data": None}, **parameters} for descriptor in data_transformation: func = getattr(storage, descriptor.method_name, None) if func is None: @@ -301,24 +319,26 @@ def apply_data_transformation( method_parameters = descriptor.parameters if descriptor.method_name in definition: method_parameters = { - **definition[descriptor.method_name].dict(exclude_unset=True), - **method_parameters + **definition[descriptor.method_name].model_dump(exclude_unset=True), + **method_parameters, } result = func(**get_parameters(func, parameters, method_parameters)) if descriptor.result_name is not None: parameters.update({descriptor.result_name: result}) else: - parameters.update(result if isinstance(result, dict) else {'new_data': result}) + parameters.update( + result if isinstance(result, dict) else {"new_data": result} + ) - return parameters['new_data'] + return parameters["new_data"] - @validate_arguments + @validate_call def storage_method_caller( - self, - path: Union[str, TensorDefinition], - method_name: str, - parameters: Dict[str, Any] + self, + path: Union[str, TensorDefinition], + method_name: str, + parameters: Dict[str, Any], ) -> Any: """ Calls a specific method of a Storage, this includes send the parameters specified in the tensor_definition @@ -351,11 +371,13 @@ def storage_method_caller( definition = tensor_definition.definition storage = self.get_storage(path=tensor_definition) - parameters.update({ - 'definition': definition, - 'original_path': path if isinstance(path, str) else path.path, - 'storage': storage - }) + parameters.update( + { + "definition": definition, + "original_path": path if isinstance(path, str) else path.path, + "storage": storage, + } + ) if method_name in definition: method_settings = definition[method_name] @@ -364,51 +386,63 @@ def storage_method_caller( func = getattr(self, method_settings.substitute_method) if func.__name__ in self.internal_actions: return func(path=path, **parameters) - return func(**get_parameters( - func, - parameters, - definition[func.__name__].dict(exclude_unset=True) if func.__name__ in definition else {} - )) + return func( + **get_parameters( + func, + parameters, + definition[func.__name__].model_dump(exclude_unset=True) + if func.__name__ in definition + else {}, + ) + ) if method_settings.data_transformation is not None: - parameters['new_data'] = self.apply_data_transformation( + parameters["new_data"] = self.apply_data_transformation( data_transformation=method_settings.data_transformation, storage=storage, definition=definition, - parameters=parameters + parameters=parameters, ) - if method_name == 'read': - return parameters['new_data'] + if method_name == "read": + return parameters["new_data"] func = getattr(storage, method_name) return func(**get_parameters(func, parameters)) @abc.abstractmethod def read( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: raise ValueError @abc.abstractmethod - def append(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def append( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: pass @abc.abstractmethod - def update(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritableDataStore: + def update( + self, path: Union[str, TensorDefinition], **kwargs + ) -> AbstractWritableDataStore: pass @abc.abstractmethod - def store(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritableDataStore: + def store( + self, path: Union[str, TensorDefinition], **kwargs + ) -> AbstractWritableDataStore: pass @abc.abstractmethod - def upsert(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def upsert( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: pass @abc.abstractmethod - def drop(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def drop( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: pass @abc.abstractmethod @@ -416,14 +450,14 @@ def exist(self, path: str, only_definition: bool = False, **kwargs) -> bool: pass def get_cached_storage( - self, - path, - max_cached_in_dim: int, - dim: str, - sort_dims: List[str], - merge_cache: bool = False, - update_logic: Literal["keep_last", "combine_first"] = "combine_first", - **kwargs + self, + path, + max_cached_in_dim: int, + dim: str, + sort_dims: List[str], + merge_cache: bool = False, + update_logic: Literal["keep_last", "combine_first"] = "combine_first", + **kwargs, ): """ Create a `CachedStorage` object which is used for multiples writes of the same file. @@ -459,22 +493,24 @@ def get_cached_storage( dim=dim, sort_dims=sort_dims, merge_cache=merge_cache, - update_logic=update_logic + update_logic=update_logic, ) def read_from_formula( - self, - formula: str, - use_exec: bool = False, - original_path: str = None, - storage: BaseStorage = None, - **kwargs: Dict[str, Any] + self, + formula: str, + use_exec: bool = False, + original_path: str = None, + storage: BaseStorage = None, + **kwargs: Dict[str, Any], ) -> Union[xr.DataArray, xr.Dataset]: data_fields = {} for path in extract_paths_from_formula(formula): if original_path is not None and original_path == path: if storage is None: - raise ValueError(f'You can not make a self read without sending the storage parameter') + raise ValueError( + f"You can not make a self read without sending the storage parameter" + ) data_fields[path] = storage.read() else: data_fields[path] = self.read(path) @@ -483,13 +519,18 @@ def read_from_formula( formula = formula.replace(f"`{path}`", f"data_fields['{path}']") formula_globals = { - 'xr': xr, 'np': np, 'pd': pd, 'da': da, 'dask': dask, 'self': self + "xr": xr, + "np": np, + "pd": pd, + "da": da, + "dask": dask, + "self": self, } - kwargs.update({'data_fields': data_fields}) + kwargs.update({"data_fields": data_fields}) if use_exec: exec(formula, formula_globals, kwargs) - return kwargs['new_data'] + return kwargs["new_data"] formula = formula.replace("\n", "") return eval(formula, formula_globals, kwargs) diff --git a/tensordb/clients/file_cache_tensor_client.py b/tensordb/clients/file_cache_tensor_client.py index f6e3b28..8f3c088 100644 --- a/tensordb/clients/file_cache_tensor_client.py +++ b/tensordb/clients/file_cache_tensor_client.py @@ -2,13 +2,11 @@ import pandas as pd import xarray as xr -from pydantic import validate_arguments +from pydantic import validate_call from xarray.backends.common import AbstractWritableDataStore from tensordb.clients.tensor_client import BaseTensorClient, TensorClient -from tensordb.storages import ( - BaseStorage -) +from tensordb.storages import BaseStorage from tensordb.storages import Mapping, PrefixLock from tensordb.tensor_definition import TensorDefinition @@ -32,13 +30,13 @@ class FileCacheTensorClient(BaseTensorClient): """ def __init__( - self, - remote_client: TensorClient, - local_client: TensorClient, - tensor_lock: PrefixLock, - checksum_path: str, - synchronizer_mode: Literal["delayed", "automatic"] = "automatic", - default_client: Literal["local", "remote"] = "remote" + self, + remote_client: TensorClient, + local_client: TensorClient, + tensor_lock: PrefixLock, + checksum_path: str, + synchronizer_mode: Literal["delayed", "automatic"] = "automatic", + default_client: Literal["local", "remote"] = "remote", ): self.remote_client = remote_client self.local_client = local_client @@ -50,7 +48,7 @@ def __init__( self.default_client = local_client def __getattr__(self, item): - if item.startswith('_'): + if item.startswith("_"): raise AttributeError(item) return getattr(self.default_client, item) @@ -65,7 +63,9 @@ def upsert_tensor(self, definition: TensorDefinition): self.local_client.upsert_tensor(definition) self.remote_client.upsert_tensor(definition) - def get_all_tensors_definition(self, include_local: bool = True) -> List[TensorDefinition]: + def get_all_tensors_definition( + self, include_local: bool = True + ) -> List[TensorDefinition]: definitions = self.remote_client.get_all_tensors_definition() if include_local: local_definitions = self.local_client.get_all_tensors_definition() @@ -76,23 +76,22 @@ def get_all_tensors_definition(self, include_local: bool = True) -> List[TensorD return definitions - @validate_arguments + @validate_call def create_tensor(self, definition: TensorDefinition, keep_metadata: bool = True): if self.remote_client.exist(definition.path) and keep_metadata: - actual_definition = self.remote_client.get_tensor_definition(definition.path) + actual_definition = self.remote_client.get_tensor_definition( + definition.path + ) definition.metadata.update(actual_definition.metadata) for client in [self.remote_client, self.local_client]: client.create_tensor(definition) - @validate_arguments + @validate_call def get_tensor_definition(self, path: str) -> TensorDefinition: return self.remote_client.get_tensor_definition(path) - @validate_arguments - def get_storage( - self, - path: Union[str, TensorDefinition] - ) -> BaseStorage: + @validate_call + def get_storage(self, path: Union[str, TensorDefinition]) -> BaseStorage: path = path.path if isinstance(path, TensorDefinition) else path return self._exec_callable( "get_storage", @@ -104,16 +103,18 @@ def get_storage( ) def merge( - self, - path: str, - force: bool = False, - only_definition: bool = False, + self, + path: str, + force: bool = False, + only_definition: bool = False, ): if not force and self.synchronizer_mode == "delayed": return if not self.local_client.exist(path): - raise ValueError(f"The path {path} does not exist in local so it is impossible to merge it") + raise ValueError( + f"The path {path} does not exist in local so it is impossible to merge it" + ) if not self.remote_client.exist(path): force = True @@ -121,20 +122,28 @@ def merge( local_definition = self.local_client.get_tensor_definition(path) if "modification_date" not in local_definition.metadata: - raise ValueError(f"There is no modification date on the local definition metadata") + raise ValueError( + f"There is no modification date on the local definition metadata" + ) if not force: - local_modification_date = pd.Timestamp(local_definition.metadata["modification_date"]) + local_modification_date = pd.Timestamp( + local_definition.metadata["modification_date"] + ) remote_definition = self.remote_client.get_tensor_definition(path) - remote_modification_date = pd.Timestamp(remote_definition.metadata["modification_date"]) + remote_modification_date = pd.Timestamp( + remote_definition.metadata["modification_date"] + ) if local_modification_date == remote_modification_date: return if local_modification_date < remote_modification_date: - raise ValueError(f"The remote tensor was updated last time on {remote_modification_date} " - f"while the local copy was updated on {local_modification_date}, please update " - f"first the local copy before merging it.") + raise ValueError( + f"The remote tensor was updated last time on {remote_modification_date} " + f"while the local copy was updated on {local_modification_date}, please update " + f"first the local copy before merging it." + ) self.remote_client.create_tensor(local_definition) @@ -144,14 +153,14 @@ def merge( local_map=self.local_client.base_map.sub_map(path), checksum_map=self.checksum_map.sub_map(path), force=force, - to_local=False + to_local=False, ) def fetch( - self, - path: str, - force: bool = False, - only_definition: bool = False, + self, + path: str, + force: bool = False, + only_definition: bool = False, ): if not self.remote_client.exist(path): return @@ -162,13 +171,19 @@ def fetch( if "modification_date" not in remote_definition.metadata: force = True - self.remote_client.update_tensor_metadata(path, {"modification_date": str(pd.Timestamp.now())}) + self.remote_client.update_tensor_metadata( + path, {"modification_date": str(pd.Timestamp.now())} + ) remote_definition = self.remote_client.get_tensor_definition(path) if not force: - remote_modification_date = pd.Timestamp(remote_definition.metadata["modification_date"]) + remote_modification_date = pd.Timestamp( + remote_definition.metadata["modification_date"] + ) local_definition = self.local_client.get_tensor_definition(path) - local_modification_date = pd.Timestamp(local_definition.metadata["modification_date"]) + local_modification_date = pd.Timestamp( + local_definition.metadata["modification_date"] + ) if local_modification_date == remote_modification_date: return @@ -177,10 +192,12 @@ def fetch( if self.synchronizer_mode == "delayed": return - raise ValueError(f"The remote tensor was updated last time on {remote_modification_date} " - f"while the local copy was updated on {local_modification_date}, this means " - f"that they are out of sync and this is not possible " - f"if the synchronizer mode is not delayed") + raise ValueError( + f"The remote tensor was updated last time on {remote_modification_date} " + f"while the local copy was updated on {local_modification_date}, this means " + f"that they are out of sync and this is not possible " + f"if the synchronizer mode is not delayed" + ) self.local_client.create_tensor(remote_definition) if not only_definition: @@ -189,28 +206,29 @@ def fetch( local_map=self.local_client.base_map.sub_map(path), checksum_map=self.checksum_map.sub_map(path), force=force, - to_local=True + to_local=True, ) def _exec_callable( - self, - func: str, - path: str, - fetch: bool, - merge: bool, - only_read: bool, - apply_client: TensorClient, - force: bool = False, - drop_checksums: bool = False, - **kwargs - + self, + func: str, + path: str, + fetch: bool, + merge: bool, + only_read: bool, + apply_client: TensorClient, + force: bool = False, + drop_checksums: bool = False, + **kwargs, ): with self.tensor_lock[path]: exist_local = self.local_client.exist(path) if fetch: self.fetch(path, force=force) if not only_read: - self.local_client.update_tensor_metadata(path, {"modification_date": str(pd.Timestamp.now())}) + self.local_client.update_tensor_metadata( + path, {"modification_date": str(pd.Timestamp.now())} + ) if drop_checksums and exist_local: self.checksum_map.rmdir(path) @@ -222,9 +240,7 @@ def _exec_callable( return result def read( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: return self._exec_callable( "read", @@ -233,13 +249,11 @@ def read( merge=False, only_read=True, apply_client=self.local_client, - **kwargs + **kwargs, ) def store( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: return self._exec_callable( "store", @@ -249,13 +263,11 @@ def store( only_read=False, drop_checksums=True, apply_client=self.local_client, - **kwargs + **kwargs, ) def append( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: return self._exec_callable( "append", @@ -264,13 +276,11 @@ def append( merge=True, only_read=False, apply_client=self.local_client, - **kwargs + **kwargs, ) def update( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: return self._exec_callable( "update", @@ -279,13 +289,11 @@ def update( merge=True, only_read=False, apply_client=self.local_client, - **kwargs + **kwargs, ) def upsert( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: return self._exec_callable( "upsert", @@ -294,20 +302,16 @@ def upsert( merge=True, only_read=False, apply_client=self.local_client, - **kwargs + **kwargs, ) def exist( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> bool: return self.remote_client.exist(path, **kwargs) def drop( - self, - path: Union[str, TensorDefinition], - **kwargs + self, path: Union[str, TensorDefinition], **kwargs ) -> List[AbstractWritableDataStore]: return self._exec_callable( "drop", @@ -316,15 +320,15 @@ def drop( merge=True, only_read=False, apply_client=self.local_client, - **kwargs + **kwargs, ) - @validate_arguments + @validate_call def delete_tensor( - self, - path: str, - only_data: bool = False, - only_local: bool = False, + self, + path: str, + only_data: bool = False, + only_local: bool = False, ) -> Any: with self.tensor_lock[path]: if self.local_client.exist(path): @@ -332,12 +336,9 @@ def delete_tensor( if not only_local: self.remote_client.delete_tensor(path=path, only_data=only_data) - @validate_arguments + @validate_call def delete_tensors( - self, - paths: List[str], - only_data: bool = False, - only_local: bool = True + self, paths: List[str], only_data: bool = False, only_local: bool = True ): for path in paths: try: diff --git a/tensordb/clients/tensor_client.py b/tensordb/clients/tensor_client.py index caf4196..898a480 100644 --- a/tensordb/clients/tensor_client.py +++ b/tensordb/clients/tensor_client.py @@ -3,17 +3,12 @@ import orjson import xarray as xr -from pydantic import validate_arguments +from pydantic import validate_call from xarray.backends.common import AbstractWritableDataStore from tensordb.algorithms import Algorithms from tensordb.clients.base import BaseTensorClient -from tensordb.storages import ( - BaseStorage, - JsonStorage, - MAPPING_STORAGES, - PrefixLock -) +from tensordb.storages import BaseStorage, JsonStorage, MAPPING_STORAGES, PrefixLock from tensordb.storages.mapping import Mapping from tensordb.tensor_definition import TensorDefinition @@ -171,21 +166,21 @@ class TensorClient(BaseTensorClient, Algorithms): # TODO: Add more examples to the documentation - internal_actions = ['store', 'update', 'append', 'upsert', 'drop'] + internal_actions = ["store", "update", "append", "upsert", "drop"] def __init__( - self, - base_map: MutableMapping, - tmp_map: MutableMapping = None, - synchronizer: Union[Literal['process', 'thread'], None, PrefixLock] = None, - synchronize_only_write: bool = False, - **kwargs + self, + base_map: MutableMapping, + tmp_map: MutableMapping = None, + synchronizer: Union[Literal["process", "thread"], None, PrefixLock] = None, + synchronize_only_write: bool = False, + **kwargs, ): self.base_map = base_map if not isinstance(base_map, Mapping): self.base_map: Mapping = Mapping(base_map) - self.tmp_map = self.base_map.sub_map('tmp') if tmp_map is None else tmp_map + self.tmp_map = self.base_map.sub_map("tmp") if tmp_map is None else tmp_map if not isinstance(self.tmp_map, Mapping): self.tmp_map: Mapping = Mapping(tmp_map) @@ -193,8 +188,8 @@ def __init__( # TODO: Drop this parameter once this is fix https://github.com/zarr-developers/zarr-python/issues/1414 self.synchronize_only_write = synchronize_only_write self._tensors_definition = JsonStorage( - base_map=self.base_map.sub_map('_tensors_definition'), - tmp_map=self.tmp_map.sub_map('_tensors_definition'), + base_map=self.base_map.sub_map("_tensors_definition"), + tmp_map=self.tmp_map.sub_map("_tensors_definition"), ) def add_custom_data(self, path, new_data: Dict): @@ -206,7 +201,7 @@ def get_custom_data(self, path, default=None): except KeyError: return default - @validate_arguments + @validate_call def create_tensor(self, definition: TensorDefinition): """ Store the definition of tensor, which is equivalent to the creation of the tensor but without data @@ -217,9 +212,11 @@ def create_tensor(self, definition: TensorDefinition): Read the docs of the `TensorDefinition` class for more info of the definition. """ - self._tensors_definition.store(path=definition.path, new_data=definition.dict(exclude_unset=True)) + self._tensors_definition.store( + path=definition.path, new_data=definition.model_dump(exclude_unset=True) + ) - @validate_arguments + @validate_call def upsert_tensor(self, definition: TensorDefinition): """ Upsert the definition of tensor, which is equivalent to the creation of the tensor but without data, @@ -231,9 +228,11 @@ def upsert_tensor(self, definition: TensorDefinition): Read the docs of the `TensorDefinition` class for more info of the definition. """ - self._tensors_definition.upsert(path=definition.path, new_data=definition.dict()) + self._tensors_definition.upsert( + path=definition.path, new_data=definition.model_dump() + ) - @validate_arguments + @validate_call def get_tensor_definition(self, path: str) -> TensorDefinition: """ Retrieve a tensor definition. @@ -251,9 +250,11 @@ def get_tensor_definition(self, path: str) -> TensorDefinition: try: return TensorDefinition(**self._tensors_definition.read(path)) except KeyError: - raise KeyError(f'The tensor {path} has not been created using the create_tensor method') + raise KeyError( + f"The tensor {path} has not been created using the create_tensor method" + ) - @validate_arguments + @validate_call def update_tensor_metadata(self, path: str, new_metadata: Dict[str, Any]): tensor_definition = self.get_tensor_definition(path) tensor_definition.metadata.update(new_metadata) @@ -261,14 +262,17 @@ def update_tensor_metadata(self, path: str, new_metadata: Dict[str, Any]): def get_all_tensors_definition(self) -> List[TensorDefinition]: from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor() as executor: - results = list(executor.map( - self.get_tensor_definition, - list(self._tensors_definition.base_map.keys()) - )) + results = list( + executor.map( + self.get_tensor_definition, + list(self._tensors_definition.base_map.keys()), + ) + ) return results - @validate_arguments + @validate_call def delete_tensor(self, path: str, only_data: bool = False) -> Any: """ Delete the tensor @@ -288,7 +292,7 @@ def delete_tensor(self, path: str, only_data: bool = False) -> Any: if not only_data: self._tensors_definition.delete_file(path) - @validate_arguments + @validate_call def get_storage(self, path: Union[str, TensorDefinition]) -> BaseStorage: """ Get the storage of the tensor, by default it try to read the stored definition of the tensor. @@ -304,7 +308,7 @@ def get_storage(self, path: Union[str, TensorDefinition]) -> BaseStorage: """ definition = self.get_tensor_definition(path) if isinstance(path, str) else path - storage = MAPPING_STORAGES['zarr_storage'] + storage = MAPPING_STORAGES["zarr_storage"] storage = MAPPING_STORAGES[definition.storage.storage_name] storage = storage( @@ -312,14 +316,12 @@ def get_storage(self, path: Union[str, TensorDefinition]) -> BaseStorage: tmp_map=self.tmp_map.sub_map(definition.path), synchronizer=self.synchronizer, synchronize_only_write=self.synchronize_only_write, - **definition.storage.dict(exclude_unset=True) + **definition.storage.model_dump(exclude_unset=True), ) return storage def read( - self, - path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], - **kwargs + self, path: Union[str, TensorDefinition, xr.DataArray, xr.Dataset], **kwargs ) -> Union[xr.DataArray, xr.Dataset]: """ Calls :meth:`TensorClient.storage_method_caller` with read as method_name (has the same parameters). @@ -332,9 +334,13 @@ def read( if isinstance(path, (xr.DataArray, xr.Dataset)): return path - return self.storage_method_caller(path=path, method_name='read', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="read", parameters=kwargs + ) - def append(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def append( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: """ Calls :meth:`TensorClient.storage_method_caller` with append as method_name (has the same parameters). @@ -344,9 +350,13 @@ def append(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractW which is used as an interface for the corresponding backend that you select in xarray (the Storage). """ - return self.storage_method_caller(path=path, method_name='append', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="append", parameters=kwargs + ) - def update(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritableDataStore: + def update( + self, path: Union[str, TensorDefinition], **kwargs + ) -> AbstractWritableDataStore: """ Calls :meth:`TensorClient.storage_method_caller` with update as method_name (has the same parameters). @@ -356,9 +366,13 @@ def update(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritab which is used as an interface for the corresponding backend that you select in xarray (the Storage). """ - return self.storage_method_caller(path=path, method_name='update', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="update", parameters=kwargs + ) - def store(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritableDataStore: + def store( + self, path: Union[str, TensorDefinition], **kwargs + ) -> AbstractWritableDataStore: """ Calls :meth:`TensorClient.storage_method_caller` with store as method_name (has the same parameters). @@ -368,9 +382,13 @@ def store(self, path: Union[str, TensorDefinition], **kwargs) -> AbstractWritabl which is used as an interface for the corresponding backend that you select in xarray (the Storage). """ - return self.storage_method_caller(path=path, method_name='store', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="store", parameters=kwargs + ) - def upsert(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def upsert( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: """ Calls :meth:`TensorClient.storage_method_caller` with upsert as method_name (has the same parameters). @@ -380,9 +398,13 @@ def upsert(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractW which is used as an interface for the corresponding backend that you select in xarray (the Storage). """ - return self.storage_method_caller(path=path, method_name='upsert', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="upsert", parameters=kwargs + ) - def drop(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWritableDataStore]: + def drop( + self, path: Union[str, TensorDefinition], **kwargs + ) -> List[AbstractWritableDataStore]: """ Calls :meth:`TensorClient.storage_method_caller` with drop as method_name (has the same parameters). @@ -392,7 +414,9 @@ def drop(self, path: Union[str, TensorDefinition], **kwargs) -> List[AbstractWri which is used as an interface for the corresponding backend that you select in xarray (the Storage). """ - return self.storage_method_caller(path=path, method_name='drop', parameters=kwargs) + return self.storage_method_caller( + path=path, method_name="drop", parameters=kwargs + ) def exist(self, path: str, only_definition: bool = False, **kwargs) -> bool: """ @@ -412,12 +436,12 @@ def exist(self, path: str, only_definition: bool = False, **kwargs) -> bool: return False def read_from_formula( - self, - formula: str, - use_exec: bool = False, - original_path: str = None, - storage: BaseStorage = None, - **kwargs: Dict[str, Any] + self, + formula: str, + use_exec: bool = False, + original_path: str = None, + storage: BaseStorage = None, + **kwargs: Dict[str, Any], ) -> Union[xr.DataArray, xr.Dataset]: """ This is one of the most important methods of the `TensorClient` class, basically it allows defining @@ -510,5 +534,5 @@ def read_from_formula( use_exec=use_exec, original_path=original_path, storage=storage, - **kwargs + **kwargs, ) diff --git a/tensordb/storages/json_storage.py b/tensordb/storages/json_storage.py index 2acf6a0..23bdc2e 100644 --- a/tensordb/storages/json_storage.py +++ b/tensordb/storages/json_storage.py @@ -2,7 +2,7 @@ import orjson import xarray as xr -from pydantic.utils import deep_update +from pydantic.v1.utils import deep_update from tensordb.storages.base_storage import BaseStorage @@ -42,7 +42,11 @@ def upsert(self, new_data: Dict, path: str = None, **kwargs): def read(self, path: str = None) -> Dict: path = self.data_names if path is None else path new_name = self.to_json_file_name(path) - return orjson.loads(self.base_map[new_name]) + try: + return orjson.loads(self.base_map[new_name]) + except orjson.JSONDecodeError: + # This error can be raised if there are multiple writes and reads in parallel + return {} def exist(self, path: str = None, **kwargs): path = self.data_names if path is None else path diff --git a/tensordb/storages/lock.py b/tensordb/storages/lock.py index c028f19..247872b 100644 --- a/tensordb/storages/lock.py +++ b/tensordb/storages/lock.py @@ -4,7 +4,6 @@ class BaseLock(abc.ABC): - def __enter__(self): raise NotImplementedError diff --git a/tensordb/storages/variables.py b/tensordb/storages/variables.py index 1a71191..7db1e28 100644 --- a/tensordb/storages/variables.py +++ b/tensordb/storages/variables.py @@ -1,7 +1,4 @@ from tensordb.storages.json_storage import JsonStorage from tensordb.storages.zarr_storage import ZarrStorage -MAPPING_STORAGES = { - 'zarr_storage': ZarrStorage, - 'json_storage': JsonStorage -} +MAPPING_STORAGES = {"zarr_storage": ZarrStorage, "json_storage": JsonStorage} diff --git a/tensordb/storages/zarr_storage.py b/tensordb/storages/zarr_storage.py index 993be84..bb316b3 100644 --- a/tensordb/storages/zarr_storage.py +++ b/tensordb/storages/zarr_storage.py @@ -4,7 +4,6 @@ import xarray as xr import zarr -from tensordb.algorithms import Algorithms from tensordb.storages.base_storage import BaseStorage from tensordb.storages.lock import PrefixLock from tensordb.storages.mapping import Mapping @@ -389,7 +388,7 @@ def update( new_data = new_data.combine_first(act_data_region) # The chunks must match with the chunks of the actual data after applying the region slice - new_data = new_data.chunk(act_data_region.chunks) + new_data = new_data.chunk(act_data_region.chunksizes) delayed_write = new_data.to_zarr( self.base_map, diff --git a/tensordb/tensor_definition.py b/tensordb/tensor_definition.py index 343b2d6..811b510 100644 --- a/tensordb/tensor_definition.py +++ b/tensordb/tensor_definition.py @@ -1,6 +1,6 @@ from typing import Dict, List, Any, Optional, Literal -from pydantic import BaseModel, Extra, Field +from pydantic import ConfigDict, BaseModel, Field class DAGOrder(BaseModel): @@ -46,9 +46,7 @@ class StorageDefinition(BaseModel): Indicate which data storage want to be used. """ ) - - class Config: - extra = Extra.allow + model_config = ConfigDict(extra="allow") class MethodDescriptor(BaseModel): @@ -70,7 +68,7 @@ class MethodDescriptor(BaseModel): """ ) result_name: Optional[str] = Field( - title="Result Name", + None, title="Result Name", description=""" Indicate the name of the output that the method produce. """ @@ -98,9 +96,7 @@ class Definition(BaseModel): it will call the append method. """ ) - - class Config: - extra = Extra.allow + model_config = ConfigDict(extra="allow") class TensorDefinition(BaseModel): diff --git a/tensordb/tensor_translator.py b/tensordb/tensor_translator.py index 1e37bf4..208706f 100644 --- a/tensordb/tensor_translator.py +++ b/tensordb/tensor_translator.py @@ -1,7 +1,7 @@ # DEPRECATED -# @validate_arguments(config=dict(arbitrary_types_allowed=True)) +# @validate_call(config=dict(arbitrary_types_allowed=True)) # def defined_translation( # func: Callable, # dims: List[Hashable], diff --git a/tensordb/tests/conftest.py b/tensordb/tests/conftest.py index 7e12462..053a6ec 100644 --- a/tensordb/tests/conftest.py +++ b/tensordb/tests/conftest.py @@ -7,7 +7,7 @@ @pytest.fixture(scope="session") def dask_cluster() -> Generator[LocalCluster, Any, None]: with LocalCluster( - processes=False, + processes=False, ) as cluster: yield cluster diff --git a/tensordb/tests/test_algorithms.py b/tensordb/tests/test_algorithms.py index 3df1be8..7d75bbb 100644 --- a/tensordb/tests/test_algorithms.py +++ b/tensordb/tests/test_algorithms.py @@ -19,7 +19,7 @@ def test_ffill(): dims=['a', 'b'], coords={'a': list(range(3)), 'b': list(range(6))} ).chunk( - (1, 2) + a=1, b=2 ) assert Algorithms.ffill(arr, limit=2, dim='b').equals(arr.compute().ffill('b', limit=2)) assert Algorithms.ffill(arr, limit=2, dim='b', until_last_valid=True).equals( @@ -56,7 +56,7 @@ def test_rank(method, ascending): ], dims=['a', 'b'], coords={'a': list(range(5)), 'b': list(range(3))} - ).chunk((3, 1)) + ).chunk(a=3, b=1) df = pd.DataFrame(arr.values, index=arr.a.values, columns=arr.b.values) result = Algorithms.rank( arr, @@ -89,7 +89,7 @@ def test_bitmask_topk(top_size, dim): ], dims=['a', 'b'], coords={'a': list(range(6)), 'b': list(range(3))} - ).chunk((3, 1)) + ).chunk(a=3, b=1) df = pd.DataFrame(arr.values, index=arr.a.values, columns=arr.b.values) expected = df.rank( @@ -137,7 +137,7 @@ def test_bitmask_topk_tie_breaker(top_size, dim): ], dims=['c', 'a', 'b'], coords={'c': list(range(2)), 'a': list(range(6)), 'b': list(range(3))} - ).chunk((1, 3, 1)) + ).chunk(c=1, a=3, b=1) result = Algorithms.bitmask_topk( arr, @@ -202,7 +202,7 @@ def test_multi_rank(dim): ], dims=['c', 'a', 'b'], coords=coords - ).chunk((3, 1)) + ).chunk(c=3, a=1) result = Algorithms.multi_rank( new_data=arr, dim=dim, @@ -246,7 +246,7 @@ def test_rolling_along_axis(window, drop_nan, fill_method): ], dims=['a', 'b'], coords={'a': list(range(5)), 'b': list(range(3))} - ).chunk((3, 1)) + ).chunk(a=3, b=1) df = pd.DataFrame(arr.values.T, arr.b.values, arr.a.values).stack(dropna=False) for min_periods in [None] + list(range(1, window)): rolling_arr = Algorithms.rolling_along_axis( @@ -289,7 +289,7 @@ def test_replace(default_replace): ], dims=['a', 'b'], coords={'a': list(range(5)), 'b': list(range(3))} - ).chunk((3, 1)) + ).chunk(a=3, b=1) df = pd.DataFrame(arr.values, index=arr.a.values, columns=arr.b.values) @@ -334,7 +334,7 @@ def test_vindex(): ], dims=['a', 'b', 'c'], coords={'a': list(range(5)), 'b': list(range(3)), 'c': list(range(2))} - ).chunk((3, 2, 1)) + ).chunk(a=3, b=2, c=1) for i_coord in [None, [0, 3, 1, 4], [3, 4, 2, 1], [1, 1, 1]]: for j_coord in [None, [1, 0, 2], [2, 1, 0], [0, 0, 0], [1, 0]]: @@ -368,7 +368,7 @@ def test_apply_on_groups(dim, keep_shape, func): ], dims=['a', 'b'], coords={'a': [1, 2, 3, 4, 5], 'b': [0, 1, 2, 3, 4]} - ).chunk((3, 2)) + ).chunk(a=3, b=2) grouper = { 'a': [1, 5, 5, 0, 1], 'b': [0, 1, 1, 0, -1] @@ -382,10 +382,16 @@ def test_apply_on_groups(dim, keep_shape, func): expected = arr.to_pandas() axis = 0 if dim == "a" else 1 + if axis == 1: + expected = expected.T + if keep_shape: - expected = expected.groupby(groups, axis=axis).transform(func) + expected = expected.groupby(groups).transform(func) else: - expected = getattr(expected.groupby(groups, axis=axis), func)() + expected = getattr(expected.groupby(groups), func)() + + if axis == 1: + expected = expected.T expected = xr.DataArray( expected.values, @@ -416,7 +422,7 @@ def test_apply_on_groups_array(dim, keep_shape, func): ], dims=['a', 'b'], coords={'a': [1, 2, 3, 4, 5], 'b': [0, 1, 2, 3, 4]} - ).chunk((3, 2)) + ).chunk(a=3, b=2) groups = xr.DataArray( [ [0, 0, 2, 1, 0], @@ -427,7 +433,7 @@ def test_apply_on_groups_array(dim, keep_shape, func): ], dims=['a', 'b'], coords={'a': [1, 2, 3, 4, 5], 'b': [0, 1, 2, 3, 4]} - ).chunk((3, 2)) + ).chunk(a=3, b=2) unique_groups = np.unique(groups.values) axis = arr.dims.index(dim) @@ -440,9 +446,15 @@ def test_apply_on_groups_array(dim, keep_shape, func): x = arr.sel({iterate_dim: coord}, drop=True) grouper = groups.sel({iterate_dim: coord}, drop=True).compute() r = result.sel({iterate_dim: coord}, drop=True) - kwargs = {"method": "first", "axis": axis} if func == "rank" else {} + kwargs = {"method": "first"} if func == "rank" else {} s = x.to_pandas() + + if axis == 1: + s = s.T expected = getattr(s.groupby(grouper.to_pandas()), func)(**kwargs).to_xarray() + if axis == 1: + expected = expected.T + if dim not in expected.dims: expected = expected.rename({expected.dims[0]: dim}) if keep_shape: @@ -468,7 +480,7 @@ def test_merge_duplicates_coord(dim): ], dims=['a', 'b'], coords={'a': [1, 5, 5, 0, 1], 'b': [0, 1, 1, 0, -1]} - ).chunk((3, 2)) + ).chunk(a=3, b=2) g = arr.groupby(dim).max(dim) arr = Algorithms.merge_duplicates_coord(arr, dim, 'max') @@ -496,7 +508,7 @@ def test_cumulative_on_sort(dim, ascending, func): ], dims=['a', 'b'], coords={'a': list(range(5)), 'b': list(range(3))} - ).chunk((5, 3)) + ).chunk(a=5, b=3) result = Algorithms.cumulative_on_sort( arr, dim=dim, diff --git a/tensordb/tests/test_tools.py b/tensordb/tests/test_tools.py index 0bb3db6..59757dd 100644 --- a/tensordb/tests/test_tools.py +++ b/tensordb/tests/test_tools.py @@ -7,16 +7,15 @@ class TestTools: - @pytest.fixture(autouse=True) def setup_tests(self): self.data_array = xr.DataArray( np.arange(56).reshape((7, 8)).astype(np.float64), - dims=['a', 'b'], - coords={'a': list(range(7)), 'b': list(range(8))} + dims=["a", "b"], + coords={"a": list(range(7)), "b": list(range(8))}, ) self.dataset = xr.Dataset( - {'first': self.data_array, 'second': self.data_array + 10} + {"first": self.data_array, "second": self.data_array + 10} ) # TODO: Once this issue is fixed https://github.com/pydata/xarray/issues/7059 this lock should be dropped self.lock = dask.utils.SerializableLock() @@ -32,43 +31,61 @@ def read_by_coords_dataset(self, arr: xr.Dataset) -> xr.Dataset: def test_xarray_from_func_data_array(self): data = xarray_from_func( self.read_by_coords, - dims=['a', 'b'], - coords={'a': list(range(6)), 'b': list(range(8))}, + dims=["a", "b"], + coords={"a": list(range(6)), "b": list(range(8))}, chunks=[2, 3], dtypes=np.float64, - func_parameters={} + func_parameters={}, ) assert data.equals(self.data_array.sel(**data.coords)) def test_xarray_from_func_dataset(self): data = xarray_from_func( self.read_by_coords_dataset, - dims=['a', 'b'], - coords={'a': list(range(6)), 'b': list(range(8))}, + dims=["a", "b"], + coords={"a": list(range(6)), "b": list(range(8))}, chunks=[2, 3], dtypes=[np.float64, np.float64], - data_names=['first', 'second'], + data_names=["first", "second"], func_parameters={}, ) assert data.equals(self.dataset.sel(data.coords)) def test_groupby_chunks(self): - e = {'a': 0, 'b': 1, 'c': 0, 'd': 0, 'e': 0, 'm': 1, 'g': 2, 'l': 2} - result = list(groupby_chunks(list(e), {0: 2, 1: 1}, lambda x: e[x], lambda x: (e[x], x))) - assert result == [['a', 'c', 'b', 'g', 'l'], ['d', 'e', 'm']] + e = {"a": 0, "b": 1, "c": 0, "d": 0, "e": 0, "m": 1, "g": 2, "l": 2} + result = list( + groupby_chunks(list(e), {0: 2, 1: 1}, lambda x: e[x], lambda x: (e[x], x)) + ) + assert result == [["a", "c", "b", "g", "l"], ["d", "e", "m"]] - result = list(groupby_chunks(list(e), {0: 2, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x))) - assert result == [['a', 'c', 'b', 'm', 'g'], ['d', 'e', 'l']] + result = list( + groupby_chunks( + list(e), {0: 2, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x) + ) + ) + assert result == [["a", "c", "b", "m", "g"], ["d", "e", "l"]] - e['f'] = 1 - result = list(groupby_chunks(list(e), {0: 2, 1: 1, 2: 2}, lambda x: e[x], lambda x: (e[x], x))) - assert result == [['a', 'c', 'b', 'g', 'l'], ['d', 'e', 'f'], ['m']] + e["f"] = 1 + result = list( + groupby_chunks( + list(e), {0: 2, 1: 1, 2: 2}, lambda x: e[x], lambda x: (e[x], x) + ) + ) + assert result == [["a", "c", "b", "g", "l"], ["d", "e", "f"], ["m"]] - result = list(groupby_chunks(list(e), {0: 1, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x))) - assert result == [['a', 'b', 'f', 'g'], ['c', 'm', 'l'], ['d'], ['e']] + result = list( + groupby_chunks( + list(e), {0: 1, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x) + ) + ) + assert result == [["a", "b", "f", "g"], ["c", "m", "l"], ["d"], ["e"]] - result = list(groupby_chunks(list(e), {0: 3, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x))) - assert result == [['a', 'c', 'd', 'b', 'f', 'g'], ['e', 'm', 'l']] + result = list( + groupby_chunks( + list(e), {0: 3, 1: 2, 2: 1}, lambda x: e[x], lambda x: (e[x], x) + ) + ) + assert result == [["a", "c", "d", "b", "f", "g"], ["e", "m", "l"]] if __name__ == "__main__": diff --git a/tensordb/tests/test_zarr_storage.py b/tensordb/tests/test_zarr_storage.py index 930c14c..2c5b079 100644 --- a/tensordb/tests/test_zarr_storage.py +++ b/tensordb/tests/test_zarr_storage.py @@ -11,108 +11,112 @@ class TestZarrStorage: - @pytest.fixture(autouse=True) def setup_tests(self, tmpdir): sub_path = tmpdir.strpath self.storage = ZarrStorage( - base_map=fsspec.get_mapper(sub_path + '/zarr_storage'), - tmp_map=fsspec.get_mapper(sub_path + '/tmp/zarr_storage'), - data_names='data_test', - chunks={'index': 3, 'columns': 2}, + base_map=fsspec.get_mapper(sub_path + "/zarr_storage"), + tmp_map=fsspec.get_mapper(sub_path + "/tmp/zarr_storage"), + data_names="data_test", + chunks={"index": 3, "columns": 2}, # synchronizer='process', - synchronize_only_write=True + synchronize_only_write=True, ) self.storage_dataset = ZarrStorage( - base_map=fsspec.get_mapper(sub_path + '/zarr_dataset'), - tmp_map=fsspec.get_mapper(sub_path + '/tmp/zarr_dataset'), - data_names=['a', 'b', 'c'], - chunks={'index': 3, 'columns': 2}, - synchronizer='process', - synchronize_only_write=True + base_map=fsspec.get_mapper(sub_path + "/zarr_dataset"), + tmp_map=fsspec.get_mapper(sub_path + "/tmp/zarr_dataset"), + data_names=["a", "b", "c"], + chunks={"index": 3, "columns": 2}, + synchronizer="process", + synchronize_only_write=True, ) self.storage_sorted_unique = ZarrStorage( - base_map=fsspec.get_mapper(sub_path + '/zarr_sorted_unique'), - tmp_map=fsspec.get_mapper(sub_path + '/tmp/zarr_sorted_unique'), - data_names='data_test', - chunks={'index': 3, 'columns': 2}, + base_map=fsspec.get_mapper(sub_path + "/zarr_sorted_unique"), + tmp_map=fsspec.get_mapper(sub_path + "/tmp/zarr_sorted_unique"), + data_names="data_test", + chunks={"index": 3, "columns": 2}, unique_coords={"index": True, "columns": True}, - sorted_coords={'index': False, 'columns': False}, + sorted_coords={"index": False, "columns": False}, # synchronizer='thread', - synchronize_only_write=True + synchronize_only_write=True, ) self.storage_dataset_sorted_unique = ZarrStorage( - base_map=fsspec.get_mapper(sub_path + '/zarr_dataset_sorted_unique'), - tmp_map=fsspec.get_mapper(sub_path + '/tmp/zarr_dataset_sorted_unique'), - data_names=['a', 'b', 'c'], - chunks={'index': 3, 'columns': 2}, + base_map=fsspec.get_mapper(sub_path + "/zarr_dataset_sorted_unique"), + tmp_map=fsspec.get_mapper(sub_path + "/tmp/zarr_dataset_sorted_unique"), + data_names=["a", "b", "c"], + chunks={"index": 3, "columns": 2}, unique_coords={"index": True, "columns": True}, - sorted_coords={'index': False, 'columns': False}, + sorted_coords={"index": False, "columns": False}, # synchronizer='process', - synchronize_only_write=True + synchronize_only_write=True, ) self.arr = xr.DataArray( - data=np.array([ - [1, 2, 7, 4, 5], - [2, 3, 5, 5, 6], - [3, 3, 11, 5, 6], - [4, 3, 10, 5, 6], - [5, 7, 8, 5, 6], - ], dtype=float), - dims=['index', 'columns'], - coords={'index': [0, 1, 2, 3, 4], 'columns': [0, 1, 2, 3, 4]}, + data=np.array( + [ + [1, 2, 7, 4, 5], + [2, 3, 5, 5, 6], + [3, 3, 11, 5, 6], + [4, 3, 10, 5, 6], + [5, 7, 8, 5, 6], + ], + dtype=float, + ), + dims=["index", "columns"], + coords={"index": [0, 1, 2, 3, 4], "columns": [0, 1, 2, 3, 4]}, ) self.arr2 = xr.DataArray( - data=np.array([ - [1, 2, 7, 4, 5, 10, 13], - [2, 3, 5, 5, 6, 11, 15], - [2, 3, 5, 5, 6, 11, 15], - ], dtype=float), - dims=['index', 'columns'], - coords={'index': [6, 7, 8], 'columns': [0, 1, 2, 3, 4, 5, 6]}, + data=np.array( + [ + [1, 2, 7, 4, 5, 10, 13], + [2, 3, 5, 5, 6, 11, 15], + [2, 3, 5, 5, 6, 11, 15], + ], + dtype=float, + ), + dims=["index", "columns"], + coords={"index": [6, 7, 8], "columns": [0, 1, 2, 3, 4, 5, 6]}, ) self.arr3 = xr.DataArray( - data=np.array([ - [1, 2, 3, 4, 5], - ], dtype=float), - dims=['index', 'columns'], - coords={'index': [5], 'columns': [0, 1, 2, 3, 4]}, + data=np.array( + [ + [1, 2, 3, 4, 5], + ], + dtype=float, + ), + dims=["index", "columns"], + coords={"index": [5], "columns": [0, 1, 2, 3, 4]}, ) self.arr4 = self.arr.astype(float) + 5 self.arr5 = self.arr.astype(np.uint) + 3 - self.dataset = xr.Dataset( - data_vars=dict( - a=self.arr, - b=self.arr4, - c=self.arr5 - ) - ) + self.dataset = xr.Dataset(data_vars=dict(a=self.arr, b=self.arr4, c=self.arr5)) - @pytest.mark.parametrize('keep_order', [True, False]) + @pytest.mark.parametrize("keep_order", [True, False]) def test_store_data(self, keep_order: bool): storage = self.storage_sorted_unique if keep_order else self.storage storage.store(self.arr2) if keep_order: assert storage.read().equals( - self.arr2.sel(index=self.arr2.index[::-1], columns=self.arr2.columns[::-1]) + self.arr2.sel( + index=self.arr2.index[::-1], columns=self.arr2.columns[::-1] + ) ) else: assert storage.read().equals(self.arr2) storage.delete_tensor() - @pytest.mark.parametrize('keep_order', [True, False]) - @pytest.mark.parametrize('as_dask', [True, False]) + @pytest.mark.parametrize("keep_order", [True, False]) + @pytest.mark.parametrize("as_dask", [True, False]) def test_append_data(self, keep_order: bool, as_dask: bool): storage = self.storage_sorted_unique if keep_order else self.storage arr, arr2 = self.arr, self.arr2 # TODO: Check why If the data is chunked and then stored in Zarr it add nan values if as_dask: - arr, arr2 = arr.chunk((2, 3)), arr2.chunk((1, 4)) + arr, arr2 = arr.chunk(index=2, columns=3), arr2.chunk(index=1, columns=4) for i in range(len(arr.index)): storage.append(arr.isel(index=[i])) @@ -120,13 +124,12 @@ def test_append_data(self, keep_order: bool, as_dask: bool): for i in range(len(arr2.index)): storage.append(arr2.isel(index=[i])) - total_data = xr.concat([arr, arr2], dim='index') + total_data = xr.concat([arr, arr2], dim="index") if keep_order: assert storage.read().equals( total_data.sel( - index=total_data.index[::-1], - columns=total_data.columns[::-1] + index=total_data.index[::-1], columns=total_data.columns[::-1] ) ) else: @@ -134,37 +137,31 @@ def test_append_data(self, keep_order: bool, as_dask: bool): storage.delete_tensor() - @pytest.mark.parametrize('keep_order', [True, False]) - @pytest.mark.parametrize('as_dask', [True, False]) - @pytest.mark.parametrize('only_diagonal_data', [True, False]) - @pytest.mark.parametrize('compute', [True, False]) - def test_append_diagonal(self, keep_order: bool, as_dask: bool, only_diagonal_data, compute): + @pytest.mark.parametrize("keep_order", [True, False]) + @pytest.mark.parametrize("as_dask", [True, False]) + @pytest.mark.parametrize("only_diagonal_data", [True, False]) + @pytest.mark.parametrize("compute", [True, False]) + def test_append_diagonal( + self, keep_order: bool, as_dask: bool, only_diagonal_data, compute + ): storage = self.storage_sorted_unique if keep_order else self.storage arr = self.arr if as_dask: - arr = arr.chunk((2, 3)) + arr = arr.chunk(index=2, columns=3) storage.store(arr) data = xr.DataArray( np.arange(16).reshape((4, 4)), dims=["index", "columns"], - coords={ - "index": [5, 6, 7, 8], "columns": [5, 6, 7, 8] - } + coords={"index": [5, 6, 7, 8], "columns": [5, 6, 7, 8]}, ) delayed = [] if only_diagonal_data: for i in range(4): delayed.append( - storage.append( - data.isel( - index=[i], - columns=[i] - ), - compute=compute - ) + storage.append(data.isel(index=[i], columns=[i]), compute=compute) ) diagonal = data.where(data.isin([0, 5, 10, 15])) expected = xr.concat([self.arr, diagonal], dim="index") @@ -175,7 +172,7 @@ def test_append_diagonal(self, keep_order: bool, as_dask: bool, only_diagonal_da if keep_order: expected = expected.sel( index=np.sort(expected.coords["index"])[::-1], - columns=np.sort(expected.coords["columns"])[::-1] + columns=np.sort(expected.coords["columns"])[::-1], ) if not compute: @@ -184,17 +181,9 @@ def test_append_diagonal(self, keep_order: bool, as_dask: bool, only_diagonal_da assert expected.equals(storage.read()) storage.delete_tensor() - @pytest.mark.parametrize('as_dask', [True, False]) - @pytest.mark.parametrize('index', [ - [0, 2, 4], - [2, 4], - [1, 4] - ]) - @pytest.mark.parametrize('columns', [ - [1, 3, 4], - [1, 4], - [0, 3] - ]) + @pytest.mark.parametrize("as_dask", [True, False]) + @pytest.mark.parametrize("index", [[0, 2, 4], [2, 4], [1, 4]]) + @pytest.mark.parametrize("columns", [[1, 3, 4], [1, 4], [0, 3]]) def test_update_data(self, as_dask, index, columns): self.storage.store(self.arr) @@ -204,7 +193,7 @@ def test_update_data(self, as_dask, index, columns): expected.loc[v, columns] = i if as_dask: - expected = expected.chunk((2, 1)) + expected = expected.chunk(index=2, columns=1) self.storage.update(expected.sel(index=index, columns=columns)) @@ -220,26 +209,32 @@ def test_update_complete_data(self): assert self.storage.read().equals(self.arr) - @pytest.mark.parametrize('keep_order', [True, False]) + @pytest.mark.parametrize("keep_order", [True, False]) def test_store_dataset(self, keep_order: bool): - storage_dataset = self.storage_dataset_sorted_unique if keep_order else self.storage_dataset + storage_dataset = ( + self.storage_dataset_sorted_unique if keep_order else self.storage_dataset + ) storage_dataset.store(self.dataset) if keep_order: assert storage_dataset.read().equals( - self.dataset.sel(index=self.dataset.index[::-1], columns=self.dataset.columns[::-1]) + self.dataset.sel( + index=self.dataset.index[::-1], columns=self.dataset.columns[::-1] + ) ) else: assert storage_dataset.read().equals(self.dataset) - @pytest.mark.parametrize('keep_order', [True, False]) + @pytest.mark.parametrize("keep_order", [True, False]) def test_append_dataset(self, keep_order: bool): - storage_dataset = self.storage_dataset_sorted_unique if keep_order else self.storage_dataset + storage_dataset = ( + self.storage_dataset_sorted_unique if keep_order else self.storage_dataset + ) storage_dataset.store(self.dataset) dataset = self.dataset.reindex( index=list(self.dataset.index.values) + [self.dataset.index.values[-1] + 1], - fill_value=1 + fill_value=1, ) storage_dataset.append(dataset.isel(index=[-1])) if keep_order: @@ -252,23 +247,24 @@ def test_append_dataset(self, keep_order: bool): def test_update_dataset(self): self.storage_dataset.store(self.dataset) - expected = xr.concat([ - self.dataset.sel(index=slice(0, 1)), - self.dataset.sel(index=slice(2, None)) + 5 - ], dim='index') + expected = xr.concat( + [ + self.dataset.sel(index=slice(0, 1)), + self.dataset.sel(index=slice(2, None)) + 5, + ], + dim="index", + ) self.storage_dataset.update(expected.sel(index=slice(2, None))) assert self.storage_dataset.read().equals(expected) def test_drop_data(self): self.storage.store(self.arr) - coords = {'index': [0, 2, 4], 'columns': [1, 3]} + coords = {"index": [0, 2, 4], "columns": [1, 3]} self.storage.drop(coords) - assert self.storage.read().equals( - self.arr.drop_sel(coords) - ) + assert self.storage.read().equals(self.arr.drop_sel(coords)) def test_keep_sorted(self): - arr = self.arr.chunk((3, 2)) + arr = self.arr.chunk(index=3, columns=2) new_data = arr.sel(index=[3, 2, 0, 4, 1], columns=[3, 4, 2, 0, 1]) result = self.storage_sorted_unique._keep_sorted_coords(new_data=new_data) assert result.chunks == ((1, 1, 1, 1, 1), (1, 1, 1, 2)) diff --git a/tensordb/utils/method_inspector.py b/tensordb/utils/method_inspector.py index 881388e..6144b0f 100644 --- a/tensordb/utils/method_inspector.py +++ b/tensordb/utils/method_inspector.py @@ -7,11 +7,9 @@ def get_parameters(func: Callable, *args: Dict[str, Any]): signature = inspect.signature(func) func_parameters = list(signature.parameters.keys()) - if 'kwargs' in func_parameters: + if "kwargs" in func_parameters: return { - k: v - for user_parameters in args[::-1] - for k, v in user_parameters.items() + k: v for user_parameters in args[::-1] for k, v in user_parameters.items() } # instance the parameters with the default parameters @@ -22,10 +20,12 @@ def get_parameters(func: Callable, *args: Dict[str, Any]): } # update the parameters based on the ones sent by the user, take into consideration the order - parameters.update({ - parameter: user_parameters[parameter] - for parameter in func_parameters - for user_parameters in args[::-1] - if parameter in user_parameters - }) + parameters.update( + { + parameter: user_parameters[parameter] + for parameter in func_parameters + for user_parameters in args[::-1] + if parameter in user_parameters + } + ) return parameters diff --git a/tensordb/utils/tools.py b/tensordb/utils/tools.py index 2789c28..4d1c70f 100644 --- a/tensordb/utils/tools.py +++ b/tensordb/utils/tools.py @@ -6,7 +6,7 @@ import numpy as np import xarray as xr from dask import array as da -from pydantic import validate_arguments +from pydantic import validate_call def groupby_chunks( @@ -90,7 +90,7 @@ def empty_xarray(dims, coords, chunks, dtype): ) -@validate_arguments(config=dict(arbitrary_types_allowed=True)) +@validate_call(config=dict(arbitrary_types_allowed=True)) def xarray_from_func( func: Callable, dims: List[Hashable],