Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental code for data fragments. #282

Merged
merged 29 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48da17d
Add experimental code for reading data from multiple sources.
JSKenyon Aug 3, 2023
3b50340
WIP on proxy reads.
JSKenyon Aug 3, 2023
c25bca9
Add further work on composite datasets.
JSKenyon Aug 4, 2023
73278e2
Rename and document xds_to/from_fragment.
JSKenyon Aug 4, 2023
3a50c17
Checkpoint further work on fragment code.
JSKenyon Aug 4, 2023
473b346
Rename experimental module.
JSKenyon Aug 4, 2023
58f8f28
Apply black.
JSKenyon Aug 8, 2023
9ed2818
Some tidying.
JSKenyon Aug 8, 2023
1713981
More tidying.
JSKenyon Aug 8, 2023
4090974
More docs.
JSKenyon Aug 8, 2023
a679d63
Clarify variable names.
JSKenyon Aug 8, 2023
2867bed
More notes about storage options.
JSKenyon Aug 8, 2023
89f6dc3
Begin adding tests. Fix bug exposesd by tests.
JSKenyon Aug 8, 2023
ea17b50
Simplify.
JSKenyon Aug 8, 2023
8fcd44e
Apply black.
JSKenyon Aug 8, 2023
fcafb7e
Use xds_from_table_fragment everywhere - xds_from_ms_fragment is just…
JSKenyon Aug 8, 2023
9fd8a4f
Add test/fix for self-parenthood.
JSKenyon Aug 8, 2023
747007f
Add pytest to testing dependencies. Check with Simon.
JSKenyon Aug 8, 2023
0147ac2
Add further tests.
JSKenyon Aug 10, 2023
91bd15b
More tests and more consistent use of tmp_path_factory.
JSKenyon Aug 10, 2023
8d4d3bb
Do not merge fragment attrs into parent (for now).
JSKenyon Aug 10, 2023
7b38704
Add some tests for subtable fragments.
JSKenyon Aug 10, 2023
6792c8e
Checkpoint changes to fragments code - first determine ancestry, then…
JSKenyon Aug 11, 2023
04a82d0
Add beginnings of fragments cli.
JSKenyon Aug 11, 2023
f1cfeb1
Simplify get_ancestry code.
JSKenyon Aug 11, 2023
122776e
Update HISTORY.rst.
JSKenyon Aug 11, 2023
f0c96f0
Fix silent failure when accessing a subtable which doesn't exist in a…
JSKenyon Aug 11, 2023
4c7b856
Add root_url to DaskMSStore. Fixes incorrect stores for s3.
JSKenyon Aug 11, 2023
5cdef02
Merge branch 'master' into multisource-experimental
sjperkins Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ History

X.Y.Z (YYYY-MM-DD)
------------------
* Add experimental fragments functionality (:pr:`282`)
* Run CI weekly on Monday @ 2h30 am UTC (:pr:`288`)
* Update minio server and client versions (:pr:`287`)
* Retain ROWID coordinates during MS conversion (:pr:`286`)
Expand Down
64 changes: 64 additions & 0 deletions daskms/apps/fragments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import click
import dask
from daskms.fsspec_store import DaskMSStore
from daskms.experimental.fragments import get_ancestry
from daskms.experimental.zarr import xds_to_zarr, xds_from_zarr


@click.group(help="Base command for interacting with fragments.")
def fragments():
pass


@click.command(help="List fragment and parents.")
@click.argument(
"fragment_path",
type=DaskMSStore,
)
@click.option(
"-p/-np",
"--prune/--no-prune",
default=False,
)
def stat(fragment_path, prune):

ancestors = get_ancestry(fragment_path, only_required=prune)

click.echo("Ancestry:")

for i, fg in enumerate(ancestors):
if i == 0:
click.echo(f" {fg.full_path} ---> root")
elif i == len(ancestors) - 1:
click.echo(f" {fg.full_path} ---> target")
else:
click.echo(f" {fg.full_path}")


