From 5788e60b5b71444a37e07d8ebf8a032d14c7ad07 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 10:29:40 +0100 Subject: [PATCH 01/15] mogpr to openeo_udp --- openeo_udp/mogpr/README.md | 51 +++++++++++++++++++++++ openeo_udp/mogpr/generate.py | 53 ++++++++++++++++++++++++ openeo_udp/mogpr/set_path.py | 79 ++++++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+) create mode 100644 openeo_udp/mogpr/README.md create mode 100644 openeo_udp/mogpr/generate.py create mode 100644 openeo_udp/mogpr/set_path.py diff --git a/openeo_udp/mogpr/README.md b/openeo_udp/mogpr/README.md new file mode 100644 index 0000000..2f2b14c --- /dev/null +++ b/openeo_udp/mogpr/README.md @@ -0,0 +1,51 @@ +# Multi output gaussian process regression + +## Description + +Compute an integrated timeseries based on multiple inputs. +For instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI. + +## Limitations + +The spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km). + +## Configuration & Resource Usage + +Run configurations for different ROI/TOI with memory requirements and estimated run durations. + +### Synchronous calls + +TODO: Replace with actual measurements!!! + +| Spatial extent | Run duration | +|----------------|--------------| +| 100 m x 100 m | 1 minute | +| 500m x 500 m | 1 minute | +| 1 km x 1 km | 1 minute | +| 5 km x 5 km | 2 minutes | +| 10 km x 10 km | 3 minutes | +| 50 km x 50 km | 9 minutes | + +The maximum duration of a synchronous run is 15 minutes. +For long running computations, you can use batch jobs. + +### Batch jobs + +TODO: Replace with actual measurements!!! + +| Spatial extent | Temporal extent | Executor memory | Run duration | +|-----------------|-----------------|-----------------|--------------| +| 100 m x 100 m | 1 month | default | 7 minutes | +| 500 m x 100 m | 1 month | default | 7 minutes | +| 1 km x 1 km | 1 month | default | 7 minutes | +| 5 km x 5 km | 1 month | default | 10 minutes | +| 10 km x 10 km | 1 month | default | 11 minutes | +| 50 km x 50 km | 1 month | 6 GB | 20 minutes | +| 100 km x 100 km | 1 month | 7 GB | 34 minutes | +| 100m x 100 m | 7 months | default | 10 minutes | +| 500 m x 500 m | 7 months | default | 10 minutes | +| 1 km x 1 km | 7 months | default | 14 minutes | +| 5 km x 5 km | 7 months | default | 14 minutes | +| 10 km x 10 km | 7 months | default | 19 minutes | +| 50 km x 50 km | 7 months | 6 GB | 45 minutes | +| 100 km x 100 km | 7 months | 8 GB | 65 minutes | diff --git a/openeo_udp/mogpr/generate.py b/openeo_udp/mogpr/generate.py new file mode 100644 index 0000000..a5edbae --- /dev/null +++ b/openeo_udp/mogpr/generate.py @@ -0,0 +1,53 @@ +import json +from pathlib import Path +from set_path import load_set_path +from typing import Union + +import openeo +from openeo import DataCube +from openeo.api.process import Parameter +from openeo.processes import ProcessBuilder, apply_neighborhood +from openeo.rest.udp import build_process_dict + +from fusets.openeo import load_mogpr_udf +from fusets.openeo.services.publish_mogpr import NEIGHBORHOOD_SIZE + +def get_mogpr( + input_cube: Union[DataCube, Parameter], +) -> ProcessBuilder: + return apply_neighborhood(input_cube, + lambda data: data.run_udf(udf=load_set_path()+"\n"+load_mogpr_udf(), runtime='Python', context=dict()), + size=[ + {'dimension': 'x', 'value': NEIGHBORHOOD_SIZE, 'unit': 'px'}, + {'dimension': 'y', 'value': NEIGHBORHOOD_SIZE, 'unit': 'px'} + ], overlap=[]) + + +def generate() -> dict: + connection = openeo.connect("openeofed.dataspace.copernicus.eu") + + # define parameters + input_cube = Parameter.datacube( + name="input_raster_cube", + description="Raster cube for which to calculate the peaks and valleys" + ) + + mogpr = get_mogpr( + input_cube=input_cube, + ) + + return build_process_dict( + process_graph=mogpr, + process_id="mogpr", + summary="Integrates timeseries in data cube using multi-output gaussian process regression", + description=(Path(__file__).parent / "README.md").read_text(), + parameters=[input_cube], + returns=None, # TODO + categories=None, # TODO + ) + + +if __name__ == "__main__": + # save the generated process to a file + with open(Path(__file__).parent / "mogpr.json", "w") as f: + json.dump(generate(), f, indent=2) diff --git a/openeo_udp/mogpr/set_path.py b/openeo_udp/mogpr/set_path.py new file mode 100644 index 0000000..686d48b --- /dev/null +++ b/openeo_udp/mogpr/set_path.py @@ -0,0 +1,79 @@ +import os +import sys +import zipfile +import requests +import functools +from typing import Union +from pathlib import Path + +from openeo.udf import inspect + + +# Example constants for demonstration +DEPENDENCIES_DIR1 = 'venv' +DEPENDENCIES_DIR2 = 'venv_static' + +DEPENDENCIES_URL1 = "https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip" +DEPENDENCIES_URL2 = "https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip" + + +def download_file(url, path): + """ + Downloads a file from the given URL to the specified path. + """ + response = requests.get(url, stream=True) + with open(path, "wb") as file: + file.write(response.content) + + +def extract_zip(zip_path, extract_to): + """ + Extracts a zip file from zip_path to the specified extract_to directory. + """ + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(extract_to) + os.remove(zip_path) # Clean up the zip file after extraction + + +def add_directory_to_sys_path(directory): + """ + Adds a directory to the Python sys.path if it's not already present. + """ + if directory not in sys.path: + sys.path.insert(0, directory) + +@functools.lru_cache(maxsize=5) +def setup_dependencies(dependencies_url,DEPENDENCIES_DIR): + """ + Main function to set up the dependencies by downloading, extracting, + and adding necessary directories to sys.path. + """ + + inspect(message="Create directories") + # Ensure base directories exist + os.makedirs(DEPENDENCIES_DIR, exist_ok=True) + + # Download and extract dependencies if not already present + if not os.listdir(DEPENDENCIES_DIR): + + inspect(message="Extract dependencies") + zip_path = os.path.join(DEPENDENCIES_DIR, "temp.zip") + download_file(dependencies_url, zip_path) + extract_zip(zip_path, DEPENDENCIES_DIR) + + # Add the extracted dependencies directory to sys.path + add_directory_to_sys_path(DEPENDENCIES_DIR) + inspect(message="Added to the sys path") + +setup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1) +setup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2) + + +def load_set_path() -> str: + """ + loads path setup functions + @return: + """ + import os + + return Path(os.path.realpath(__file__)).read_text() \ No newline at end of file From 615c74851cbdda3e60dde1efd5d6ae07a7e5bf81 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 11:11:07 +0100 Subject: [PATCH 02/15] changed process id --- openeo_udp/mogpr/generate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openeo_udp/mogpr/generate.py b/openeo_udp/mogpr/generate.py index a5edbae..a852a91 100644 --- a/openeo_udp/mogpr/generate.py +++ b/openeo_udp/mogpr/generate.py @@ -38,7 +38,7 @@ def generate() -> dict: return build_process_dict( process_graph=mogpr, - process_id="mogpr", + process_id="fusets_mogpr", summary="Integrates timeseries in data cube using multi-output gaussian process regression", description=(Path(__file__).parent / "README.md").read_text(), parameters=[input_cube], From cfe6b0c60cbbba101585779db6212d8aeb2e27c8 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 11:24:26 +0100 Subject: [PATCH 03/15] moved dir due to sys path issue --- openeo_udp/{mogpr => fusets_mogpr}/README.md | 0 openeo_udp/fusets_mogpr/fusets_mogpr.json | 55 +++++++++++++++++++ .../{mogpr => fusets_mogpr}/generate.py | 2 +- .../{mogpr => fusets_mogpr}/set_path.py | 0 4 files changed, 56 insertions(+), 1 deletion(-) rename openeo_udp/{mogpr => fusets_mogpr}/README.md (100%) create mode 100644 openeo_udp/fusets_mogpr/fusets_mogpr.json rename openeo_udp/{mogpr => fusets_mogpr}/generate.py (96%) rename openeo_udp/{mogpr => fusets_mogpr}/set_path.py (100%) diff --git a/openeo_udp/mogpr/README.md b/openeo_udp/fusets_mogpr/README.md similarity index 100% rename from openeo_udp/mogpr/README.md rename to openeo_udp/fusets_mogpr/README.md diff --git a/openeo_udp/fusets_mogpr/fusets_mogpr.json b/openeo_udp/fusets_mogpr/fusets_mogpr.json new file mode 100644 index 0000000..ac1c837 --- /dev/null +++ b/openeo_udp/fusets_mogpr/fusets_mogpr.json @@ -0,0 +1,55 @@ +{ + "process_graph": { + "applyneighborhood1": { + "process_id": "apply_neighborhood", + "arguments": { + "data": { + "from_parameter": "input_raster_cube" + }, + "overlap": [], + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "context": {}, + "data": { + "from_parameter": "data" + }, + "runtime": "Python", + "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom typing import Union\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\n# Example constants for demonstration\nDEPENDENCIES_DIR1 = 'venv'\nDEPENDENCIES_DIR2 = 'venv_static'\n\nDEPENDENCIES_URL1 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\"\nDEPENDENCIES_URL2 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\"\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n os.remove(zip_path) # Clean up the zip file after extraction\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1)\nsetup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2)\n\n\ndef load_set_path() -> str:\n \"\"\"\n loads path setup functions \n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" + }, + "result": true + } + } + }, + "size": [ + { + "dimension": "x", + "value": 32, + "unit": "px" + }, + { + "dimension": "y", + "value": 32, + "unit": "px" + } + ] + }, + "result": true + } + }, + "id": "fusets_mogpr", + "summary": "Integrates timeseries in data cube using multi-output gaussian process regression", + "description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n", + "parameters": [ + { + "name": "input_raster_cube", + "description": "Raster cube for which to calculate the peaks and valleys", + "schema": { + "type": "object", + "subtype": "datacube" + } + } + ] +} \ No newline at end of file diff --git a/openeo_udp/mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py similarity index 96% rename from openeo_udp/mogpr/generate.py rename to openeo_udp/fusets_mogpr/generate.py index a852a91..922a26a 100644 --- a/openeo_udp/mogpr/generate.py +++ b/openeo_udp/fusets_mogpr/generate.py @@ -49,5 +49,5 @@ def generate() -> dict: if __name__ == "__main__": # save the generated process to a file - with open(Path(__file__).parent / "mogpr.json", "w") as f: + with open(Path(__file__).parent / "fusets_mogpr.json", "w") as f: json.dump(generate(), f, indent=2) diff --git a/openeo_udp/mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py similarity index 100% rename from openeo_udp/mogpr/set_path.py rename to openeo_udp/fusets_mogpr/set_path.py From 682134dd9392298c1393b9a04d85cfa42af83c3f Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:23:02 +0100 Subject: [PATCH 04/15] algorithm catalog --- algorithm_catalog/fusets_mogpr.json | 136 ++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 algorithm_catalog/fusets_mogpr.json diff --git a/algorithm_catalog/fusets_mogpr.json b/algorithm_catalog/fusets_mogpr.json new file mode 100644 index 0000000..1990a74 --- /dev/null +++ b/algorithm_catalog/fusets_mogpr.json @@ -0,0 +1,136 @@ +{ + "id": "fusets_mogpr", + "type": "Feature", + "conformsTo": [ + "http://www.opengis.net/spec/ogcapi-records-1/1.0/req/record-core" + ], + "geometry": null, + "properties": { + "created": "2025-01-093T00:00:00Z", + "updated": "2025-01-09T00:00:00Z", + "type": "apex_algorithm", + "title": "Multi output gaussian process regression", + "description": "Integrates timeseries in data cube using multi-output gaussian process regression. The service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other indicators that are correlated with each other.", + "cost_estimate": 12, + "cost_unit": "platform credits per km²", + "keywords": [ + "timeseries", + "Gaussian Process Regression (GPR)" + ], + "language": { + "code": "en-US", + "name": "English (United States)" + }, + "languages": [ + { + "code": "en-US", + "name": "English (United States)" + } + ], + "contacts": [ + { + "name": "Bram Janssen", + "position": "Researcher", + "organization": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + }, + { + "href": "https://github.com/JanssenBrm", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "Contact via VITO", + "roles": [ + "principal investigator" + ] + }, + { + "name": "Pratichhya Sharma", + "position": "Researcher", + "organization": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + }, + { + "href": "https://github.com/Pratichhya", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "Contact via VITO", + "roles": [ + "service provider" + ] + }, + { + "name": "VITO", + "links": [ + { + "href": "https://www.vito.be/", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "SEE WEBSITE", + "roles": [ + "processor" + ] + } + ], + "themes": [ + { + "concepts": [ + { + "id": "Normalised vegetation difference index (NDVI)" + }, + { + "id": "Radar Vegetation Index (RVI)" + }, + { + "id": "Multi-output Gaussian Process Regression (MOGPR)" + } + ], + "scheme": "https://gcmd.earthdata.nasa.gov/kms/concepts/concept_scheme/sciencekeywords" + } + ], + "formats": [ + { + "name": "JSON" + } + ], + "license": "other" + }, + "linkTemplates": [], + "links": [ + { + "rel": "openeo-process", + "type": "application/json", + "title": "openEO Process Definition", + "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/3b5a011a90f4a3050ff8fdf69ca5bc2fd1535881/openeo_udp/biopar/biopar.json" + }, + { + "rel": "service", + "type": "application/json", + "title": "CDSE openEO federation", + "href": "https://openeofed.dataspace.copernicus.eu" + }, + { + "rel": "license", + "href": "https://apex.esa.int/license" + }, + { + "rel": "example", + "type": "application/json", + "title": "Example output", + "href": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + } + ] +} \ No newline at end of file From 8711703433da1bafca8ff5379ba52dc4cfe09704 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:44:09 +0100 Subject: [PATCH 05/15] benchmark scenario --- benchmark_scenarios/fusets_mogpr.json | 182 ++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 benchmark_scenarios/fusets_mogpr.json diff --git a/benchmark_scenarios/fusets_mogpr.json b/benchmark_scenarios/fusets_mogpr.json new file mode 100644 index 0000000..c0b9e6d --- /dev/null +++ b/benchmark_scenarios/fusets_mogpr.json @@ -0,0 +1,182 @@ +[ + { + "id": "fusets_mogpr", + "type": "openeo", + "description": "Multi output gaussian process regression example on NDVI timeseries", + "backend": "openeofed.dataspace.copernicus.eu", + "process_graph": { + "aggregatespatial1": { + "arguments": { + "data": { + "from_node": "mogpr1" + }, + "geometries": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "reducer": { + "process_graph": { + "mean1": { + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "process_id": "mean", + "result": true + } + } + } + }, + "process_id": "aggregate_spatial" + }, + "loadcollection1": { + "arguments": { + "bands": [ + "B04", + "B08" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2023-07-31" + ] + }, + "process_id": "load_collection" + }, + "loadcollection2": { + "arguments": { + "bands": [ + "SCL" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2023-07-31" + ] + }, + "process_id": "load_collection" + }, + "mask1": { + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "mask": { + "from_node": "toscldilationmask1" + } + }, + "process_id": "mask" + }, + "mogpr1": { + "arguments": { + "input_raster_cube": { + "from_node": "ndvi1" + } + }, + "namespace": "https://openeo.dataspace.copernicus.eu/openeo/1.2/processes/u:3e24e251-2e9a-438f-90a9-d4500e576574/mogpr", + "process_id": "mogpr" + }, + "ndvi1": { + "arguments": { + "data": { + "from_node": "mask1" + }, + "nir": "B08", + "red": "B04" + }, + "process_id": "ndvi" + }, + "toscldilationmask1": { + "arguments": { + "data": { + "from_node": "loadcollection2" + } + }, + "process_id": "to_scl_dilation_mask" + } + }, + "reference_data": { + "job-results.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/job-results.json", + "timeseries.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + } + } + ] + \ No newline at end of file From 711c44db14232984720b723f2cfcd822dd25d12f Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:08:17 +0100 Subject: [PATCH 06/15] updated namespace --- algorithm_catalog/fusets_mogpr.json | 2 +- benchmark_scenarios/fusets_mogpr.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/algorithm_catalog/fusets_mogpr.json b/algorithm_catalog/fusets_mogpr.json index 1990a74..309a512 100644 --- a/algorithm_catalog/fusets_mogpr.json +++ b/algorithm_catalog/fusets_mogpr.json @@ -114,7 +114,7 @@ "rel": "openeo-process", "type": "application/json", "title": "openEO Process Definition", - "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/3b5a011a90f4a3050ff8fdf69ca5bc2fd1535881/openeo_udp/biopar/biopar.json" + "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json" }, { "rel": "service", diff --git a/benchmark_scenarios/fusets_mogpr.json b/benchmark_scenarios/fusets_mogpr.json index c0b9e6d..21006d3 100644 --- a/benchmark_scenarios/fusets_mogpr.json +++ b/benchmark_scenarios/fusets_mogpr.json @@ -151,7 +151,7 @@ "from_node": "ndvi1" } }, - "namespace": "https://openeo.dataspace.copernicus.eu/openeo/1.2/processes/u:3e24e251-2e9a-438f-90a9-d4500e576574/mogpr", + "namespace": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json", "process_id": "mogpr" }, "ndvi1": { From 044b57b2ce266bd85bae429963f9740e7fbd8c68 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:09:00 +0100 Subject: [PATCH 07/15] preetify json --- algorithm_catalog/fusets_mogpr.json | 10 +- benchmark_scenarios/fusets_mogpr.json | 349 +++++++++++----------- openeo_udp/fusets_mogpr/fusets_mogpr.json | 100 +++---- 3 files changed, 229 insertions(+), 230 deletions(-) diff --git a/algorithm_catalog/fusets_mogpr.json b/algorithm_catalog/fusets_mogpr.json index 309a512..0c4ecc4 100644 --- a/algorithm_catalog/fusets_mogpr.json +++ b/algorithm_catalog/fusets_mogpr.json @@ -103,7 +103,7 @@ ], "formats": [ { - "name": "JSON" + "name": "JSON" } ], "license": "other" @@ -127,10 +127,10 @@ "href": "https://apex.esa.int/license" }, { - "rel": "example", - "type": "application/json", - "title": "Example output", - "href": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + "rel": "example", + "type": "application/json", + "title": "Example output", + "href": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" } ] } \ No newline at end of file diff --git a/benchmark_scenarios/fusets_mogpr.json b/benchmark_scenarios/fusets_mogpr.json index 21006d3..00792bd 100644 --- a/benchmark_scenarios/fusets_mogpr.json +++ b/benchmark_scenarios/fusets_mogpr.json @@ -1,182 +1,181 @@ [ { - "id": "fusets_mogpr", - "type": "openeo", - "description": "Multi output gaussian process regression example on NDVI timeseries", - "backend": "openeofed.dataspace.copernicus.eu", - "process_graph": { - "aggregatespatial1": { - "arguments": { - "data": { - "from_node": "mogpr1" - }, - "geometries": { - "coordinates": [ - [ - [ - 5.170012098271149, - 51.25062964728295 - ], - [ - 5.17085904378298, - 51.24882567194015 - ], - [ - 5.17857421368097, - 51.2468515482926 - ], - [ - 5.178972704726344, - 51.24982704376254 - ], - [ - 5.170012098271149, - 51.25062964728295 - ] - ] - ], - "type": "Polygon" - }, - "reducer": { - "process_graph": { - "mean1": { + "id": "fusets_mogpr", + "type": "openeo", + "description": "Multi output gaussian process regression example on NDVI timeseries", + "backend": "openeofed.dataspace.copernicus.eu", + "process_graph": { + "aggregatespatial1": { "arguments": { - "data": { - "from_parameter": "data" - } + "data": { + "from_node": "mogpr1" + }, + "geometries": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "reducer": { + "process_graph": { + "mean1": { + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "process_id": "mean", + "result": true + } + } + } }, - "process_id": "mean", - "result": true - } + "process_id": "aggregate_spatial" + }, + "loadcollection1": { + "arguments": { + "bands": [ + "B04", + "B08" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2023-07-31" + ] + }, + "process_id": "load_collection" + }, + "loadcollection2": { + "arguments": { + "bands": [ + "SCL" + ], + "id": "SENTINEL2_L2A", + "spatial_extent": { + "coordinates": [ + [ + [ + 5.170012098271149, + 51.25062964728295 + ], + [ + 5.17085904378298, + 51.24882567194015 + ], + [ + 5.17857421368097, + 51.2468515482926 + ], + [ + 5.178972704726344, + 51.24982704376254 + ], + [ + 5.170012098271149, + 51.25062964728295 + ] + ] + ], + "type": "Polygon" + }, + "temporal_extent": [ + "2022-05-01", + "2023-07-31" + ] + }, + "process_id": "load_collection" + }, + "mask1": { + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "mask": { + "from_node": "toscldilationmask1" + } + }, + "process_id": "mask" + }, + "mogpr1": { + "arguments": { + "input_raster_cube": { + "from_node": "ndvi1" + } + }, + "namespace": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json", + "process_id": "mogpr" + }, + "ndvi1": { + "arguments": { + "data": { + "from_node": "mask1" + }, + "nir": "B08", + "red": "B04" + }, + "process_id": "ndvi" + }, + "toscldilationmask1": { + "arguments": { + "data": { + "from_node": "loadcollection2" + } + }, + "process_id": "to_scl_dilation_mask" } - } - }, - "process_id": "aggregate_spatial" - }, - "loadcollection1": { - "arguments": { - "bands": [ - "B04", - "B08" - ], - "id": "SENTINEL2_L2A", - "spatial_extent": { - "coordinates": [ - [ - [ - 5.170012098271149, - 51.25062964728295 - ], - [ - 5.17085904378298, - 51.24882567194015 - ], - [ - 5.17857421368097, - 51.2468515482926 - ], - [ - 5.178972704726344, - 51.24982704376254 - ], - [ - 5.170012098271149, - 51.25062964728295 - ] - ] - ], - "type": "Polygon" - }, - "temporal_extent": [ - "2022-05-01", - "2023-07-31" - ] - }, - "process_id": "load_collection" - }, - "loadcollection2": { - "arguments": { - "bands": [ - "SCL" - ], - "id": "SENTINEL2_L2A", - "spatial_extent": { - "coordinates": [ - [ - [ - 5.170012098271149, - 51.25062964728295 - ], - [ - 5.17085904378298, - 51.24882567194015 - ], - [ - 5.17857421368097, - 51.2468515482926 - ], - [ - 5.178972704726344, - 51.24982704376254 - ], - [ - 5.170012098271149, - 51.25062964728295 - ] - ] - ], - "type": "Polygon" - }, - "temporal_extent": [ - "2022-05-01", - "2023-07-31" - ] - }, - "process_id": "load_collection" - }, - "mask1": { - "arguments": { - "data": { - "from_node": "loadcollection1" - }, - "mask": { - "from_node": "toscldilationmask1" - } - }, - "process_id": "mask" - }, - "mogpr1": { - "arguments": { - "input_raster_cube": { - "from_node": "ndvi1" - } - }, - "namespace": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json", - "process_id": "mogpr" - }, - "ndvi1": { - "arguments": { - "data": { - "from_node": "mask1" - }, - "nir": "B08", - "red": "B04" - }, - "process_id": "ndvi" - }, - "toscldilationmask1": { - "arguments": { - "data": { - "from_node": "loadcollection2" - } }, - "process_id": "to_scl_dilation_mask" - } - }, - "reference_data": { - "job-results.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/job-results.json", - "timeseries.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" - } + "reference_data": { + "job-results.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/job-results.json", + "timeseries.json": "https://s3.waw3-1.cloudferro.com/swift/v1/apex-examples/fusets_mogpr/timeseries.json" + } } - ] - \ No newline at end of file +] \ No newline at end of file diff --git a/openeo_udp/fusets_mogpr/fusets_mogpr.json b/openeo_udp/fusets_mogpr/fusets_mogpr.json index ac1c837..41a7dd9 100644 --- a/openeo_udp/fusets_mogpr/fusets_mogpr.json +++ b/openeo_udp/fusets_mogpr/fusets_mogpr.json @@ -1,55 +1,55 @@ { - "process_graph": { - "applyneighborhood1": { - "process_id": "apply_neighborhood", - "arguments": { - "data": { - "from_parameter": "input_raster_cube" - }, - "overlap": [], - "process": { - "process_graph": { - "runudf1": { - "process_id": "run_udf", - "arguments": { - "context": {}, + "process_graph": { + "applyneighborhood1": { + "process_id": "apply_neighborhood", + "arguments": { "data": { - "from_parameter": "data" + "from_parameter": "input_raster_cube" }, - "runtime": "Python", - "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom typing import Union\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\n# Example constants for demonstration\nDEPENDENCIES_DIR1 = 'venv'\nDEPENDENCIES_DIR2 = 'venv_static'\n\nDEPENDENCIES_URL1 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\"\nDEPENDENCIES_URL2 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\"\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n os.remove(zip_path) # Clean up the zip file after extraction\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1)\nsetup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2)\n\n\ndef load_set_path() -> str:\n \"\"\"\n loads path setup functions \n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" - }, - "result": true + "overlap": [], + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "context": {}, + "data": { + "from_parameter": "data" + }, + "runtime": "Python", + "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom typing import Union\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\n# Example constants for demonstration\nDEPENDENCIES_DIR1 = 'venv'\nDEPENDENCIES_DIR2 = 'venv_static'\n\nDEPENDENCIES_URL1 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\"\nDEPENDENCIES_URL2 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\"\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n os.remove(zip_path) # Clean up the zip file after extraction\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1)\nsetup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2)\n\n\ndef load_set_path() -> str:\n \"\"\"\n loads path setup functions \n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" + }, + "result": true + } + } + }, + "size": [ + { + "dimension": "x", + "value": 32, + "unit": "px" + }, + { + "dimension": "y", + "value": 32, + "unit": "px" + } + ] + }, + "result": true + } + }, + "id": "fusets_mogpr", + "summary": "Integrates timeseries in data cube using multi-output gaussian process regression", + "description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n", + "parameters": [ + { + "name": "input_raster_cube", + "description": "Raster cube for which to calculate the peaks and valleys", + "schema": { + "type": "object", + "subtype": "datacube" } - } - }, - "size": [ - { - "dimension": "x", - "value": 32, - "unit": "px" - }, - { - "dimension": "y", - "value": 32, - "unit": "px" - } - ] - }, - "result": true - } - }, - "id": "fusets_mogpr", - "summary": "Integrates timeseries in data cube using multi-output gaussian process regression", - "description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n", - "parameters": [ - { - "name": "input_raster_cube", - "description": "Raster cube for which to calculate the peaks and valleys", - "schema": { - "type": "object", - "subtype": "datacube" - } - } - ] + } + ] } \ No newline at end of file From c3b998f3e7f4384149128d631961ad0b31d50ce2 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Thu, 9 Jan 2025 14:10:58 +0100 Subject: [PATCH 08/15] ruff checked --- openeo_udp/fusets_mogpr/generate.py | 3 --- openeo_udp/fusets_mogpr/set_path.py | 1 - 2 files changed, 4 deletions(-) diff --git a/openeo_udp/fusets_mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py index 922a26a..8e31fa4 100644 --- a/openeo_udp/fusets_mogpr/generate.py +++ b/openeo_udp/fusets_mogpr/generate.py @@ -3,7 +3,6 @@ from set_path import load_set_path from typing import Union -import openeo from openeo import DataCube from openeo.api.process import Parameter from openeo.processes import ProcessBuilder, apply_neighborhood @@ -24,8 +23,6 @@ def get_mogpr( def generate() -> dict: - connection = openeo.connect("openeofed.dataspace.copernicus.eu") - # define parameters input_cube = Parameter.datacube( name="input_raster_cube", diff --git a/openeo_udp/fusets_mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py index 686d48b..6fa803c 100644 --- a/openeo_udp/fusets_mogpr/set_path.py +++ b/openeo_udp/fusets_mogpr/set_path.py @@ -3,7 +3,6 @@ import zipfile import requests import functools -from typing import Union from pathlib import Path from openeo.udf import inspect From 80fd98fae76a7315ea5c59e292d2073cdb474423 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:37:17 +0100 Subject: [PATCH 09/15] updated the README.md file --- openeo_udp/fusets_mogpr/README.md | 108 +++++++++++++++++++----------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/openeo_udp/fusets_mogpr/README.md b/openeo_udp/fusets_mogpr/README.md index 2f2b14c..af4fdff 100644 --- a/openeo_udp/fusets_mogpr/README.md +++ b/openeo_udp/fusets_mogpr/README.md @@ -1,51 +1,83 @@ -# Multi output gaussian process regression +# Multi-output Gaussian process regression (MOGPR) -## Description +The MOGPR service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other correlated indicators. -Compute an integrated timeseries based on multiple inputs. -For instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI. +## Parameters -## Limitations +The MOGPR service requires the following parameters: -The spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km). +- `datacube`: The input datacube that contains the data to be gap-filled. -## Configuration & Resource Usage +## Usage -Run configurations for different ROI/TOI with memory requirements and estimated run durations. +The MOGPR service can be used as follows: -### Synchronous calls +```python -TODO: Replace with actual measurements!!! +import openeo -| Spatial extent | Run duration | -|----------------|--------------| -| 100 m x 100 m | 1 minute | -| 500m x 500 m | 1 minute | -| 1 km x 1 km | 1 minute | -| 5 km x 5 km | 2 minutes | -| 10 km x 10 km | 3 minutes | -| 50 km x 50 km | 9 minutes | +## Setup of parameters +spat_ext = { +    "type": "Polygon", +    "coordinates": [ + [ + [ +                5.170012098271149, +                51.25062964728295 + ], + [ +                5.17085904378298, +                51.24882567194015 + ], + [ +                5.17857421368097, +                51.2468515482926 + ], + [ +                5.178972704726344, +                51.24982704376254 + ], + [ +                5.170012098271149, +                51.25062964728295 + ] + ] + ] +} +temp_ext = ["2022-05-01", "2023-07-31"] -The maximum duration of a synchronous run is 15 minutes. -For long running computations, you can use batch jobs. +## Setup connection to openEO +eoconn = openeo.connect( +        "openeo.dataspace.copernicus.eu" + ).authenticate_oidc("CDSE") -### Batch jobs +## Create a base NDVI datacube that can be used as input for the service +base = eoconn.load_collection('SENTINEL2_L2A', +                                  spatial_extent=spat_ext, +                                  temporal_extent=temp_ext, +                                  bands=["B04", "B08", "SCL"]) +mask = scl.process("to_scl_dilation_mask", data=scl) +base_cloudmasked = base.mask(mask) +base_ndvi = base_cloudmasked.ndvi(red="B04", nir="B08") -TODO: Replace with actual measurements!!! +process_id = "fusets_mogpr" +namespace_url = "public_url"    # publised URL of the process +## Create a processing graph from the MOGPR process using an active openEO connection +mogpr = eoconn.datacube_from_process( +       process_id=process_id, +       namespace= namespace_url, +       input_raster_cube=base_ndvi, + ) -| Spatial extent | Temporal extent | Executor memory | Run duration | -|-----------------|-----------------|-----------------|--------------| -| 100 m x 100 m | 1 month | default | 7 minutes | -| 500 m x 100 m | 1 month | default | 7 minutes | -| 1 km x 1 km | 1 month | default | 7 minutes | -| 5 km x 5 km | 1 month | default | 10 minutes | -| 10 km x 10 km | 1 month | default | 11 minutes | -| 50 km x 50 km | 1 month | 6 GB | 20 minutes | -| 100 km x 100 km | 1 month | 7 GB | 34 minutes | -| 100m x 100 m | 7 months | default | 10 minutes | -| 500 m x 500 m | 7 months | default | 10 minutes | -| 1 km x 1 km | 7 months | default | 14 minutes | -| 5 km x 5 km | 7 months | default | 14 minutes | -| 10 km x 10 km | 7 months | default | 19 minutes | -| 50 km x 50 km | 7 months | 6 GB | 45 minutes | -| 100 km x 100 km | 7 months | 8 GB | 65 minutes | + +## Calculate the average time series value for the given area of interest +mogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean') + +# Execute the service as a batch process +mogpr_job = mogpr.execute_batch('./mogpr.json', out_format="json", title=f'FuseTS - MOGPR') + +``` + +## Output + +The User-Defined-Process (UDP) produces a datacube that contains a gap-filled time series for all pixels within the specified temporal and spatial range. This datacube can be seamlessly integrated with other openEO processes. \ No newline at end of file From 7ced4d2c6d72c49a7f56507025c4b1588fd5ebab Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:20:44 +0100 Subject: [PATCH 10/15] updated requirement txt --- openeo_udp/fusets_mogpr/README.md | 2 +- openeo_udp/fusets_mogpr/generate.py | 4 ++-- qa/unittests/requirements.txt | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/openeo_udp/fusets_mogpr/README.md b/openeo_udp/fusets_mogpr/README.md index af4fdff..be62efa 100644 --- a/openeo_udp/fusets_mogpr/README.md +++ b/openeo_udp/fusets_mogpr/README.md @@ -66,7 +66,7 @@ namespace_url = "public_url"    # publised URL of the process mogpr = eoconn.datacube_from_process(        process_id=process_id,        namespace= namespace_url, -       input_raster_cube=base_ndvi, +      data=base_ndvi, ) diff --git a/openeo_udp/fusets_mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py index 8e31fa4..72a4979 100644 --- a/openeo_udp/fusets_mogpr/generate.py +++ b/openeo_udp/fusets_mogpr/generate.py @@ -25,7 +25,7 @@ def get_mogpr( def generate() -> dict: # define parameters input_cube = Parameter.datacube( - name="input_raster_cube", + name="data", description="Raster cube for which to calculate the peaks and valleys" ) @@ -46,5 +46,5 @@ def generate() -> dict: if __name__ == "__main__": # save the generated process to a file - with open(Path(__file__).parent / "fusets_mogpr.json", "w") as f: + with open(Path(__file__).parent / "fusets_mogpr2.json", "w") as f: json.dump(generate(), f, indent=2) diff --git a/qa/unittests/requirements.txt b/qa/unittests/requirements.txt index 01be142..09eb03c 100644 --- a/qa/unittests/requirements.txt +++ b/qa/unittests/requirements.txt @@ -3,3 +3,4 @@ git+https://github.com/ESA-APEx/esa-apex-toolbox-python.git@main pytest>=8.2.0 moto[s3, server]>=5.0.13 dirty-equals>=0.8.0 +fusets From c0129ca9e7ebee28ee7022a81dc47139746d9a85 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:07:01 +0100 Subject: [PATCH 11/15] addressed the suggested changes on the set_path function --- openeo_udp/fusets_mogpr/generate.py | 11 +++++------ openeo_udp/fusets_mogpr/set_path.py | 26 ++++---------------------- 2 files changed, 9 insertions(+), 28 deletions(-) diff --git a/openeo_udp/fusets_mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py index 72a4979..a3e8b29 100644 --- a/openeo_udp/fusets_mogpr/generate.py +++ b/openeo_udp/fusets_mogpr/generate.py @@ -1,6 +1,5 @@ import json from pathlib import Path -from set_path import load_set_path from typing import Union from openeo import DataCube @@ -9,16 +8,16 @@ from openeo.rest.udp import build_process_dict from fusets.openeo import load_mogpr_udf -from fusets.openeo.services.publish_mogpr import NEIGHBORHOOD_SIZE + def get_mogpr( input_cube: Union[DataCube, Parameter], ) -> ProcessBuilder: return apply_neighborhood(input_cube, - lambda data: data.run_udf(udf=load_set_path()+"\n"+load_mogpr_udf(), runtime='Python', context=dict()), + lambda data: data.run_udf(udf=Path("set_path.py").read_text()+"\n"+load_mogpr_udf(), runtime='Python', context=dict()), size=[ - {'dimension': 'x', 'value': NEIGHBORHOOD_SIZE, 'unit': 'px'}, - {'dimension': 'y', 'value': NEIGHBORHOOD_SIZE, 'unit': 'px'} + {'dimension': 'x', 'value': 32, 'unit': 'px'}, + {'dimension': 'y', 'value': 32, 'unit': 'px'} ], overlap=[]) @@ -46,5 +45,5 @@ def generate() -> dict: if __name__ == "__main__": # save the generated process to a file - with open(Path(__file__).parent / "fusets_mogpr2.json", "w") as f: + with open(Path(__file__).parent / "fusets_mogpr3.json", "w") as f: json.dump(generate(), f, indent=2) diff --git a/openeo_udp/fusets_mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py index 6fa803c..c393ea5 100644 --- a/openeo_udp/fusets_mogpr/set_path.py +++ b/openeo_udp/fusets_mogpr/set_path.py @@ -8,14 +8,6 @@ from openeo.udf import inspect -# Example constants for demonstration -DEPENDENCIES_DIR1 = 'venv' -DEPENDENCIES_DIR2 = 'venv_static' - -DEPENDENCIES_URL1 = "https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip" -DEPENDENCIES_URL2 = "https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip" - - def download_file(url, path): """ Downloads a file from the given URL to the specified path. @@ -31,7 +23,6 @@ def extract_zip(zip_path, extract_to): """ with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extract_to) - os.remove(zip_path) # Clean up the zip file after extraction def add_directory_to_sys_path(directory): @@ -39,7 +30,7 @@ def add_directory_to_sys_path(directory): Adds a directory to the Python sys.path if it's not already present. """ if directory not in sys.path: - sys.path.insert(0, directory) + sys.path.append(directory) @functools.lru_cache(maxsize=5) def setup_dependencies(dependencies_url,DEPENDENCIES_DIR): @@ -59,20 +50,11 @@ def setup_dependencies(dependencies_url,DEPENDENCIES_DIR): zip_path = os.path.join(DEPENDENCIES_DIR, "temp.zip") download_file(dependencies_url, zip_path) extract_zip(zip_path, DEPENDENCIES_DIR) + os.remove(zip_path) # Add the extracted dependencies directory to sys.path add_directory_to_sys_path(DEPENDENCIES_DIR) inspect(message="Added to the sys path") -setup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1) -setup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2) - - -def load_set_path() -> str: - """ - loads path setup functions - @return: - """ - import os - - return Path(os.path.realpath(__file__)).read_text() \ No newline at end of file +setup_dependencies("https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip", 'venv') +setup_dependencies("https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip", 'venv_static') \ No newline at end of file From ce18bc8b88557adfa4180cafe6a39e8b116b63f5 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:15:41 +0100 Subject: [PATCH 12/15] finalised changes with udp --- openeo_udp/fusets_mogpr/fusets_mogpr.json | 8 ++++---- openeo_udp/fusets_mogpr/generate.py | 2 +- openeo_udp/fusets_mogpr/set_path.py | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/openeo_udp/fusets_mogpr/fusets_mogpr.json b/openeo_udp/fusets_mogpr/fusets_mogpr.json index 41a7dd9..2c491e9 100644 --- a/openeo_udp/fusets_mogpr/fusets_mogpr.json +++ b/openeo_udp/fusets_mogpr/fusets_mogpr.json @@ -4,7 +4,7 @@ "process_id": "apply_neighborhood", "arguments": { "data": { - "from_parameter": "input_raster_cube" + "from_parameter": "data" }, "overlap": [], "process": { @@ -17,7 +17,7 @@ "from_parameter": "data" }, "runtime": "Python", - "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom typing import Union\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\n# Example constants for demonstration\nDEPENDENCIES_DIR1 = 'venv'\nDEPENDENCIES_DIR2 = 'venv_static'\n\nDEPENDENCIES_URL1 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\"\nDEPENDENCIES_URL2 = \"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\"\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n os.remove(zip_path) # Clean up the zip file after extraction\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(DEPENDENCIES_URL1, DEPENDENCIES_DIR1)\nsetup_dependencies(DEPENDENCIES_URL2, DEPENDENCIES_DIR2)\n\n\ndef load_set_path() -> str:\n \"\"\"\n loads path setup functions \n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" + "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.append(directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n os.remove(zip_path)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\", 'venv')\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\", 'venv_static')\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" }, "result": true } @@ -41,10 +41,10 @@ }, "id": "fusets_mogpr", "summary": "Integrates timeseries in data cube using multi-output gaussian process regression", - "description": "# Multi output gaussian process regression\n\n## Description\n\nCompute an integrated timeseries based on multiple inputs.\nFor instance, combine Sentinel-2 NDVI with Sentinel-1 RVI into one integrated NDVI.\n\n## Limitations\n\nThe spatial extent is limited to a maximum size equal to a Sentinel-2 MGRS tile (100 km x 100 km).\n\n## Configuration & Resource Usage\n\nRun configurations for different ROI/TOI with memory requirements and estimated run durations.\n\n### Synchronous calls\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Run duration |\n|----------------|--------------|\n| 100 m x 100 m | 1 minute |\n| 500m x 500 m | 1 minute |\n| 1 km x 1 km | 1 minute |\n| 5 km x 5 km | 2 minutes |\n| 10 km x 10 km | 3 minutes |\n| 50 km x 50 km | 9 minutes |\n\nThe maximum duration of a synchronous run is 15 minutes.\nFor long running computations, you can use batch jobs.\n\n### Batch jobs\n\nTODO: Replace with actual measurements!!!\n\n| Spatial extent | Temporal extent | Executor memory | Run duration |\n|-----------------|-----------------|-----------------|--------------|\n| 100 m x 100 m | 1 month | default | 7 minutes |\n| 500 m x 100 m | 1 month | default | 7 minutes |\n| 1 km x 1 km | 1 month | default | 7 minutes |\n| 5 km x 5 km | 1 month | default | 10 minutes |\n| 10 km x 10 km | 1 month | default | 11 minutes |\n| 50 km x 50 km | 1 month | 6 GB | 20 minutes |\n| 100 km x 100 km | 1 month | 7 GB | 34 minutes |\n| 100m x 100 m | 7 months | default | 10 minutes |\n| 500 m x 500 m | 7 months | default | 10 minutes |\n| 1 km x 1 km | 7 months | default | 14 minutes |\n| 5 km x 5 km | 7 months | default | 14 minutes |\n| 10 km x 10 km | 7 months | default | 19 minutes |\n| 50 km x 50 km | 7 months | 6 GB | 45 minutes |\n| 100 km x 100 km | 7 months | 8 GB | 65 minutes |\n", + "description": "# Multi-output Gaussian process regression (MOGPR)\n\nThe MOGPR service is designed to enable multi-output regression analysis using Gaussian Process Regression (GPR) on geospatial data. It provides a powerful tool for understanding and predicting spatiotemporal phenomena by filling gaps based on other correlated indicators.\n\n## Parameters\n\nThe MOGPR service requires the following parameters:\n\n- `datacube`: The input datacube that contains the data to be gap-filled.\n\n## Usage\n\nThe MOGPR service can be used as follows:\n\n```python\n\nimport openeo\n\n## Setup of parameters\nspat_ext = {\n    \"type\": \"Polygon\",\n    \"coordinates\": [\n [\n [\n                5.170012098271149,\n                51.25062964728295\n ],\n [\n                5.17085904378298,\n                51.24882567194015\n ],\n [\n                5.17857421368097,\n                51.2468515482926\n ],\n [\n                5.178972704726344,\n                51.24982704376254\n ],\n [\n                5.170012098271149,\n                51.25062964728295\n ]\n ]\n ]\n}\ntemp_ext = [\"2022-05-01\", \"2023-07-31\"]\n\n## Setup connection to openEO\neoconn = openeo.connect(\n        \"openeo.dataspace.copernicus.eu\"\n ).authenticate_oidc(\"CDSE\")\n\n## Create a base NDVI datacube that can be used as input for the service\nbase = eoconn.load_collection('SENTINEL2_L2A',\n                                  spatial_extent=spat_ext,\n                                  temporal_extent=temp_ext,\n                                  bands=[\"B04\", \"B08\", \"SCL\"])\nmask = scl.process(\"to_scl_dilation_mask\", data=scl)\nbase_cloudmasked = base.mask(mask)\nbase_ndvi = base_cloudmasked.ndvi(red=\"B04\", nir=\"B08\")\n\nprocess_id = \"fusets_mogpr\"\nnamespace_url = \"public_url\"    # publised URL of the process\n## Create a processing graph from the MOGPR process using an active openEO connection\nmogpr = eoconn.datacube_from_process(\n       process_id=process_id,\n       namespace= namespace_url,\n      data=base_ndvi, \n )\n\n\n## Calculate the average time series value for the given area of interest\nmogpr = mogpr.aggregate_spatial(spat_ext, reducer='mean')\n\n# Execute the service as a batch process\nmogpr_job = mogpr.execute_batch('./mogpr.json', out_format=\"json\", title=f'FuseTS - MOGPR') \n\n```\n\n## Output\n\nThe User-Defined-Process (UDP) produces a datacube that contains a gap-filled time series for all pixels within the specified temporal and spatial range. This datacube can be seamlessly integrated with other openEO processes.", "parameters": [ { - "name": "input_raster_cube", + "name": "data", "description": "Raster cube for which to calculate the peaks and valleys", "schema": { "type": "object", diff --git a/openeo_udp/fusets_mogpr/generate.py b/openeo_udp/fusets_mogpr/generate.py index a3e8b29..2d4e128 100644 --- a/openeo_udp/fusets_mogpr/generate.py +++ b/openeo_udp/fusets_mogpr/generate.py @@ -45,5 +45,5 @@ def generate() -> dict: if __name__ == "__main__": # save the generated process to a file - with open(Path(__file__).parent / "fusets_mogpr3.json", "w") as f: + with open(Path(__file__).parent / "fusets_mogpr.json", "w") as f: json.dump(generate(), f, indent=2) diff --git a/openeo_udp/fusets_mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py index c393ea5..59366cf 100644 --- a/openeo_udp/fusets_mogpr/set_path.py +++ b/openeo_udp/fusets_mogpr/set_path.py @@ -3,7 +3,6 @@ import zipfile import requests import functools -from pathlib import Path from openeo.udf import inspect From 2af64c84b4a13b406f97a927af90e835c86fe508 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:16:08 +0100 Subject: [PATCH 13/15] fusets_version --- qa/unittests/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/unittests/requirements.txt b/qa/unittests/requirements.txt index 09eb03c..b1c0ca1 100644 --- a/qa/unittests/requirements.txt +++ b/qa/unittests/requirements.txt @@ -3,4 +3,4 @@ git+https://github.com/ESA-APEx/esa-apex-toolbox-python.git@main pytest>=8.2.0 moto[s3, server]>=5.0.13 dirty-equals>=0.8.0 -fusets +fusets>=2.0.1 From 6bfda52913d4f95c31880a6741598e17786a8e83 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:37:12 +0100 Subject: [PATCH 14/15] back to insert instead of append --- openeo_udp/fusets_mogpr/fusets_mogpr.json | 2 +- openeo_udp/fusets_mogpr/set_path.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_udp/fusets_mogpr/fusets_mogpr.json b/openeo_udp/fusets_mogpr/fusets_mogpr.json index 2c491e9..dbdd837 100644 --- a/openeo_udp/fusets_mogpr/fusets_mogpr.json +++ b/openeo_udp/fusets_mogpr/fusets_mogpr.json @@ -17,7 +17,7 @@ "from_parameter": "data" }, "runtime": "Python", - "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\nfrom pathlib import Path\n\nfrom openeo.udf import inspect\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.append(directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n os.remove(zip_path)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\", 'venv')\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\", 'venv_static')\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" + "udf": "import os\nimport sys\nimport zipfile\nimport requests\nimport functools\n\nfrom openeo.udf import inspect\n\n\ndef download_file(url, path):\n \"\"\"\n Downloads a file from the given URL to the specified path.\n \"\"\"\n response = requests.get(url, stream=True)\n with open(path, \"wb\") as file:\n file.write(response.content)\n\n\ndef extract_zip(zip_path, extract_to):\n \"\"\"\n Extracts a zip file from zip_path to the specified extract_to directory.\n \"\"\"\n with zipfile.ZipFile(zip_path, \"r\") as zip_ref:\n zip_ref.extractall(extract_to)\n\n\ndef add_directory_to_sys_path(directory):\n \"\"\"\n Adds a directory to the Python sys.path if it's not already present.\n \"\"\"\n if directory not in sys.path:\n sys.path.insert(0, directory)\n\n@functools.lru_cache(maxsize=5)\ndef setup_dependencies(dependencies_url,DEPENDENCIES_DIR):\n \"\"\"\n Main function to set up the dependencies by downloading, extracting,\n and adding necessary directories to sys.path.\n \"\"\"\n\n inspect(message=\"Create directories\")\n # Ensure base directories exist\n os.makedirs(DEPENDENCIES_DIR, exist_ok=True)\n\n # Download and extract dependencies if not already present\n if not os.listdir(DEPENDENCIES_DIR):\n\n inspect(message=\"Extract dependencies\")\n zip_path = os.path.join(DEPENDENCIES_DIR, \"temp.zip\")\n download_file(dependencies_url, zip_path)\n extract_zip(zip_path, DEPENDENCIES_DIR)\n os.remove(zip_path)\n\n # Add the extracted dependencies directory to sys.path\n add_directory_to_sys_path(DEPENDENCIES_DIR)\n inspect(message=\"Added to the sys path\")\n\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets_venv.zip\", 'venv')\nsetup_dependencies(\"https://artifactory.vgt.vito.be:443/artifactory/auxdata-public/ai4food/fusets.zip\", 'venv_static')\nimport os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in ['tmp/venv_static', 'tmp/venv']:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ['HOME'] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv('HOME')\n set_home('/tmp')\n user_file = Path.home() / '.config' / 'GPy' / 'user.cfg'\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config['plotting'] = {\n 'library': 'none'\n }\n with open(user_file, 'w') as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n dims = cube.get_array().dims\n result = mogpr(cube.get_array().to_dataset(dim=\"bands\"))\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n return Path(os.path.realpath(__file__)).read_text()\n" }, "result": true } diff --git a/openeo_udp/fusets_mogpr/set_path.py b/openeo_udp/fusets_mogpr/set_path.py index 59366cf..a7ecba4 100644 --- a/openeo_udp/fusets_mogpr/set_path.py +++ b/openeo_udp/fusets_mogpr/set_path.py @@ -29,7 +29,7 @@ def add_directory_to_sys_path(directory): Adds a directory to the Python sys.path if it's not already present. """ if directory not in sys.path: - sys.path.append(directory) + sys.path.insert(0, directory) @functools.lru_cache(maxsize=5) def setup_dependencies(dependencies_url,DEPENDENCIES_DIR): From 0a18fa04118f941c1fbee2c2aa9c0a2bdb277bc2 Mon Sep 17 00:00:00 2001 From: Pratichhya <39898768+Pratichhya@users.noreply.github.com> Date: Tue, 14 Jan 2025 15:42:44 +0100 Subject: [PATCH 15/15] updated benchmark scenario and results --- benchmark_scenarios/fusets_mogpr.json | 35 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/benchmark_scenarios/fusets_mogpr.json b/benchmark_scenarios/fusets_mogpr.json index 00792bd..91f0829 100644 --- a/benchmark_scenarios/fusets_mogpr.json +++ b/benchmark_scenarios/fusets_mogpr.json @@ -8,7 +8,7 @@ "aggregatespatial1": { "arguments": { "data": { - "from_node": "mogpr1" + "from_node": "fusetsmogpr1" }, "geometries": { "coordinates": [ @@ -53,6 +53,15 @@ }, "process_id": "aggregate_spatial" }, + "fusetsmogpr1": { + "arguments": { + "data": { + "from_node": "ndvi1" + } + }, + "namespace": "https://openeo.dataspace.copernicus.eu/openeo/1.2/processes/u:3e24e251-2e9a-438f-90a9-d4500e576574/fusets_mogpr", + "process_id": "fusets_mogpr" + }, "loadcollection1": { "arguments": { "bands": [ @@ -89,7 +98,7 @@ }, "temporal_extent": [ "2022-05-01", - "2023-07-31" + "2022-06-01" ] }, "process_id": "load_collection" @@ -129,7 +138,7 @@ }, "temporal_extent": [ "2022-05-01", - "2023-07-31" + "2022-06-01" ] }, "process_id": "load_collection" @@ -145,15 +154,6 @@ }, "process_id": "mask" }, - "mogpr1": { - "arguments": { - "input_raster_cube": { - "from_node": "ndvi1" - } - }, - "namespace": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/mogpr_v1/openeo_udp/fusets_mogpr/fusets_mogpr.json", - "process_id": "mogpr" - }, "ndvi1": { "arguments": { "data": { @@ -164,6 +164,17 @@ }, "process_id": "ndvi" }, + "saveresult1": { + "arguments": { + "data": { + "from_node": "aggregatespatial1" + }, + "format": "JSON", + "options": {} + }, + "process_id": "save_result", + "result": true + }, "toscldilationmask1": { "arguments": { "data": {