Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
2.1.0 (TBD)

Added:
- Add the option to get Planetary Variable subscription results as a CSV file ().
- A subscription_request.planetary_variable_source function has been added
(#976).
- The subscription_request.build_request function has a new option to clip to
the subscription's source geometry. This is a preview of the default
behavior of the next version of the Subscriptions API.
the subscription's source geometry. This is a preview of the default behavior
of the next version of the Subscriptions API (#971).

2.0.3 (2023-06-28)

Expand Down
24 changes: 17 additions & 7 deletions planet/cli/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,13 @@ async def get_subscription_cmd(ctx, subscription_id, pretty):
"success"]),
multiple=True,
default=None,
callback=lambda ctx,
param,
value: set(value),
callback=(lambda ctx, param, value: set(value)),
help="Select subscription results in one or more states. Default: all.")
@click.option('--csv',
'csv_flag',
is_flag=True,
default=False,
help="Get subscription results as an unpaged CSV file.")
@limit
# TODO: the following 3 options.
# –created: timestamp instant or range.
Expand All @@ -178,13 +181,20 @@ async def list_subscription_results_cmd(ctx,
subscription_id,
pretty,
status,
csv_flag,
limit):
"""Gets results of a subscription and prints the API response."""
async with subscriptions_client(ctx) as client:
async for result in client.get_results(subscription_id,
status=status,
limit=limit):
echo_json(result, pretty)
if csv_flag:
async for result in client.get_results_csv(subscription_id,
status=status,
limit=limit):
click.echo(result)
else:
async for result in client.get_results(subscription_id,
status=status,
limit=limit):
echo_json(result, pretty)


@subscriptions.command() # type: ignore
Expand Down
58 changes: 52 additions & 6 deletions planet/clients/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Planet Subscriptions API Python client."""

import logging
from typing import AsyncIterator, Optional, Set
from typing import AsyncIterator, Literal, Optional, Sequence

from planet.exceptions import APIError, ClientError
from planet.http import Session
Expand Down Expand Up @@ -58,7 +58,7 @@ def __init__(self,
self._base_url = self._base_url[:-1]

async def list_subscriptions(self,
status: Optional[Set[str]] = None,
status: Optional[Sequence[str]] = None,
limit: int = 100) -> AsyncIterator[dict]:
"""Iterate over list of account subscriptions with optional filtering.

Expand Down Expand Up @@ -216,16 +216,21 @@ async def get_subscription(self, subscription_id: str) -> dict:

async def get_results(self,
subscription_id: str,
status: Optional[Set[str]] = None,
status: Optional[Sequence[Literal[
"created",
"queued",
"processing",
"failed",
"success"]]] = None,
limit: int = 100) -> AsyncIterator[dict]:
"""Iterate over results of a Subscription.

Note:
Notes:
The name of this method is based on the API's method name. This
method provides iteration over results, it does not get a
single result description or return a list of descriptions.

Args:
Parameters:
subscription_id (str): id of a subscription.
status (Set[str]): pass result with status in this set,
filter out results with status not in this set.
Expand All @@ -252,7 +257,6 @@ class _ResultsPager(Paged):
resp = await self._session.request(method='GET',
url=url,
params=params)

async for sub in _ResultsPager(resp,
self._session.request,
limit=limit):
Expand All @@ -263,3 +267,45 @@ class _ResultsPager(Paged):
raise
except ClientError: # pragma: no cover
raise

async def get_results_csv(self,
subscription_id: str,
status: Optional[Sequence[Literal[
"created",
"queued",
"processing",
"failed",
"success"]]] = None,
**kwargs) -> AsyncIterator[str]:
"""Iterate over rows of results CSV for a Subscription.

Notes:
The name of this method is based on the API's method name. This
method provides iteration over results, it does not get a
single result description or return a list of descriptions.

Parameters:
subscription_id (str): id of a subscription.
status (Set[str]): pass result with status in this set,
filter out results with status not in this set.
TODO: created, updated, completed, user_id

Yields:
str: a row from a CSV file.

