diff --git a/configs/bc_processings/bc_ls_cy_debug_4gm_7bands_cfg.yaml b/configs/bc_processings/bc_ls_cy_debug_4gm_7bands_cfg.yaml index b14d006..937acee 100644 --- a/configs/bc_processings/bc_ls_cy_debug_4gm_7bands_cfg.yaml +++ b/configs/bc_processings/bc_ls_cy_debug_4gm_7bands_cfg.yaml @@ -21,4 +21,18 @@ input_products: - swir1 - swir2 +product: + name: bc_ls_cy_debug + short_name: bc_ls_cy_debug + version: 3.0.0 + product_family: burncube + bands: + - wofssevere + - wofsseverity + - wofsmoderate + - severe + - severity + - moderate + - count + task_table: 10-year-historical-processing-4year-geomad.csv diff --git a/configs/bc_processings/bc_ls_cy_pre_release_4gm_7bands_cfg.yaml b/configs/bc_processings/bc_ls_cy_pre_release_4gm_7bands_cfg.yaml index 6213a3b..e8db24c 100644 --- a/configs/bc_processings/bc_ls_cy_pre_release_4gm_7bands_cfg.yaml +++ b/configs/bc_processings/bc_ls_cy_pre_release_4gm_7bands_cfg.yaml @@ -21,4 +21,18 @@ input_products: - swir1 - swir2 +product: + name: ga_ls8c_bc_4cyear_2020 + short_name: ga_ls8c_bc_4cyear_2020 + version: 3.0.0 + product_family: burncube + bands: + - wofssevere + - wofsseverity + - wofsmoderate + - severe + - severity + - moderate + - count + task_table: 10-year-historical-processing-4year-geomad.csv diff --git a/configs/bc_processings/bc_ls_fy_pre_release_4gm_7bands_cfg.yaml b/configs/bc_processings/bc_ls_fy_pre_release_4gm_7bands_cfg.yaml index 30e3ef3..c212add 100644 --- a/configs/bc_processings/bc_ls_fy_pre_release_4gm_7bands_cfg.yaml +++ b/configs/bc_processings/bc_ls_fy_pre_release_4gm_7bands_cfg.yaml @@ -21,4 +21,10 @@ input_products: - swir1 - swir2 +product: + name: ga_ls8c_bc_4fyear_2020 + short_name: ga_ls8c_bc_4fyear_2020 + version: 3.0.0 + product_family: burncube + task_table: 10-year-historical-processing-4year-geomad.csv diff --git a/configs/bc_products/ga_ls8c_bc_4cyear_2020.odc-product.yaml b/configs/bc_products/ga_ls8c_bc_4cyear_2020.odc-product.yaml new file mode 100644 index 0000000..2075139 --- /dev/null +++ b/configs/bc_products/ga_ls8c_bc_4cyear_2020.odc-product.yaml @@ -0,0 +1,50 @@ +name: ga_ls8c_bc_4cyear_2020 +description: Geoscience Australia Landsat Nadir BRDF Adjusted Reflectance Terrain, Landsat 8 Burn Cube 4 Calendar Years Collection 3 +license: CC-BY-4.0 +metadata_type: eo3 + +metadata: + properties: + odc:file_format: GeoTIFF + odc:product_family: burncube + product: + name: ga_ls8c_bc_4cyear_2020 + +measurements: + - name: wofssevere + dtype: float64 + nodata: -999 + units: '1' + - name: wofsseverity + dtype: float64 + nodata: -999 + units: '1' + - name: wofsmoderate + dtype: float64 + nodata: -999 + units: '1' + - name: severe + dtype: int16 + nodata: -999 + units: '1' + - name: severity + dtype: float64 + nodata: -999 + units: '1' + - name: moderate + dtype: int16 + nodata: -999 + units: '1' + - name: count + dtype: int16 + nodata: -999 + units: '1' + +load: + crs: 'epsg:3577' + resolution: + y: -30 + x: 30 + align: + y: 0 + x: 0 diff --git a/constraints.txt b/constraints.txt index 062c7c6..238813f 100644 --- a/constraints.txt +++ b/constraints.txt @@ -95,6 +95,7 @@ Pygments==2.14.0 pyparsing==3.0.9 pyproj==3.2.1 pyrsistent==0.19.3 +pystac==1.7.3 python-dateutil==2.8.2 pyTMD==1.1.3 pytz==2023.2 diff --git a/dea_burn_cube/__main__.py b/dea_burn_cube/__main__.py index 0091d3c..fc76c7d 100644 --- a/dea_burn_cube/__main__.py +++ b/dea_burn_cube/__main__.py @@ -4,11 +4,15 @@ """ import logging +import math import os +import re import shutil import sys import zipfile +from datetime import datetime, timezone from multiprocessing import cpu_count +from typing import Any, Dict from urllib.parse import urlparse import boto3 @@ -18,11 +22,16 @@ import numpy as np import pandas as pd import pyproj +import pystac import requests import s3fs import xarray as xr from datacube.utils import geometry from datacube.utils.cog import write_cog +from datacube.utils.dates import normalise_dt +from odc.dscache.tools.tiling import parse_gridspec_with_name +from pystac.extensions.eo import Band, EOExtension +from pystac.extensions.projection import ProjectionExtension from shapely.geometry import Point from shapely.ops import unary_union @@ -36,6 +45,14 @@ os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1" +def format_datetime(dt: datetime, with_tz=True, timespec="microseconds") -> str: + dt = normalise_dt(dt) + dt = dt.isoformat(timespec=timespec) + if with_tz: + dt = dt + "Z" + return dt + + @task.log_execution_time def result_file_saving_and_uploading( burn_cube_result_apply_wofs: xr.Dataset, @@ -450,6 +467,237 @@ def update_hotspot_data( ) +@main.command(no_args_is_help=True) +@click.option( + "--task-id", + "-t", + type=str, + default=None, + help="REQUIRED. Burn Cube task id, e.g. Dec-21.", +) +@click.option( + "--region-id", + "-r", + type=str, + default=None, + help="REQUIRED. Region id AU-30 Grid.", +) +@click.option( + "--process-cfg-url", + "-p", + type=str, + default=None, + help="REQUIRED. The Path URL to Burn Cube process cfg file as YAML format.", +) +@click.option( + "--overwrite/--no-overwrite", + default=False, + help="Rerun scenes that have already been processed.", +) +def burn_cube_add_metadata( + task_id, + region_id, + process_cfg_url, + overwrite, +): + logging_setup() + + process_cfg = task.load_yaml_remote(process_cfg_url) + + task_table = process_cfg["task_table"] + + output = process_cfg["output_folder"] + + bc_running_task = task.generate_task(task_id, task_table) + + mappingperiod = ( + bc_running_task["Mapping Period Start"], + bc_running_task["Mapping Period End"], + ) + + odc_config = { + "db_hostname": os.getenv("ODC_DB_HOSTNAME"), + "db_password": os.getenv("ODC_DB_PASSWORD"), + "db_username": os.getenv("ODC_DB_USERNAME"), + "db_port": 5432, + "db_database": os.getenv("ODC_DB_DATABASE"), + } + + odc_dc = datacube.Datacube( + app=f"Burn Cube K8s processing - {region_id}", config=odc_config + ) + + ard_product_names = process_cfg["input_products"]["ard_product_names"] + + _, gridspec = parse_gridspec_with_name("au-30") + + # gridspec : au-30 + pattern = r"x(\d+)y(\d+)" + + match = re.match(pattern, region_id) + + if match: + x = int(match.group(1)) + y = int(match.group(2)) + else: + logger.error( + "No match found in region id %s.", + region_id, + ) + # cannot extract geobox, so we stop here. + # if we throw exception, it will trigger the Airflow/Argo retry. + sys.exit(0) + + geobox = gridspec.tile_geobox((x, y)) + + geobox_wgs84 = geobox.extent.to_crs( + "epsg:4326", resolution=math.inf, wrapdateline=True + ) + + bbox = geobox_wgs84.boundingbox + + input_datasets = odc_dc.find_datasets( + product=ard_product_names, geopolygon=geobox_wgs84, time=mappingperiod + ) + + data_source = process_cfg["input_products"]["platform"] + + _, target_file_path = task.generate_output_filenames( + output, task_id, region_id, data_source + ) + + o = urlparse(output) + + bucket_name = o.netloc + object_key = target_file_path[1:] + + processing_dt = datetime.utcnow() + + product_name = process_cfg["product"]["name"] + product_version = process_cfg["product"]["version"] + + properties: Dict[str, Any] = {} + + properties["title"] = f"BurnMapping-{data_source}-{task_id}-{region_id}" + properties["dtr:start_datetime"] = format_datetime(mappingperiod[0]) + properties["dtr:end_datetime"] = format_datetime(mappingperiod[1]) + properties["odc:processing_datetime"] = format_datetime( + processing_dt, timespec="seconds" + ) + properties["odc:region_code"] = region_id + properties["odc:product"] = product_name + properties["instruments"] = ["oli", "tirs"] # get it from ARD datasets + properties["gsd"] = 15 # get it from ARD datasets + properties["platform"] = "landsat-8" # get it from ARD datasets + properties["odc:file_format"] = "GeoTIFF" # get it from ARD datasets + properties["odc:product_family"] = "burncube" # get it from ARD datasets + properties["odc:producer"] = "ga.gov.au" # get it from ARD datasets + properties["odc:dataset_version"] = product_version # get it from ARD datasets + properties["dea:dataset_maturity"] = "final" + properties["odc:collection_number"] = 3 + + uuid = task.odc_uuid( + product_name, + product_version, + sources=[str(e.id) for e in input_datasets], + tile=region_id, + time=str(mappingperiod), + ) + + item = pystac.Item( + id=str(uuid), + geometry=geobox_wgs84.json, + bbox=[bbox.left, bbox.bottom, bbox.right, bbox.top], + datetime=pd.Timestamp(mappingperiod[0]).replace(tzinfo=timezone.utc), + properties=properties, + collection=product_name, + ) + + ProjectionExtension.add_to(item) + proj_ext = ProjectionExtension.ext(item) + proj_ext.apply( + geobox.crs.epsg, + transform=geobox.transform, + shape=geobox.shape, + ) + + # Lineage last + item.properties["odc:lineage"] = dict(inputs=[str(e.id) for e in input_datasets]) + + bands = process_cfg["product"]["bands"] + + # Add all the assets + for band_name in bands: + asset = pystac.Asset( + href=f"BurnMapping-{data_source}-{task_id}-{region_id}-{band_name}.tif", + media_type="image/tiff; application=geotiff", + roles=["data"], + title=band_name, + ) + + eo = EOExtension.ext(asset) + band = Band.create(band_name) + eo.apply(bands=[band]) + + proj = ProjectionExtension.ext(asset) + + proj.apply( + geobox.crs.epsg, + transform=geobox.transform, + shape=geobox.shape, + ) + + item.add_asset(band_name, asset=asset) + + stac_metadata_path = ( + bucket_name + "/" + object_key.replace(".nc", ".stac-item.json") + ) + + # Add links + item.links.append( + pystac.Link( + rel="product_overview", + media_type="application/json", + target=f"https://explorer.dea.ga.gov.au/product/{product_name}", + ) + ) + + item.links.append( + pystac.Link( + rel="collection", + media_type="application/json", + target=f"https://explorer.dea.ga.gov.au/stac/collections/{product_name}", + ) + ) + + item.links.append( + pystac.Link( + rel="alternative", + media_type="text/html", + target=f"https://explorer.dea.ga.gov.au/dataset/{str(uuid)}", + ) + ) + + item.links.append( + pystac.Link( + rel="self", + media_type="application/json", + target=stac_metadata_path, + ) + ) + + stac_metadata = item.to_dict() + + logger.info( + "Upload STAC metadata file %s in s3.", + stac_metadata_path, + ) + + io.upload_dict_to_s3( + stac_metadata, bucket_name, object_key.replace(".nc", ".stac-item.json") + ) + + @main.command(no_args_is_help=True) @click.option( "--task-id", @@ -567,7 +815,11 @@ def burn_cube_run( # check the input product detail # TODO: can add dry-run, and it will stop after input dataset list check try: - gpgon, input_dataset_list = bc_data_loading.check_input_datasets( + ( + gpgon, + summary_datasets, + ard_datasets, + ) = bc_data_loading.check_input_datasets( hnrs_dc, odc_dc, period, @@ -599,7 +851,8 @@ def burn_cube_run( region_id, output, task_table, - input_dataset_list, + summary_datasets, + ard_datasets, ) # No matter upload successful or not, should not block the main processing diff --git a/dea_burn_cube/bc_data_loading.py b/dea_burn_cube/bc_data_loading.py index fc7f420..c0bc347 100644 --- a/dea_burn_cube/bc_data_loading.py +++ b/dea_burn_cube/bc_data_loading.py @@ -126,7 +126,8 @@ def check_input_datasets( dataset is found for any input product. """ - overall_input_datasets = [] + summary_datasets = [] + ard_datasets = [] gpgon = _get_gpgon(region_id) @@ -155,7 +156,7 @@ def check_input_datasets( elif len(datasets) > 1: raise IncorrectInputDataError("Find one more than GeoMAD dataset") else: - overall_input_datasets.extend( + summary_datasets.extend( [{str(e.id): e.metadata_doc["label"]} for e in datasets] ) # Load the geometry from OpenDataCube again, avoid the pixel mismatch issue @@ -186,7 +187,7 @@ def check_input_datasets( elif len(datasets) > 1: raise IncorrectInputDataError("Find one more than WOfS summary dataset") else: - overall_input_datasets.extend( + summary_datasets.extend( [{str(e.id): e.metadata_doc["label"]} for e in datasets] ) @@ -197,10 +198,8 @@ def check_input_datasets( if len(datasets) == 0: raise IncorrectInputDataError("Cannot find any mapping ARD dataset") - # else: - # overall_input_datasets.extend( - # [{str(e.id): e.metadata_doc["properties"]["title"]} for e in datasets] - # ) + else: # we only keep UUID for ARD datasets to meet metadata request + ard_datasets.extend([str(e.id) for e in datasets]) logger.info("Load referance ARD from %s", "-".join(ard_product_names)) logger.info("Find %s referance ARD datasets", str(len(datasets))) @@ -212,10 +211,8 @@ def check_input_datasets( if len(datasets) == 0: raise IncorrectInputDataError("Cannot find any mapping ARD dataset") - # else: - # overall_input_datasets.extend( - # [{str(e.id): e.metadata_doc["properties"]["title"]} for e in datasets] - # ) + else: + ard_datasets.extend([str(e.id) for e in datasets]) logger.info("Load referance ARD from %s", "-".join(ard_product_names)) logger.info("Find %s mapping ARD datasets", str(len(datasets))) @@ -227,7 +224,7 @@ def check_input_datasets( region_polygon.geometry[0], crs="epsg:3577" ) - return gpgon, overall_input_datasets + return gpgon, summary_datasets, ard_datasets @task.log_execution_time diff --git a/dea_burn_cube/task.py b/dea_burn_cube/task.py index e2bb977..513eeda 100644 --- a/dea_burn_cube/task.py +++ b/dea_burn_cube/task.py @@ -8,8 +8,9 @@ import datetime import logging import time -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Sequence, Tuple from urllib.parse import urlparse +from uuid import UUID, uuid5 import boto3 import botocore @@ -23,6 +24,10 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") +# Some random UUID to be ODC namespace +# copy paste from: odc-stats Model.py design +ODC_NS = UUID("6f34c6f4-13d6-43c0-8e4e-42b6c13203af") + def load_yaml_remote(yaml_url: str) -> Dict[str, Any]: """ @@ -245,7 +250,8 @@ def generate_processing_log( region_id: str, output: str, task_table: str, - input_dataset_list: List[Dict[str, Any]], + summary_datasets: List[Dict[str, Any]], + ard_datasets: List[str], ) -> Dict[str, Any]: """ Generates a processing log dictionary for the task. @@ -269,8 +275,10 @@ def generate_processing_log( Output folder path task_table : str Name of the table to store the task - input_dataset_list : List[Dict[str, Any]] - List of input datasets with metadata + summary_datasets : List[Dict[str, Any]] + List of summary datasets with UUID and label + ard_datasets: List[str] + List of input ARD datasets with UUID only Returns: processing_log : Dict[str, Any]): A dictionary containing processing log information @@ -289,5 +297,36 @@ def generate_processing_log( "output": output, "task_table": task_table, "DEA Burn Cube": version, - "input_dataset_list": input_dataset_list, + "summary_datasets": summary_datasets, + "ard_datasets": ard_datasets, } + + +def odc_uuid( + algorithm: str, + algorithm_version: str, + sources: Sequence[UUID], + deployment_id: str = "", + **other_tags, +) -> UUID: + """ + Generate deterministic UUID for a derived Dataset. + + :param algorithm: Name of the algorithm + :param algorithm_version: Version string of the algorithm + :param sources: Sequence of input Dataset UUIDs + :param deployment_id: Some sort of identifier for installation that performs + the run, for example Docker image hash, or dea module version on NCI. + :param **other_tags: Any other identifiers necessary to uniquely identify dataset + """ + + tags = [f"{k}={str(v)}" for k, v in other_tags.items()] + + stringified_sources = ( + [str(algorithm), str(algorithm_version), str(deployment_id)] + + sorted(tags) + + [str(u) for u in sorted(sources)] + ) + + srcs_hashes = "\n".join(s.lower() for s in stringified_sources) + return uuid5(ODC_NS, srcs_hashes) diff --git a/requirements.txt b/requirements.txt index 392768c..dd017c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,8 +8,10 @@ fiona==1.8.22 fsspec geopandas numpy==1.23.5 +odc_dscache>=0.2.2 pyarrow pyproj==3.2.1 +pystac==1.7.3 s3fs==0.4.2 scipy==1.9.1 Shapely==1.8.5.post1