Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 41 #42

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- [issue/11](https://github.com/danielfromearth/batchee/issues/11): Rename from concat_batcher to batchee
- [issue/21](https://github.com/danielfromearth/batchee/issues/21): Improve CICD workflows
- [issue/41](https://github.com/danielfromearth/batchee/issues/41): Change Adapter output from single to multiple STAC Catalogs
### Deprecated
### Removed
### Fixed
54 changes: 29 additions & 25 deletions batcher/harmony/service_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from pystac.item import Asset

from batcher.harmony.util import (
_get_item_url,
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from batcher.tempo_filename_parser import get_batch_indices
Expand Down Expand Up @@ -48,21 +48,20 @@ def invoke(self):

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
def process_catalog(self, catalog: pystac.Catalog) -> list[pystac.Catalog]:
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
self.logger.info("process_catalog() started.")
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

self.logger.info(f"length of items==={len(items)}.")

# Quick return if catalog contains no items
if len(items) == 0:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()
return result

# # --- Get granule filepaths (urls) ---
Expand All @@ -79,38 +78,43 @@ def process_catalog(self, catalog: pystac.Catalog):
for k, v in zip(batch_indices, items):
grouped.setdefault(k, []).append(v)

# --- Construct a STAC Catalog that holds multiple Items (which represent each TEMPO scan),
# and each Item holds multiple Assets (which represent each granule).
result.clear_items()

# --- Construct a list of STAC Catalogs (which represent each TEMPO scan),
# and each Catalog holds multiple Items (which represent each granule).
catalogs = []
for batch_id, batch_items in grouped.items():
batch_urls: list[str] = _get_netcdf_urls(batch_items)
bounding_box = _get_output_bounding_box(batch_items)
properties = _get_output_date_range(batch_items)

self.logger.info(f"constructing new pystac.Item for batch_id==={batch_id}.")

# Construct a new pystac.Item with every granule in the batch as a pystac.Asset
output_item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)
self.logger.info(f"constructing new pystac.Catalog for batch_id==={batch_id}.")
# Initialize a new, empty Catalog
batch_catalog = catalog.clone()
batch_catalog.id = str(uuid4())
batch_catalog.clear_children()
batch_catalog.clear_items()

for idx, item in enumerate(batch_items):
# Construct a new pystac.Item for each granule in the batch
output_item = Item(
str(uuid4()),
bbox_to_geometry(item.bbox),
item.bbox,
None,
_get_output_date_range([item]),
)
output_item.add_asset(
f"data_{idx}",
Asset(
batch_urls[idx],
title=batch_urls[idx],
_get_item_url(item),
title=_get_item_url(item),
media_type="application/x-netcdf4",
roles=["data"],
),
)
batch_catalog.add_item(output_item)

result.add_item(output_item)
self.logger.info("STAC catalog creation for batch_id==={batch_id} complete.")
catalogs.append(batch_catalog)

self.logger.info("STAC catalog creation complete.")
self.logger.info("All STAC catalogs are complete.")

return result
return catalogs

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
Expand Down
Loading