Raises:
APIError: on an API server error.
ClientError: on a client error.
"""
url = f'{self._base_url}/{subscription_id}/results'
params = {'status': [val for val in status or {}], 'format': 'csv'}

# Note: retries are not implemented yet. This project has
# retry logic for HTTP requests, but does not handle errors
# during streaming. We may want to consider a retry decorator
# for this entire method a la stamina:
# https://github.com/hynek/stamina.
Copy link
Contributor Author

@sgillies sgillies Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that our model for retrying requests doesn't cover streaming cases.

Retrying can wait until after 2.1.0.

async with self._session._client.stream('GET', url,
params=params) as response:
async for line in response.aiter_lines():
yield line
133 changes: 110 additions & 23 deletions planet/subscription_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# the License.
"""Functionality for preparing subscription requests."""
from datetime import datetime
from typing import Any, Dict, Optional, List, Mapping
from typing import Any, Dict, Optional, List, Literal, Mapping

from . import geojson, specs
from .exceptions import ClientError
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_request(name: str,
delivery: Mapping,
notifications: Optional[Mapping] = None,
tools: Optional[List[Mapping]] = None,
clip_to_source=False) -> dict:
clip_to_source: Optional[bool] = False) -> dict:
"""Construct a Subscriptions API request.

The return value can be passed to
Expand All @@ -73,12 +73,12 @@ def build_request(name: str,
behavior.

Returns:
A Python dict representation of a Subscriptions API request for
a new subscription.
dict: a representation of a Subscriptions API request for
a new subscription.

Raises:
ClientError when a valid Subscriptions API request can't be
constructed.
ClientError: when a valid Subscriptions API request can't be
constructed.

Examples:
```python
Expand Down Expand Up @@ -152,27 +152,34 @@ def catalog_source(
end_time: Optional[datetime] = None,
rrule: Optional[str] = None,
) -> dict:
"""Catalog subscription source.
"""Construct a Catalog subscription source.

The return value can be passed to
[planet.subscription_request.build_request][].

Parameters:
item_types: The class of spacecraft and processing level of the
subscription's matching items, e.g. PSScene.
asset_types: The data products which will be delivered for all subscription
matching items. An item will only match and deliver if all specified
asset types are published for that item.
geometry: The area of interest of the subscription that will be used to
determine matches.
start_time: The start time of the subscription. This time can be in the
past or future.
filter: The filter criteria based on item-level metadata.
end_time: The end time of the subscription. This time can be in the past or
future, and must be after the start_time.
rrule: The recurrence rule, given in iCalendar RFC 5545 format. Only
monthly recurrences are supported at this time.
item_types: The class of spacecraft and processing level of the
subscription's matching items, e.g. PSScene.
asset_types: The data products which will be delivered for all
subscription matching items. An item will only match and
deliver if all specified asset types are published for that
item.
geometry: The area of interest of the subscription that will be
used to determine matches.
start_time: The start time of the subscription. This time can be
in the past or future.
filter: The filter criteria based on item-level metadata.
end_time: The end time of the subscription. This time can be in
the past or future, and must be after the start_time.
rrule: The recurrence rule, given in iCalendar RFC 5545 format.
Only monthly recurrences are supported at this time.

Returns:
dict: a representation of a subscription source.

Raises:
planet.exceptions.ClientError: If start_time or end_time are not valid
datetimes
ClientError: if a source can not be
configured.
"""
if len(item_types) > 1:
raise ClientError(
Expand Down Expand Up @@ -212,6 +219,86 @@ def catalog_source(
return {"type": "catalog", "parameters": parameters}


def planetary_variable_source(
var_type: Literal["biomass_proxy",
"land_surface_temperature",
"soil_water_content",
"vegetation_optical_depth"],
var_id: str,
geometry: Mapping,
start_time: datetime,
end_time: Optional[datetime] = None,
) -> dict:
"""Construct a Planetary Variable subscription source.

Planetary Variables come in 4 types and are further subdivided
within these types. See [Subscribing to Planetary
Variables](https://developers.planet.com/docs/subscriptions/pvs-subs/#planetary-variables-types-and-ids)
for details.

The return value can be passed to
[planet.subscription_request.build_request][].

Note: this function does not validate variable types and ids.

Parameters:
var_type: one of "biomass_proxy", "land_surface_temperature",
"soil_water_content", or "vegetation_optical_depth".
var_id: a value such as "SWC-AMSR2-C_V1.0_100" for soil water
content derived from AMSR2 C band.
geometry: The area of interest of the subscription that will be
used to determine matches.
start_time: The start time of the subscription. This time can be
in the past or future.
end_time: The end time of the subscription. This time can be in
the past or future, and must be after the start_time.

Returns:
dict: a representation of a subscription source.

Raises:
ClientError: if a source can not be
configured.

Examples:
```python
>>> source = planetary_variables_source(
... "soil_water_content",
... "SWC-AMSR2-C_V1.0_100",
... geometry={
... "type": "Polygon",
... "coordinates": [[[37.791595458984375, 14.84923123791421],
... [37.90214538574219, 14.84923123791421],
... [37.90214538574219, 14.945448293647944],
... [37.791595458984375, 14.945448293647944],
... [37.791595458984375, 14.84923123791421]]]
... },
... start_time=datetime(2021, 3, 1)
... )
>>> request = build_request(source=source, ...)
```
"""
# TODO: validation of variable types and ids.

parameters = {
"id": var_id,
"geometry": geojson.as_geom(dict(geometry)),
}

try:
parameters['start_time'] = _datetime_to_rfc3339(start_time)
except AttributeError:
raise ClientError('Could not convert start_time to an iso string')

if end_time:
try:
parameters['end_time'] = _datetime_to_rfc3339(end_time)
except AttributeError:
raise ClientError('Could not convert end_time to an iso string')

return {"type": var_type, "parameters": parameters}


def _datetime_to_rfc3339(value: datetime) -> str:
"""Converts the datetime to an RFC3339 string"""
iso = value.isoformat()
Expand Down
23 changes: 20 additions & 3 deletions tests/integration/test_subscriptions_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,25 @@ def result_pages(status=None, size=40):
# must disable the default.
res_api_mock = respx.mock(assert_all_called=False)

# 1. Request for status: created. Response has three pages.
# 1. CSV results
res_api_mock.route(
M(url__startswith=TEST_URL), M(params__contains={'format': 'csv'})).mock(
side_effect=[Response(200, text="id,status\n1234-abcd,SUCCESS\n")])

# 2. Request for status: created. Response has three pages.
res_api_mock.route(
M(url__startswith=TEST_URL),
M(params__contains={'status': 'created'})).mock(side_effect=[
Response(200, json=page)
for page in result_pages(status={'created'}, size=40)
])

# 2. Request for status: queued. Response has a single empty page.
# 3. Request for status: queued. Response has a single empty page.
res_api_mock.route(M(url__startswith=TEST_URL),
M(params__contains={'status': 'queued'})).mock(
side_effect=[Response(200, json={'results': []})])

# 3. No status requested. Response is the same as for 1.
# 4. No status requested. Response is the same as for 1.
res_api_mock.route(M(url__startswith=TEST_URL)).mock(
side_effect=[Response(200, json=page) for page in result_pages(size=40)])

Expand Down Expand Up @@ -276,6 +281,18 @@ async def test_get_results_success():
assert len(results) == 100


@pytest.mark.anyio
@res_api_mock
async def test_get_results_csv():
"""Subscription CSV fetched, has the expected items."""
async with Session() as session:
client = SubscriptionsClient(session, base_url=TEST_URL)
results = [res async for res in client.get_results_csv("42")]
import csv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised yapf doesn't care about this import's placement.

rows = list(csv.reader(results))
assert rows == [['id', 'status'], ['1234-abcd', 'SUCCESS']]


paging_cycle_api_mock = respx.mock()

# Identical next links is a hangup we want to avoid.
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/test_subscriptions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,11 @@ def test_request_catalog_success(invoke, geom_geojson):
])
assert json.loads(result.output) == source
assert result.exit_code == 0 # success.


@res_api_mock
def test_subscriptions_results_csv(invoke):
"""Get results as CSV."""
result = invoke(['results', 'test', '--csv'])
assert result.exit_code == 0 # success.
assert result.output.splitlines() == ['id,status', '1234-abcd,SUCCESS']
Loading