diff --git a/algorithm_catalog/fusets_mogpr.json b/algorithm_catalog/fusets_mogpr.json new file mode 100644 index 0000000..0c4ecc4 --- /dev/null +++ b/algorithm_catalog/fusets_mogpr.json @@ -0,0 +1,136 @@ +{ + "id": "fusets_mogpr", + "type": "Feature", + "conformsTo": [ + "http://www.opengis.net/spec/ogcapi-records-1/1.0/req/record-core" + ], + "geometry": null, + "properties": { + "created": "2025-01-093T00:00:00Z", + "updated": "2025-01-09T00:00:00Z", + "type": "apex_algorithm", + "title": "Multi output gaussian process regression", + "description": "Integrates timeseries in data cube using multi-output gaussian process regression. The service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other indicators that are correlated with each other.", + "cost_estimate": 12, + "cost_unit": "platform credits per km²", + "keywords": [ + "timeseries", + "Gaussian Process Regression (GPR)" + ], + "language": { + "code": "en-US", + "name": "English (United States)" + }, + "languages": [ + { + "code": "en-US", + "name": "English (United States)" + } + ], + "contacts": [ + { + "name": "Bram Janssen", + "position": "Researcher", + "organization": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + }, + { + "href": "https://github.com/JanssenBrm", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "Contact via VITO", + "roles": [ + "principal investigator" + ] + }, + { + "name": "Pratichhya Sharma", + "position": "Researcher", + "organization": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + }, + { + "href": "https://github.com/Pratichhya", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "Contact via VITO", + "roles": [ + "service provider" + ] + }, + { + "name": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "SEE WEBSITE", + "roles": [ + "processor" + ] + } + ], + "themes": [ + { + "concepts": [ + { + "id": "Normalised vegetation difference index (NDVI)" + }, + { + "id": "Radar Vegetation Index (RVI)" + }, + { + "id": "Multi-output Gaussian Process Regression (MOGPR)" + } + ], + "scheme": "https://gcmd.earthdata.nasa.gov/kms/concepts/concept_scheme/sciencekeywords" + } + ], + "formats": [ + { + "name": "JSON" + } + ], + "license": "other" + }, + "linkTemplates": [], + "links": [ + { + "rel": "openeo-process", + "type": "application/json", + "title": "openEO Process Definition", + "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json" + }, + { + "rel": "service", + "type": "application/json", + "title": "CDSE openEO federation", + "href": "https://openeofed.dataspace.copernicus.eu" + }, + { + "rel": "license", + "href": "https://apex.esa.int/license" + }, + { + "rel": "example", + "type": "application/json", + "title": "Example output", + "href": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + } + ] +} \ No newline at end of file diff --git a/benchmark_scenarios/fusets_mogpr.json b/benchmark_scenarios/fusets_mogpr.json new file mode 100644 index 0000000..91f0829 --- /dev/null +++ b/benchmark_scenarios/fusets_mogpr.json @@ -0,0 +1,192 @@ +[ + { + "id": "fusets_mogpr", + "type": "openeo", + "description": "Multi output gaussian process regression example on NDVI timeseries", + "backend": "openeofed.dataspace.copernicus.eu", + "process_graph": { + "aggregatespatial1": { + "arguments": { + "data": { + "from_node": "fusetsmogpr1" + }, + "geometries": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "reducer": { + "process_graph": { + "mean1": { + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "process_id": "mean", + "result": true + } + } + } + }, + "process_id": "aggregate_spatial" + }, + "fusetsmogpr1": { + "arguments": { + "data": { + "from_node": "ndvi1" + } + }, + "namespace": "https://openeo.dataspace.copernicus.eu/openeo/1.2/processes/u:3e24e251-2e9a-438f-90a9-d4500e576574/fusets_mogpr", + "process_id": "fusets_mogpr" + }, + "loadcollection1": { + "arguments": { + "bands": [ + "B04", + "B08" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2022-06-01" + ] + }, + "process_id": "load_collection" + }, + "loadcollection2": { + "arguments": { + "bands": [ + "SCL" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2022-06-01" + ] + }, + "process_id": "load_collection" + }, + "mask1": { + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "mask": { + "from_node": "toscldilationmask1" + } + }, + "process_id": "mask" + }, + "ndvi1": { + "arguments": { + "data": { + "from_node": "mask1" + }, + "nir": "B08", + "red": "B04" + }, + "process_id": "ndvi" + }, + "saveresult1": { + "arguments": { + "data": { + "from_node": "aggregatespatial1" + }, + "format": "JSON", + "options": {} + }, + "process_id": "save_result", + "result": true + }, + "toscldilationmask1": { + "arguments": { + "data": { + "from_node": "loadcollection2" + } + }, + "process_id": "to_scl_dilation_mask" + } + }, + "reference_data": { + "job-results.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/job-results.json", + "timeseries.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + } + } +] \ No newline at end of file diff --git a/openeo_udp/fusets_mogpr/README.md b/openeo_udp/fusets_mogpr/README.md new file mode 100644 index 0000000..be62efa --- /dev/null +++ b/openeo_udp/fusets_mogpr/README.md @@ -0,0 +1,83 @@ +# Multi-output Gaussian process regression (MOGPR) + +The MOGPR service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other correlated indicators. + +## Parameters + +The MOGPR service requires the following parameters: + +- `datacube`: The input datacube that contains the data to be gap-filled. + +## Usage + +The MOGPR service can be used as follows: + +```python + +import openeo + +## Setup of parameters +spat_ext = { +    "type": "Polygon", +    "coordinates": [ + [ + [ +                5.170012098271149, +                51.25062964728295 + ], + [ +                5.17085904378298, +                51.24882567194015 + ], + [ +                5.17857421368097, +                51.2468515482926 + ], + [ +                5.178972704726344, +                51.24982704376254 + ], + [ +                5.170012098271149, +                51.25062964728295 + ] + ] + ] +} +temp_ext = ["2022-05-01", "2023-07-31"] + +## Setup connection to openEO +eoconn = openeo.connect( +        "openeo.dataspace.copernicus.eu" + ).authenticate_oidc("CDSE") + +## Create a base NDVI datacube that can be used as input for the service +base = eoconn.load_collection('SENTINEL2_L2A', +                                  spatial_extent=spat_ext, +                                  temporal_extent=temp_ext, +                                  bands=["B04", "B08", "SCL"]) +mask = scl.process("to_scl_dilation_mask", data=scl) +base_cloudmasked = base.mask(mask) +base_ndvi = base_cloudmasked.ndvi(red="B04", nir="B08") + +process_id = "fusets_mogpr" +namespace_url = "public_url"    # publised URL of the process +## Create a processing graph from the MOGPR process using an active openEO connection +mogpr = eoconn.datacube_from_process( +       process_id=process_id, +       namespace= namespace_url, +      data=base_ndvi, + ) + + +## Calculate the average time series value for the given area of interest +mogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean') + +# Execute the service as a batch process +mogpr_job = mogpr.execute_batch('./mogpr.json', out_format="json", title=f'FuseTS - MOGPR') + +``` + +## Output + +The User-Defined-Process (UDP) produces a datacube that contains a gap-filled time series for all pixels within the specified temporal and spatial range. This datacube can be seamlessly integrated with other openEO processes. \ No newline at end of file diff --git a/openeo_udp/fusets_mogpr/fusets_mogpr.json b/openeo_udp/fusets_mogpr/fusets_mogpr.json new file mode 100644 index 0000000..dbdd837 --- /dev/null +++ b/openeo_udp/fusets_mogpr/fusets_mogpr.json @@ -0,0 +1,55 @@ +{ + "process_graph": { + "applyneighborhood1": { + "process_id": "apply_neighborhood", + "arguments": { + "data": { + "from_parameter": "data" + }, + "overlap": [], + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "context": {}, + "data": { + "from_parameter": "data" + }, + "runtime": "Python", + "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\n\nfrom openeo.udf import inspect\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n os.remove(zip_path)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\", 'venv')\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\", 'venv_static')\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" + }, + "result": true + } + } + }, + "size": [ + { + "dimension": "x", + "value": 32, + "unit": "px" + }, + { + "dimension": "y", + "value": 32, + "unit": "px" + } + ] + }, + "result": true + } + }, + "id": "fusets_mogpr", + "summary": "Integrates timeseries in data cube using multi-output gaussian process regression", + "description": "# Multi-output Gaussian process regression (MOGPR)\n\nThe MOGPR service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other correlated indicators.\n\n## Parameters\n\nThe MOGPR service requires the following parameters:\n\n- `datacube`: The input datacube that contains the data to be gap-filled.\n\n## Usage\n\nThe MOGPR service can be used as follows:\n\n```python\n\nimport openeo\n\n## Setup of parameters\nspat_ext = {\n    \"type\": \"Polygon\",\n    \"coordinates\": [\n [\n [\n                5.170012098271149,\n                51.25062964728295\n ],\n [\n                5.17085904378298,\n                51.24882567194015\n ],\n [\n                5.17857421368097,\n                51.2468515482926\n ],\n [\n                5.178972704726344,\n                51.24982704376254\n ],\n [\n                5.170012098271149,\n                51.25062964728295\n ]\n ]\n ]\n}\ntemp_ext = [\"2022-05-01\", \"2023-07-31\"]\n\n## Setup connection to openEO\neoconn = openeo.connect(\n        \"openeo.dataspace.copernicus.eu\"\n ).authenticate_oidc(\"CDSE\")\n\n## Create a base NDVI datacube that can be used as input for the service\nbase = eoconn.load_collection('SENTINEL2_L2A',\n                                  spatial_extent=spat_ext,\n                                  temporal_extent=temp_ext,\n                                  bands=[\"B04\", \"B08\", \"SCL\"])\nmask = scl.process(\"to_scl_dilation_mask\", data=scl)\nbase_cloudmasked = base.mask(mask)\nbase_ndvi = base_cloudmasked.ndvi(red=\"B04\", nir=\"B08\")\n\nprocess_id = \"fusets_mogpr\"\nnamespace_url = \"public_url\"    # publised URL of the process\n## Create a processing graph from the MOGPR process using an active openEO connection\nmogpr = eoconn.datacube_from_process(\n       process_id=process_id,\n       namespace= namespace_url,\n      data=base_ndvi, \n )\n\n\n## Calculate the average time series value for the given area of interest\nmogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean')\n\n# Execute the service as a batch process\nmogpr_job = mogpr.execute_batch('./mogpr.json', out_format=\"json\", title=f'FuseTS - MOGPR') \n\n```\n\n## Output\n\nThe User-Defined-Process (UDP) produces a datacube that contains a gap-filled time series for all pixels within the specified temporal and spatial range. This datacube can be seamlessly integrated with other openEO processes.", + "parameters": [ + { + "name": "data", + "description": "Raster cube for which to calculate the peaks and valleys", + "schema": { + "type": "object", + "subtype": "datacube" + } + } + ] +} \ No newline at end of file diff --git a/openeo_udp/fusets_mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py new file mode 100644 index 0000000..2d4e128 --- /dev/null +++ b/openeo_udp/fusets_mogpr/generate.py @@ -0,0 +1,49 @@ +import json +from pathlib import Path +from typing import Union + +from openeo import DataCube +from openeo.api.process import Parameter +from openeo.processes import ProcessBuilder, apply_neighborhood +from openeo.rest.udp import build_process_dict + +from fusets.openeo import load_mogpr_udf + + +def get_mogpr( + input_cube: Union[DataCube, Parameter], +) -> ProcessBuilder: + return apply_neighborhood(input_cube, + lambda data: data.run_udf(udf=Path("set_path.py").read_text()+"\n"+load_mogpr_udf(), runtime='Python', context=dict()), + size=[ + {'dimension': 'x', 'value': 32, 'unit': 'px'}, + {'dimension': 'y', 'value': 32, 'unit': 'px'} + ], overlap=[]) + + +def generate() -> dict: + # define parameters + input_cube = Parameter.datacube( + name="data", + description="Raster cube for which to calculate the peaks and valleys" + ) + + mogpr = get_mogpr( + input_cube=input_cube, + ) + + return build_process_dict( + process_graph=mogpr, + process_id="fusets_mogpr", + summary="Integrates timeseries in data cube using multi-output gaussian process regression", + description=(Path(__file__).parent / "README.md").read_text(), + parameters=[input_cube], + returns=None, # TODO + categories=None, # TODO + ) + + +if __name__ == "__main__": + # save the generated process to a file + with open(Path(__file__).parent / "fusets_mogpr.json", "w") as f: + json.dump(generate(), f, indent=2) diff --git a/openeo_udp/fusets_mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py new file mode 100644 index 0000000..a7ecba4 --- /dev/null +++ b/openeo_udp/fusets_mogpr/set_path.py @@ -0,0 +1,59 @@ +import os +import sys +import zipfile +import requests +import functools + +from openeo.udf import inspect + + +def download_file(url, path): + """ + Downloads a file from the given URL to the specified path. + """ + response = requests.get(url, stream=True) + with open(path, "wb") as file: + file.write(response.content) + + +def extract_zip(zip_path, extract_to): + """ + Extracts a zip file from zip_path to the specified extract_to directory. + """ + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(extract_to) + + +def add_directory_to_sys_path(directory): + """ + Adds a directory to the Python sys.path if it's not already present. + """ + if directory not in sys.path: + sys.path.insert(0, directory) + +@functools.lru_cache(maxsize=5) +def setup_dependencies(dependencies_url,DEPENDENCIES_DIR): + """ + Main function to set up the dependencies by downloading, extracting, + and adding necessary directories to sys.path. + """ + + inspect(message="Create directories") + # Ensure base directories exist + os.makedirs(DEPENDENCIES_DIR, exist_ok=True) + + # Download and extract dependencies if not already present + if not os.listdir(DEPENDENCIES_DIR): + + inspect(message="Extract dependencies") + zip_path = os.path.join(DEPENDENCIES_DIR, "temp.zip") + download_file(dependencies_url, zip_path) + extract_zip(zip_path, DEPENDENCIES_DIR) + os.remove(zip_path) + + # Add the extracted dependencies directory to sys.path + add_directory_to_sys_path(DEPENDENCIES_DIR) + inspect(message="Added to the sys path") + +setup_dependencies("https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip", 'venv') +setup_dependencies("https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip", 'venv_static') \ No newline at end of file diff --git a/qa/unittests/requirements.txt b/qa/unittests/requirements.txt index 01be142..b1c0ca1 100644 --- a/qa/unittests/requirements.txt +++ b/qa/unittests/requirements.txt @@ -3,3 +3,4 @@ git+https://github.com/ESA-APEx/esa-apex-toolbox-python.git@main pytest>=8.2.0 moto[s3, server]>=5.0.13 dirty-equals>=0.8.0 +fusets>=2.0.1