@click.command(help="Change fragment parent.")
@click.argument(
"fragment_path",
type=DaskMSStore,
)
@click.argument(
"parent_path",
type=DaskMSStore,
)
def rebase(fragment_path, parent_path):
xdsl = xds_from_zarr(fragment_path, columns=[])

xdsl = [
xds.assign_attrs({"__dask_ms_parent_url__": parent_path.url}) for xds in xdsl
]

writes = xds_to_zarr(xdsl, fragment_path)

dask.compute(writes)


fragments.add_command(stat)
fragments.add_command(rebase)


def main():
fragments()
237 changes: 237 additions & 0 deletions daskms/experimental/fragments/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
from daskms import xds_from_storage_table
from daskms.fsspec_store import DaskMSStore
from daskms.utils import requires
from daskms.experimental.zarr import xds_to_zarr
from zarr.errors import GroupNotFoundError

try:
import xarray # noqa
except ImportError as e:
xarray_import_error = e
else:
xarray_import_error = None

xarray_import_msg = "pip install dask-ms[xarray] for xarray support"


def get_ancestry(store, only_required=True):
"""Produces a list of stores needed to reconstruct the dataset at store."""

fragments = []

if not isinstance(store, DaskMSStore):
# TODO: Where, when and how should we pass storage options?
store = DaskMSStore(store)

while True:
root_store = DaskMSStore(store.root_url)

if store.exists():
try:
# Store exists and can be read.
xdsl = xds_from_storage_table(store, columns=[])
fragments += [store]
except GroupNotFoundError:
# Store exists, but cannot be read. We may be dealing with
# a subtable only fragment. NOTE: This assumes that all
# subtables in a fragment have the same parent, so we don't
# care which subtable we read.
subtable_name = store.subdirectories()[0].rsplit("/")[-1]
subtable_store = store.subtable_store(subtable_name)
xdsl = xds_from_storage_table(subtable_store, columns=[])
fragments += [] if only_required else [subtable_store]
elif root_store.exists():
# Root store exists and can be read.
xdsl = xds_from_storage_table(root_store, columns=[])
fragments += [] if only_required else [root_store]
else:
raise FileNotFoundError(f"No root/fragment found at {store}.")

subtable = store.table

parent_urls = {xds.attrs.get("__dask_ms_parent_url__", None) for xds in xdsl}

assert (
len(parent_urls) == 1
), "Fragment has more than one parent - this is not supported."

parent_url = parent_urls.pop()

if parent_url:
if not isinstance(parent_url, DaskMSStore):
# TODO: Where, when and how should we pass storage options?
store = DaskMSStore(parent_url).subtable_store(subtable or "")
else:
if store.table and not any(f.table for f in fragments):
# If we are attempting to open a subtable, we don't know if
# it exists until we have traversed the entire ancestry.
raise FileNotFoundError(
f"{store.table} subtable was not found in parents."
)

return fragments[::-1] # Flip so that the root appears first.


@requires(xarray_import_msg, xarray_import_error)
def consolidate(xdsl):
"""
Consolidates a list of xarray datasets by assigning data variables.
Priority is determined by the position within the list, with elements at
the end of the list having higher priority than those at the start. The
primary purpose of this function is the construction of a consolidated
dataset from a root and deltas (fragments).

Parameters
----------
xdsl : tuple or list
Tuple or list of :class:`xarray.Dataset` objects to consolidate.

Returns
-------
consolidated_xds : :class:`xarray.Dataset`
A single :class:`xarray.Dataset`.
"""

root_xds = xdsl[0] # First element is the root for this operation.

root_schema = root_xds.__daskms_partition_schema__
root_partition_keys = {p[0] for p in root_schema}

consolidated_xds = root_xds # Will be replaced in the loop.

for xds in xdsl[1:]:
xds_schema = xds.__daskms_partition_schema__
xds_partition_keys = {p[0] for p in xds_schema}

