Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async I/O methods [WIP] #749

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ PySTAC can be installed from pip or the source repository.

If you would like to enable the validation feature utilizing the
[jsonschema](https://pypi.org/project/jsonschema/) project, install with the optional
`validation` requirements:
`validation` extra:


```bash
Expand All @@ -40,7 +40,7 @@ If you would like to enable the validation feature utilizing the

If you would like to use the [`orjson`](https://pypi.org/project/orjson/) instead of the
standard `json` library for JSON serialization/deserialization, install with the
optional `orjson` requirements:
optional `orjson` extra:

```bash
> pip install pystac[orjson]
Expand All @@ -50,6 +50,16 @@ optional `orjson` requirements:
> `orjson` extra with Python 3.10 you will need to have the Rust nightly toolchain
> installed as your default toolchain in order to build the package wheel.*

If you would like to take advantage of asyncio I/O operations, install with the
optional `async` extra:

```bash
> pip install pystac[async]
```

This installs the [`aiofiles`](https://pypi.org/project/aiofiles/) and
[`httpx`](https://www.python-httpx.org/) dependencies.

From source repository:

```bash
Expand Down
6 changes: 5 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ as to serialize and deserialize STAC object to and from JSON.

* :class:`pystac.StacIO`: Base class that can be inherited to provide custom I/O
* :class:`pystac.stac_io.DefaultStacIO`: The default :class:`pystac.StacIO`
implementation used throughout the library.
implementation used throughout the library when ``aiofiles`` or ``httpx`` is not
installed.
* :class:`pystac.stac_io.DefaultStacIOAsync`: The default :class:`pystac.StacIO`
implementation used throughout the library when ``aiofiles`` and ``httpx`` are
installed.

Extensions
----------
Expand Down
19 changes: 19 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ Install from PyPi (recommended)

pip install pystac

.. note::
It is **highly recommended** that you install the ``aiofiles`` and ``httpx`` extras as well.

.. code-block:: bash

pip install pystac[aiofiles,httpx]

Install from conda-forge
========================

Expand Down Expand Up @@ -35,6 +42,18 @@ for the basic PySTAC library is `python-dateutil
PySTAC also has the following extras, which can be optionally installed to provide
additional functionality:

* ``aiofiles`` and ``httpx``

Installs the additional `aiofiles.open <https://pypi.org/project/aiofiles/>`__ and/or
`httpx <https://www.python-httpx.org>`__ dependencies used by the
:class:`~pystac.stac_io.DefaultStacIOAsync` default implementaiton of
:class:`~pystac.StacIO`.

.. note::
To get the best performance, it is **highly recommended** that you either install
these extras or create your own asynchronous :class:`~pystac.StacIO` implementation
using alternative async I/O libraries.

* ``validation``

Installs the additional `jsonschema
Expand Down
78 changes: 72 additions & 6 deletions pystac/catalog.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from copy import deepcopy
from pystac.errors import STACTypeError
Expand Down Expand Up @@ -731,6 +732,8 @@ def save(
self,
catalog_type: Optional[CatalogType] = None,
dest_href: Optional[str] = None,
*,
item_batch_size: int = 50,
) -> None:
"""Save this catalog and all it's children/item to files determined by the object's
self link HREF or a specified path.
Expand All @@ -752,7 +755,55 @@ def save(
If the catalog type is ``CatalogType.SELF_CONTAINED``, no self links will
be included and hierarchical links will be relative URLs.
"""
try:
loop = asyncio.get_running_loop()
loop.run_until_complete(
self.save_async(
catalog_type, dest_href, item_batch_size=item_batch_size
)
)
except RuntimeError:
asyncio.run(
self.save_async(
catalog_type, dest_href, item_batch_size=item_batch_size
)
)

async def save_async(
self,
catalog_type: Optional[CatalogType] = None,
dest_href: Optional[str] = None,
*,
item_batch_size: int = 50,
) -> None:
"""Asynchronously save this catalog and all it's children/item to files
determined by the object's self link HREF or a specified path.

This method will recursively await calls to :meth:`~pystac.Catalog.save_async`
for any child Catalogs and Collections and uses :func:`asyncio.gather` to await
batched calls to :meth:`STACObject.save_object_async
<pystac.STACObject.save_object_async>`. The number of requests per batch can be
controlled using the ``item_batch_size`` argument.

Args:
catalog_type : The catalog type that dictates the structure of
the catalog to save. Use a member of :class:`~pystac.CatalogType`.
If not supplied, the catalog_type of this catalog will be used.
If that attribute is not set, an exception will be raised.
dest_href : The location where the catalog is to be saved.
If not supplied, the catalog's self link HREF is used to determine
the location of the catalog file and children's files.
item_batch_size : The maximum number of concurrent Items to save in
each batch. Defaults to ``50``.
Note:
If the catalog type is ``CatalogType.ABSOLUTE_PUBLISHED``,
all self links will be included, and hierarchical links be absolute URLs.
If the catalog type is ``CatalogType.RELATIVE_PUBLISHED``, this catalog's
self link will be included, but no child catalog will have self links, and
hierarchical links will be relative URLs
If the catalog type is ``CatalogType.SELF_CONTAINED``, no self links will
be included and hierarchical links will be relative URLs.
"""
root = self.get_root()
if root is None:
raise Exception("There is no root catalog")
Expand All @@ -770,10 +821,11 @@ def save(
child_dest_href = make_absolute_href(
rel_href, dest_href, start_is_dir=True
)
child.save(dest_href=os.path.dirname(child_dest_href))
await child.save_async(dest_href=os.path.dirname(child_dest_href))
else:
child.save()
await child.save_async()

item_batch = []
for item_link in self.get_item_links():
if item_link.is_resolved():
item = cast(pystac.Item, item_link.target)
Expand All @@ -782,12 +834,26 @@ def save(
item_dest_href = make_absolute_href(
rel_href, dest_href, start_is_dir=True
)
item.save_object(
include_self_link=items_include_self_link,
dest_href=item_dest_href,
item_batch.append(
item.save_object_async(
include_self_link=items_include_self_link,
dest_href=item_dest_href,
)
)
else:
item.save_object(include_self_link=items_include_self_link)
item_batch.append(
item.save_object_async(
include_self_link=items_include_self_link
)
)

if len(item_batch) == item_batch_size:
await asyncio.gather(*item_batch)
item_batch = []

if len(item_batch) > 0:
await asyncio.gather(*item_batch)
item_batch = []

include_self_link = False
# include a self link if this is the root catalog
Expand Down
Loading