From 48da17d93c6171641ad2f5e7a7a4f8c93c1520f9 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 3 Aug 2023 15:48:11 +0200 Subject: [PATCH 01/28] Add experimental code for reading data from multiple sources. --- daskms/experimental/multisource/__init__.py | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 daskms/experimental/multisource/__init__.py diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/multisource/__init__.py new file mode 100644 index 00000000..dc054c24 --- /dev/null +++ b/daskms/experimental/multisource/__init__.py @@ -0,0 +1,32 @@ +from daskms import xds_from_storage_ms +from daskms.utils import requires + +from collections.abc import Iterable + +try: + import xarray +except ImportError as e: + xarray_import_error = e +else: + xarray_import_error = None + + +@requires("pip install dask-ms[xarray] for xarray support", xarray_import_error) +def xds_from_merged_storage(stores, **kwargs): + + if not isinstance(stores, Iterable): + stores = [stores] + + storage_options = kwargs.pop("storage_options", {}) + + lxdsl = [] + + for store in stores: + lxdsl.append(xds_from_storage_ms(store, storage_options=storage_options, **kwargs)) + + assert len({len(xdsl) for xdsl in lxdsl}) == 1, ( + "xds_from_merged_storage was unable to merge datasets dynamically " + "due to conflicting lengths on the intermediary lists of datasets." + ) + + return [xarray.merge(xdss) for xdss in zip(*lxdsl)] From 3b50340b59b54cecffecf33809d042382f108e72 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 3 Aug 2023 16:47:37 +0200 Subject: [PATCH 02/28] WIP on proxy reads. --- daskms/experimental/multisource/__init__.py | 39 ++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/multisource/__init__.py index dc054c24..05f2c2dd 100644 --- a/daskms/experimental/multisource/__init__.py +++ b/daskms/experimental/multisource/__init__.py @@ -1,5 +1,7 @@ -from daskms import xds_from_storage_ms +from daskms import xds_from_storage_ms, xds_to_storage_table +from daskms.fsspec_store import DaskMSStore from daskms.utils import requires +from daskms.experimental.zarr import xds_to_zarr, xds_from_zarr from collections.abc import Iterable @@ -30,3 +32,38 @@ def xds_from_merged_storage(stores, **kwargs): ) return [xarray.merge(xdss) for xdss in zip(*lxdsl)] + + +def xds_from_proxy(store, **kwargs): + + xdsl = xds_from_storage_ms(store, **kwargs) + + parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} + + assert len(parent_urls) == 1, ( + "Proxy has more than one parent - this is not supported." + ) + + parent_url = parent_urls.pop() + + if parent_url: + + if not isinstance(parent_url, DaskMSStore): + store = DaskMSStore(parent_url) + + xdsl_nested = xds_from_proxy(store, **kwargs) + else: + return [xdsl] + + return [xdsl, *xdsl_nested] + + + +def xds_to_proxy(xds, store, parent, **kwargs): + + if not isinstance(parent, DaskMSStore): + parent = DaskMSStore(parent) + + xds = [x.assign_attrs({"__dask_ms_parent_url__": parent.url}) for x in xds] + + return xds_to_zarr(xds, store, **kwargs) From c25bca9fb796655871bbeb2dfa05087aab88eb0c Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 10:22:23 +0200 Subject: [PATCH 03/28] Add further work on composite datasets. --- daskms/experimental/multisource/__init__.py | 33 +++++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/multisource/__init__.py index 05f2c2dd..f9d51d84 100644 --- a/daskms/experimental/multisource/__init__.py +++ b/daskms/experimental/multisource/__init__.py @@ -34,7 +34,7 @@ def xds_from_merged_storage(stores, **kwargs): return [xarray.merge(xdss) for xdss in zip(*lxdsl)] -def xds_from_proxy(store, **kwargs): +def _xds_from_proxy(store, **kwargs): xdsl = xds_from_storage_ms(store, **kwargs) @@ -51,12 +51,39 @@ def xds_from_proxy(store, **kwargs): if not isinstance(parent_url, DaskMSStore): store = DaskMSStore(parent_url) - xdsl_nested = xds_from_proxy(store, **kwargs) + xdsl_nested = _xds_from_proxy(store, **kwargs) else: return [xdsl] - return [xdsl, *xdsl_nested] + return [*xdsl_nested, xdsl] + + +def merge_via_assign(xdsl): + + composite_xds = xdsl[0] + + partition_keys = [p[0] for p in composite_xds.__daskms_partition_schema__] + + for xds in xdsl[1:]: + + if not all(xds.attrs[k] == composite_xds.attrs[k] for k in partition_keys): + raise ValueError( + "merge_via_assign failed due to conflicting partition keys." + "This usually means you are attempting to merge datasets " + "which were constructed with different group_cols arguments." + ) + + composite_xds = composite_xds.assign(xds.data_vars) + composite_xds = composite_xds.assign_attrs(xds.attrs) + + return composite_xds + + +def xds_from_proxy(store, **kwargs): + + lxdsl = _xds_from_proxy(store, **kwargs) + return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] def xds_to_proxy(xds, store, parent, **kwargs): From 73278e2338f5f2caf43f3c9ef90df53e95c3c564 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 11:14:54 +0200 Subject: [PATCH 04/28] Rename and document xds_to/from_fragment. --- daskms/experimental/multisource/__init__.py | 71 ++++++++++++++++++--- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/multisource/__init__.py index f9d51d84..e4e1089c 100644 --- a/daskms/experimental/multisource/__init__.py +++ b/daskms/experimental/multisource/__init__.py @@ -1,7 +1,7 @@ -from daskms import xds_from_storage_ms, xds_to_storage_table +from daskms import xds_from_storage_ms from daskms.fsspec_store import DaskMSStore from daskms.utils import requires -from daskms.experimental.zarr import xds_to_zarr, xds_from_zarr +from daskms.experimental.zarr import xds_to_zarr from collections.abc import Iterable @@ -34,7 +34,7 @@ def xds_from_merged_storage(stores, **kwargs): return [xarray.merge(xdss) for xdss in zip(*lxdsl)] -def _xds_from_proxy(store, **kwargs): +def _xds_from_fragment(store, **kwargs): xdsl = xds_from_storage_ms(store, **kwargs) @@ -51,7 +51,7 @@ def _xds_from_proxy(store, **kwargs): if not isinstance(parent_url, DaskMSStore): store = DaskMSStore(parent_url) - xdsl_nested = _xds_from_proxy(store, **kwargs) + xdsl_nested = _xds_from_fragment(store, **kwargs) else: return [xdsl] @@ -79,14 +79,69 @@ def merge_via_assign(xdsl): return composite_xds -def xds_from_proxy(store, **kwargs): - - lxdsl = _xds_from_proxy(store, **kwargs) +def xds_from_fragment(store, **kwargs): + """ + Creates a list of xarray datasets representing the contents a composite + Measurement Set. The resulting list of datasets will consist of some root + dataset with any newer variables populated from the child fragments. It + defers to :func:`xds_from_storage_ms`, which should be consulted + for more information. + + Parameters + ---------- + store : str or DaskMSStore + Store or string of the child fragment of interest. + columns : tuple or list, optional + Columns present on the resulting dataset. + Defaults to all if ``None``. + index_cols : tuple or list, optional + Sequence of indexing columns. + Defaults to :code:`%(indices)s` + group_cols : tuple or list, optional + Sequence of grouping columns. + Defaults to :code:`%(groups)s` + **kwargs : optional + + Returns + ------- + datasets : list of :class:`xarray.Dataset` + xarray datasets for each group + """ + + lxdsl = _xds_from_fragment(store, **kwargs) return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] -def xds_to_proxy(xds, store, parent, **kwargs): +def xds_to_fragment(xds, store, parent, **kwargs): + """ + Generates a list of Datasets representing write operations from the + specified arrays in :class:`xarray.Dataset`'s into a child fragment + dataset. + + Parameters + ---------- + xds : :class:`xarray.Dataset` or list of :class:`xarray.Dataset` + dataset(s) containing the specified columns. If a list of datasets + is provided, the concatenation of the columns in + sequential datasets will be written. + store : str or DaskMSStore + Store or string which determines the location to which the child + fragment will be written. + parent : str or DaskMSStore + Store or sting corresponding to the parent dataset. Can be either + point to either a root dataset or another child fragment. + + **kwargs : optional arguments. See :func:`xds_to_table`. + + Returns + ------- + write_datasets : list of :class:`xarray.Dataset` + Datasets containing arrays representing write operations + into a CASA Table + table_proxy : :class:`daskms.TableProxy`, optional + The Table Proxy associated with the datasets + """ if not isinstance(parent, DaskMSStore): parent = DaskMSStore(parent) From 3a50c17e5a4d397210465484066fa65f8cfc781c Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 16:17:03 +0200 Subject: [PATCH 05/28] Checkpoint further work on fragment code. --- daskms/experimental/multisource/__init__.py | 126 +++++++++++++++++- .../multisource/tests/__init__.py | 0 .../multisource/tests/test_multisource.py | 5 + 3 files changed, 125 insertions(+), 6 deletions(-) create mode 100644 daskms/experimental/multisource/tests/__init__.py create mode 100644 daskms/experimental/multisource/tests/test_multisource.py diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/multisource/__init__.py index e4e1089c..42ca907a 100644 --- a/daskms/experimental/multisource/__init__.py +++ b/daskms/experimental/multisource/__init__.py @@ -1,7 +1,8 @@ -from daskms import xds_from_storage_ms +from daskms import xds_from_storage_ms, xds_from_storage_table from daskms.fsspec_store import DaskMSStore from daskms.utils import requires from daskms.experimental.zarr import xds_to_zarr +from daskms.fsspec_store import UnknownStoreTypeError from collections.abc import Iterable @@ -34,7 +35,7 @@ def xds_from_merged_storage(stores, **kwargs): return [xarray.merge(xdss) for xdss in zip(*lxdsl)] -def _xds_from_fragment(store, **kwargs): +def _xds_from_ms_fragment(store, **kwargs): xdsl = xds_from_storage_ms(store, **kwargs) @@ -51,7 +52,7 @@ def _xds_from_fragment(store, **kwargs): if not isinstance(parent_url, DaskMSStore): store = DaskMSStore(parent_url) - xdsl_nested = _xds_from_fragment(store, **kwargs) + xdsl_nested = _xds_from_ms_fragment(store, **kwargs) else: return [xdsl] @@ -79,7 +80,7 @@ def merge_via_assign(xdsl): return composite_xds -def xds_from_fragment(store, **kwargs): +def xds_from_ms_fragment(store, **kwargs): """ Creates a list of xarray datasets representing the contents a composite Measurement Set. The resulting list of datasets will consist of some root @@ -108,12 +109,125 @@ def xds_from_fragment(store, **kwargs): xarray datasets for each group """ - lxdsl = _xds_from_fragment(store, **kwargs) + lxdsl = _xds_from_ms_fragment(store, **kwargs) return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] -def xds_to_fragment(xds, store, parent, **kwargs): +def xds_to_ms_fragment(xds, store, parent, **kwargs): + """ + Generates a list of Datasets representing write operations from the + specified arrays in :class:`xarray.Dataset`'s into a child fragment + dataset. + + Parameters + ---------- + xds : :class:`xarray.Dataset` or list of :class:`xarray.Dataset` + dataset(s) containing the specified columns. If a list of datasets + is provided, the concatenation of the columns in + sequential datasets will be written. + store : str or DaskMSStore + Store or string which determines the location to which the child + fragment will be written. + parent : str or DaskMSStore + Store or sting corresponding to the parent dataset. Can be either + point to either a root dataset or another child fragment. + + **kwargs : optional arguments. See :func:`xds_to_table`. + + Returns + ------- + write_datasets : list of :class:`xarray.Dataset` + Datasets containing arrays representing write operations + into a CASA Table + table_proxy : :class:`daskms.TableProxy`, optional + The Table Proxy associated with the datasets + """ + + if not isinstance(parent, DaskMSStore): + parent = DaskMSStore(parent) + + xds = [x.assign_attrs({"__dask_ms_parent_url__": parent.url}) for x in xds] + + return xds_to_zarr(xds, store, **kwargs) + + +def _xds_from_table_fragment(store, **kwargs): + + try: + # Try to open the store. However, as we are reading from a fragment, + # the subtable may not exist in the child. + xdsl = xds_from_storage_table(store, **kwargs) + required = True + except UnknownStoreTypeError: + # NOTE: We don't pass kwargs - the only purpose of this read is to + # grab the parent urls (if they exist). + xdsl = xds_from_storage_table(DaskMSStore(store.root)) + required = False + + subtable = store.table + + parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} + + assert len(parent_urls) == 1, ( + "Proxy has more than one parent - this is not supported." + ) + + parent_url = parent_urls.pop() + + if parent_url: + + if not isinstance(parent_url, DaskMSStore): + store = DaskMSStore(parent_url).subtable_store(subtable) + + xdsl_nested = _xds_from_table_fragment(store, **kwargs) + else: + return [xdsl] + + if required: + return [*xdsl_nested, xdsl] + else: + return [*xdsl_nested] + + +def xds_from_table_fragment(store, **kwargs): + """ + Creates a list of xarray datasets representing the contents a composite + Measurement Set. The resulting list of datasets will consist of some root + dataset with any newer variables populated from the child fragments. It + defers to :func:`xds_from_storage_ms`, which should be consulted + for more information. + + Parameters + ---------- + store : str or DaskMSStore + Store or string of the child fragment of interest. + columns : tuple or list, optional + Columns present on the resulting dataset. + Defaults to all if ``None``. + index_cols : tuple or list, optional + Sequence of indexing columns. + Defaults to :code:`%(indices)s` + group_cols : tuple or list, optional + Sequence of grouping columns. + Defaults to :code:`%(groups)s` + **kwargs : optional + + Returns + ------- + datasets : list of :class:`xarray.Dataset` + xarray datasets for each group + """ + + if not isinstance(store, DaskMSStore): + store = DaskMSStore(store) + + lxdsl = _xds_from_table_fragment(store, **kwargs) + + return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] + + +def xds_to_table_fragment(xds, store, parent, **kwargs): """ Generates a list of Datasets representing write operations from the specified arrays in :class:`xarray.Dataset`'s into a child fragment diff --git a/daskms/experimental/multisource/tests/__init__.py b/daskms/experimental/multisource/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/daskms/experimental/multisource/tests/test_multisource.py b/daskms/experimental/multisource/tests/test_multisource.py new file mode 100644 index 00000000..da03a8d6 --- /dev/null +++ b/daskms/experimental/multisource/tests/test_multisource.py @@ -0,0 +1,5 @@ +import pytest + +def test_write_fragment(ms, tmp_path_factory): + + import ipdb; ipdb.set_trace() From 473b346f9d06a66c81d1ced1ad14cca314426d8d Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 4 Aug 2023 16:23:03 +0200 Subject: [PATCH 06/28] Rename experimental module. --- daskms/experimental/{multisource => fragments}/__init__.py | 0 daskms/experimental/{multisource => fragments}/tests/__init__.py | 0 .../test_multisource.py => fragments/tests/test_fragments.py} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename daskms/experimental/{multisource => fragments}/__init__.py (100%) rename daskms/experimental/{multisource => fragments}/tests/__init__.py (100%) rename daskms/experimental/{multisource/tests/test_multisource.py => fragments/tests/test_fragments.py} (100%) diff --git a/daskms/experimental/multisource/__init__.py b/daskms/experimental/fragments/__init__.py similarity index 100% rename from daskms/experimental/multisource/__init__.py rename to daskms/experimental/fragments/__init__.py diff --git a/daskms/experimental/multisource/tests/__init__.py b/daskms/experimental/fragments/tests/__init__.py similarity index 100% rename from daskms/experimental/multisource/tests/__init__.py rename to daskms/experimental/fragments/tests/__init__.py diff --git a/daskms/experimental/multisource/tests/test_multisource.py b/daskms/experimental/fragments/tests/test_fragments.py similarity index 100% rename from daskms/experimental/multisource/tests/test_multisource.py rename to daskms/experimental/fragments/tests/test_fragments.py From 58f8f28641a59e073f12eebe14db2d057060dc9a Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 08:52:03 +0200 Subject: [PATCH 07/28] Apply black. --- daskms/experimental/fragments/__init__.py | 23 ++++++++----------- .../fragments/tests/test_fragments.py | 4 +++- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 42ca907a..972916c3 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -16,7 +16,6 @@ @requires("pip install dask-ms[xarray] for xarray support", xarray_import_error) def xds_from_merged_storage(stores, **kwargs): - if not isinstance(stores, Iterable): stores = [stores] @@ -25,7 +24,9 @@ def xds_from_merged_storage(stores, **kwargs): lxdsl = [] for store in stores: - lxdsl.append(xds_from_storage_ms(store, storage_options=storage_options, **kwargs)) + lxdsl.append( + xds_from_storage_ms(store, storage_options=storage_options, **kwargs) + ) assert len({len(xdsl) for xdsl in lxdsl}) == 1, ( "xds_from_merged_storage was unable to merge datasets dynamically " @@ -36,19 +37,17 @@ def xds_from_merged_storage(stores, **kwargs): def _xds_from_ms_fragment(store, **kwargs): - xdsl = xds_from_storage_ms(store, **kwargs) parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} - assert len(parent_urls) == 1, ( - "Proxy has more than one parent - this is not supported." - ) + assert ( + len(parent_urls) == 1 + ), "Proxy has more than one parent - this is not supported." parent_url = parent_urls.pop() if parent_url: - if not isinstance(parent_url, DaskMSStore): store = DaskMSStore(parent_url) @@ -60,13 +59,11 @@ def _xds_from_ms_fragment(store, **kwargs): def merge_via_assign(xdsl): - composite_xds = xdsl[0] partition_keys = [p[0] for p in composite_xds.__daskms_partition_schema__] for xds in xdsl[1:]: - if not all(xds.attrs[k] == composite_xds.attrs[k] for k in partition_keys): raise ValueError( "merge_via_assign failed due to conflicting partition keys." @@ -153,7 +150,6 @@ def xds_to_ms_fragment(xds, store, parent, **kwargs): def _xds_from_table_fragment(store, **kwargs): - try: # Try to open the store. However, as we are reading from a fragment, # the subtable may not exist in the child. @@ -169,14 +165,13 @@ def _xds_from_table_fragment(store, **kwargs): parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} - assert len(parent_urls) == 1, ( - "Proxy has more than one parent - this is not supported." - ) + assert ( + len(parent_urls) == 1 + ), "Proxy has more than one parent - this is not supported." parent_url = parent_urls.pop() if parent_url: - if not isinstance(parent_url, DaskMSStore): store = DaskMSStore(parent_url).subtable_store(subtable) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index da03a8d6..3172c292 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -1,5 +1,7 @@ import pytest + def test_write_fragment(ms, tmp_path_factory): + import ipdb - import ipdb; ipdb.set_trace() + ipdb.set_trace() From 9ed2818504edc27b79c14d34cc96abc1179ba89e Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 09:15:19 +0200 Subject: [PATCH 08/28] Some tidying. --- daskms/experimental/fragments/__init__.py | 82 +++++++---------------- 1 file changed, 23 insertions(+), 59 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 972916c3..d8cd35d8 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -36,6 +36,25 @@ def xds_from_merged_storage(stores, **kwargs): return [xarray.merge(xdss) for xdss in zip(*lxdsl)] +def consolidate(xdsl): + composite_xds = xdsl[0] + + partition_keys = [p[0] for p in composite_xds.__daskms_partition_schema__] + + for xds in xdsl[1:]: + if not all(xds.attrs[k] == composite_xds.attrs[k] for k in partition_keys): + raise ValueError( + "consolidate failed due to conflicting partition keys." + "This usually means you are attempting to merge datasets " + "which were constructed with different group_cols arguments." + ) + + composite_xds = composite_xds.assign(xds.data_vars) + composite_xds = composite_xds.assign_attrs(xds.attrs) + + return composite_xds + + def _xds_from_ms_fragment(store, **kwargs): xdsl = xds_from_storage_ms(store, **kwargs) @@ -58,25 +77,6 @@ def _xds_from_ms_fragment(store, **kwargs): return [*xdsl_nested, xdsl] -def merge_via_assign(xdsl): - composite_xds = xdsl[0] - - partition_keys = [p[0] for p in composite_xds.__daskms_partition_schema__] - - for xds in xdsl[1:]: - if not all(xds.attrs[k] == composite_xds.attrs[k] for k in partition_keys): - raise ValueError( - "merge_via_assign failed due to conflicting partition keys." - "This usually means you are attempting to merge datasets " - "which were constructed with different group_cols arguments." - ) - - composite_xds = composite_xds.assign(xds.data_vars) - composite_xds = composite_xds.assign_attrs(xds.attrs) - - return composite_xds - - def xds_from_ms_fragment(store, **kwargs): """ Creates a list of xarray datasets representing the contents a composite @@ -108,45 +108,7 @@ def xds_from_ms_fragment(store, **kwargs): lxdsl = _xds_from_ms_fragment(store, **kwargs) - return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] - - -def xds_to_ms_fragment(xds, store, parent, **kwargs): - """ - Generates a list of Datasets representing write operations from the - specified arrays in :class:`xarray.Dataset`'s into a child fragment - dataset. - - Parameters - ---------- - xds : :class:`xarray.Dataset` or list of :class:`xarray.Dataset` - dataset(s) containing the specified columns. If a list of datasets - is provided, the concatenation of the columns in - sequential datasets will be written. - store : str or DaskMSStore - Store or string which determines the location to which the child - fragment will be written. - parent : str or DaskMSStore - Store or sting corresponding to the parent dataset. Can be either - point to either a root dataset or another child fragment. - - **kwargs : optional arguments. See :func:`xds_to_table`. - - Returns - ------- - write_datasets : list of :class:`xarray.Dataset` - Datasets containing arrays representing write operations - into a CASA Table - table_proxy : :class:`daskms.TableProxy`, optional - The Table Proxy associated with the datasets - """ - - if not isinstance(parent, DaskMSStore): - parent = DaskMSStore(parent) - - xds = [x.assign_attrs({"__dask_ms_parent_url__": parent.url}) for x in xds] - - return xds_to_zarr(xds, store, **kwargs) + return [consolidate(xdss) for xdss in zip(*lxdsl)] def _xds_from_table_fragment(store, **kwargs): @@ -214,12 +176,13 @@ def xds_from_table_fragment(store, **kwargs): xarray datasets for each group """ + # TODO: Where, when and how should we pass storage options? if not isinstance(store, DaskMSStore): store = DaskMSStore(store) lxdsl = _xds_from_table_fragment(store, **kwargs) - return [merge_via_assign(xdss) for xdss in zip(*lxdsl)] + return [consolidate(xdss) for xdss in zip(*lxdsl)] def xds_to_table_fragment(xds, store, parent, **kwargs): @@ -252,6 +215,7 @@ def xds_to_table_fragment(xds, store, parent, **kwargs): The Table Proxy associated with the datasets """ + # TODO: Where, when and how should we pass storage options? if not isinstance(parent, DaskMSStore): parent = DaskMSStore(parent) From 17139811d86d04886d0ddbf522785fd572324ee6 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 09:18:36 +0200 Subject: [PATCH 09/28] More tidying. --- daskms/experimental/fragments/__init__.py | 31 +++++------------------ 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index d8cd35d8..446cf35f 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -4,38 +4,17 @@ from daskms.experimental.zarr import xds_to_zarr from daskms.fsspec_store import UnknownStoreTypeError -from collections.abc import Iterable - try: - import xarray + import xarray # noqa except ImportError as e: xarray_import_error = e else: xarray_import_error = None - -@requires("pip install dask-ms[xarray] for xarray support", xarray_import_error) -def xds_from_merged_storage(stores, **kwargs): - if not isinstance(stores, Iterable): - stores = [stores] - - storage_options = kwargs.pop("storage_options", {}) - - lxdsl = [] - - for store in stores: - lxdsl.append( - xds_from_storage_ms(store, storage_options=storage_options, **kwargs) - ) - - assert len({len(xdsl) for xdsl in lxdsl}) == 1, ( - "xds_from_merged_storage was unable to merge datasets dynamically " - "due to conflicting lengths on the intermediary lists of datasets." - ) - - return [xarray.merge(xdss) for xdss in zip(*lxdsl)] +xarray_import_msg = "pip install dask-ms[xarray] for xarray support" +@requires(xarray_import_msg, xarray_import_error) def consolidate(xdsl): composite_xds = xdsl[0] @@ -55,6 +34,7 @@ def consolidate(xdsl): return composite_xds +@requires(xarray_import_msg, xarray_import_error) def _xds_from_ms_fragment(store, **kwargs): xdsl = xds_from_storage_ms(store, **kwargs) @@ -77,6 +57,7 @@ def _xds_from_ms_fragment(store, **kwargs): return [*xdsl_nested, xdsl] +@requires(xarray_import_msg, xarray_import_error) def xds_from_ms_fragment(store, **kwargs): """ Creates a list of xarray datasets representing the contents a composite @@ -147,6 +128,7 @@ def _xds_from_table_fragment(store, **kwargs): return [*xdsl_nested] +@requires(xarray_import_msg, xarray_import_error) def xds_from_table_fragment(store, **kwargs): """ Creates a list of xarray datasets representing the contents a composite @@ -185,6 +167,7 @@ def xds_from_table_fragment(store, **kwargs): return [consolidate(xdss) for xdss in zip(*lxdsl)] +@requires(xarray_import_msg, xarray_import_error) def xds_to_table_fragment(xds, store, parent, **kwargs): """ Generates a list of Datasets representing write operations from the From 4090974f7f5fe979d27250a48d6715de13206300 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 09:32:58 +0200 Subject: [PATCH 10/28] More docs. --- daskms/experimental/fragments/__init__.py | 31 ++++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 446cf35f..71431e97 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -16,22 +16,41 @@ @requires(xarray_import_msg, xarray_import_error) def consolidate(xdsl): - composite_xds = xdsl[0] + """ + Consolidates a list of xarray datasets by assigning data variables. + Priority is determined by the position within the list, with elements at + the end of the list having higher priority than those at the start. The + primary purpose of this function is the construction of a consolidated + dataset from a parent and deltas (frgaments). + + Parameters + ---------- + xdsl : tuple or list + Tuple or list of :class:`xarray.Dataset` objects to consolidate. + + Returns + ------- + consolidated_xds : :class:`xarray.Dataset` + A single :class:`xarray.Dataset`. + """ + + consolidated_xds = xdsl[0] # First element is the - partition_keys = [p[0] for p in composite_xds.__daskms_partition_schema__] + partition_keys = [p[0] for p in consolidated_xds.__daskms_partition_schema__] for xds in xdsl[1:]: - if not all(xds.attrs[k] == composite_xds.attrs[k] for k in partition_keys): + if not all(xds.attrs[k] == consolidated_xds.attrs[k] for k in partition_keys): raise ValueError( "consolidate failed due to conflicting partition keys." "This usually means you are attempting to merge datasets " "which were constructed with different group_cols arguments." ) - composite_xds = composite_xds.assign(xds.data_vars) - composite_xds = composite_xds.assign_attrs(xds.attrs) + consolidated_xds = consolidated_xds.assign(xds.data_vars) + # NOTE: Assigning the fragment's attributes may be unnecessary. + consolidated_xds = consolidated_xds.assign_attrs(xds.attrs) - return composite_xds + return consolidated_xds @requires(xarray_import_msg, xarray_import_error) From a679d6337e31b96816bc094ce87feaf8a6a385b5 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 09:40:43 +0200 Subject: [PATCH 11/28] Clarify variable names. --- daskms/experimental/fragments/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 71431e97..b7e9d581 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -21,7 +21,7 @@ def consolidate(xdsl): Priority is determined by the position within the list, with elements at the end of the list having higher priority than those at the start. The primary purpose of this function is the construction of a consolidated - dataset from a parent and deltas (frgaments). + dataset from a root and deltas (fragments). Parameters ---------- @@ -34,12 +34,16 @@ def consolidate(xdsl): A single :class:`xarray.Dataset`. """ - consolidated_xds = xdsl[0] # First element is the + root_xds = xdsl[0] # First element is the root for this operation. - partition_keys = [p[0] for p in consolidated_xds.__daskms_partition_schema__] + root_schema = root_xds.__daskms_partition_schema__ + + partition_keys = [p[0] for p in root_schema] + + consolidated_xds = root_xds # Will be replaced in the loop. for xds in xdsl[1:]: - if not all(xds.attrs[k] == consolidated_xds.attrs[k] for k in partition_keys): + if not all(xds.attrs[k] == root_xds.attrs[k] for k in partition_keys): raise ValueError( "consolidate failed due to conflicting partition keys." "This usually means you are attempting to merge datasets " From 2867bedcf4fc2092d2cff33295834e04c5ffb640 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 09:43:30 +0200 Subject: [PATCH 12/28] More notes about storage options. --- daskms/experimental/fragments/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index b7e9d581..7ffd0c15 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -65,12 +65,13 @@ def _xds_from_ms_fragment(store, **kwargs): assert ( len(parent_urls) == 1 - ), "Proxy has more than one parent - this is not supported." + ), "Fragment has more than one parent - this is not supported." parent_url = parent_urls.pop() if parent_url: if not isinstance(parent_url, DaskMSStore): + # TODO: Where, when and how should we pass storage options? store = DaskMSStore(parent_url) xdsl_nested = _xds_from_ms_fragment(store, **kwargs) @@ -133,12 +134,13 @@ def _xds_from_table_fragment(store, **kwargs): assert ( len(parent_urls) == 1 - ), "Proxy has more than one parent - this is not supported." + ), "Fragment has more than one parent - this is not supported." parent_url = parent_urls.pop() if parent_url: if not isinstance(parent_url, DaskMSStore): + # TODO: Where, when and how should we pass storage options? store = DaskMSStore(parent_url).subtable_store(subtable) xdsl_nested = _xds_from_table_fragment(store, **kwargs) From 89f6dc386168995f168a44c0e172c29c2295df7e Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 15:28:12 +0200 Subject: [PATCH 13/28] Begin adding tests. Fix bug exposesd by tests. --- daskms/experimental/fragments/__init__.py | 19 +- .../fragments/tests/test_fragments.py | 200 +++++++++++++++++- 2 files changed, 209 insertions(+), 10 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 7ffd0c15..cd69b8bb 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -37,21 +37,26 @@ def consolidate(xdsl): root_xds = xdsl[0] # First element is the root for this operation. root_schema = root_xds.__daskms_partition_schema__ - - partition_keys = [p[0] for p in root_schema] + root_partition_keys = {p[0] for p in root_schema} consolidated_xds = root_xds # Will be replaced in the loop. for xds in xdsl[1:]: - if not all(xds.attrs[k] == root_xds.attrs[k] for k in partition_keys): + + xds_schema = xds.__daskms_partition_schema__ + xds_partition_keys = {p[0] for p in xds_schema} + + if root_partition_keys.symmetric_difference(xds_partition_keys): raise ValueError( - "consolidate failed due to conflicting partition keys." - "This usually means you are attempting to merge datasets " - "which were constructed with different group_cols arguments." + f"consolidate failed due to conflicting partition keys. " + f"This usually means the partition keys of the fragments " + f"are inconsistent with the current group_cols argument. " + f"Current group_cols produces {root_partition_keys} but " + f"the fragment has {xds_partition_keys}." ) consolidated_xds = consolidated_xds.assign(xds.data_vars) - # NOTE: Assigning the fragment's attributes may be unnecessary. + # NOTE: Assigning the fragment's attributes may be unnecessary/bad. consolidated_xds = consolidated_xds.assign_attrs(xds.attrs) return consolidated_xds diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 3172c292..56bdbd4a 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -1,7 +1,201 @@ import pytest +import dask +import dask.array as da +import numpy.testing as npt +from daskms import xds_from_storage_ms +from daskms.experimental.fragments import ( + xds_to_table_fragment, + xds_from_ms_fragment, + xds_from_table_fragment, +) +# Prevent warning pollution generated by all calls to xds_from_zarr with +# unsupported kwargs. +pytestmark = pytest.mark.filterwarnings( + "ignore:The following unsupported kwargs were ignored in xds_from_zarr" +) -def test_write_fragment(ms, tmp_path_factory): - import ipdb - ipdb.set_trace() +@pytest.fixture( + scope="module", + params=[ + ("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), + ("DATA_DESC_ID", "FIELD_ID"), + ("DATA_DESC_ID",), + ], +) +def group_cols(request): + return request.param + + +def test_fragment_with_noop(ms, tmp_path_factory, group_cols): + """Unchanged data_vars must remain the same when read from a fragment.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment_path = tmp_path_factory.mktemp("fragment0.ms") + + writes = xds_to_table_fragment(reads, fragment_path, ms, columns=("DATA",)) + + dask.compute(writes) + + fragment_reads = xds_from_ms_fragment( + fragment_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + for rxds, frxds in zip(reads, fragment_reads): + for dv in rxds.data_vars.keys(): + npt.assert_array_equal(rxds[dv].data, frxds[dv].data) + + +def test_fragment_with_update(ms, tmp_path_factory, group_cols): + """Updated data_vars must change when read from a fragment.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment_path = tmp_path_factory.mktemp("fragment0.ms") + + updates = [ + xds.assign({"DATA": (xds.DATA.dims, da.ones_like(xds.DATA.data))}) + for xds in reads + ] + + writes = xds_to_table_fragment(updates, fragment_path, ms, columns=("DATA",)) + + dask.compute(writes) + + fragment_reads = xds_from_ms_fragment( + fragment_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + for frxds in fragment_reads: + npt.assert_array_equal(1, frxds.DATA.data) + + +def test_nonoverlapping_parents(ms, tmp_path_factory, group_cols): + """All updated data_vars must change when read from a fragment.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment0_path = tmp_path_factory.mktemp("fragment0.ms") + + updates = [ + xds.assign({"DATA": (xds.DATA.dims, da.zeros_like(xds.DATA.data))}) + for xds in reads + ] + + writes = xds_to_table_fragment(updates, fragment0_path, ms, columns=("DATA",)) + + dask.compute(writes) + + fragment0_reads = xds_from_ms_fragment( + fragment0_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment1_path = tmp_path_factory.mktemp("fragment1.ms") + + updates = [ + xds.assign({"UVW": (xds.UVW.dims, da.zeros_like(xds.UVW.data))}) + for xds in fragment0_reads + ] + + writes = xds_to_table_fragment( + updates, fragment1_path, fragment0_path, columns=("UVW",) + ) + + dask.compute(writes) + + fragment1_reads = xds_from_ms_fragment( + fragment1_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + for frxds in fragment1_reads: + npt.assert_array_equal(0, frxds.DATA.data) + npt.assert_array_equal(0, frxds.UVW.data) + + +def test_overlapping_parents(ms, tmp_path_factory, group_cols): + """Youngest child takes priority if updated data_vars overlap.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment0_path = tmp_path_factory.mktemp("fragment0.ms") + + updates = [ + xds.assign({"DATA": (xds.DATA.dims, da.ones_like(xds.DATA.data))}) + for xds in reads + ] + + writes = xds_to_table_fragment(updates, fragment0_path, ms, columns=("DATA",)) + + dask.compute(writes) + + fragment0_reads = xds_from_ms_fragment( + fragment0_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment1_path = tmp_path_factory.mktemp("fragment1.ms") + + updates = [ + xds.assign({"DATA": (xds.DATA.dims, da.zeros_like(xds.DATA.data))}) + for xds in fragment0_reads + ] + + writes = xds_to_table_fragment( + updates, fragment1_path, fragment0_path, columns=("DATA",) + ) + + dask.compute(writes) + + fragment1_reads = xds_from_ms_fragment( + fragment1_path, + index_cols=("TIME",), + group_cols=group_cols, + ) + + for frxds in fragment1_reads: + npt.assert_array_equal(0, frxds.DATA.data) + + +def test_inconsistent_partitioning(ms, tmp_path_factory, group_cols): + """Raises a ValueError when parititoning would be inconsistent.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + fragment_path = tmp_path_factory.mktemp("fragment0.ms") + + writes = xds_to_table_fragment(reads, fragment_path, ms, columns=("DATA",)) + + dask.compute(writes) + + with pytest.raises(ValueError, match="consolidate failed"): + xds_from_ms_fragment( + fragment_path, + index_cols=("TIME",), + group_cols=(), + ) From ea17b50d4ab79b5bd301569c99aec9b88428cb18 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 15:44:15 +0200 Subject: [PATCH 14/28] Simplify. --- daskms/experimental/fragments/__init__.py | 36 +++++------------------ 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index cd69b8bb..3a7423cc 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -1,4 +1,4 @@ -from daskms import xds_from_storage_ms, xds_from_storage_table +from daskms import xds_from_storage_table from daskms.fsspec_store import DaskMSStore from daskms.utils import requires from daskms.experimental.zarr import xds_to_zarr @@ -62,37 +62,13 @@ def consolidate(xdsl): return consolidated_xds -@requires(xarray_import_msg, xarray_import_error) -def _xds_from_ms_fragment(store, **kwargs): - xdsl = xds_from_storage_ms(store, **kwargs) - - parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} - - assert ( - len(parent_urls) == 1 - ), "Fragment has more than one parent - this is not supported." - - parent_url = parent_urls.pop() - - if parent_url: - if not isinstance(parent_url, DaskMSStore): - # TODO: Where, when and how should we pass storage options? - store = DaskMSStore(parent_url) - - xdsl_nested = _xds_from_ms_fragment(store, **kwargs) - else: - return [xdsl] - - return [*xdsl_nested, xdsl] - - @requires(xarray_import_msg, xarray_import_error) def xds_from_ms_fragment(store, **kwargs): """ Creates a list of xarray datasets representing the contents a composite Measurement Set. The resulting list of datasets will consist of some root dataset with any newer variables populated from the child fragments. It - defers to :func:`xds_from_storage_ms`, which should be consulted + defers to :func:`xds_from_table_fragment`, which should be consulted for more information. Parameters @@ -116,7 +92,11 @@ def xds_from_ms_fragment(store, **kwargs): xarray datasets for each group """ - lxdsl = _xds_from_ms_fragment(store, **kwargs) + # TODO: Where, when and how should we pass storage options? + if not isinstance(store, DaskMSStore): + store = DaskMSStore(store) + + lxdsl = _xds_from_table_fragment(store, **kwargs) return [consolidate(xdss) for xdss in zip(*lxdsl)] @@ -146,7 +126,7 @@ def _xds_from_table_fragment(store, **kwargs): if parent_url: if not isinstance(parent_url, DaskMSStore): # TODO: Where, when and how should we pass storage options? - store = DaskMSStore(parent_url).subtable_store(subtable) + store = DaskMSStore(parent_url).subtable_store(subtable or '') xdsl_nested = _xds_from_table_fragment(store, **kwargs) else: From 8fcd44e4663020e557557e91bb58fdb1ceb74b8e Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 15:46:29 +0200 Subject: [PATCH 15/28] Apply black. --- daskms/experimental/fragments/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 3a7423cc..81a67c25 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -42,7 +42,6 @@ def consolidate(xdsl): consolidated_xds = root_xds # Will be replaced in the loop. for xds in xdsl[1:]: - xds_schema = xds.__daskms_partition_schema__ xds_partition_keys = {p[0] for p in xds_schema} @@ -126,7 +125,7 @@ def _xds_from_table_fragment(store, **kwargs): if parent_url: if not isinstance(parent_url, DaskMSStore): # TODO: Where, when and how should we pass storage options? - store = DaskMSStore(parent_url).subtable_store(subtable or '') + store = DaskMSStore(parent_url).subtable_store(subtable or "") xdsl_nested = _xds_from_table_fragment(store, **kwargs) else: From fcafb7eceac6805db0456e7420acda83d4761f95 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 15:48:54 +0200 Subject: [PATCH 16/28] Use xds_from_table_fragment everywhere - xds_from_ms_fragment is just a wrapper around identical functionality. --- .../fragments/tests/test_fragments.py | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 56bdbd4a..44ec3bbd 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -3,11 +3,7 @@ import dask.array as da import numpy.testing as npt from daskms import xds_from_storage_ms -from daskms.experimental.fragments import ( - xds_to_table_fragment, - xds_from_ms_fragment, - xds_from_table_fragment, -) +from daskms.experimental.fragments import xds_to_table_fragment, xds_from_table_fragment # Prevent warning pollution generated by all calls to xds_from_zarr with # unsupported kwargs. @@ -42,7 +38,7 @@ def test_fragment_with_noop(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment_reads = xds_from_ms_fragment( + fragment_reads = xds_from_table_fragment( fragment_path, index_cols=("TIME",), group_cols=group_cols, @@ -72,7 +68,7 @@ def test_fragment_with_update(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment_reads = xds_from_ms_fragment( + fragment_reads = xds_from_table_fragment( fragment_path, index_cols=("TIME",), group_cols=group_cols, @@ -101,7 +97,7 @@ def test_nonoverlapping_parents(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment0_reads = xds_from_ms_fragment( + fragment0_reads = xds_from_table_fragment( fragment0_path, index_cols=("TIME",), group_cols=group_cols, @@ -120,7 +116,7 @@ def test_nonoverlapping_parents(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment1_reads = xds_from_ms_fragment( + fragment1_reads = xds_from_table_fragment( fragment1_path, index_cols=("TIME",), group_cols=group_cols, @@ -150,7 +146,7 @@ def test_overlapping_parents(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment0_reads = xds_from_ms_fragment( + fragment0_reads = xds_from_table_fragment( fragment0_path, index_cols=("TIME",), group_cols=group_cols, @@ -169,7 +165,7 @@ def test_overlapping_parents(ms, tmp_path_factory, group_cols): dask.compute(writes) - fragment1_reads = xds_from_ms_fragment( + fragment1_reads = xds_from_table_fragment( fragment1_path, index_cols=("TIME",), group_cols=group_cols, @@ -194,7 +190,7 @@ def test_inconsistent_partitioning(ms, tmp_path_factory, group_cols): dask.compute(writes) with pytest.raises(ValueError, match="consolidate failed"): - xds_from_ms_fragment( + xds_from_table_fragment( fragment_path, index_cols=("TIME",), group_cols=(), From 9fd8a4fe58bc23c1cd66a540c6ca2eef79f706e3 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 15:58:49 +0200 Subject: [PATCH 17/28] Add test/fix for self-parenthood. --- daskms/experimental/fragments/__init__.py | 10 ++++++++++ .../experimental/fragments/tests/test_fragments.py | 12 ++++++++++++ 2 files changed, 22 insertions(+) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 81a67c25..7c346bb4 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -211,6 +211,16 @@ def xds_to_table_fragment(xds, store, parent, **kwargs): if not isinstance(parent, DaskMSStore): parent = DaskMSStore(parent) + # TODO: Where, when and how should we pass storage options? + if not isinstance(store, DaskMSStore): + store = DaskMSStore(store) + + if parent == store: + raise ValueError( + "store and parent arguments identical in xds_to_table_fragment. " + "This is unsupported i.e. a fragment cannot be its own parent. " + ) + xds = [x.assign_attrs({"__dask_ms_parent_url__": parent.url}) for x in xds] return xds_to_zarr(xds, store, **kwargs) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 44ec3bbd..92091805 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -195,3 +195,15 @@ def test_inconsistent_partitioning(ms, tmp_path_factory, group_cols): index_cols=("TIME",), group_cols=(), ) + + +def test_mutate_parent(ms, tmp_path_factory): + """Raises a ValueError when parititoning would be inconsistent.""" + reads = xds_from_storage_ms( + ms, + index_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), + group_cols=("TIME",), + ) + + with pytest.raises(ValueError, match="store and parent arguments"): + xds_to_table_fragment(reads, ms, ms, columns=("DATA",)) From 747007fad3a4ccb112fba03ea47185faa8451593 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Tue, 8 Aug 2023 16:05:10 +0200 Subject: [PATCH 18/28] Add pytest to testing dependencies. Check with Simon. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9bdff572..44f65ee4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ xarray = ["xarray"] zarr = ["zarr"] s3 = ["s3fs"] complete = ["s3fs", "pyarrow", "xarray", "zarr"] -testing = ["minio", "pytest"] +testing = ["minio", "pytest", "xarray"] [tool.poetry.group.dev.dependencies] tbump = "^6.9.0" From 0147ac22c3bcca7c8749b321778d2b13751d1971 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 10 Aug 2023 09:20:26 +0200 Subject: [PATCH 19/28] Add further tests. --- daskms/experimental/fragments/__init__.py | 11 +++++-- .../fragments/tests/test_fragments.py | 32 +++++++++++++++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 7c346bb4..e1186d88 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -109,8 +109,15 @@ def _xds_from_table_fragment(store, **kwargs): except UnknownStoreTypeError: # NOTE: We don't pass kwargs - the only purpose of this read is to # grab the parent urls (if they exist). - xdsl = xds_from_storage_table(DaskMSStore(store.root)) - required = False + root_store = DaskMSStore(store.root) + if root_store.exists(): + xdsl = xds_from_storage_table(root_store) + required = False + else: + raise FileNotFoundError( + f"No table found at {store}. This suggests that a parent is " + f"missing." + ) subtable = store.table diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 92091805..073b4fa8 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -198,12 +198,38 @@ def test_inconsistent_partitioning(ms, tmp_path_factory, group_cols): def test_mutate_parent(ms, tmp_path_factory): - """Raises a ValueError when parititoning would be inconsistent.""" + """Raises a ValueError when a fragment would be its own parent.""" reads = xds_from_storage_ms( ms, - index_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), - group_cols=("TIME",), + index_cols=("TIME",), + group_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), ) with pytest.raises(ValueError, match="store and parent arguments"): xds_to_table_fragment(reads, ms, ms, columns=("DATA",)) + + +def test_missing_parent(ms, tmp_path_factory): + """Raises a ValueError when a fragment is missing a parent.""" + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), + ) + + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" + missing_parent = tmp_dir / "missing.ms" + + writes = xds_to_table_fragment( + reads, fragment_path, missing_parent, columns=("DATA",) + ) + + dask.compute(writes) + + with pytest.raises(FileNotFoundError, match="No table found at"): + xds_from_table_fragment( + fragment_path, + index_cols=("TIME",), + group_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), + ) From 91bd15b61a6aa24e42d6de13f3ee567d728f369a Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 10 Aug 2023 09:41:39 +0200 Subject: [PATCH 20/28] More tests and more consistent use of tmp_path_factory. --- .../fragments/tests/test_fragments.py | 61 +++++++++++++++---- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 073b4fa8..0e3f93cd 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -24,15 +24,20 @@ def group_cols(request): return request.param +# -----------------------------MAIN_TABLE_TESTS-------------------------------- + + def test_fragment_with_noop(ms, tmp_path_factory, group_cols): """Unchanged data_vars must remain the same when read from a fragment.""" + reads = xds_from_storage_ms( ms, index_cols=("TIME",), group_cols=group_cols, ) - fragment_path = tmp_path_factory.mktemp("fragment0.ms") + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" writes = xds_to_table_fragment(reads, fragment_path, ms, columns=("DATA",)) @@ -45,19 +50,20 @@ def test_fragment_with_noop(ms, tmp_path_factory, group_cols): ) for rxds, frxds in zip(reads, fragment_reads): - for dv in rxds.data_vars.keys(): - npt.assert_array_equal(rxds[dv].data, frxds[dv].data) + assert rxds.equals(frxds), "Datasets not identical." def test_fragment_with_update(ms, tmp_path_factory, group_cols): """Updated data_vars must change when read from a fragment.""" + reads = xds_from_storage_ms( ms, index_cols=("TIME",), group_cols=group_cols, ) - fragment_path = tmp_path_factory.mktemp("fragment0.ms") + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" updates = [ xds.assign({"DATA": (xds.DATA.dims, da.ones_like(xds.DATA.data))}) @@ -80,13 +86,16 @@ def test_fragment_with_update(ms, tmp_path_factory, group_cols): def test_nonoverlapping_parents(ms, tmp_path_factory, group_cols): """All updated data_vars must change when read from a fragment.""" + reads = xds_from_storage_ms( ms, index_cols=("TIME",), group_cols=group_cols, ) - fragment0_path = tmp_path_factory.mktemp("fragment0.ms") + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment0_path = tmp_dir / "fragment0.ms" + fragment1_path = tmp_dir / "fragment1.ms" updates = [ xds.assign({"DATA": (xds.DATA.dims, da.zeros_like(xds.DATA.data))}) @@ -103,8 +112,6 @@ def test_nonoverlapping_parents(ms, tmp_path_factory, group_cols): group_cols=group_cols, ) - fragment1_path = tmp_path_factory.mktemp("fragment1.ms") - updates = [ xds.assign({"UVW": (xds.UVW.dims, da.zeros_like(xds.UVW.data))}) for xds in fragment0_reads @@ -135,7 +142,9 @@ def test_overlapping_parents(ms, tmp_path_factory, group_cols): group_cols=group_cols, ) - fragment0_path = tmp_path_factory.mktemp("fragment0.ms") + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment0_path = tmp_dir / "fragment0.ms" + fragment1_path = tmp_dir / "fragment1.ms" updates = [ xds.assign({"DATA": (xds.DATA.dims, da.ones_like(xds.DATA.data))}) @@ -152,8 +161,6 @@ def test_overlapping_parents(ms, tmp_path_factory, group_cols): group_cols=group_cols, ) - fragment1_path = tmp_path_factory.mktemp("fragment1.ms") - updates = [ xds.assign({"DATA": (xds.DATA.dims, da.zeros_like(xds.DATA.data))}) for xds in fragment0_reads @@ -183,7 +190,8 @@ def test_inconsistent_partitioning(ms, tmp_path_factory, group_cols): group_cols=group_cols, ) - fragment_path = tmp_path_factory.mktemp("fragment0.ms") + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" writes = xds_to_table_fragment(reads, fragment_path, ms, columns=("DATA",)) @@ -211,6 +219,7 @@ def test_mutate_parent(ms, tmp_path_factory): def test_missing_parent(ms, tmp_path_factory): """Raises a ValueError when a fragment is missing a parent.""" + reads = xds_from_storage_ms( ms, index_cols=("TIME",), @@ -233,3 +242,33 @@ def test_missing_parent(ms, tmp_path_factory): index_cols=("TIME",), group_cols=("DATA_DESC_ID", "FIELD_ID", "SCAN_NUMBER"), ) + + +def test_datavar_in_parent(ms, tmp_path_factory, group_cols): + """Datavars not present in the fragment must be read from the parent.""" + + reads = xds_from_storage_ms( + ms, + index_cols=("TIME",), + group_cols=group_cols, + ) + + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" + + writes = xds_to_table_fragment(reads, fragment_path, ms, columns=("DATA",)) + + dask.compute(writes) + + fragment_reads = xds_from_table_fragment( + fragment_path, + columns=("UVW",), # Not in fragment. + index_cols=("TIME",), + group_cols=group_cols, + ) + + for rxds, frxds in zip(reads, fragment_reads): + npt.assert_array_equal(rxds.UVW.data, frxds.UVW.data) + + +# ------------------------------SUBTABLE_TESTS--------------------------------- From 8d4d3bb6d283cbf0c620faf0654faecab6e4b72c Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 10 Aug 2023 09:44:53 +0200 Subject: [PATCH 21/28] Do not merge fragment attrs into parent (for now). --- daskms/experimental/fragments/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index e1186d88..4117b0e3 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -55,8 +55,6 @@ def consolidate(xdsl): ) consolidated_xds = consolidated_xds.assign(xds.data_vars) - # NOTE: Assigning the fragment's attributes may be unnecessary/bad. - consolidated_xds = consolidated_xds.assign_attrs(xds.attrs) return consolidated_xds From 7b387049f29cdbec1f36e6268a30fadcd4699ddb Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Thu, 10 Aug 2023 10:55:03 +0200 Subject: [PATCH 22/28] Add some tests for subtable fragments. --- .../fragments/tests/test_fragments.py | 74 ++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 0e3f93cd..7b98181d 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -2,7 +2,7 @@ import dask import dask.array as da import numpy.testing as npt -from daskms import xds_from_storage_ms +from daskms import xds_from_storage_ms, xds_from_storage_table from daskms.experimental.fragments import xds_to_table_fragment, xds_from_table_fragment # Prevent warning pollution generated by all calls to xds_from_zarr with @@ -272,3 +272,75 @@ def test_datavar_in_parent(ms, tmp_path_factory, group_cols): # ------------------------------SUBTABLE_TESTS--------------------------------- + + +def test_subtable_fragment_with_noop(spw_table, tmp_path_factory): + """Unchanged data_vars must remain the same when read from a fragment.""" + + reads = xds_from_storage_table(spw_table, group_cols=("__row__",)) + + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" + + writes = xds_to_table_fragment( + reads, fragment_path, spw_table, columns=("CHAN_FREQ",) + ) + + dask.compute(writes) + + fragment_reads = xds_from_table_fragment(fragment_path, group_cols=("__row__",)) + + for rxds, frxds in zip(reads, fragment_reads): + assert rxds.equals(frxds), "Datasets not identical." + + +def test_subtable_fragment_with_update(spw_table, tmp_path_factory): + """Updated data_vars must change when read from a fragment.""" + + reads = xds_from_storage_table(spw_table, group_cols=("__row__",)) + + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" + + updates = [ + xds.assign( + {"CHAN_FREQ": (xds.CHAN_FREQ.dims, da.ones_like(xds.CHAN_FREQ.data))} + ) + for xds in reads + ] + + writes = xds_to_table_fragment( + updates, fragment_path, spw_table, columns=("CHAN_FREQ",) + ) + + dask.compute(writes) + + fragment_reads = xds_from_table_fragment(fragment_path, group_cols=("__row__",)) + + for frxds in fragment_reads: + npt.assert_array_equal(1, frxds.CHAN_FREQ.data) + + +def test_subtable_datavar_in_parent(spw_table, tmp_path_factory): + """Datavars not present in the fragment must be read from the parent.""" + + reads = xds_from_storage_table(spw_table, group_cols=("__row__",)) + + tmp_dir = tmp_path_factory.mktemp("fragments") + fragment_path = tmp_dir / "fragment.ms" + + writes = xds_to_table_fragment( + reads, fragment_path, spw_table, columns=("CHAN_FREQ",) + ) + + dask.compute(writes) + + fragment_reads = xds_from_table_fragment( + fragment_path, columns=("NUM_CHAN",), group_cols=("__row__",) + ) + + for rxds, frxds in zip(reads, fragment_reads): + npt.assert_array_equal(rxds.NUM_CHAN.data, frxds.NUM_CHAN.data) + + +# ----------------------------------------------------------------------------- From 6792c8e1415d37abd49796d612eb8bf6eb5dcd28 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 11:00:43 +0200 Subject: [PATCH 23/28] Checkpoint changes to fragments code - first determine ancestry, then load datasets. --- daskms/experimental/fragments/__init__.py | 116 ++++++++++++---------- 1 file changed, 61 insertions(+), 55 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 4117b0e3..98d5503f 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -3,6 +3,7 @@ from daskms.utils import requires from daskms.experimental.zarr import xds_to_zarr from daskms.fsspec_store import UnknownStoreTypeError +from zarr.errors import GroupNotFoundError try: import xarray # noqa @@ -14,6 +15,63 @@ xarray_import_msg = "pip install dask-ms[xarray] for xarray support" +def get_ancestry(store, only_required=True): + """Produces a list of stores needed to reconstruct the dataset at store.""" + + fragments = [] + + if not isinstance(store, DaskMSStore): + # TODO: Where, when and how should we pass storage options? + store = DaskMSStore(store) + + while True: + try: + # Try to open the store. However, as we are reading from a + # fragment, the subtable may not exist in the child. + xdsl = xds_from_storage_table(store, columns=[]) + fragments.append(store) + except UnknownStoreTypeError: + # NOTE: We don't pass kwargs - the only purpose of this read is to + # grab the parent urls (if they exist). + root_store = DaskMSStore(store.root, columns=[]) + if root_store.exists(): + xdsl = xds_from_storage_table(root_store, columns=[]) + if not only_required: + fragments.append(root_store) + else: + raise FileNotFoundError( + f"No table found at {store}. This suggests that a parent " + f"is missing." + ) + except GroupNotFoundError: + # We are likely dealing with a subtable fragment but the user is + # requesting something from the main table. This can be supported. + subtable_name = store.subdirectories()[0].rsplit("/")[-1] + subtable_store = store.subtable_store(subtable_name) + + xdsl = xds_from_storage_table(subtable_store, columns=[]) + if not only_required: + fragments.append(subtable_store) + + subtable = store.table + + parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} + + assert ( + len(parent_urls) == 1 + ), "Fragment has more than one parent - this is not supported." + + parent_url = parent_urls.pop() + + if parent_url: + if not isinstance(parent_url, DaskMSStore): + # TODO: Where, when and how should we pass storage options? + store = DaskMSStore(parent_url).subtable_store(subtable or "") + else: + fragments = fragments[::-1] # Flip so that root is first. + return fragments + + @requires(xarray_import_msg, xarray_import_error) def consolidate(xdsl): """ @@ -89,57 +147,7 @@ def xds_from_ms_fragment(store, **kwargs): xarray datasets for each group """ - # TODO: Where, when and how should we pass storage options? - if not isinstance(store, DaskMSStore): - store = DaskMSStore(store) - - lxdsl = _xds_from_table_fragment(store, **kwargs) - - return [consolidate(xdss) for xdss in zip(*lxdsl)] - - -def _xds_from_table_fragment(store, **kwargs): - try: - # Try to open the store. However, as we are reading from a fragment, - # the subtable may not exist in the child. - xdsl = xds_from_storage_table(store, **kwargs) - required = True - except UnknownStoreTypeError: - # NOTE: We don't pass kwargs - the only purpose of this read is to - # grab the parent urls (if they exist). - root_store = DaskMSStore(store.root) - if root_store.exists(): - xdsl = xds_from_storage_table(root_store) - required = False - else: - raise FileNotFoundError( - f"No table found at {store}. This suggests that a parent is " - f"missing." - ) - - subtable = store.table - - parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl} - - assert ( - len(parent_urls) == 1 - ), "Fragment has more than one parent - this is not supported." - - parent_url = parent_urls.pop() - - if parent_url: - if not isinstance(parent_url, DaskMSStore): - # TODO: Where, when and how should we pass storage options? - store = DaskMSStore(parent_url).subtable_store(subtable or "") - - xdsl_nested = _xds_from_table_fragment(store, **kwargs) - else: - return [xdsl] - - if required: - return [*xdsl_nested, xdsl] - else: - return [*xdsl_nested] + return xds_from_table_fragment(store, **kwargs) @requires(xarray_import_msg, xarray_import_error) @@ -172,11 +180,9 @@ def xds_from_table_fragment(store, **kwargs): xarray datasets for each group """ - # TODO: Where, when and how should we pass storage options? - if not isinstance(store, DaskMSStore): - store = DaskMSStore(store) + ancestors = get_ancestry(store) - lxdsl = _xds_from_table_fragment(store, **kwargs) + lxdsl = [xds_from_storage_table(s, **kwargs) for s in ancestors] return [consolidate(xdss) for xdss in zip(*lxdsl)] From 04a82d0014f2a1f819934e5083e2f08ed2cd5fc5 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 11:12:44 +0200 Subject: [PATCH 24/28] Add beginnings of fragments cli. --- daskms/apps/fragments.py | 65 ++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 2 files changed, 66 insertions(+) create mode 100644 daskms/apps/fragments.py diff --git a/daskms/apps/fragments.py b/daskms/apps/fragments.py new file mode 100644 index 00000000..743efd24 --- /dev/null +++ b/daskms/apps/fragments.py @@ -0,0 +1,65 @@ +import click +import dask +from daskms.fsspec_store import DaskMSStore +from daskms.experimental.fragments import get_ancestry +from daskms.experimental.zarr import xds_to_zarr, xds_from_zarr + + +@click.group(help="Base command for interacting with fragments.") +def fragments(): + pass + + +@click.command(help="List fragment and parents.") +@click.argument( + "fragment_path", + type=DaskMSStore, +) +@click.option( + "-p/-np", + "--prune/--no-prune", + default=False, +) +def stat(fragment_path, prune): + """Path to fragment.""" + + ancestors = get_ancestry(fragment_path, only_required=prune) + + click.echo("Ancestry:") + + for i, fg in enumerate(ancestors): + if i == 0: + click.echo(f" {fg.full_path} ---> root") + elif i == len(ancestors) - 1: + click.echo(f" {fg.full_path} ---> target") + else: + click.echo(f" {fg.full_path}") + + +@click.command(help="Change fragment parent.") +@click.argument( + "fragment_path", + type=DaskMSStore, +) +@click.argument( + "parent_path", + type=DaskMSStore, +) +def rebase(fragment_path, parent_path): + xdsl = xds_from_zarr(fragment_path, columns=[]) + + xdsl = [ + xds.assign_attrs({"__dask_ms_parent_url__": parent_path.url}) for xds in xdsl + ] + + writes = xds_to_zarr(xdsl, fragment_path) + + dask.compute(writes) + + +fragments.add_command(stat) +fragments.add_command(rebase) + + +def main(): + fragments() diff --git a/pyproject.toml b/pyproject.toml index 44f65ee4..abacdaf8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ pytest = {version = "^7.1.3", optional=true} [tool.poetry.scripts] dask-ms = "daskms.apps.entrypoint:main" +fragments = "daskms.apps.fragments:main" [tool.poetry.extras] arrow = ["pyarrow"] From f1cfeb102fbb5b968c14dacecbc8176f8a796a15 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 11:45:32 +0200 Subject: [PATCH 25/28] Simplify get_ancestry code. --- daskms/experimental/fragments/__init__.py | 53 ++++++++----------- .../fragments/tests/test_fragments.py | 2 +- 2 files changed, 24 insertions(+), 31 deletions(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 98d5503f..0fec0ec0 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -2,7 +2,6 @@ from daskms.fsspec_store import DaskMSStore from daskms.utils import requires from daskms.experimental.zarr import xds_to_zarr -from daskms.fsspec_store import UnknownStoreTypeError from zarr.errors import GroupNotFoundError try: @@ -25,33 +24,28 @@ def get_ancestry(store, only_required=True): store = DaskMSStore(store) while True: - try: - # Try to open the store. However, as we are reading from a - # fragment, the subtable may not exist in the child. - xdsl = xds_from_storage_table(store, columns=[]) - fragments.append(store) - except UnknownStoreTypeError: - # NOTE: We don't pass kwargs - the only purpose of this read is to - # grab the parent urls (if they exist). - root_store = DaskMSStore(store.root, columns=[]) - if root_store.exists(): - xdsl = xds_from_storage_table(root_store, columns=[]) - if not only_required: - fragments.append(root_store) - else: - raise FileNotFoundError( - f"No table found at {store}. This suggests that a parent " - f"is missing." - ) - except GroupNotFoundError: - # We are likely dealing with a subtable fragment but the user is - # requesting something from the main table. This can be supported. - subtable_name = store.subdirectories()[0].rsplit("/")[-1] - subtable_store = store.subtable_store(subtable_name) - - xdsl = xds_from_storage_table(subtable_store, columns=[]) - if not only_required: - fragments.append(subtable_store) + root_store = DaskMSStore(store.root) + + if store.exists(): + try: + # Store exists and can be read. + xdsl = xds_from_storage_table(store, columns=[]) + fragments += [store] + except GroupNotFoundError: + # Store exists, but cannot be read. We may be dealing with + # a subtable only fragment. NOTE: This assumes that all + # subtables in a fragment have the same parent, so we don't + # care which subtable we read. + subtable_name = store.subdirectories()[0].rsplit("/")[-1] + subtable_store = store.subtable_store(subtable_name) + xdsl = xds_from_storage_table(subtable_store, columns=[]) + fragments += [] if only_required else [subtable_store] + elif root_store.exists(): + # Root store exists and can be read. + xdsl = xds_from_storage_table(root_store, columns=[]) + fragments += [] if only_required else [root_store] + else: + raise FileNotFoundError(f"No root/fragment found at {store}.") subtable = store.table @@ -68,8 +62,7 @@ def get_ancestry(store, only_required=True): # TODO: Where, when and how should we pass storage options? store = DaskMSStore(parent_url).subtable_store(subtable or "") else: - fragments = fragments[::-1] # Flip so that root is first. - return fragments + return fragments[::-1] # Flip so that the root appears first. @requires(xarray_import_msg, xarray_import_error) diff --git a/daskms/experimental/fragments/tests/test_fragments.py b/daskms/experimental/fragments/tests/test_fragments.py index 7b98181d..557f08fc 100644 --- a/daskms/experimental/fragments/tests/test_fragments.py +++ b/daskms/experimental/fragments/tests/test_fragments.py @@ -236,7 +236,7 @@ def test_missing_parent(ms, tmp_path_factory): dask.compute(writes) - with pytest.raises(FileNotFoundError, match="No table found at"): + with pytest.raises(FileNotFoundError, match="No root/fragment found at"): xds_from_table_fragment( fragment_path, index_cols=("TIME",), From 122776e78ca36b7e1099bea5f08f804b602ae6aa Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 12:18:04 +0200 Subject: [PATCH 26/28] Update HISTORY.rst. --- HISTORY.rst | 1 + daskms/apps/fragments.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index ba2de36b..a0d64c6f 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -4,6 +4,7 @@ History X.Y.Z (YYYY-MM-DD) ------------------ +* Add experimental fragments functionality (:pr:`282`) * Update minio server and client versions (:pr:`287`) 0.2.17 (2023-08-02) diff --git a/daskms/apps/fragments.py b/daskms/apps/fragments.py index 743efd24..0166a30d 100644 --- a/daskms/apps/fragments.py +++ b/daskms/apps/fragments.py @@ -21,7 +21,6 @@ def fragments(): default=False, ) def stat(fragment_path, prune): - """Path to fragment.""" ancestors = get_ancestry(fragment_path, only_required=prune) From f0c96f03419b07701031d843a171b6256e9f69f2 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 14:27:34 +0200 Subject: [PATCH 27/28] Fix silent failure when accessing a subtable which doesn't exist in any parent. --- daskms/experimental/fragments/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index 0fec0ec0..a7d34d1b 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -62,6 +62,13 @@ def get_ancestry(store, only_required=True): # TODO: Where, when and how should we pass storage options? store = DaskMSStore(parent_url).subtable_store(subtable or "") else: + if store.table and not any(f.table for f in fragments): + # If we are attempting to open a subtable, we don't know if + # it exists until we have traversed the entire ancestry. + raise FileNotFoundError( + f"{store.table} subtable was not found in parents." + ) + return fragments[::-1] # Flip so that the root appears first. From 4c7b8564a5230340d58ae1d61d743a50fe248730 Mon Sep 17 00:00:00 2001 From: JSKenyon Date: Fri, 11 Aug 2023 16:10:59 +0200 Subject: [PATCH 28/28] Add root_url to DaskMSStore. Fixes incorrect stores for s3. --- daskms/experimental/fragments/__init__.py | 2 +- daskms/fsspec_store.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/daskms/experimental/fragments/__init__.py b/daskms/experimental/fragments/__init__.py index a7d34d1b..b0d5bd15 100644 --- a/daskms/experimental/fragments/__init__.py +++ b/daskms/experimental/fragments/__init__.py @@ -24,7 +24,7 @@ def get_ancestry(store, only_required=True): store = DaskMSStore(store) while True: - root_store = DaskMSStore(store.root) + root_store = DaskMSStore(store.root_url) if store.exists(): try: diff --git a/daskms/fsspec_store.py b/daskms/fsspec_store.py index 56acf2a9..2c35bdc6 100644 --- a/daskms/fsspec_store.py +++ b/daskms/fsspec_store.py @@ -94,6 +94,10 @@ def assert_type(self, store_type): def url(self): return f"{self.fs.unstrip_protocol(self.canonical_path)}" + @property + def root_url(self): + return f"{self.fs.unstrip_protocol(self.root)}" + def subdirectories(self): return [ d["name"]