if root_partition_keys.symmetric_difference(xds_partition_keys):
raise ValueError(
f"consolidate failed due to conflicting partition keys. "
f"This usually means the partition keys of the fragments "
f"are inconsistent with the current group_cols argument. "
f"Current group_cols produces {root_partition_keys} but "
f"the fragment has {xds_partition_keys}."
)

consolidated_xds = consolidated_xds.assign(xds.data_vars)

return consolidated_xds


@requires(xarray_import_msg, xarray_import_error)
def xds_from_ms_fragment(store, **kwargs):
"""
Creates a list of xarray datasets representing the contents a composite
Measurement Set. The resulting list of datasets will consist of some root
dataset with any newer variables populated from the child fragments. It
defers to :func:`xds_from_table_fragment`, which should be consulted
for more information.

Parameters
----------
store : str or DaskMSStore
Store or string of the child fragment of interest.
columns : tuple or list, optional
Columns present on the resulting dataset.
Defaults to all if ``None``.
index_cols : tuple or list, optional
Sequence of indexing columns.
Defaults to :code:`%(indices)s`
group_cols : tuple or list, optional
Sequence of grouping columns.
Defaults to :code:`%(groups)s`
**kwargs : optional

Returns
-------
datasets : list of :class:`xarray.Dataset`
xarray datasets for each group
"""

return xds_from_table_fragment(store, **kwargs)


@requires(xarray_import_msg, xarray_import_error)
def xds_from_table_fragment(store, **kwargs):
"""
Creates a list of xarray datasets representing the contents a composite
Measurement Set. The resulting list of datasets will consist of some root
dataset with any newer variables populated from the child fragments. It
defers to :func:`xds_from_storage_ms`, which should be consulted
for more information.

Parameters
----------
store : str or DaskMSStore
Store or string of the child fragment of interest.
columns : tuple or list, optional
Columns present on the resulting dataset.
Defaults to all if ``None``.
index_cols : tuple or list, optional
Sequence of indexing columns.
Defaults to :code:`%(indices)s`
group_cols : tuple or list, optional
Sequence of grouping columns.
Defaults to :code:`%(groups)s`
**kwargs : optional

Returns
-------
datasets : list of :class:`xarray.Dataset`
xarray datasets for each group
"""

ancestors = get_ancestry(store)

lxdsl = [xds_from_storage_table(s, **kwargs) for s in ancestors]

return [consolidate(xdss) for xdss in zip(*lxdsl)]


@requires(xarray_import_msg, xarray_import_error)
def xds_to_table_fragment(xds, store, parent, **kwargs):
"""
Generates a list of Datasets representing write operations from the
specified arrays in :class:`xarray.Dataset`'s into a child fragment
dataset.

Parameters
----------
xds : :class:`xarray.Dataset` or list of :class:`xarray.Dataset`
dataset(s) containing the specified columns. If a list of datasets
is provided, the concatenation of the columns in
sequential datasets will be written.
store : str or DaskMSStore
Store or string which determines the location to which the child
fragment will be written.
parent : str or DaskMSStore
Store or sting corresponding to the parent dataset. Can be either
point to either a root dataset or another child fragment.

**kwargs : optional arguments. See :func:`xds_to_table`.

Returns
-------
write_datasets : list of :class:`xarray.Dataset`
Datasets containing arrays representing write operations
into a CASA Table
table_proxy : :class:`daskms.TableProxy`, optional
The Table Proxy associated with the datasets
"""

# TODO: Where, when and how should we pass storage options?
if not isinstance(parent, DaskMSStore):
parent = DaskMSStore(parent)

# TODO: Where, when and how should we pass storage options?
if not isinstance(store, DaskMSStore):
store = DaskMSStore(store)

if parent == store:
raise ValueError(
"store and parent arguments identical in xds_to_table_fragment. "
"This is unsupported i.e. a fragment cannot be its own parent. "
)

xds = [x.assign_attrs({"__dask_ms_parent_url__": parent.url}) for x in xds]

return xds_to_zarr(xds, store, **kwargs)
Empty file.
Loading
Loading