Skip to content

Commit

Permalink
Catalog import argument documentation. (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored Dec 14, 2023
1 parent 413c8b1 commit da589dc
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 99 deletions.
37 changes: 0 additions & 37 deletions docs/catalogs/advanced.rst

This file was deleted.

163 changes: 141 additions & 22 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ Catalog Import Arguments

This page discusses a few topics around setting up a catalog pipeline.

At a minimum, you need to pass the reader arguments that include where to find the input files,
the column names for RA, DEC and ID, and where to put the output files. A minimal arguments block
will look something like:
At a minimum, you need arguments that include where to find the input files,
the column names for RA, and DEC, and where to put the output files.
A minimal arguments block will look something like:

.. code-block:: python
Expand All @@ -19,16 +19,76 @@ will look something like:
output_path="./output",
)
You only need to provide the ``file_reader`` argument if you are using a custom file reader
or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(separator="\s+")``
to parse a whitespace separated file.
More details on each of these parameters is provided below.
More details on each of these parameters is provided in sections below.

For a full list of the available arguments, see the API documentation for
For the curious, see the API documentation for
:py:class:`hipscat_import.catalog.arguments.ImportArguments`, and its superclass
:py:class:`hipscat_import.runtime_arguments.RuntimeArguments`.

Pipeline setup
-------------------------------------------------------------------------------

Dask
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

We will either use a user-provided dask ``Client``, or create a new one with
arguments:

``dask_tmp`` - ``str`` - directory for dask worker space. this should be local to
the execution of the pipeline, for speed of reads and writes. For much more
information, see :doc:`temp_files`

``dask_n_workers`` - ``int`` - number of workers for the dask client. Defaults to 1.

``dask_threads_per_worker`` - ``int`` - number of threads per dask worker. Defaults to 1.

If you find that you need additional parameters for your dask client (e.g are creating
a SLURM worker pool), you can instead create your own dask client and pass along
to the pipeline, ignoring the above arguments. This would look like:

.. code-block:: python
from dask.distributed import Client
from hipscat_import.pipeline import pipeline_with_client
args = ...
with Client('scheduler:port') as client:
pipeline_with_client(args, client)
If you're running within a ``.py`` file, we recommend you use a ``main`` guard to
potentially avoid some python threading issues with dask:

.. code-block:: python
from hipscat_import.pipeline import pipeline
def import_pipeline():
args = ...
pipeline(args)
if __name__ == '__main__':
import_pipeline()
Resuming
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The import pipeline has the potential to be a very long-running process, if
you're importing large amounts of data, or performing complex transformations
on the data before writing.

While the pipeline runs, we take notes of our progress so that the pipeline can
be resumed at a later time, if the job is pre-empted or canceled for any reason.

When instantiating a pipeline, you can use the ``resume`` flag to indicate that
we can resume from an earlier execution of the pipeline.

If any resume files are found, we will only proceed if you've set the ``resume=True``.
Otherwise, the pipeline will terminate.

To address this, go to the temp directory you've specified and remove any intermediate
files created by the previous runs of the ``hipscat-import`` pipeline.

Reading input files
-------------------------------------------------------------------------------

Expand Down Expand Up @@ -61,14 +121,18 @@ How to read them?

Specify an instance of ``InputReader`` for the ``file_reader`` parameter.

see the API documentation for
:py:class:`hipscat_import.catalog.file_readers.InputReader`

We use the ``InputReader`` class to read files in chunks and pass the chunks
along to the map/reduce stages. We've provided reference implementations for
reading CSV, FITS, and Parquet input files, but you can subclass the reader
type to suit whatever input files you've got.

You only need to provide the ``file_reader`` argument if you are using a custom file reader
or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(separator="\s+")``
to parse a whitespace separated file.

You can find the full API documentation for
:py:class:`hipscat_import.catalog.file_readers.InputReader`

.. code-block:: python
class StarrReader(InputReader):
Expand Down Expand Up @@ -102,6 +166,9 @@ type to suit whatever input files you've got.
)
If you're reading from cloud storage, or otherwise have some filesystem credential
dict, put those in ``input_storage_options``.

Which fields?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -111,13 +178,13 @@ There are two fields that we require in order to make a valid hipscatted
catalog, the right ascension and declination. At this time, this is the only
supported system for celestial coordinates.

If you're importing data that has previously been hipscatted, you may use
``use_hipscat_index = True``. This will use that previously compused hipscat spatial
index as the position, instead of ra/dec.

