Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

propose run udf on vector cube #200

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import requests

from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir
from openeo.util import ensure_dir, str_truncate
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.util.utm import area_in_square_meters
from openeo_driver.utils import EvalEnv
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to build unit test for this feature: but failed on this:
import of geopyspark-driver in python driver code is not going to work


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,6 +225,38 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
)

def apply_dimension(
self,
process: dict,
*,
dimension: str,
target_dimension: Optional[str] = None,
context: Optional[dict] = None,
env: EvalEnv,
) -> "DriverVectorCube":
if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this check for the "geometries" dimension instead of the "bands" dimension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so too at first, but when you want to run a udf for each feature, and get all its properties, then the dimension you are working on is "bands" I guess?

visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process)
udf = visitor.udf_args.get('udf', None)

from openeo.udf import FeatureCollection, UdfData
collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df())
data = UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the vector data (labels along geometry dimension) is passed here as feature collection, but the vector cube might also contain actual data cube values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at _as_geopandas_df, I have the impression that it does merge the geometries with the datacube values?

I'm only a bit worried about from_geojson, which does not seem to extract any property values into cube values, but that's maybe not relevant to what I try to solve here.

)

log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
result_data = env.backend_implementation.processing.run_udf(udf, data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")

if isinstance(result_data, UdfData):
if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1):
return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data)

raise ValueError(f"Could not handle UDF result: {result_data}")

else:
raise FeatureUnsupportedException()

@classmethod
def from_fiona(
cls,
Expand Down