Skip to content

Commit

Permalink
feat(release)!: release db, stac and ingest api, add otel (#368)
Browse files Browse the repository at this point in the history
### Breaking
- #356

#### Breaking changes notes
Breaking: `VEDA_COGNITO_DOMAIN` configuration now required along with
one time administrator step to update existing user pool client allowed
callback urls with the ingest-api's URL

### Added
- #342
- #330
- #323

### Changed/Updated
- #355
- #340

### Fixed

1. - #367
2. - #365
3. - #361
4. - #360
5. - #358
8. - #345
9. - #344
12. - #339
13. - #338
14. - #337
15. - #335
16. - #334
17. - #331
20. - #329
21. - #327
22. - #326
23. - #325
24. - #324
  • Loading branch information
botanical authored May 6, 2024
2 parents 6a6f232 + 04b0b64 commit 6fc9ded
Show file tree
Hide file tree
Showing 24 changed files with 267 additions and 161 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements.txt
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ jobs:
- name: Integrations tests
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s

- name: Stop services
run: docker compose stop

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
run: python -m pytest .github/workflows/tests/ -vv -s

- name: Install reqs for ingest api
run: python -m pip install -r ingest_api/runtime/requirements.txt
run: python -m pip install -r ingest_api/runtime/requirements_dev.txt

- name: Ingest unit tests
run: NO_PYDANTIC_SSM_SETTINGS=1 python -m pytest ingest_api/runtime/tests/ -vv -s
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ In case of failure, all container logs will be written out to `container_logs.lo

> **Warning** PgSTAC records should be loaded in the database using [pypgstac](https://github.com/stac-utils/pgstac#pypgstac) for proper indexing and partitioning.
The VEDA ecosystem includes tools specifially created for loading PgSTAC records and optimizing data assets. The [veda-data-airflow](https://github.com/NASA-IMPACT/veda-data-airflow) project provides examples of cloud pipelines that transform data to cloud optimized formats, generate STAC metadata, and submit records for publication to the veda-backend database using the [veda-stac-ingestor](https://github.com/NASA-IMPACT/veda-stac-ingestor).
The VEDA ecosystem includes tools specifially created for loading PgSTAC records and optimizing data assets. The [veda-data-airflow](https://github.com/NASA-IMPACT/veda-data-airflow) project provides examples of cloud pipelines that transform data to cloud optimized formats, generate STAC metadata, and submit records for publication to the veda-backend database via veda-backend's ingest API. Veda-backend's integrated ingest system includes an API lambda for enqueuing collection and item records in a DynamoDB table and an ingestor lambda that batch loads DDB enqueued records into the PgSTAC database. Currently, the client id and domain of an existing Cognito user pool programmatic client must be supplied in [configuration](ingest_api/infrastructure/config.py) as `VEDA_CLIENT_ID` and `VEDA_COGNITO_DOMAIN` (the [veda-auth project](https://github.com/NASA-IMPACT/veda-auth) can be used to deploy a Cognito user pool and client). To dispense auth tokens via the ingest API swagger docs and `/token` endpoints, an administrator must add the ingest API lambda URL to the allowed callbacks of the Cognito client.

## Support scripts
Support scripts are provided for manual system operations.
Expand Down
2 changes: 1 addition & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def stage_name(self) -> str:
def get_stac_catalog_url(self) -> Optional[str]:
"""Infer stac catalog url based on whether the app is configured to deploy the catalog to a custom subdomain or to a cloudfront route"""
if self.veda_custom_host and self.veda_stac_root_path:
return f"https://{veda_app_settings.veda_custom_host}{veda_app_settings.veda_stac_root_path}/"
return f"https://{veda_app_settings.veda_custom_host}{veda_app_settings.veda_stac_root_path}"
if (
self.veda_domain_create_custom_subdomains
and self.veda_domain_hosted_zone_name
Expand Down
52 changes: 50 additions & 2 deletions database/runtime/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,51 @@ def create_collection_search_functions(cursor) -> None:
cursor.execute(sql.SQL(search_collection_ids_sql))


def create_collection_extents_functions(cursor) -> None:
"""
Functions to update spatial and temporal extents off all items in a collection
This is slightly different from the inbuilt pgstac.update_collection_extents function which describes the range of nominal datetimes,
here we capture the maximum range which must include max end datetime.
"""

collection_temporal_extent_max_sql = """
CREATE OR REPLACE FUNCTION dashboard.collection_temporal_extent_max(id text) RETURNS jsonb
LANGUAGE sql
IMMUTABLE PARALLEL SAFE
SET search_path TO 'pgstac', 'public'
AS $function$
SELECT to_jsonb(array[array[min(datetime)::text, max(end_datetime)::text]])
FROM items WHERE collection=$1;
$function$
;
"""
cursor.execute(sql.SQL(collection_temporal_extent_max_sql))

update_collection_extents_max_sql = """
CREATE OR REPLACE FUNCTION dashboard.update_collection_extents_max(id text)
RETURNS void
LANGUAGE sql
SET search_path TO 'pgstac', 'public'
AS $function$
UPDATE collections SET
content = content ||
jsonb_build_object(
'extent', jsonb_build_object(
'spatial', jsonb_build_object(
'bbox', collection_bbox(collections.id)
),
'temporal', jsonb_build_object(
'interval', dashboard.collection_temporal_extent_max(collections.id)
)
)
)
WHERE collections.id=$1;
$function$
;
"""
cursor.execute(sql.SQL(update_collection_extents_max_sql))


def create_collection_summaries_functions(cursor) -> None:
"""
Functions to summarize datetimes and raster statistics for 'default' collections of items
Expand Down Expand Up @@ -441,10 +486,13 @@ def handler(event, context):
print("Creating dashboard schema...")
create_dashboard_schema(cursor=cur, username=user_params["username"])

print("Creating functions for summarizing collection datetimes...")
create_collection_summaries_functions(cursor=cur)

print(
"Creating functions for summarizing default collection datetimes and cog_default statistics..."
"Creating functions for setting the maximum end_datetime temporal extent of a collection..."
)
create_collection_summaries_functions(cursor=cur)
create_collection_extents_functions(cursor=cur)

except Exception as e:
print(f"Unable to bootstrap database with exception={e}")
Expand Down
5 changes: 4 additions & 1 deletion ingest_api/infrastructure/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional

import aws_cdk
from pydantic import BaseSettings, Field, constr
from pydantic import AnyHttpUrl, BaseSettings, Field, constr

AwsArn = constr(regex=r"^arn:aws:iam::\d{12}:role/.+")

Expand Down Expand Up @@ -34,6 +34,9 @@ class IngestorConfig(BaseSettings):
client_secret: Optional[str] = Field(
"", description="The Cognito APP client secret"
)
cognito_domain: AnyHttpUrl = Field(
description="The base url of the Cognito domain for authorization and token urls"
)
stac_db_security_group_id: str = Field(
description="ID of Security Group used by pgSTAC DB"
)
Expand Down
1 change: 1 addition & 0 deletions ingest_api/infrastructure/construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
"RASTER_URL": config.raster_api_url,
"ROOT_PATH": config.ingest_root_path,
"STAGE": config.stage,
"COGNITO_DOMAIN": config.cognito_domain,
}

# create lambda
Expand Down
7 changes: 5 additions & 2 deletions ingest_api/runtime/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116 and 117
Authlib==1.0.1
cryptography>=42.0.5
ddbcereal==2.1.1
fastapi>=0.75.1
fastapi<=0.108.0
fsspec==2023.3.0
mangum>=0.15.0
orjson>=3.6.8
psycopg[binary,pool]>=3.0.15
pydantic_ssm_settings>=0.2.0
pydantic>=1.10.12
pyjwt>=2.8.0
pypgstac==0.7.4
python-multipart==0.0.7
requests>=2.27.1
Expand All @@ -17,3 +18,5 @@ xarray==2023.1.0
xstac==1.1.0
zarr==2.13.6
boto3==1.24.59
aws_xray_sdk>=2.6.0,<3
aws-lambda-powertools>=1.18.0
5 changes: 5 additions & 0 deletions ingest_api/runtime/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-r requirements.txt
moto[dynamodb, ssm]>=4.0.9,<5.0
httpx
aws_xray_sdk>=2.6.0,<3
aws-lambda-powertools>=1.18.0
95 changes: 44 additions & 51 deletions ingest_api/runtime/src/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,61 @@
import hashlib
import hmac
import logging
from typing import Dict
from typing import Annotated, Any, Dict

import boto3
import requests
import src.config as config
from authlib.jose import JsonWebKey, JsonWebToken, JWTClaims, KeySet, errors
from cachetools import TTLCache, cached
import jwt
from src.config import settings

from fastapi import Depends, HTTPException, security
from fastapi import Depends, HTTPException, Security, security, status

logger = logging.getLogger(__name__)

token_scheme = security.HTTPBearer()
oauth2_scheme = security.OAuth2AuthorizationCodeBearer(
authorizationUrl=settings.cognito_authorization_url,
tokenUrl=settings.cognito_token_url,
refreshUrl=settings.cognito_token_url,
)

jwks_client = jwt.PyJWKClient(settings.jwks_url) # Caches JWKS

def get_settings() -> config.Settings:
import src.main as main

return main.settings


def get_jwks_url(settings: config.Settings = Depends(get_settings)) -> str:
return settings.jwks_url


@cached(TTLCache(maxsize=1, ttl=3600))
def get_jwks(jwks_url: str = Depends(get_jwks_url)) -> KeySet:
with requests.get(jwks_url) as response:
response.raise_for_status()
return JsonWebKey.import_key_set(response.json())


def decode_token(
token: security.HTTPAuthorizationCredentials = Depends(token_scheme),
jwks: KeySet = Depends(get_jwks),
) -> JWTClaims:
"""
Validate & decode JWT
"""
def validated_token(
token_str: Annotated[str, Security(oauth2_scheme)],
required_scopes: security.SecurityScopes,
) -> Dict:
# Parse & validate token
logger.info(f"\nToken String {token_str}")
try:
claims = JsonWebToken(["RS256"]).decode(
s=token.credentials,
key=jwks,
claims_options={
# # Example of validating audience to match expected value
# "aud": {"essential": True, "values": [APP_CLIENT_ID]}
},
token = jwt.decode(
token_str,
jwks_client.get_signing_key_from_jwt(token_str).key,
algorithms=["RS256"],
)

if "client_id" in claims:
# Insert Cognito's `client_id` into `aud` claim if `aud` claim is unset
claims.setdefault("aud", claims["client_id"])

claims.validate()
return claims
except errors.JoseError: #
logger.exception("Unable to decode token")
raise HTTPException(status_code=403, detail="Bad auth token")


def get_username(claims: security.HTTPBasicCredentials = Depends(decode_token)):
return claims["sub"]
except jwt.exceptions.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from e

# Validate scopes (if required)
for scope in required_scopes.scopes:
if scope not in token["scope"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={
"WWW-Authenticate": f'Bearer scope="{required_scopes.scope_str}"'
},
)

return token


def get_username(token: Annotated[Dict[Any, Any], Depends(validated_token)]) -> str:
result = token["username"] if "username" in token else str(token.get("sub"))
return result


def _get_secret_hash(username: str, client_id: str, client_secret: str) -> str:
Expand Down
26 changes: 26 additions & 0 deletions ingest_api/runtime/src/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from getpass import getuser
from typing import Optional

from pydantic import AnyHttpUrl, BaseSettings, Field, constr
Expand All @@ -21,14 +23,38 @@ class Settings(BaseSettings):

userpool_id: str = Field(description="The Cognito Userpool used for authentication")

cognito_domain: AnyHttpUrl = Field(
description="The base url of the Cognito domain for authorization and token urls"
)
client_id: str = Field(description="The Cognito APP client ID")
client_secret: str = Field("", description="The Cognito APP client secret")
root_path: Optional[str] = Field(description="Root path of API")
stage: Optional[str] = Field(description="API stage")

@property
def cognito_authorization_url(self) -> AnyHttpUrl:
"""Cognito user pool authorization url"""
return f"{self.cognito_domain}/oauth2/authorize"

@property
def cognito_token_url(self) -> AnyHttpUrl:
"""Cognito user pool token and refresh url"""
return f"{self.cognito_domain}/oauth2/token"

class Config(AwsSsmSourceConfig):
env_file = ".env"

@classmethod
def from_ssm(cls, stack: str):
return cls(_secrets_dir=f"/{stack}")


settings = (
Settings()
if os.environ.get("NO_PYDANTIC_SSM_SETTINGS")
else Settings.from_ssm(
stack=os.environ.get(
"STACK", f"veda-stac-ingestion-system-{os.environ.get('STAGE', getuser())}"
),
)
)
4 changes: 2 additions & 2 deletions ingest_api/runtime/src/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
token_scheme = security.HTTPBearer()


def get_table(settings: config.Settings = Depends(auth.get_settings)):
def get_table():
client = boto3.resource("dynamodb")
return client.Table(settings.dynamodb_table)
return client.Table(config.settings.dynamodb_table)


def get_db(table=Depends(get_table)) -> services.Database:
Expand Down
5 changes: 3 additions & 2 deletions ingest_api/runtime/src/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
## Auth
The auth API allows users to retrieve an access token and get information about the
current user.
To get an access token, the user must provide their username and password in the
request body to the POST `/token` API.
To get an access token, the user must provide their username and password via the Cognito UI (click the 'Authorize' button for a temporary login redirect).
The current user's information can be retrieved using the GET `/auth/me` API.
For programmatic access, post username and password in the request body to the POST `/token` API.
Before using the API, user must ask a VEDA team member to create credentials (username
and password) for VEDA auth.
The user name and password is used to get the access token from Auth API call in order
Expand Down
3 changes: 1 addition & 2 deletions ingest_api/runtime/src/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from boto3.dynamodb.types import TypeDeserializer
from pypgstac.db import PgstacDB
from src.auth import get_settings
from src.dependencies import get_table
from src.schemas import Ingestion, Status
from src.utils import IngestionType, get_db_credentials, load_into_pgstac
Expand Down Expand Up @@ -41,7 +40,7 @@ def update_dynamodb(
"""
# Update records in DynamoDB
print(f"Updating ingested items status in DynamoDB, marking as {status}...")
table = get_table(get_settings())
table = get_table()
with table.batch_writer(overwrite_by_pkeys=["created_by", "id"]) as batch:
for ingestion in ingestions:
batch.put_item(
Expand Down
Loading

0 comments on commit 6fc9ded

Please sign in to comment.