Healpix order and thresholds
-------------------------------------------------------------------------------

Details for ``pixel_threshold``, ``highest_healpix_order``, and
``constant_healpix_order`` arguments

When creating a new catalog through the hipscat-import process, we try to
create partitions with approximately the same number of rows per partition.
This isn't perfect, because the sky is uneven, but we still try to create
Expand All @@ -133,6 +200,8 @@ should adjust your parameters.
For more discussion of the ``pixel_threshold`` argument and a strategy for setting
this parameter, see notebook :doc:`/notebooks/estimate_pixel_threshold`

For more discussion of the "Binning" and all other stages, see :doc:`temp_files`

Alternatively, you can use the ``constant_healpix_order`` argument. This will
**ignore** both of the ``pixel_threshold`` and ``highest_healpix_order`` arguments
and the catalog will be partitioned by healpix pixels at the
Expand Down Expand Up @@ -164,20 +233,70 @@ for either pipeline success or failure.
Output
-------------------------------------------------------------------------------

Where?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You must specify a name for the catalog, using ``output_artifact_name``.

You must specify where you want your catalog data to be written, using
``output_path``. This path should be the base directory for your catalogs, as
the full path for the catalog will take the form of ``output_path/output_artifact_name``.

If there is already catalog data in the indicated directory, you can force a
new catalog to be written in the directory with the ``overwrite`` flag.
new catalog to be written in the directory with the ``overwrite`` flag. It's
preferable to delete any existing contents, however, as this may cause
unexpected side effects.

If you're writing to cloud storage, or otherwise have some filesystem credential
dict, put those in ``output_storage_options``.

In addition, you can specify directories to use for various intermediate files:

- dask worker space (``dask_tmp``)
- sharded parquet files (``tmp_dir``)
- intermediate resume files (``resume_tmp``)

Most users are going to be ok with simply setting the ``tmp_dir`` for all intermediate
file use. For more information on these parameters, when you would use each,
and demonstrations of temporary file use see :doc:`temp_files`

How?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You may want to tweak parameters of the final catalog output, and we have helper
arguments for a few of those.

``add_hipscat_index`` - ``bool`` - whether or not to add the hipscat spatial index
as a column in the resulting catalog. The ``_hipscat_index`` field is designed to make many
dask operations more performant, but if you do not intend to publish your dataset
and do not intend to use dask, then you can suppress generation of this column to
save a little space in your final disk usage.

The ``_hipscat_index`` uses a high healpix order and a uniqueness counter to create
values that can order all points in the sky, according to a nested healpix scheme.

``sort_columns`` - ``str`` - column for survey identifier, or other sortable column.
If sorting by multiple columns, they should be comma-separated.
If ``add_hipscat_index=True``, this sorting will be used to resolve the
index counter within the same higher-order pixel space.

``use_schema_file`` - ``str`` - path to a parquet file with schema metadata.
This will be used for column metadata when writing the files, if specified.
For more information on why you would want this file and how to generate it,
check out our notebook :doc:`/notebooks/unequal_schema`.

``debug_stats_only`` - ``bool`` - If ``True``, we will not create the leaf
parquet files with the catalog data, and will only generate root-level metadata
files representing the full statistics of the final catalog. This can be useful
when probing the import process for effectiveness on processing a target dataset.

``epoch`` - ``str`` - astronomical epoch for the data. defaults to ``"J2000"``

``catalog_type`` - ``"object"`` or ``"source"``. Indicates the level of catalog data,
using the LSST nomenclature:

In addition, you can specify a directory to use for intermediate files, using
``tmp_dir``, as well as a directory for dask to use for intermediate files using
``dask_tmp``. This can be useful if you have additional scratch storage, or want
to use local storage for intermediate files and remote storage for the final
catalog files.
- object - things in the sky (e.g. stars, galaxies)
- source - detections of things in the sky at some point in time.

For more information on these parameters, when you would use each, and demonstrations
of temporary file use see :doc:`temp_files`
Some data providers split detection-level data into a separate catalog, to make object
catalogs smaller, and reflects a relational data model.
16 changes: 0 additions & 16 deletions docs/catalogs/debug.rst

This file was deleted.

21 changes: 0 additions & 21 deletions docs/catalogs/resume.rst

This file was deleted.

3 changes: 0 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ threading issues with dask:
:caption: Catalogs

catalogs/arguments
catalogs/resume
catalogs/debug
catalogs/advanced
catalogs/temp_files
catalogs/public/index

Expand Down

0 comments on commit da589dc

Please sign in to comment.