diff --git a/docs/source/explanations/catalog.md b/docs/source/explanations/catalog.md index 7b6f7be3e..9bd7df587 100644 --- a/docs/source/explanations/catalog.md +++ b/docs/source/explanations/catalog.md @@ -54,7 +54,8 @@ and `assets`, describes the format, structure, and location of the data. to the Adapter - `management` --- enum indicating whether the data is registered `"external"` data or `"writable"` data managed by Tiled -- `structure_family` --- enum of structure types (`"container"`, `"array"`, `"table"`, ...) +- `structure_family` --- enum of structure types (`"container"`, `"array"`, `"table"`, + etc. -- except for `consolidated`, which can not be assigned to a Data Source) - `structure_id` --- a foreign key to the `structures` table - `node_id` --- foreign key to `nodes` - `id` --- integer primary key diff --git a/docs/source/explanations/structures.md b/docs/source/explanations/structures.md index 65ce55deb..89039231d 100644 --- a/docs/source/explanations/structures.md +++ b/docs/source/explanations/structures.md @@ -11,7 +11,8 @@ The structure families are: * array --- a strided array, like a [numpy](https://numpy.org) array * awkward --- nested, variable-sized data (as implemented by [AwkwardArray](https://awkward-array.org/)) -* container --- a of other structures, akin to a dictionary or a directory +* consolidated --- a container-like structure to combine tables and arrays in a common namespace +* container --- a collection of other structures, akin to a dictionary or a directory * sparse --- a sparse array (i.e. an array which is mostly zeros) * table --- tabular data, as in [Apache Arrow](https://arrow.apache.org) or [pandas](https://pandas.pydata.org/) @@ -575,3 +576,76 @@ response. "count": 5 } ``` + +### Consolidated + +This is a specialized container-like structure designed to link together multiple tables and arrays that store +related scientific data. It does not support nesting but provides a common namespace across all columns of the +contained tables along with the arrays (thus, name collisions are forbidden). This allows to further abstract out +the disparate internal storage mechanisms (e.g. Parquet for tables and zarr for arrays) and present the user with a +smooth homogeneous interface for data access. Consolidated structures do not support pagination and are not +recommended for "wide" datasets with more than ~1000 items (cloumns and arrays) in the namespace. + +Below is an example of a Consolidated structure that describes two tables and two arrays of various sizes. Their +respective structures are specfied in the `parts` list, and `all_keys` defines the internal namespace of directly +addressible columns and arrays. + +```json +{ + "parts": [ + { + "structure_family": "table", + "structure": { + "arrow_schema": "data:application/vnd.apache.arrow.file;base64,/////...FFFF", + "npartitions": 1, + "columns": ["A", "B"], + "resizable": false + }, + "name": "table1" + }, + { + "structure_family": "table", + "structure": { + "arrow_schema": "data:application/vnd.apache.arrow.file;base64,/////...FFFF", + "npartitions": 1, + "columns": ["C", "D", "E"], + "resizable": false + }, + "name": "table2" + }, + { + "structure_family": "array", + "structure": { + "data_type": { + "endianness": "little", + "kind": "f", + "itemsize": 8, + "dt_units": null + }, + "chunks": [[3], [5]], + "shape": [3, 5], + "dims": null, + "resizable": false + }, + "name": "F" + }, + { + "structure_family": "array", + "structure": { + "data_type": { + "endianness": "not_applicable", + "kind": "u", + "itemsize": 1, + "dt_units": null + }, + "chunks": [[5], [7], [3]], + "shape": [5, 7, 3], + "dims": null, + "resizable": false + }, + "name": "G" + } + ], + "all_keys": ["A", "B", "C", "D", "E", "F", "G"] +} +``` diff --git a/docs/source/how-to/register.md b/docs/source/how-to/register.md index 2ca73f068..835ea1cb3 100644 --- a/docs/source/how-to/register.md +++ b/docs/source/how-to/register.md @@ -72,7 +72,10 @@ Sometimes it is necessary to take more manual control of this registration process, such as if you want to take advantage of particular knowledge about the files to specify particular `metadata` or `specs`. -Use the Python client, as in this example. +#### Registering external data + +To register data from external files in Tiled, one can use the Python client and +construct Data Source object explicitly passing the list of assets, as in the following example. ```py import numpy @@ -112,3 +115,36 @@ client.new( specs=[], ) ``` + +#### Writing a consolidated structure + +Similarly, to create a consolidated container structure, one needs to specify +its constituents as separate Data Sources. For example, to consolidate a table +and an array, consider the following example + +```python +import pandas + +rng = numpy.random.default_rng(12345) +arr = rng.random(size=(3, 5), dtype="float64") +df = pandas.DataFrame({"A": ["one", "two", "three"], "B": [1, 2, 3]}) + +node = client.create_consolidated( + [ + DataSource( + structure_family=StructureFamily.table, + structure=TableStructure.from_pandas(df), + name="table1", + ), + DataSource( + structure_family=StructureFamily.array, + structure=ArrayStructure.from_array(arr), + name="C", + ) + ] +) + +# Write the data +node.parts["table1"].write(df) +node.parts["C"].write_block(arr, (0, 0)) +``` diff --git a/docs/source/reference/service.md b/docs/source/reference/service.md index 213a81d57..a28b00fb8 100644 --- a/docs/source/reference/service.md +++ b/docs/source/reference/service.md @@ -104,6 +104,8 @@ See {doc}`../explanations/structures` for more context. tiled.structures.array.BuiltinDtype tiled.structures.array.Endianness tiled.structures.array.Kind + tiled.structures.consolidated.ConsolidatedStructure + tiled.structures.consolidated.ConsolidatedStructurePart tiled.structures.core.Spec tiled.structures.core.StructureFamily tiled.structures.table.TableStructure diff --git a/tiled/_tests/test_consolidated.py b/tiled/_tests/test_consolidated.py new file mode 100644 index 000000000..103be7520 --- /dev/null +++ b/tiled/_tests/test_consolidated.py @@ -0,0 +1,88 @@ +import numpy +import pandas +import pandas.testing +import pytest + +from ..catalog import in_memory +from ..client import Context, from_context +from ..server.app import build_app +from ..structures.array import ArrayStructure +from ..structures.core import StructureFamily +from ..structures.data_source import DataSource +from ..structures.table import TableStructure + +rng = numpy.random.default_rng(12345) + +df1 = pandas.DataFrame({"A": ["one", "two", "three"], "B": [1, 2, 3]}) +df2 = pandas.DataFrame( + { + "C": ["red", "green", "blue", "white"], + "D": [10.0, 20.0, 30.0, 40.0], + "E": [0, 0, 0, 0], + } +) +arr1 = rng.random(size=(3, 5), dtype="float64") +arr2 = rng.integers(0, 255, size=(5, 7, 3), dtype="uint8") +md = {"md_key1": "md_val1", "md_key2": 2} + + +@pytest.fixture(scope="module") +def tree(tmp_path_factory): + return in_memory(writable_storage=tmp_path_factory.getbasetemp()) + + +@pytest.fixture(scope="module") +def context(tree): + with Context.from_app(build_app(tree)) as context: + client = from_context(context) + x = client.create_consolidated( + [ + DataSource( + structure_family=StructureFamily.table, + structure=TableStructure.from_pandas(df1), + name="table1", + ), + DataSource( + structure_family=StructureFamily.table, + structure=TableStructure.from_pandas(df2), + name="table2", + ), + DataSource( + structure_family=StructureFamily.array, + structure=ArrayStructure.from_array(arr1), + name="F", + ), + DataSource( + structure_family=StructureFamily.array, + structure=ArrayStructure.from_array(arr2), + name="G", + ), + ], + key="x", + metadata=md, + ) + # Write by data source. + x.parts["table1"].write(df1) + x.parts["table2"].write(df2) + x.parts["F"].write_block(arr1, (0, 0)) + x.parts["G"].write_block(arr2, (0, 0, 0)) + + yield context + + +def test_iterate_parts(context): + client = from_context(context) + for part in client["x"].parts: + client["x"].parts[part].read() + + +def test_iterate_columns(context): + client = from_context(context) + for col in client["x"]: + client["x"][col].read() + client[f"x/{col}"].read() + + +def test_metadata(context): + client = from_context(context) + assert client["x"].metadata == md diff --git a/tiled/_tests/test_dataframe.py b/tiled/_tests/test_dataframe.py index 1df2163bf..01570356b 100644 --- a/tiled/_tests/test_dataframe.py +++ b/tiled/_tests/test_dataframe.py @@ -41,6 +41,17 @@ pandas.DataFrame({f"column_{i:03d}": i * numpy.ones(5) for i in range(10)}), npartitions=1, ), + # a dataframe with mixed types + "diverse": DataFrameAdapter.from_pandas( + pandas.DataFrame( + { + "A": numpy.array([1, 2, 3], dtype="|u8"), + "B": numpy.array([1, 2, 3], dtype=" TableStructure: """ return self._structure + + def get(self, key: str) -> Union[ArrayAdapter, None]: + return self.dataframe_adapter.get(key) diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index 375645e5e..dab586edb 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -156,30 +156,25 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._structure.columns!r})" def __getitem__(self, key: str) -> ArrayAdapter: - """ + # Must compute to determine shape + array = self.read([key])[key].values - Parameters - ---------- - key : + # Convert (experimental) pandas.StringDtype to numpy's unicode string dtype + if isinstance(array.dtype, pandas.StringDtype): + import numpy - Returns - ------- - - """ - # Must compute to determine shape. - return ArrayAdapter.from_array(self.read([key])[key].values) + max_size = max((len(i) for i in array.ravel())) + array = array.astype(dtype=numpy.dtype(f" Iterator[Tuple[str, ArrayAdapter]]: - """ + return ArrayAdapter.from_array(array) - Returns - ------- + def get(self, key: str) -> Union[ArrayAdapter, None]: + if key not in self.structure().columns: + return None + return self[key] - """ - yield from ( - (key, ArrayAdapter.from_array(self.read([key])[key].values)) - for key in self._structure.columns - ) + def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: + yield from ((key, self[key]) for key in self._structure.columns) def metadata(self) -> JSON: """ diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 13bd2353c..007dd9d11 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -62,6 +62,8 @@ ZARR_MIMETYPE, ) from ..query_registration import QueryTranslationRegistry +from ..server.pydantic_consolidated import ConsolidatedStructure +from ..server.pydantic_container import ContainerStructure from ..server.schemas import Asset, DataSource, Management, Revision, Spec from ..structures.core import StructureFamily from ..utils import ( @@ -285,6 +287,8 @@ def __init__( context, node, *, + structure_family=None, + data_sources=None, conditions=None, queries=None, sorting=None, @@ -303,12 +307,18 @@ def __init__( self.conditions = conditions or [] self.queries = queries or [] self.structure_family = node.structure_family - self.specs = [Spec.model_validate(spec) for spec in node.specs] + self.specs = [Spec.model_validate(spec) for spec in node.specs] # parse_obj??? self.ancestors = node.ancestors self.key = node.key self.access_policy = access_policy self.startup_tasks = [self.startup] self.shutdown_tasks = [self.shutdown] + self.structure_family = structure_family or node.structure_family + if data_sources is None: + data_sources = [ + DataSource.from_orm(ds) for ds in (self.node.data_sources or []) + ] + self.data_sources = data_sources def metadata(self): return self.node.metadata_ @@ -347,10 +357,6 @@ async def __aiter__(self): async with self.context.session() as db: return (await db.execute(statement)).scalar().all() - @property - def data_sources(self): - return [DataSource.from_orm(ds) for ds in (self.node.data_sources or [])] - async def asset_by_id(self, asset_id): statement = ( select(orm.Asset) @@ -372,6 +378,11 @@ async def asset_by_id(self, asset_id): return Asset.from_orm(asset) def structure(self): + if self.structure_family == StructureFamily.container: + # Give no inlined contents. + return ContainerStructure(contents=None, count=None) + if self.structure_family == StructureFamily.consolidated: + return ConsolidatedStructure.from_data_sources(self.data_sources) if self.data_sources: assert len(self.data_sources) == 1 # more not yet implemented return self.data_sources[0].structure @@ -399,7 +410,8 @@ async def async_len(self): return (await db.execute(statement)).scalar_one() async def lookup_adapter( - self, segments + self, + segments, ): # TODO: Accept filter for predicate-pushdown. if not segments: return self @@ -435,6 +447,13 @@ async def lookup_adapter( for i in range(len(segments)): catalog_adapter = await self.lookup_adapter(segments[:i]) + if ( + catalog_adapter.structure_family == StructureFamily.consolidated + ) and len(segments[i:]) == 1: + # All the segments but the final segment, segments[-1], resolve + # to a consolidated structure. Dispatch to the consolidated Adapter + # to get the inner Adapter for whatever type of structure it is. + return await ensure_awaitable(catalog_adapter.get, segments[-1]) if catalog_adapter.data_sources: adapter = await catalog_adapter.get_adapter() for segment in segments[i:]: @@ -444,7 +463,9 @@ async def lookup_adapter( return adapter return None return STRUCTURES[node.structure_family]( - self.context, node, access_policy=self.access_policy + self.context, + node, + access_policy=self.access_policy, ) async def get_adapter(self): @@ -644,8 +665,11 @@ async def create_node( data_source.structure_family ] data_source.parameters = {} + data_uri_path_parts = self.segments + [key] + if structure_family == StructureFamily.consolidated: + data_uri_path_parts.append(data_source.name) data_uri = str(self.context.writable_storage) + "".join( - f"/{quote_plus(segment)}" for segment in (self.segments + [key]) + f"/{quote_plus(segment)}" for segment in data_uri_path_parts ) if data_source.mimetype not in INIT_STORAGE: raise HTTPException( @@ -676,7 +700,7 @@ async def create_node( # Obtain and hash the canonical (RFC 8785) representation of # the JSON structure. structure = _prepare_structure( - structure_family, data_source.structure + data_source.structure_family, data_source.structure ) structure_id = compute_structure_id(structure) statement = ( @@ -688,6 +712,7 @@ async def create_node( await db.execute(statement) data_source_orm = orm.DataSource( structure_family=data_source.structure_family, + name=data_source.name, mimetype=data_source.mimetype, management=data_source.management, parameters=data_source.parameters, @@ -1116,6 +1141,34 @@ async def append_partition(self, *args, **kwargs): ) +class CatalogConsolidatedAdapter(CatalogNodeAdapter): + async def get(self, key): + if key not in self.structure().all_keys: + return None + for data_source in self.data_sources: + if data_source.structure_family == StructureFamily.table: + if key in data_source.structure.columns: + return await ensure_awaitable( + self.for_part(data_source.name).get, key + ) + if key == data_source.name: + return self.for_part(data_source.name) + + def for_part(self, name): + for data_source in self.data_sources: + if name == data_source.name: + break + else: + raise ValueError(f"No DataSource named {name} on this node") + return STRUCTURES[data_source.structure_family]( + self.context, + self.node, + access_policy=self.access_policy, + structure_family=data_source.structure_family, + data_sources=[data_source], + ) + + def delete_asset(data_uri, is_directory): url = urlparse(data_uri) if url.scheme == "file": @@ -1506,4 +1559,5 @@ def specs_array_to_json(specs): StructureFamily.container: CatalogContainerAdapter, StructureFamily.sparse: CatalogSparseAdapter, StructureFamily.table: CatalogTableAdapter, + StructureFamily.consolidated: CatalogConsolidatedAdapter, } diff --git a/tiled/catalog/core.py b/tiled/catalog/core.py index a57c88a6f..a81dcb1e2 100644 --- a/tiled/catalog/core.py +++ b/tiled/catalog/core.py @@ -5,6 +5,8 @@ # This is list of all valid revisions (from current to oldest). ALL_REVISIONS = [ + "0dc110294112", + "7c8130c40b8f", "ed3a4223a600", "e756b9381c14", "2ca16566d692", diff --git a/tiled/catalog/migrations/versions/0dc110294112_add_union_to_structure_family_enum.py b/tiled/catalog/migrations/versions/0dc110294112_add_union_to_structure_family_enum.py new file mode 100644 index 000000000..5dc04c83f --- /dev/null +++ b/tiled/catalog/migrations/versions/0dc110294112_add_union_to_structure_family_enum.py @@ -0,0 +1,33 @@ +"""Add 'consolidated' to structure_family enum. + +Revision ID: 0dc110294112 +Revises: 7c8130c40b8f +Create Date: 2024-02-23 09:13:23.658921 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0dc110294112" +down_revision = "7c8130c40b8f" +branch_labels = None +depends_on = None + + +def upgrade(): + connection = op.get_bind() + + if connection.engine.dialect.name == "postgresql": + with op.get_context().autocommit_block(): + op.execute( + sa.text( + "ALTER TYPE structurefamily ADD VALUE IF NOT EXISTS 'consolidated' AFTER 'table'" + ) + ) + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/migrations/versions/7c8130c40b8f_add_name_column_to_data_sources_table.py b/tiled/catalog/migrations/versions/7c8130c40b8f_add_name_column_to_data_sources_table.py new file mode 100644 index 000000000..4809939d5 --- /dev/null +++ b/tiled/catalog/migrations/versions/7c8130c40b8f_add_name_column_to_data_sources_table.py @@ -0,0 +1,25 @@ +"""Add 'name' column to data_sources table. + +Revision ID: 7c8130c40b8f +Revises: e756b9381c14 +Create Date: 2024-02-23 08:53:24.008576 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7c8130c40b8f" +down_revision = "ed3a4223a600" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("data_sources", sa.Column("name", sa.Unicode(1023), nullable=True)) + + +def downgrade(): + # This _could_ be implemented but we will wait for a need since we are + # still in alpha releases. + raise NotImplementedError diff --git a/tiled/catalog/orm.py b/tiled/catalog/orm.py index 051faf4f8..190969eec 100644 --- a/tiled/catalog/orm.py +++ b/tiled/catalog/orm.py @@ -369,6 +369,9 @@ class DataSource(Timestamped, Base): # This relates to the mutability of the data. management = Column(Enum(Management), nullable=False) structure_family = Column(Enum(StructureFamily), nullable=False) + # This is used by `consolidated` structures to address arrays. + # It may have additional uses in the future. + name = Column(Unicode(1023), nullable=True) # many-to-one relationship to Structure structure: Mapped["Structure"] = relationship( diff --git a/tiled/client/base.py b/tiled/client/base.py index bc47414f6..e934ca287 100644 --- a/tiled/client/base.py +++ b/tiled/client/base.py @@ -126,18 +126,13 @@ def __init__( self._metadata_revisions = None self._include_data_sources = include_data_sources attributes = self.item["attributes"] - structure_family = attributes["structure_family"] - if structure is not None: # Allow the caller to optionally hand us a structure that is already # parsed from a dict into a structure dataclass. self._structure = structure - elif structure_family == StructureFamily.container: - self._structure = None else: structure_type = STRUCTURE_TYPES[attributes["structure_family"]] self._structure = structure_type.from_json(attributes["structure"]) - super().__init__() def structure(self): diff --git a/tiled/client/consolidated.py b/tiled/client/consolidated.py new file mode 100644 index 000000000..38a08e228 --- /dev/null +++ b/tiled/client/consolidated.py @@ -0,0 +1,89 @@ +import copy +from urllib.parse import parse_qs, urlparse + +from .base import STRUCTURE_TYPES, BaseClient +from .utils import MSGPACK_MIME_TYPE, ClientError, client_for_item, handle_error + + +class ConsolidatedClient(BaseClient): + def __repr__(self): + return ( + f"<{type(self).__name__} {{" + + ", ".join(f"'{key}'" for key in self.structure().all_keys) + + "}>" + ) + + @property + def parts(self): + return ConsolidatedContents(self) + + def __getitem__(self, key): + if key not in self.structure().all_keys: + raise KeyError(key) + try: + self_link = self.item["links"]["self"] + if self_link.endswith("/"): + self_link = self_link[:-1] + url_path = f"{self_link}/{key}" + params = parse_qs(urlparse(url_path).query) + if self._include_data_sources: + params["include_data_sources"] = True + content = handle_error( + self.context.http_client.get( + url_path, + headers={"Accept": MSGPACK_MIME_TYPE}, + params=params, + ) + ).json() + except ClientError as err: + if err.response.status_code == 404: + raise KeyError(key) + raise + item = content["data"] + return client_for_item( + self.context, + self.structure_clients, + item, + include_data_sources=self._include_data_sources, + ) + + def __iter__(self): + yield from self.structure().all_keys + + +class ConsolidatedContents: + def __init__(self, node): + self.node = node + + def __repr__(self): + return ( + f"<{type(self).__name__} {{" + + ", ".join(f"'{item.name}'" for item in self.node.structure().parts) + + "}>" + ) + + def __getitem__(self, name): + for index, item in enumerate(self.node.structure().parts): + if item.name == name: + structure_family = item.structure_family + structure_dict = item.structure + break + else: + raise KeyError(name) + item = copy.deepcopy(self.node.item) + item["attributes"]["structure_family"] = structure_family + item["attributes"]["structure"] = structure_dict + item["links"] = item["links"]["parts"][index] + structure_type = STRUCTURE_TYPES[structure_family] + structure = structure_type.from_json(structure_dict) + return client_for_item( + self.node.context, + self.node.structure_clients, + item, + structure=structure, + include_data_sources=self.node._include_data_sources, + ) + + def __iter__(self): + for item in self.node.structure().parts: + yield item.name diff --git a/tiled/client/container.py b/tiled/client/container.py index a521ef2a9..2e011fa51 100644 --- a/tiled/client/container.py +++ b/tiled/client/container.py @@ -164,9 +164,8 @@ def __len__(self): # If the contents of this node was provided in-line, there is an # implication that the contents are not expected to be dynamic. Used the # count provided in the structure. - structure = self.item["attributes"]["structure"] - if structure["contents"]: - return structure["count"] + if self.structure() and (self.structure().count is not None): + return self.structure().count now = time.monotonic() if self._cached_len is not None: length, deadline = self._cached_len @@ -199,14 +198,15 @@ def __iter__(self, _ignore_inlined_contents=False): # If the contents of this node was provided in-line, and we don't need # to apply any filtering or sorting, we can slice the in-lined data # without fetching anything from the server. - contents = self.item["attributes"]["structure"]["contents"] + structure = self.structure() if ( - (contents is not None) + structure + and structure.contents and (not self._queries) and ((not self.sorting) or (self.sorting == [("_", 1)])) and (not _ignore_inlined_contents) ): - return (yield from contents) + return (yield from structure.contents) next_page_url = self.item["links"]["search"] while next_page_url is not None: content = handle_error( @@ -305,7 +305,8 @@ def __getitem__(self, keys, _ignore_inlined_contents=False): # to the node of interest without downloading information about # intermediate parents. for i, key in enumerate(keys): - item = (self.item["attributes"]["structure"]["contents"] or {}).get(key) + structure = self.structure() + item = ((structure and structure.contents) or {}).get(key) if (item is None) or _ignore_inlined_contents: # The item was not inlined, either because nothing was inlined # or because it was added after we fetched the inlined contents. @@ -642,8 +643,11 @@ def new( if structure_family == StructureFamily.container: structure = {"contents": None, "count": None} + elif structure_family == StructureFamily.consolidated: + structure = None + # To be filled in below, by server response. + # We need the server to tell us data_source_ids. else: - # Only containers can have multiple data_sources right now. (data_source,) = data_sources structure = data_source.structure item["attributes"]["structure"] = structure @@ -653,7 +657,7 @@ def new( item["attributes"]["metadata"] = document.pop("metadata") # Ditto for structure if "structure" in document: - item["attributes"]["structure"] = STRUCTURE_TYPES[structure_family]( + structure = STRUCTURE_TYPES[structure_family].from_json( document.pop("structure") ) @@ -681,7 +685,7 @@ def new( # to attempt to avoid bumping into size limits. _SUGGESTED_MAX_UPLOAD_SIZE = 100_000_000 # 100 MB - def create_container(self, key=None, *, metadata=None, dims=None, specs=None): + def create_container(self, key=None, *, metadata=None, specs=None): """ EXPERIMENTAL: Create a new, empty container. @@ -692,8 +696,6 @@ def create_container(self, key=None, *, metadata=None, dims=None, specs=None): metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. - dims : List[str], optional - A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. @@ -707,6 +709,31 @@ def create_container(self, key=None, *, metadata=None, dims=None, specs=None): specs=specs, ) + def create_consolidated(self, data_sources, key=None, *, metadata=None, specs=None): + """ + EXPERIMENTAL: Create a new consolidated node backed by data sources. + + Parameters + ---------- + data_sources : List[DataSources] + key : str, optional + Key (name) for this new node. If None, the server will provide a unique key. + metadata : dict, optional + User metadata. May be nested. Must contain only basic types + (e.g. numbers, strings, lists, dicts) that are JSON-serializable. + specs : List[Spec], optional + List of names that are used to label that the data and/or metadata + conform to some named standard specification. + + """ + return self.new( + StructureFamily.consolidated, + data_sources, + key=key, + metadata=metadata, + specs=specs, + ) + def write_array(self, array, *, key=None, metadata=None, dims=None, specs=None): """ EXPERIMENTAL: Write an array. @@ -1054,6 +1081,9 @@ def _write_partition(x, partition_info, client): "table": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), + "consolidated": _LazyLoad( + ("..consolidated", Container.__module__), "ConsolidatedClient" + ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DatasetClient" ), @@ -1072,6 +1102,9 @@ def _write_partition(x, partition_info, client): "table": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), + "consolidated": _LazyLoad( + ("..consolidated", Container.__module__), "ConsolidatedClient" + ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DaskDatasetClient" ), diff --git a/tiled/serialization/table.py b/tiled/serialization/table.py index b339e6b40..1aa717938 100644 --- a/tiled/serialization/table.py +++ b/tiled/serialization/table.py @@ -50,7 +50,7 @@ def serialize_csv(df, metadata, preserve_index=False): def deserialize_csv(buffer): import pandas - return pandas.read_csv(io.BytesIO(buffer), header=None) + return pandas.read_csv(io.BytesIO(buffer), headers=False) serialization_registry.register(StructureFamily.table, "text/csv", serialize_csv) diff --git a/tiled/server/dependencies.py b/tiled/server/dependencies.py index 0eefa0c56..ef94a5172 100644 --- a/tiled/server/dependencies.py +++ b/tiled/server/dependencies.py @@ -12,6 +12,7 @@ serialization_registry as default_serialization_registry, ) from ..query_registration import query_registry as default_query_registry +from ..structures.core import StructureFamily from ..validation_registration import validation_registry as default_validation_registry from .authentication import get_current_principal, get_session_state from .core import NoEntry @@ -59,6 +60,7 @@ def SecureEntry(scopes, structure_families=None): async def inner( path: str, request: Request, + part: Optional[str] = None, principal: str = Depends(get_current_principal), root_tree: pydantic_settings.BaseSettings = Depends(get_root_tree), session_state: dict = Depends(get_session_state), @@ -130,11 +132,33 @@ async def inner( entry.structure_family in structure_families ): return entry + # Handle consolidated structure_family + if entry.structure_family == StructureFamily.consolidated: + if not part: + raise HTTPException( + status_code=HTTP_404_NOT_FOUND, + detail=( + "A part query parameter is required on this endpoint " + "when addressing a 'consolidated' structure." + ), + ) + entry_for_part = entry.for_part(part) + if entry_for_part.structure_family in structure_families: + return entry_for_part + raise HTTPException( + status_code=HTTP_404_NOT_FOUND, + detail=( + f"The data source named {part} backing the node " + f"at {path} has structure family {entry_for_part.structure_family} " + "and this endpoint is compatible only with structure families " + f"{structure_families}" + ), + ) raise HTTPException( status_code=HTTP_404_NOT_FOUND, detail=( f"The node at {path} has structure family {entry.structure_family} " - "and this endpoint is compatible with structure families " + "and this endpoint is compatible only with structure families " f"{structure_families}" ), ) diff --git a/tiled/server/links.py b/tiled/server/links.py index 76bf2616f..2d1db43a6 100644 --- a/tiled/server/links.py +++ b/tiled/server/links.py @@ -15,32 +15,59 @@ def links_for_node(structure_family, structure, base_url, path_str): return links -def links_for_array(structure_family, structure, base_url, path_str): +def links_for_array(structure_family, structure, base_url, path_str, part=None): links = {} block_template = ",".join(f"{{{index}}}" for index in range(len(structure.shape))) links["block"] = f"{base_url}/array/block/{path_str}?block={block_template}" links["full"] = f"{base_url}/array/full/{path_str}" + if part: + links["block"] += f"&part={part}" + links["full"] += f"?part={part}" return links -def links_for_awkward(structure_family, structure, base_url, path_str): +def links_for_awkward(structure_family, structure, base_url, path_str, part=None): links = {} links["buffers"] = f"{base_url}/awkward/buffers/{path_str}" links["full"] = f"{base_url}/awkward/full/{path_str}" + if part: + links["buffers"] += "?part={part}" + links["full"] += "?part={part}" return links def links_for_container(structure_family, structure, base_url, path_str): + # Cannot be used inside consolidated, so there is no part parameter. links = {} links["full"] = f"{base_url}/container/full/{path_str}" links["search"] = f"{base_url}/search/{path_str}" return links -def links_for_table(structure_family, structure, base_url, path_str): +def links_for_table(structure_family, structure, base_url, path_str, part=None): links = {} links["partition"] = f"{base_url}/table/partition/{path_str}?partition={{index}}" links["full"] = f"{base_url}/table/full/{path_str}" + if part: + links["partition"] += f"&part={part}" + links["full"] += f"?part={part}" + return links + + +def links_for_consolidated(structure_family, structure, base_url, path_str): + links = {} + # This contains the links for each structure. + links["parts"] = [] + for item in structure.parts: + item_links = LINKS_BY_STRUCTURE_FAMILY[item.structure_family]( + item.structure_family, + item.structure, + base_url, + path_str, + part=item.name, + ) + item_links["self"] = f"{base_url}/metadata/{path_str}" + links["parts"].append(item_links) return links @@ -48,6 +75,7 @@ def links_for_table(structure_family, structure, base_url, path_str): StructureFamily.array: links_for_array, StructureFamily.awkward: links_for_awkward, StructureFamily.container: links_for_container, - StructureFamily.sparse: links_for_array, # spare and array are the same + StructureFamily.sparse: links_for_array, # sparse and array are the same StructureFamily.table: links_for_table, + StructureFamily.consolidated: links_for_consolidated, } diff --git a/tiled/server/pydantic_consolidated.py b/tiled/server/pydantic_consolidated.py new file mode 100644 index 000000000..5f6c49d07 --- /dev/null +++ b/tiled/server/pydantic_consolidated.py @@ -0,0 +1,49 @@ +from typing import Any, List, Optional + +import pydantic + +from ..structures.core import StructureFamily + + +class ConsolidatedStructurePart(pydantic.BaseModel): + structure_family: StructureFamily + structure: Any # Union of Structures, but we do not want to import them... + name: str + + @classmethod + def from_json(cls, item): + return cls(**item) + + +class ConsolidatedStructure(pydantic.BaseModel): + parts: List[ConsolidatedStructurePart] + all_keys: Optional[List[str]] + + @classmethod + def from_json(cls, structure): + return cls( + parts=[ + ConsolidatedStructurePart.from_json(item) for item in structure["parts"] + ], + all_keys=structure["all_keys"], + ) + + @classmethod + def from_data_sources(cls, data_sources): + all_keys = [] + for data_source in data_sources: + if data_source.structure_family == StructureFamily.table: + all_keys.extend(data_source.structure.columns) + else: + all_keys.append(data_source.name) + parts = [ + ConsolidatedStructurePart( + data_source_id=data_source.id, + structure=data_source.structure, + structure_family=data_source.structure_family, + name=data_source.name, + ) + for data_source in data_sources + ] + + return cls(parts=parts, all_keys=all_keys) diff --git a/tiled/server/pydantic_container.py b/tiled/server/pydantic_container.py new file mode 100644 index 000000000..58fe4b3b9 --- /dev/null +++ b/tiled/server/pydantic_container.py @@ -0,0 +1,8 @@ +from typing import Optional + +import pydantic + + +class ContainerStructure(pydantic.BaseModel): + contents: Optional[dict] + count: Optional[int] diff --git a/tiled/server/router.py b/tiled/server/router.py index 15e251025..b72b4bc3a 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -61,6 +61,7 @@ ) from .file_response_with_range import FileResponseWithRange from .links import links_for_node +from .pydantic_consolidated import ConsolidatedStructure from .settings import get_settings from .utils import filter_for_access, get_base_url, record_timing @@ -374,6 +375,7 @@ async def array_block( expected_shape=Depends(expected_shape), format: Optional[str] = None, filename: Optional[str] = None, + data_source: Optional[str] = None, serialization_registry=Depends(get_serialization_registry), settings: BaseSettings = Depends(get_settings), ): @@ -421,10 +423,14 @@ async def array_block( "Use slicing ('?slice=...') to request smaller chunks." ), ) + if entry.structure_family == StructureFamily.consolidated: + structure_family = entry.data_source.structure_family + else: + structure_family = entry.structure_family try: with record_timing(request.state.metrics, "pack"): return await construct_data_response( - entry.structure_family, + structure_family, serialization_registry, array, entry.metadata(), @@ -458,7 +464,10 @@ async def array_full( """ Fetch a slice of array-like data. """ - structure_family = entry.structure_family + if entry.structure_family == StructureFamily.consolidated: + structure_family = entry.data_source.structure_family + else: + structure_family = entry.structure_family # Deferred import because this is not a required dependency of the server # for some use cases. import numpy @@ -720,10 +729,14 @@ async def table_full( "request a smaller chunks." ), ) + if entry.structure_family == StructureFamily.consolidated: + structure_family = entry.data_source.structure_family + else: + structure_family = entry.structure_family try: with record_timing(request.state.metrics, "pack"): return await construct_data_response( - entry.structure_family, + structure_family, serialization_registry, data, entry.metadata(), @@ -1146,21 +1159,41 @@ async def _create_node( body.structure_family, body.specs, ) - if structure_family == StructureFamily.container: - structure = None - else: - if len(body.data_sources) != 1: - raise NotImplementedError + metadata_modified = False + if structure_family == StructureFamily.consolidated: + structure = ConsolidatedStructure.from_data_sources(body.data_sources) + elif body.data_sources: + assert len(body.data_sources) == 1 # more not yet implemented structure = body.data_sources[0].structure + else: + structure = None - metadata_modified, metadata = await validate_metadata( - metadata=metadata, - structure_family=structure_family, - structure=structure, - specs=specs, - validation_registry=validation_registry, - settings=settings, - ) + # Specs should be ordered from most specific/constrained to least. + # Validate them in reverse order, with the least constrained spec first, + # because it may do normalization that helps pass the more constrained one. + # Known Issue: + # When there is more than one spec, it's possible for the validator for + # Spec 2 to make a modification that breaks the validation for Spec 1. + # For now we leave it to the server maintainer to ensure that validators + # won't step on each other in this way, but this may need revisiting. + for spec in reversed(specs): + if spec.name not in validation_registry: + if settings.reject_undeclared_specs: + raise HTTPException( + status_code=400, detail=f"Unrecognized spec: {spec.name}" + ) + else: + validator = validation_registry(spec.name) + try: + result = validator(metadata, structure_family, structure, spec) + except ValidationError as e: + raise HTTPException( + status_code=400, + detail=f"failed validation for spec {spec.name}:\n{e}", + ) + if result is not None: + metadata_modified = True + metadata = result key, node = await entry.create_node( metadata=body.metadata, @@ -1172,10 +1205,14 @@ async def _create_node( links = links_for_node( structure_family, structure, get_base_url(request), path + f"/{key}" ) + structure = node.structure() + if structure is not None: + structure = structure.dict() response_data = { "id": key, "links": links, - "data_sources": [ds.model_dump() for ds in node.data_sources], + "data_sources": [ds.dict() for ds in node.data_sources], + "structure": structure, } if metadata_modified: response_data["metadata"] = metadata @@ -1365,7 +1402,9 @@ async def put_table_partition( async def patch_table_partition( partition: int, request: Request, - entry=SecureEntry(scopes=["write:data"]), + entry=SecureEntry( + scopes=["write:data"], structure_families={StructureFamily.table} + ), deserialization_registry=Depends(get_deserialization_registry), ): if not hasattr(entry, "write_partition"): diff --git a/tiled/server/schemas.py b/tiled/server/schemas.py index fa7f039e9..58d996edb 100644 --- a/tiled/server/schemas.py +++ b/tiled/server/schemas.py @@ -11,9 +11,10 @@ from typing_extensions import Annotated, TypedDict from ..structures.core import StructureFamily -from ..structures.data_source import Management +from ..structures.data_source import Management, validate_data_sources from .pydantic_array import ArrayStructure from .pydantic_awkward import AwkwardStructure +from .pydantic_consolidated import ConsolidatedStructure from .pydantic_sparse import SparseStructure from .pydantic_table import TableStructure @@ -145,15 +146,17 @@ class DataSource(pydantic.BaseModel): Union[ ArrayStructure, AwkwardStructure, - SparseStructure, NodeStructure, + SparseStructure, TableStructure, + ConsolidatedStructure, ] ] = None mimetype: Optional[str] = None parameters: dict = {} assets: List[Asset] = [] management: Management = Management.writable + name: Optional[str] = None model_config = pydantic.ConfigDict(extra="forbid") @@ -167,6 +170,7 @@ def from_orm(cls, orm: tiled.catalog.orm.DataSource) -> DataSource: parameters=orm.parameters, assets=[Asset.from_assoc_orm(assoc) for assoc in orm.asset_associations], management=orm.management, + name=orm.name, ) @@ -179,9 +183,10 @@ class NodeAttributes(pydantic.BaseModel): Union[ ArrayStructure, AwkwardStructure, - SparseStructure, NodeStructure, + SparseStructure, TableStructure, + ConsolidatedStructure, ] ] = None @@ -230,12 +235,20 @@ class SparseLinks(pydantic.BaseModel): block: str +class ConsolidatedLinks(pydantic.BaseModel): + self: str + parts: List[ + Union[ArrayLinks, AwkwardLinks, ContainerLinks, DataFrameLinks, SparseLinks] + ] + + resource_links_type_by_structure_family = { StructureFamily.array: ArrayLinks, StructureFamily.awkward: AwkwardLinks, StructureFamily.container: ContainerLinks, StructureFamily.sparse: SparseLinks, StructureFamily.table: DataFrameLinks, + StructureFamily.consolidated: ConsolidatedLinks, } @@ -456,6 +469,10 @@ def specs_uniqueness_validator(cls, v): raise ValueError return v + @pydantic.validator("data_sources", always=True) + def check_consistency(cls, v, values): + return validate_data_sources(values["structure_family"], v) + class PutDataSourceRequest(pydantic.BaseModel): data_source: DataSource @@ -463,7 +480,15 @@ class PutDataSourceRequest(pydantic.BaseModel): class PostMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]): id: str - links: Union[ArrayLinks, DataFrameLinks, SparseLinks] + links: Union[ArrayLinks, DataFrameLinks, SparseLinks, ConsolidatedLinks] + structure: Union[ + ArrayStructure, + AwkwardStructure, + NodeStructure, + SparseStructure, + TableStructure, + ConsolidatedStructure, + ] metadata: Dict data_sources: List[DataSource] diff --git a/tiled/structures/array.py b/tiled/structures/array.py index 53207b84e..901c58fc9 100644 --- a/tiled/structures/array.py +++ b/tiled/structures/array.py @@ -52,7 +52,7 @@ class Kind(str, enum.Enum): unicode = "U" # fixed-length sequence of Py_UNICODE other = "V" # "V" is for "void" -- generic fixed-size chunk of memory - # By default, do not tolerate numpy objectg arrays + # By default, do not tolerate numpy object arrays if os.getenv("TILED_ALLOW_OBJECT_ARRAYS", "0") != "0": object = "O" # Object (i.e. the memory contains a pointer to PyObject) diff --git a/tiled/structures/consolidated.py b/tiled/structures/consolidated.py new file mode 100644 index 000000000..4e5a3bfc4 --- /dev/null +++ b/tiled/structures/consolidated.py @@ -0,0 +1,50 @@ +import dataclasses +from typing import Any, List, Optional + +from .core import StructureFamily + + +@dataclasses.dataclass +class ConsolidatedStructurePart: + structure_family: StructureFamily + structure: Any # Union of Structures, but we do not want to import them... + name: Optional[str] + + @classmethod + def from_json(cls, item): + return cls(**item) + + +@dataclasses.dataclass +class ConsolidatedStructure: + parts: List[ConsolidatedStructurePart] + all_keys: List[str] + + @classmethod + def from_json(cls, structure): + return cls( + parts=[ + ConsolidatedStructurePart.from_json(item) for item in structure["parts"] + ], + all_keys=structure["all_keys"], + ) + + @classmethod + def from_data_sources(cls, data_sources): + all_keys = [] + for data_source in data_sources: + if data_source.structure_family == StructureFamily.table: + all_keys.extend(data_source.structure.columns) + else: + all_keys.append(data_source.name) + parts = [ + ConsolidatedStructurePart( + data_source_id=data_source.id, + structure=data_source.structure, + structure_family=data_source.structure_family, + name=data_source.name, + ) + for data_source in data_sources + ] + + return cls(parts=parts, all_keys=all_keys) diff --git a/tiled/structures/container.py b/tiled/structures/container.py new file mode 100644 index 000000000..c451ddb25 --- /dev/null +++ b/tiled/structures/container.py @@ -0,0 +1,12 @@ +import dataclasses +from typing import Optional + + +@dataclasses.dataclass +class ContainerStructure: + contents: Optional[dict] + count: Optional[int] + + @classmethod + def from_json(cls, structure): + return cls(**structure) diff --git a/tiled/structures/core.py b/tiled/structures/core.py index 309ada6e8..5612b1623 100644 --- a/tiled/structures/core.py +++ b/tiled/structures/core.py @@ -18,6 +18,7 @@ class StructureFamily(str, enum.Enum): container = "container" sparse = "sparse" table = "table" + consolidated = "consolidated" # can not be used in DataSources @dataclass(frozen=True) @@ -54,11 +55,17 @@ def dict(self) -> Dict[str, Optional[str]]: StructureFamily.awkward: lambda: importlib.import_module( "...structures.awkward", StructureFamily.__module__ ).AwkwardStructure, + StructureFamily.container: lambda: importlib.import_module( + "...structures.container", StructureFamily.__module__ + ).ContainerStructure, StructureFamily.table: lambda: importlib.import_module( "...structures.table", StructureFamily.__module__ ).TableStructure, StructureFamily.sparse: lambda: importlib.import_module( "...structures.sparse", StructureFamily.__module__ ).SparseStructure, + StructureFamily.consolidated: lambda: importlib.import_module( + "...structures.consolidated", StructureFamily.__module__ + ).ConsolidatedStructure, } ) diff --git a/tiled/structures/data_source.py b/tiled/structures/data_source.py index f1100044c..679fc0346 100644 --- a/tiled/structures/data_source.py +++ b/tiled/structures/data_source.py @@ -1,8 +1,9 @@ +import collections import dataclasses import enum from typing import Any, List, Optional -from .core import StructureFamily +from ..structures.core import StructureFamily class Management(str, enum.Enum): @@ -30,9 +31,74 @@ class DataSource: parameters: dict = dataclasses.field(default_factory=dict) assets: List[Asset] = dataclasses.field(default_factory=list) management: Management = Management.writable + name: Optional[str] = None + + def __post_init__(self): + if self.structure_family == StructureFamily.consolidated: + raise ValueError( + "DataSource can not be intialized with Consolidated StructureFamliy" + ) @classmethod def from_json(cls, d): d = d.copy() assets = [Asset(**a) for a in d.pop("assets")] return cls(assets=assets, **d) + + +def validate_data_sources(node_structure_family, data_sources): + "Check that data sources are consistent." + return validators[node_structure_family](node_structure_family, data_sources) + + +def validate_container_data_sources(node_structure_family, data_sources): + if len(data_sources) > 1: + raise ValueError( + "A container node can be backed by 0 or 1 data source, " + f"not {len(data_sources)}" + ) + return data_sources + + +def validate_consolidated_data_sources(node_structure_family, data_sources): + "Check that column names and keys of others (e.g. arrays) do not collide." + keys = set() + names = set() + for data_source in data_sources: + if data_source.name is None: + raise ValueError( + "Data sources backing a consolidated structure_family must " + "all have non-NULL names." + ) + if data_source.name in names: + raise ValueError( + "Data sources must have unique names. " + f"This name is used one more than one: {data_source.name}" + ) + names.add(data_source.name) + if data_source.structure_family == StructureFamily.table: + columns = data_source.structure.columns + if keys.intersection(columns): + raise ValueError( + f"Data sources provide colliding keys: {keys.intersection(columns)}" + ) + keys.update(columns) + else: + key = data_source.name + if key in keys: + raise ValueError(f"Data sources provide colliding keys: {key}") + keys.add(key) + return data_sources + + +def validate_other_data_sources(node_structure_family, data_sources): + if len(data_sources) != 1: + raise ValueError( + f"A {node_structure_family} node must be backed by 1 data source." + ) + return data_sources + + +validators = collections.defaultdict(lambda: validate_other_data_sources) +validators[StructureFamily.container] = validate_container_data_sources +validators[StructureFamily.consolidated] = validate_consolidated_data_sources