diff --git a/.gitignore b/.gitignore index 557b271b8..8e04b777d 100644 --- a/.gitignore +++ b/.gitignore @@ -58,6 +58,7 @@ config.yml prometheus_data grafana_data data +alembic.ini tiled/_version.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ae138eaf..c90f1ab3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Write the date in place of the "Unreleased" in the case a new version is release # Changelog +## Unreleased + +### Added + +- zarr v2 endpoints for read access + ## v0.1.0b10 (2024-10-11) - Add kwarg to client logout to auto-clear default identity. diff --git a/pyproject.toml b/pyproject.toml index 1b74b8a17..a00664062 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,7 @@ dataframe = [ # These are required for developing the package (running the tests, building # the documentation) but not necessarily required for _using_ it. dev = [ + "aiohttp", "coverage", "flake8", "importlib_resources;python_version < \"3.9\"", diff --git a/tiled/_tests/test_protocols.py b/tiled/_tests/test_protocols.py index e54921cd5..a458ae106 100644 --- a/tiled/_tests/test_protocols.py +++ b/tiled/_tests/test_protocols.py @@ -247,9 +247,13 @@ def test_sparseadapter_protocol(mocker: MockFixture) -> None: mock_call4 = mocker.patch.object(CustomSparseAdapter, "specs") mock_call5 = mocker.patch.object(CustomSparseAdapter, "metadata") - structure = COOStructure(shape=(2 * 5,), chunks=((5, 5),)) - array = numpy.random.rand(2, 512, 512) + + structure = COOStructure( + shape=(2 * 5,), + chunks=((5, 5),), + data_type=BuiltinDtype.from_numpy_dtype(array.dtype), + ) blocks: Dict[Tuple[int, ...], Tuple[NDArray[Any], Any]] = {(1,): (array, (1,))} metadata: JSON = {"foo": "bar"} anyslice = (1, 1, 1) diff --git a/tiled/_tests/test_server.py b/tiled/_tests/test_server.py index a22e8b7b8..4dde50969 100644 --- a/tiled/_tests/test_server.py +++ b/tiled/_tests/test_server.py @@ -1,7 +1,3 @@ -import contextlib -import threading -import time - import pytest import uvicorn from fastapi import APIRouter @@ -11,35 +7,10 @@ from ..client import from_uri from ..server.app import build_app from ..server.logging_config import LOGGING_CONFIG +from .utils import ThreadedServer router = APIRouter() - -class Server(uvicorn.Server): - # https://github.com/encode/uvicorn/discussions/1103#discussioncomment-941726 - - def install_signal_handlers(self): - pass - - @contextlib.contextmanager - def run_in_thread(self): - thread = threading.Thread(target=self.run) - thread.start() - try: - # Wait for server to start up, or raise TimeoutError. - for _ in range(100): - time.sleep(0.1) - if self.started: - break - else: - raise TimeoutError("Server did not start in 10 seconds.") - host, port = self.servers[0].sockets[0].getsockname() - yield f"http://{host}:{port}" - finally: - self.should_exit = True - thread.join() - - API_KEY = "secret" @@ -49,7 +20,7 @@ def server(tmpdir): app = build_app(catalog, {"single_user_api_key": API_KEY}) app.include_router(router) config = uvicorn.Config(app, port=0, loop="asyncio", log_config=LOGGING_CONFIG) - server = Server(config) + server = ThreadedServer(config) with server.run_in_thread() as url: yield url diff --git a/tiled/_tests/test_writing.py b/tiled/_tests/test_writing.py index 571aaad59..320dba4e3 100644 --- a/tiled/_tests/test_writing.py +++ b/tiled/_tests/test_writing.py @@ -26,6 +26,7 @@ from ..mimetypes import PARQUET_MIMETYPE from ..queries import Key from ..server.app import build_app +from ..structures.array import BuiltinDtype from ..structures.core import Spec, StructureFamily from ..structures.data_source import DataSource from ..structures.sparse import COOStructure @@ -245,7 +246,13 @@ def test_write_sparse_chunked(tree): "sparse", [ DataSource( - structure=COOStructure(shape=(2 * N,), chunks=((N, N),)), + structure=COOStructure( + shape=(2 * N,), + chunks=((N, N),), + data_type=BuiltinDtype.from_numpy_dtype( + numpy.dtype("float64") + ), + ), structure_family="sparse", ) ], diff --git a/tiled/_tests/test_zarr.py b/tiled/_tests/test_zarr.py new file mode 100644 index 000000000..77632eaf0 --- /dev/null +++ b/tiled/_tests/test_zarr.py @@ -0,0 +1,331 @@ +import math +import string +import warnings + +import dask.array +import numpy +import pandas.testing +import pytest +import uvicorn +import zarr +from fsspec.implementations.http import HTTPFileSystem +from httpx import ASGITransport, AsyncClient +from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND + +from ..adapters.array import ArrayAdapter +from ..adapters.dataframe import DataFrameAdapter +from ..adapters.mapping import MapAdapter +from ..server.app import build_app +from .utils import ThreadedServer + +rng = numpy.random.default_rng(seed=42) +array_cases = { + "dtype_b": (numpy.arange(10) % 2).astype("b"), + "dtype_i": numpy.arange(-10, 10, dtype="i"), + "dtype_uint8": numpy.arange(10, dtype="uint8"), + "dtype_uint16": numpy.arange(10, dtype="uint16"), + "dtype_uint64": numpy.arange(10, dtype="uint64"), + "dtype_f": numpy.arange(10, dtype="f"), + "dtype_c": (numpy.arange(10) * 1j).astype("c"), + "dtype_S": numpy.array([letter * 3 for letter in string.ascii_letters], dtype="S3"), + "dtype_U": numpy.array([letter * 3 for letter in string.ascii_letters], dtype="U3"), + "dtype_m": numpy.array( + ["2007-07-13", "2006-01-13", "2010-08-13"], dtype="datetime64" + ) + - numpy.datetime64("2008-01-01"), + "dtype_M": numpy.array( + ["2007-07-13", "2006-01-13", "2010-08-13"], dtype="datetime64" + ), + "random_2d": rng.random((10, 10)), +} +# TODO bitfield "t", void "v", and object "O" (which is not supported by default) +scalar_cases = { + k: numpy.array(v[0], dtype=v.dtype) + for k, v in array_cases.items() + if k.startswith("dtype_") +} +for v in scalar_cases.values(): + assert v.shape == () +array_tree = MapAdapter({k: ArrayAdapter.from_array(v) for k, v in array_cases.items()}) +scalar_tree = MapAdapter( + {k: ArrayAdapter.from_array(v) for k, v in scalar_cases.items()} +) + +cube_cases = { + "tiny_cube": rng.random((10, 10, 10)), + "tiny_hypercube": rng.random((10, 10, 10, 10, 10)), +} +cube_tree = MapAdapter({k: ArrayAdapter.from_array(v) for k, v in cube_cases.items()}) +arr_with_inf = numpy.array([0, 1, numpy.nan, -numpy.inf, numpy.inf]) +inf_tree = MapAdapter( + { + "example": ArrayAdapter.from_array( + arr_with_inf, + metadata={"infinity": math.inf, "-infinity": -math.inf, "nan": numpy.nan}, + ) + }, + metadata={"infinity": math.inf, "-infinity": -math.inf, "nan": numpy.nan}, +) +arr_with_zero_dim = numpy.array([]).reshape((0, 100, 1, 10)) +# Suppress RuntimeWarning: divide by zero encountered in true_divide from dask.array.core. +with warnings.catch_warnings(): + zero_tree = MapAdapter( + { + "example": ArrayAdapter.from_array( + dask.array.from_array(arr_with_zero_dim, chunks=arr_with_zero_dim.shape) + ) + } + ) +df = pandas.DataFrame( + { + "x": rng.random(size=10, dtype="float64"), + "y": rng.integers(10, size=10, dtype="uint"), + "z": rng.integers(-10, 10, size=10, dtype="int64"), + } +) +table_tree = MapAdapter( + { + # a dataframe divided into three partitions + "divided": DataFrameAdapter.from_pandas(df, npartitions=3), + # a dataframe with just one partition + "single": DataFrameAdapter.from_pandas(df, npartitions=1), + } +) + +tree = MapAdapter( + { + "nested": MapAdapter({"array": array_tree, "cube": cube_tree}), + "inf": inf_tree, + "scalar": scalar_tree, + "zero": zero_tree, + "table": table_tree, + "random_2d": array_tree["random_2d"], + } +) + + +def traverse_tree(tree, parent="", result=None): + result = result or {} + for key, val in tree.items(): + if isinstance(val, ArrayAdapter): + result.update({f"{parent}/{key}": "array"}) + elif isinstance(val, DataFrameAdapter): + result.update({f"{parent}/{key}": "group"}) + for col, _ in val.items(): + result.update({f"{parent}/{key}/{col}": "array"}) + else: + result.update({f"{parent}/{key}": "group"}) + traverse_tree(val, parent=f"{parent}/{key}", result=result) + return result + + +@pytest.fixture(scope="module") +def app(): + app = build_app(tree, authentication={"single_user_api_key": "secret"}) + return app + + +@pytest.fixture(scope="module") +def server(app): + config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="info") + server = ThreadedServer(config) + with server.run_in_thread(): + yield server + + +@pytest.fixture(scope="module") +def fs(): + headers = {"Authorization": "Apikey secret", "Content-Type": "application/json"} + fs = HTTPFileSystem(client_kwargs={"headers": headers}) + return fs + + +@pytest.mark.parametrize( + "path", ["/zarr/v2/", "/zarr/v2", "/zarr/v2/nested", "/zarr/v2/table/single"] +) +@pytest.mark.asyncio +async def test_zarr_group_routes(path, app): + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + headers={"Authorization": "Apikey secret"}, + follow_redirects=True, + ) as client: + response = await client.get(path) + assert response.status_code == HTTP_200_OK + + response = await client.get(path + "/.zarray") + assert response.status_code == HTTP_404_NOT_FOUND + + response = await client.get(path + "/.zgroup") + assert response.status_code == HTTP_200_OK + + +@pytest.mark.parametrize( + "path", ["/zarr/v2/nested/cube/tiny_cube", "/zarr/v2/table/single/x"] +) +@pytest.mark.asyncio +async def test_zarr_array_routes(path, app): + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + headers={"Authorization": "Apikey secret"}, + follow_redirects=True, + ) as client: + response = await client.get(path) + assert response.status_code == HTTP_200_OK + + response = await client.get(path + "/.zgroup") + assert response.status_code == HTTP_404_NOT_FOUND + + response = await client.get(path + "/.zarray") + assert response.status_code == HTTP_200_OK + + ndim = len(response.json().get("shape")) + indx = ".".join(["0"] * max(ndim, 0)) + response = await client.get(path + f"/{indx}") + assert response.status_code == HTTP_200_OK + + +@pytest.mark.parametrize( + "path", + [ + "/zarr/v2/", + "/zarr/v2", + "/zarr/v2/nested", + "/zarr/v2/table/single", + "/zarr/v2/nested/cube/tiny_cube", + "/zarr/v2/table/single/x", + ], +) +@pytest.mark.asyncio +async def test_authentication(path, app): + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + headers={"Authorization": "Apikey not-secret"}, + follow_redirects=True, + ) as client: + response = await client.get(path) + assert response.status_code == HTTP_401_UNAUTHORIZED + + response = await client.get(path + "/.zarray") + assert response.status_code == HTTP_401_UNAUTHORIZED + + response = await client.get(path + "/.zgroup") + assert response.status_code == HTTP_401_UNAUTHORIZED + + +def test_zarr_integration(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/" + grp = zarr.open(fs.get_mapper(url), mode="r") + + assert grp.store.fs == fs + assert set(grp.keys()) == set(tree.keys()) + assert len(set(grp.group_keys())) == 5 + assert len(set(grp.array_keys())) == 1 + + +@pytest.mark.parametrize( + "suffix, path", + [ + ("", "random_2d"), + ("", "nested/array/random_2d"), + ("nested", "array/random_2d"), + ("nested/array", "random_2d"), + ("nested/array/random_2d", ""), + ], +) +@pytest.mark.parametrize("slash", ["", "/"]) +def test_zarr_groups(suffix, path, slash, server, fs): + expected = array_cases["random_2d"] + url = f"http://localhost:{server.port}/zarr/v2/{suffix}{slash}" + arr = zarr.open(fs.get_mapper(url), mode="r") + if path: + arr = arr[path] + assert numpy.array_equal(arr[...], expected) + + +@pytest.mark.parametrize("kind", list(array_cases)) +def test_array_dtypes(kind, server, fs): + expected = array_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/nested/array" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("kind", list(scalar_cases)) +def test_scalar_dtypes(kind, server, fs): + expected = scalar_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/scalar" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("kind", list(cube_cases)) +def test_cube_cases(kind, server, fs): + expected = cube_cases[kind] + url = f"http://localhost:{server.port}/zarr/v2/nested/cube" + grp = zarr.open(fs.get_mapper(url), mode="r") + actual = grp[kind][...] + assert numpy.array_equal(actual, expected) + + +def test_infinity(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/inf/example" + actual = zarr.open(fs.get_mapper(url), mode="r")[...] + mask = numpy.isnan(arr_with_inf) + assert numpy.array_equal(actual[~mask], arr_with_inf[~mask]) + assert numpy.isnan(actual[mask]).all() + + +def test_shape_with_zero(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/zero/example" + actual = zarr.open(fs.get_mapper(url), mode="r")[...] + assert numpy.array_equal(actual, arr_with_zero_dim) + + +def test_dataframe_group(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/table" + grp = zarr.open(fs.get_mapper(url), mode="r") + assert set(grp.keys()) == set(table_tree.keys()) + + for key in grp.keys(): + for col in grp[key].keys(): + actual = grp[key][col][...] + expected = df[col] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("key", list(table_tree.keys())) +def test_dataframe_single(key, server, fs): + url = f"http://localhost:{server.port}/zarr/v2/table/{key}" + grp = zarr.open(fs.get_mapper(url), mode="r") + + for col in df.columns: + actual = grp[col][...] + expected = df[col] + assert numpy.array_equal(actual, expected) + + +@pytest.mark.parametrize("key", list(table_tree.keys())) +def test_dataframe_column(key, server, fs): + for col in df.columns: + url = f"http://localhost:{server.port}/zarr/v2/table/{key}/{col}" + arr = zarr.open(fs.get_mapper(url), mode="r") + actual = arr[...] + expected = df[col] + assert numpy.array_equal(actual, expected) + + +def test_writing(server, fs): + url = f"http://localhost:{server.port}/zarr/v2/nested/array" + + with pytest.raises(NotImplementedError): + grp = zarr.open(fs.get_mapper(url), mode="w") + + with pytest.raises(zarr.errors.ReadOnlyError): + grp = zarr.open(fs.get_mapper(url), mode="r") + grp["random_2d"][0, 0] = 0.0 diff --git a/tiled/_tests/utils.py b/tiled/_tests/utils.py index b3a4bc9df..f00bc68a4 100644 --- a/tiled/_tests/utils.py +++ b/tiled/_tests/utils.py @@ -3,12 +3,15 @@ import sqlite3 import sys import tempfile +import threading +import time import uuid from enum import IntEnum from pathlib import Path import httpx import pytest +import uvicorn from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine @@ -92,3 +95,30 @@ def sqlite_from_dump(filename): conn.executescript(path.read_text()) conn.close() yield database_path + + +class ThreadedServer(uvicorn.Server): + # https://github.com/encode/uvicorn/discussions/1103#discussioncomment-941726 + + def install_signal_handlers(self): + pass + + @contextlib.contextmanager + def run_in_thread(self): + thread = threading.Thread(target=self.run) + thread.start() + try: + # Wait for server to start up, or raise TimeoutError. + for _ in range(100): + time.sleep(0.1) + if self.started: + break + else: + raise TimeoutError("Server did not start in 10 seconds.") + + # Get the actual hostname and port number + self.host, self.port = self.servers[0].sockets[0].getsockname() + yield f"http://{self.host}:{self.port}" + finally: + self.should_exit = True + thread.join() diff --git a/tiled/adapters/sparse.py b/tiled/adapters/sparse.py index 60b122d5c..c6771f942 100644 --- a/tiled/adapters/sparse.py +++ b/tiled/adapters/sparse.py @@ -6,6 +6,7 @@ import sparse from numpy._typing import NDArray +from ..structures.array import BuiltinDtype from ..structures.core import Spec, StructureFamily from ..structures.sparse import COOStructure from .array import slice_and_shape_from_block_and_chunks @@ -49,6 +50,7 @@ def from_arrays( dims=dims, shape=shape, chunks=tuple((dim,) for dim in shape), + data_type=BuiltinDtype.from_numpy_dtype(data.dtype), resizable=False, ) return cls( @@ -133,6 +135,7 @@ def from_global_ref( dims=dims, shape=shape, chunks=chunks, + data_type=BuiltinDtype.from_numpy_dtype(data.dtype), resizable=False, ) return cls( diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index 7a914965c..e156549bd 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -1,5 +1,4 @@ import builtins -import collections.abc import os import sys from typing import Any, Iterator, List, Optional, Tuple, Union @@ -19,6 +18,11 @@ from .protocols import AccessPolicy from .type_alliases import JSON, NDSlice +if sys.version_info < (3, 9): + from typing_extensions import Mapping as MappingType +else: + from collections.abc import Mapping as MappingType + INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7")) @@ -27,17 +31,17 @@ def read_zarr( structure: Optional[ArrayStructure] = None, **kwargs: Any, ) -> Union["ZarrGroupAdapter", ArrayAdapter]: - """ + """Create an adapter for zarr Group or Array Parameters ---------- - data_uri : - structure : - kwargs : + data_uri : location of the zarr resource, e.g. 'file://localhost/data/arr1' + structure : specification of the shape, chunks, and data type + kwargs : any kwargs accepted by ZarrGroupAdapter or ZarrArrayAdapter Returns ------- - + Initialized ZarrGroupAdapter or ZarrArrayAdapter. """ filepath = path_from_uri(data_uri) zarr_obj = zarr.open(filepath) # Group or Array @@ -91,14 +95,17 @@ def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]: ] def _stencil(self) -> Tuple[slice, ...]: - """ - Trims overflow because Zarr always has equal-sized chunks. + """Trims overflow because Zarr always has equal-sized chunks. + Returns ------- """ return tuple(builtins.slice(0, dim) for dim in self.structure().shape) + def get(self, key: str) -> Union[ArrayAdapter, None]: + return None + def read( self, slice: NDSlice = ..., @@ -184,16 +191,6 @@ async def write_block( self._array[block_slice] = data -if sys.version_info < (3, 9): - from typing_extensions import Mapping - - MappingType = Mapping -else: - import collections - - MappingType = collections.abc.Mapping - - class ZarrGroupAdapter( MappingType[str, Union["ArrayAdapter", "ZarrGroupAdapter"]], IndexersMixin, diff --git a/tiled/client/container.py b/tiled/client/container.py index 7a87ce507..2434185ee 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -886,6 +886,7 @@ def write_sparse( >>> x.write_block(coords=[[2, 4]], data=[3.1, 2.8], block=(0,)) >>> x.write_block(coords=[[0, 1]], data=[6.7, 1.2], block=(1,)) """ + from ..structures.array import BuiltinDtype from ..structures.sparse import COOStructure structure = COOStructure( @@ -893,6 +894,7 @@ def write_sparse( # This method only supports single-chunk COO arrays. chunks=tuple((dim,) for dim in shape), dims=dims, + data_type=BuiltinDtype.from_numpy_dtype(data.dtype), ) client = self.new( StructureFamily.sparse, diff --git a/tiled/examples/generated.py b/tiled/examples/generated.py index 014c9c82c..0a6d1343d 100644 --- a/tiled/examples/generated.py +++ b/tiled/examples/generated.py @@ -17,24 +17,25 @@ from tiled.adapters.xarray import DatasetAdapter print("Generating large example data...", file=sys.stderr) +rng = numpy.random.default_rng(seed=42) data = { - "big_image": numpy.random.random((10_000, 10_000)), - "small_image": numpy.random.random((300, 300)), - "medium_image": numpy.random.random((1000, 1000)), - "tiny_image": numpy.random.random((50, 50)), - "tiny_cube": numpy.random.random((50, 50, 50)), - "tiny_hypercube": numpy.random.random((50, 50, 50, 50, 50)), - "high_entropy": numpy.random.random((100, 100)), - "low_entropy": numpy.ones((100, 100)), - "short_column": numpy.random.random(100), - "tiny_column": numpy.random.random(10), - "long_column": numpy.random.random(100_000), + "big_image": rng.random((10_000, 10_000)), + "small_image": rng.random((300, 300)), + "medium_image": rng.random((1000, 1000)), + "tiny_image": rng.random((50, 50)), + "tiny_cube": rng.random((50, 50, 50)), + "tiny_hypercube": rng.random((50, 50, 50, 50, 50)), + "high_entropy": rng.integers(-10, 10, size=(100, 100)), + "low_entropy": numpy.ones((100, 100), dtype="int32"), + "short_column": rng.integers(10, size=100, dtype=numpy.dtype("uint8")), + "tiny_column": rng.random(10), + "long_column": rng.random(100_000), } -temp = 15 + 8 * numpy.random.randn(2, 2, 3) -precip = 10 * numpy.random.rand(2, 2, 3) +temp = 15 + 8 * rng.normal(size=(2, 2, 3)) +precip = 10 * rng.uniform(size=(2, 2, 3)) lon = [[-99.83, -99.32], [-99.79, -99.23]] lat = [[42.25, 42.21], [42.63, 42.59]] -sparse_arr = numpy.random.random((100, 100)) +sparse_arr = rng.random((100, 100)) sparse_arr[sparse_arr < 0.9] = 0 # fill most of the array with zeros awkward_arr = awkward.Array( [[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}], [], [{"x": 3.3, "y": [1, 2, 3]}]] @@ -43,48 +44,84 @@ print("Done generating example data.", file=sys.stderr) mapping = { - "big_image": ArrayAdapter.from_array(data["big_image"]), - "small_image": ArrayAdapter.from_array(data["small_image"]), - "medium_image": ArrayAdapter.from_array(data["medium_image"]), - "sparse_image": COOAdapter.from_coo(sparse.COO(sparse_arr)), - "awkward_array": AwkwardAdapter.from_array(awkward_arr), - "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), - "tiny_cube": ArrayAdapter.from_array(data["tiny_cube"]), - "tiny_hypercube": ArrayAdapter.from_array(data["tiny_hypercube"]), - "short_table": DataFrameAdapter.from_pandas( - pandas.DataFrame( - { - "A": data["short_column"], - "B": 2 * data["short_column"], - "C": 3 * data["short_column"], - }, - index=pandas.Index(numpy.arange(len(data["short_column"])), name="index"), - ), - npartitions=1, - metadata={"animal": "dog", "color": "red"}, + "scalars": MapAdapter( + { + "pi": ArrayAdapter.from_array(3.14159), + "e_arr": ArrayAdapter.from_array(["2.71828"]), + "fsc": ArrayAdapter.from_array("1/137"), + "fortytwo": ArrayAdapter.from_array(42), + }, + metadata={"numbers": "constants", "precision": 5}, ), - "long_table": DataFrameAdapter.from_pandas( - pandas.DataFrame( - { - "A": data["long_column"], - "B": 2 * data["long_column"], - "C": 3 * data["long_column"], - }, - index=pandas.Index(numpy.arange(len(data["long_column"])), name="index"), - ), - npartitions=5, - metadata={"animal": "dog", "color": "green"}, + "nested": MapAdapter( + { + "images": MapAdapter( + { + "tiny_image": ArrayAdapter.from_array(data["tiny_image"]), + "small_image": ArrayAdapter.from_array(data["small_image"]), + "medium_image": ArrayAdapter.from_array( + data["medium_image"], chunks=((250,) * 4, (100,) * 10) + ), + "big_image": ArrayAdapter.from_array(data["big_image"]), + }, + metadata={"animal": "cat", "color": "green"}, + ), + "cubes": MapAdapter( + { + "tiny_cube": ArrayAdapter.from_array(data["tiny_cube"]), + "tiny_hypercube": ArrayAdapter.from_array(data["tiny_hypercube"]), + }, + metadata={"animal": "dog", "color": "red"}, + ), + "sparse_image": COOAdapter.from_coo(sparse.COO(sparse_arr)), + "awkward_array": AwkwardAdapter.from_array(awkward_arr), + }, + metadata={"animal": "cat", "color": "green"}, ), - "wide_table": DataFrameAdapter.from_pandas( - pandas.DataFrame( - { - letter: i * data["tiny_column"] - for i, letter in enumerate(string.ascii_uppercase, start=1) - }, - index=pandas.Index(numpy.arange(len(data["tiny_column"])), name="index"), - ), - npartitions=1, - metadata={"animal": "dog", "color": "red"}, + "tables": MapAdapter( + { + "short_table": DataFrameAdapter.from_pandas( + pandas.DataFrame( + { + "A": data["short_column"], + "B": 2 * data["short_column"], + "C": 3 * data["short_column"], + }, + index=pandas.Index( + numpy.arange(len(data["short_column"])), name="index" + ), + ), + npartitions=1, + metadata={"animal": "dog", "color": "red"}, + ), + "long_table": DataFrameAdapter.from_pandas( + pandas.DataFrame( + { + "A": data["long_column"], + "B": 2 * data["long_column"], + "C": 3 * data["long_column"], + }, + index=pandas.Index( + numpy.arange(len(data["long_column"])), name="index" + ), + ), + npartitions=5, + metadata={"animal": "dog", "color": "green"}, + ), + "wide_table": DataFrameAdapter.from_pandas( + pandas.DataFrame( + { + letter: i * data["tiny_column"] + for i, letter in enumerate(string.ascii_uppercase, start=1) + }, + index=pandas.Index( + numpy.arange(len(data["tiny_column"])), name="index" + ), + ), + npartitions=1, + metadata={"animal": "dog", "color": "red"}, + ), + } ), "structured_data": MapAdapter( { @@ -110,7 +147,7 @@ }, metadata={"animal": "cat", "color": "green"}, ), - "flat_array": ArrayAdapter.from_array(numpy.random.random(100)), + "flat_array": ArrayAdapter.from_array(rng.random(100)), "low_entropy": ArrayAdapter.from_array(data["low_entropy"]), "high_entropy": ArrayAdapter.from_array(data["high_entropy"]), # Below, an asynchronous task modifies this value over time. diff --git a/tiled/server/app.py b/tiled/server/app.py index 1f991d666..304ed263d 100644 --- a/tiled/server/app.py +++ b/tiled/server/app.py @@ -3,14 +3,15 @@ import contextvars import logging import os +import re import secrets import sys import urllib.parse +import urllib.parse as urlparse import warnings from contextlib import asynccontextmanager from functools import lru_cache, partial from pathlib import Path -from typing import List import anyio import packaging.version @@ -59,6 +60,7 @@ get_root_url, record_timing, ) +from .zarr import router as zarr_router SAFE_METHODS = {"GET", "HEAD", "OPTIONS", "TRACE"} SENSITIVE_COOKIES = { @@ -66,6 +68,7 @@ } CSRF_HEADER_NAME = "x-csrf" CSRF_QUERY_PARAMETER = "csrf" +ZARR_PREFIX = "/zarr/v2" MINIMUM_SUPPORTED_PYTHON_CLIENT_VERSION = packaging.version.parse("0.1.0a104") @@ -349,6 +352,7 @@ async def unhandled_exception_handler( ) app.include_router(router, prefix="/api/v1") + app.include_router(zarr_router, prefix=ZARR_PREFIX) # The Tree and Authenticator have the opportunity to add custom routes to # the server here. (Just for example, a Tree of BlueskyRuns uses this @@ -427,11 +431,7 @@ async def unhandled_exception_handler( # opporunity to register custom query types before startup. app.get( "/api/v1/search/{path:path}", - response_model=schemas.Response[ - List[schemas.Resource[schemas.NodeAttributes, dict, dict]], - schemas.PaginationLinks, - dict, - ], + response_model=schemas.SearchResponse, )(patch_route_signature(search, query_registry)) app.get( "/api/v1/distinct/{path:path}", @@ -881,6 +881,32 @@ async def current_principal_logging_filter(request: Request, call_next): current_principal.set(request.state.principal) return response + @app.middleware("http") + async def resolve_zarr_uris(request: Request, call_next): + response = await call_next(request) + + # If a zarr block is requested, e.g. http://localhost:8000/zarr/v2/array/0.1.2.3, replace the block spec + # with a properly formatted query parameter: http://localhost:8000/zarr/v2/array?block=0,1,2,3 (with ',' + # safely encoded) + if request.url.path.startswith(ZARR_PREFIX) and response.status_code == 404: + # Extract the last bit of the path + zarr_path = ( + request.url.path[len(ZARR_PREFIX) :] # noqa: #E203 + .strip("/") + .split("/") + ) + zarr_block = zarr_path[-1] if len(zarr_path) > 0 else "" + if re.compile(r"^(?:\d+\.)*\d+$").fullmatch(zarr_block): + # Create a query string if the last part is in the zarr block form, e.g. `m.n.p. ... .q` + query = dict(urlparse.parse_qsl(request.url.query)) + query.update({"block": zarr_block.replace(".", ",")}) + request.scope["query_string"] = urlparse.urlencode(query).encode() + request.scope["path"] = ZARR_PREFIX + "/" + "/".join(zarr_path[:-1]) + + response = await call_next(request) + + return response + app.add_middleware( CorrelationIdMiddleware, header_name="X-Tiled-Request-ID", diff --git a/tiled/server/dependencies.py b/tiled/server/dependencies.py index f13c676fc..c756a80d3 100644 --- a/tiled/server/dependencies.py +++ b/tiled/server/dependencies.py @@ -73,7 +73,6 @@ async def inner( """ path_parts = [segment for segment in path.split("/") if segment] entry = root_tree - # If the entry/adapter can take a session state, pass it in. # The entry/adapter may return itself or a different object. if hasattr(entry, "with_session_state") and session_state: diff --git a/tiled/server/pydantic_sparse.py b/tiled/server/pydantic_sparse.py index 145f272d0..6c7d35e05 100644 --- a/tiled/server/pydantic_sparse.py +++ b/tiled/server/pydantic_sparse.py @@ -2,12 +2,14 @@ import pydantic +from ..structures.array import BuiltinDtype, StructDtype from ..structures.sparse import SparseLayout class COOStructure(pydantic.BaseModel): shape: Tuple[int, ...] # tuple of ints like (3, 3) chunks: Tuple[Tuple[int, ...], ...] # tuple-of-tuples-of-ints like ((3,), (3,)) + data_type: Optional[Union[BuiltinDtype, StructDtype]] = None dims: Optional[Tuple[str, ...]] = None # None or tuple of names like ("x", "y") resizable: Union[bool, Tuple[bool, ...]] = False layout: SparseLayout = SparseLayout.COO diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index fa7f039e9..3cc757b84 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -567,4 +567,8 @@ class PatchMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]): data_sources: Optional[List[DataSource]] +SearchResponse = Response[ + List[Resource[NodeAttributes, Dict, Dict]], PaginationLinks, Dict +] + NodeStructure.model_rebuild() diff --git a/tiled/server/utils.py b/tiled/server/utils.py index 0c4368d07..bc7e55d91 100644 --- a/tiled/server/utils.py +++ b/tiled/server/utils.py @@ -1,5 +1,6 @@ import contextlib import time +from typing import Literal from ..access_policies import NO_ACCESS from ..adapters.mapping import MapAdapter @@ -41,6 +42,13 @@ def get_base_url(request): return f"{get_root_url(request)}/api/v1" +def get_zarr_url(request, version: Literal["v2", "v3"] = "v2"): + """ + Base URL for the Zarr API + """ + return f"{get_root_url(request)}/zarr/{version}" + + def get_root_url_low_level(request_headers, scope): # We want to get the scheme, host, and root_path (if any) # *as it appears to the client* for use in assembling links to diff --git a/tiled/server/zarr.py b/tiled/server/zarr.py new file mode 100644 index 000000000..e8d536114 --- /dev/null +++ b/tiled/server/zarr.py @@ -0,0 +1,203 @@ +import json +from typing import Optional, Tuple + +import numcodecs +from fastapi import APIRouter, HTTPException, Request +from starlette.responses import Response +from starlette.status import ( + HTTP_400_BAD_REQUEST, + HTTP_404_NOT_FOUND, + HTTP_500_INTERNAL_SERVER_ERROR, +) + +from ..structures.array import StructDtype +from ..structures.core import StructureFamily +from ..utils import ensure_awaitable +from .dependencies import SecureEntry +from .utils import record_timing + +ZARR_BLOCK_SIZE = 10000 +ZARR_BYTE_ORDER = "C" +ZARR_CODEC_SPEC = { + "blocksize": 0, + "clevel": 5, + "cname": "lz4", + "id": "blosc", + "shuffle": 1, +} +ZARR_DATETIME64_PRECISION = "ns" + +zarr_codec = numcodecs.get_codec(ZARR_CODEC_SPEC) + +router = APIRouter() + + +def convert_chunks_for_zarr(tiled_chunks: Tuple[Tuple[int]]): + """Convert full tiled/dask chunk specification into zarr format + + Zarr only accepts chunks of constant size along each dimension; this function finds a unique representation of + (possibly variable-sized chunks) internal to Tiled ArrayAdapter in terms of zarr blocks. + + Zarr chunks must be at least of size 1 (even for zero-dimensional arrays). + """ + return [min(ZARR_BLOCK_SIZE, max(*tc, 1)) for tc in tiled_chunks] + + +@router.get("{path:path}.zgroup", name="Root .zgroup metadata") +@router.get("/{path:path}/.zgroup", name="Zarr .zgroup metadata") +async def get_zarr_group_metadata( + request: Request, + entry=SecureEntry( + scopes=["read:data", "read:metadata"], + structure_families={ + StructureFamily.table, + StructureFamily.container, + StructureFamily.array, + }, + ), +): + # Usual (unstructured) array; should respond to /.zarray instead + if entry.structure_family == StructureFamily.array and not isinstance( + entry.structure().data_type, StructDtype + ): + raise HTTPException(status_code=HTTP_404_NOT_FOUND) + + # Structured numpy array, Container, or Table + return Response(json.dumps({"zarr_format": 2}), status_code=200) + + +@router.get("/{path:path}/.zarray", name="Zarr .zarray metadata") +async def get_zarr_array_metadata( + request: Request, + entry=SecureEntry( + scopes=["read:data", "read:metadata"], + structure_families={StructureFamily.array, StructureFamily.sparse}, + ), +): + # Only StructureFamily.array and StructureFamily.sparse can respond to `/.zarray` querries. Zarr will try to + # request .zarray on all other nodes in Tiled (not included in SecureEntry above), in which case the server + # will return an 404 error; this is the expected behaviour, which will signal zarr to try /.zgroup instead. + structure = entry.structure() + if isinstance(structure.data_type, StructDtype): + # Structured numpy array should be treated as a DataFrame and will respond to /.zgroup instead + raise HTTPException(status_code=HTTP_404_NOT_FOUND) + try: + zarray_spec = { + "chunks": convert_chunks_for_zarr(structure.chunks), + "compressor": ZARR_CODEC_SPEC, + "dtype": structure.data_type.to_numpy_str(), + "fill_value": 0, + "filters": None, + "order": ZARR_BYTE_ORDER, + "shape": list(structure.shape), + "zarr_format": 2, + } + return Response(json.dumps(zarray_spec), status_code=200) + except Exception as err: + print(f"Can not create .zarray metadata, {err}") + raise HTTPException( + status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=err.args[0] + ) + + +@router.get( + "/{path:path}", name="Zarr group (directory) structure or a chunk of a zarr array" +) +async def get_zarr_array( + request: Request, + block: Optional[str] = None, + entry=SecureEntry( + scopes=["read:data"], + structure_families={ + StructureFamily.array, + StructureFamily.sparse, + StructureFamily.table, + StructureFamily.container, + }, + ), +): + # Remove query params and the trailing slash from the url + url = str(request.url).split("?")[0].rstrip("/") + + if entry.structure_family == StructureFamily.container: + # List the contents of a "simulated" zarr directory (excluding .zarray and .zgroup files) + if hasattr(entry, "keys_range"): + keys = await entry.keys_range(offset=0, limit=None) + else: + keys = entry.keys() + body = json.dumps([url + "/" + key for key in keys]) + + return Response(body, status_code=200, media_type="application/json") + + elif entry.structure_family == StructureFamily.table: + # List the columns of the table -- they will be accessed separately as arrays + body = json.dumps([url + "/" + key for key in entry.structure().columns]) + + return Response(body, status_code=200, media_type="application/json") + + elif entry.structure_family == StructureFamily.array and isinstance( + entry.structure().data_type, StructDtype + ): + # List the column names of the structured array -- they will be accessed separately + body = json.dumps( + [url + "/" + f.name for f in entry.structure().data_type.fields] + ) + + return Response(body, status_code=200, media_type="application/json") + + elif entry.structure_family in {StructureFamily.array, StructureFamily.sparse}: + # Return the actual array values for a single block of zarr array + if block is not None: + import numpy as np + from sparse import SparseArray + + zarr_block_indx = [int(i) for i in block.split(",")] + zarr_block_spec = convert_chunks_for_zarr(entry.structure().chunks) + if (not (zarr_block_spec == [] and zarr_block_indx == [0])) and ( + len(zarr_block_spec) != len(zarr_block_indx) + ): + # Not a scalar and shape doesn't match + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, + detail=f"Requested zarr block index {zarr_block_indx} is inconsistent with the shape of array, {entry.structure().shape}.", # noqa + ) + + # Indices of the array slices in each dimension that correspond to the requested zarr block + block_slices = tuple( + [ + slice(i * c, (i + 1) * c) + for i, c in zip(zarr_block_indx, zarr_block_spec) + ] + ) + try: + with record_timing(request.state.metrics, "read"): + array = await ensure_awaitable(entry.read, slice=block_slices) + except IndexError: + raise HTTPException( + status_code=HTTP_400_BAD_REQUEST, + detail=f"Index of zarr block {zarr_block_indx} is out of range.", + ) + + if isinstance(array, SparseArray): + array = array.todense() + + # Padd the last slices with zeros if needed to ensure all zarr blocks have same shapes + padding_size = [ + max(0, sl.stop - sh) + for sl, sh in zip(block_slices, entry.structure().shape) + ] + if sum(padding_size) > 0: + array = np.pad(array, [(0, p) for p in padding_size], mode="constant") + + # Ensure the array is contiguous and encode it; equivalent to `buf = zarr.array(array).store['0.0']` + array = array.astype(array.dtype, order=ZARR_BYTE_ORDER, copy=False) + buf = zarr_codec.encode(array) + if not isinstance(buf, bytes): + buf = array.tobytes(order="A") + + return Response(buf, status_code=200) + + else: + # TODO: + # Entire array (root uri) is requested -- never happens, but need to decide what to return here + return Response(json.dumps({}), status_code=200) diff --git a/tiled/structures/sparse.py b/tiled/structures/sparse.py index 354d150d7..da50c3a7b 100644 --- a/tiled/structures/sparse.py +++ b/tiled/structures/sparse.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from typing import Optional, Tuple, Union +from .array import BuiltinDtype, StructDtype + class SparseLayout(str, enum.Enum): # Only COO is currently supported, but this lays a path @@ -13,6 +15,7 @@ class SparseLayout(str, enum.Enum): class COOStructure: chunks: Tuple[Tuple[int, ...], ...] # tuple-of-tuples-of-ints like ((3,), (3,)) shape: Tuple[int, ...] # tuple of ints like (3, 3) + data_type: Optional[Union[BuiltinDtype, StructDtype]] = None dims: Optional[Tuple[str, ...]] = None # None or tuple of names like ("x", "y") resizable: Union[bool, Tuple[bool, ...]] = False layout: SparseLayout = SparseLayout.COO @@ -20,7 +23,13 @@ class COOStructure: @classmethod def from_json(cls, structure): + data_type = structure.get("data_type", None) + if data_type is not None and "fields" in data_type: + data_type = StructDtype.from_json(data_type) + else: + data_type = BuiltinDtype.from_json(data_type) return cls( + data_type=data_type, chunks=tuple(map(tuple, structure["chunks"])), shape=tuple(structure["shape"]), dims=structure["dims"],