Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db)!: update collection summaries on load items #344

Merged
merged 15 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements.txt
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ jobs:
- name: Integrations tests
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s

- name: Stop services
run: docker compose stop

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements.txt
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s
Expand Down
52 changes: 50 additions & 2 deletions database/runtime/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,51 @@ def create_collection_search_functions(cursor) -> None:
cursor.execute(sql.SQL(search_collection_ids_sql))


def create_collection_extents_functions(cursor) -> None:
"""
Functions to update spatial and temporal extents off all items in a collection
This is slightly different from the inbuilt pgstac.update_collection_extents function which describes the range of nominal datetimes,
here we capture the maximum range which must include max end datetime.
"""

collection_temporal_extent_max_sql = """
CREATE OR REPLACE FUNCTION dashboard.collection_temporal_extent_max(id text) RETURNS jsonb
LANGUAGE sql
IMMUTABLE PARALLEL SAFE
SET search_path TO 'pgstac', 'public'
AS $function$
SELECT to_jsonb(array[array[min(datetime)::text, max(end_datetime)::text]])
FROM items WHERE collection=$1;
$function$
;
"""
cursor.execute(sql.SQL(collection_temporal_extent_max_sql))

update_collection_extents_max_sql = """
CREATE OR REPLACE FUNCTION dashboard.update_collection_extents_max(id text)
RETURNS void
LANGUAGE sql
SET search_path TO 'pgstac', 'public'
AS $function$
UPDATE collections SET
content = content ||
jsonb_build_object(
'extent', jsonb_build_object(
'spatial', jsonb_build_object(
'bbox', collection_bbox(collections.id)
),
'temporal', jsonb_build_object(
'interval', dashboard.collection_temporal_extent_max(collections.id)
)
)
)
WHERE collections.id=$1;
$function$
;
"""
cursor.execute(sql.SQL(update_collection_extents_max_sql))


def create_collection_summaries_functions(cursor) -> None:
"""
Functions to summarize datetimes and raster statistics for 'default' collections of items
Expand Down Expand Up @@ -441,10 +486,13 @@ def handler(event, context):
print("Creating dashboard schema...")
create_dashboard_schema(cursor=cur, username=user_params["username"])

print("Creating functions for summarizing collection datetimes...")
create_collection_summaries_functions(cursor=cur)

print(
"Creating functions for summarizing default collection datetimes and cog_default statistics..."
"Creating functions for setting the maximum end_datetime temporal extent of a collection..."
)
create_collection_summaries_functions(cursor=cur)
create_collection_extents_functions(cursor=cur)

except Exception as e:
print(f"Unable to bootstrap database with exception={e}")
Expand Down
4 changes: 2 additions & 2 deletions ingest_api/runtime/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116 and 117
Authlib==1.0.1
ddbcereal==2.1.1
fastapi>=0.75.1
fastapi<=0.108.0
fsspec==2023.3.0
mangum>=0.15.0
orjson>=3.6.8
Expand All @@ -16,4 +16,4 @@ stac-pydantic @ git+https://github.com/ividito/stac-pydantic.git@3f4cb381c85749b
xarray==2023.1.0
xstac==1.1.0
zarr==2.13.6
boto3==1.24.59
boto3==1.24.59
21 changes: 21 additions & 0 deletions ingest_api/runtime/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116 and 117
Authlib==1.0.1
ddbcereal==2.1.1
fastapi<=0.108.0
fsspec==2023.3.0
mangum>=0.15.0
orjson>=3.6.8
psycopg[binary,pool]>=3.0.15
pydantic_ssm_settings>=0.2.0
pydantic>=1.10.12
pypgstac==0.7.4
python-multipart==0.0.7
requests>=2.27.1
s3fs==2023.3.0
stac-pydantic @ git+https://github.com/ividito/stac-pydantic.git@3f4cb381c85749bb4b15d1181179057ec0f51a94
xarray==2023.1.0
xstac==1.1.0
zarr==2.13.6
boto3==1.24.59
moto[dynamodb, ssm]>=4.0.9,<5.0
httpx
28 changes: 9 additions & 19 deletions ingest_api/runtime/src/vedaloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,22 @@ def update_collection_summaries(self, collection_id: str) -> None:
STAC-conformant bbox and temporal extent."""
with self.conn.cursor() as cur:
with self.conn.transaction():

# First update the spatial and temporal extents for all item records for the collection
logger.info(f"Updating extents for collection: {collection_id}.")
cur.execute(
"SELECT dashboard.update_collection_extents_max(%s)",
(collection_id,),
)

# Next update default summaries which use the collection temporal extent for summaries of periodic items
logger.info(
f"Updating dashboard summaries for collection: {collection_id}."
)
cur.execute(
"SELECT dashboard.update_collection_default_summaries(%s)",
(collection_id,),
)
logger.info(f"Updating extents for collection: {collection_id}.")
cur.execute(
"""
UPDATE collections SET
content = content ||
jsonb_build_object(
'extent', jsonb_build_object(
'spatial', jsonb_build_object(
'bbox', collection_bbox(collections.id)
),
'temporal', jsonb_build_object(
'interval', collection_temporal_extent(collections.id)
)
)
)
WHERE collections.id=%s;
""",
(collection_id,),
)

def delete_collection(self, collection_id: str) -> None:
with self.conn.cursor() as cur:
Expand Down
38 changes: 5 additions & 33 deletions ingest_api/runtime/tests/test_registration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import base64
import json
from datetime import timedelta
from math import isclose
from typing import TYPE_CHECKING, List

import pytest
from src.schemas import Ingestion

from fastapi.encoders import jsonable_encoder

if TYPE_CHECKING:
from src import schemas, services
Expand Down Expand Up @@ -39,11 +39,11 @@ def populate_table(self, count=100) -> List["schemas.Ingestion"]:

def test_simple_lookup(self):
self.mock_table.put_item(Item=self.example_ingestion.dynamodb_dict())

ingestion = jsonable_encoder(self.example_ingestion)
response = self.api_client.get(ingestion_endpoint)
assert response.status_code == 200
assert response.json() == {
"items": [json.loads(self.example_ingestion.json(by_alias=True))],
"items": [ingestion],
"next": None,
}

Expand All @@ -60,32 +60,4 @@ def test_next_response(self):
response = self.api_client.get(ingestion_endpoint, params={"limit": limit})
assert response.status_code == 200
assert json.loads(base64.b64decode(response.json()["next"])) == expected_next
assert response.json()["items"] == [
json.loads(ingestion.json(by_alias=True))
for ingestion in example_ingestions[:limit]
]

def test_load_large_number(self):
ingestion_data = self.example_ingestion.dict()
visual_asset = ingestion_data["item"]["assets"]["visual"]
# todo: why does this need to be a float?
visual_asset["nodata"] = -3.4028234663852886e38
ingestion = Ingestion.parse_obj(ingestion_data)
self.mock_table.put_item(Item=ingestion.dynamodb_dict())

response = self.api_client.get(ingestion_endpoint)
actual = response.json()["items"]
expected = [json.loads(ingestion.json(by_alias=True))]

# first, check the nodata value
# value should match, format will not. isclose() will properly compare the values
assert isclose(
actual[0]["item"]["assets"]["visual"]["nodata"],
expected[0]["item"]["assets"]["visual"]["nodata"],
abs_tol=1,
)
expected[0]["item"]["assets"]["visual"]["nodata"] = actual[0]["item"]["assets"][
"visual"
]["nodata"]
# second, check everything else
assert actual == expected
assert response.json()["items"] == jsonable_encoder(example_ingestions[:limit])