Skip to content

Commit

Permalink
Issue #114/#141 convert inline GeoJSON in aggregate_spatial to Vector…
Browse files Browse the repository at this point in the history
…Cube

Also applied black code formatting style on changed/touched bits,
using the `darker` tool
  • Loading branch information
soxofaan committed Oct 10, 2022
1 parent be37238 commit aee2c00
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 205 deletions.
52 changes: 28 additions & 24 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,32 +740,26 @@ def fit_class_random_forest(args: dict, env: EvalEnv) -> DriverMlModel:
return DriverMlModel()

predictors = extract_arg(args, 'predictors')
if not isinstance(predictors, AggregatePolygonSpatialResult):
# TODO #114 EP-3981 add support for real vector cubes.
if not isinstance(predictors, (AggregatePolygonSpatialResult, DriverVectorCube)):
# TODO #114 EP-3981 drop AggregatePolygonSpatialResult support.
raise ProcessParameterInvalidException(
parameter="predictors", process="fit_class_random_forest",
reason=f"should be non-temporal vector-cube (got `{type(predictors)}`)."
)
target = extract_arg(args, 'target')
if not (
isinstance(target, dict)
and target.get("type") == "FeatureCollection"
and isinstance(target.get("features"), list)
):
# TODO #114 EP-3981 vector cube support
raise ProcessParameterInvalidException(
parameter="target", process="fit_class_random_forest",
reason='only GeoJSON FeatureCollection is currently supported.',
parameter="predictors",
process="fit_class_random_forest",
reason=f"should be non-temporal vector-cube, but got {type(predictors)}.",
)
if any(
# TODO: allow string based target labels too?
not isinstance(deep_get(f, "properties", "target", default=None), int)
for f in target.get("features", [])
):

target = extract_arg(args, "target")
if isinstance(target, DriverVectorCube):
pass
elif isinstance(target, dict) and target.get("type") == "FeatureCollection":
# TODO: convert to vector cube, e.g.:
# target = env.backend_implementation.vector_cube_cls.from_geojson(target)
pass
else:
raise ProcessParameterInvalidException(
parameter="target",
process="fit_class_random_forest",
reason="Each feature (from target feature collection) should have an integer 'target' property.",
reason=f"expected feature collection or vector-cube value, but got {type(target)}.",
)

# TODO: get defaults from process spec?
Expand Down Expand Up @@ -1056,7 +1050,8 @@ def aggregate_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
if isinstance(geoms, DriverVectorCube):
geoms = geoms
elif isinstance(geoms, dict):
geoms = geojson_to_geometry(geoms)
# Automatically convert inline GeoJSON to a vector cube #114/#141
geoms = env.backend_implementation.vector_cube_cls.from_geojson(geoms)
elif isinstance(geoms, DelayedVector):
geoms = geoms.path
else:
Expand Down Expand Up @@ -1321,6 +1316,12 @@ def run_udf(args: dict, env: EvalEnv):
# TODO #114 add support for DriverVectorCube
if isinstance(data, AggregatePolygonResult):
pass
if isinstance(data, DriverVectorCube):
# TODO: this is temporary stopgap measure, converting to old-style save results to stay backward compatible.
# Better have proper DriverVectorCube support in run_udf?
# How does that fit in UdfData and UDF function signatures?
data = data.to_legacy_save_result()

if isinstance(data, (DelayedVector, dict)):
if isinstance(data, dict):
data = DelayedVector.from_json_dict(data)
Expand All @@ -1338,7 +1339,10 @@ def run_udf(args: dict, env: EvalEnv):
)
else:
raise ProcessParameterInvalidException(
parameter='data', process='run_udf', reason=f"Invalid data type {type(data)!r} expected raster-cube.")
parameter="data",
process="run_udf",
reason=f"Unsupported data type {type(data)}.",
)

_log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
result_data = openeo.udf.run_udf_code(udf, data)
Expand Down Expand Up @@ -1550,7 +1554,7 @@ def to_vector_cube(args: Dict, env: EvalEnv):
# TODO: standardization of something like this? https://github.com/Open-EO/openeo-processes/issues/346
data = extract_arg(args, "data", process_id="to_vector_cube")
if isinstance(data, dict) and data.get("type") in {"Polygon", "MultiPolygon", "Feature", "FeatureCollection"}:
return DriverVectorCube.from_geojson(data)
return env.backend_implementation.vector_cube_cls.from_geojson(data)
# TODO: support more inputs: string with geojson, string with WKT, list of WKT, string with URL to GeoJSON, ...
raise FeatureUnsupportedException(f"Converting {type(data)} to vector cube is not supported")

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.20.4a1'
__version__ = "0.21.0a1"
5 changes: 4 additions & 1 deletion openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from openeo.capabilities import ComparableVersion
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import rfc3339, dict_no_none
from openeo_driver.datacube import DriverDataCube, DriverMlModel
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
from openeo_driver.datastructs import SarBackscatterArgs
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import CollectionNotFoundException, ServiceUnsupportedException, FeatureUnsupportedException
Expand Down Expand Up @@ -570,6 +570,9 @@ class OpenEoBackendImplementation:
enable_basic_auth = True
enable_oidc_auth = True

# Overridable vector cube implementation
vector_cube_cls = DriverVectorCube

def __init__(
self,
secondary_services: Optional[SecondaryServices] = None,
Expand Down
54 changes: 52 additions & 2 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ def __init__(
def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX) -> "DriverVectorCube":
"""Create new vector cube with same geometries but new cube"""
log.info(f"Creating vector cube with new cube {cube.name!r}")
return DriverVectorCube(geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix)
return type(self)(
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
)

@classmethod
def from_fiona(cls, paths: List[str], driver: str, options: dict) -> "DriverVectorCube":
Expand Down Expand Up @@ -225,7 +227,8 @@ def _as_geopandas_df(self) -> gpd.GeoDataFrame:

return df

def to_geojson(self):
def to_geojson(self) -> dict:
"""Export as GeoJSON FeatureCollection."""
return shapely.geometry.mapping(self._as_geopandas_df())

def to_wkt(self) -> List[str]:
Expand All @@ -242,6 +245,14 @@ def write_assets(
format_info = IOFORMATS.get(format)
# TODO: check if format can be used for vector data?
path = directory / f"vectorcube.{format_info.extension}"

if format_info.format == "JSON":
# TODO: eliminate this legacy format?
log.warning(
f"Exporting vector cube {self} to legacy, non-standard JSON format"
)
return self.to_legacy_save_result().write_assets(directory)

self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver)

if not format_info.multi_file:
Expand Down Expand Up @@ -274,6 +285,36 @@ def write_assets(
def to_multipolygon(self) -> shapely.geometry.MultiPolygon:
return shapely.ops.unary_union(self._geometries.geometry)

def to_legacy_save_result(self) -> Union["AggregatePolygonResult", "JSONResult"]:
"""
Export to legacy AggregatePolygonResult/JSONResult objects.
Provided as temporary adaption layer while migrating to real vector cubes.
"""
# TODO: eliminate these legacy, non-standard formats?
from openeo_driver.save_result import AggregatePolygonResult, JSONResult

cube = self._cube
# TODO: more flexible temporal/band dimension detection?
if cube.dims == (self.DIM_GEOMETRIES, "t"):
# Add single band dimension
cube = cube.expand_dims({"bands": ["band"]}, axis=-1)
if cube.dims == (self.DIM_GEOMETRIES, "t", "bands"):
cube = cube.transpose("t", self.DIM_GEOMETRIES, "bands")
timeseries = {
t.item(): t_slice.values.tolist()
for t, t_slice in zip(cube.coords["t"], cube)
}
return AggregatePolygonResult(timeseries=timeseries, regions=self)
elif cube.dims == (self.DIM_GEOMETRIES, "bands"):
# This covers the legacy `AggregatePolygonSpatialResult` code path,
# but as AggregatePolygonSpatialResult's constructor expects a folder of CSV file(s),
# we keep it simple here with a basic JSONResult result.
cube = cube.transpose(self.DIM_GEOMETRIES, "bands")
return JSONResult(data=cube.values.tolist())
raise ValueError(
f"Unsupported cube configuration {cube.dims} for _write_legacy_aggregate_polygon_result_json"
)

def get_bounding_box(self) -> Tuple[float, float, float, float]:
return tuple(self._geometries.total_bounds)

Expand All @@ -293,6 +334,15 @@ def __eq__(self, other):
return (isinstance(other, DriverVectorCube)
and np.array_equal(self._as_geopandas_df().values, other._as_geopandas_df().values))

def fit_class_random_forest(
self,
target: "DriverVectorCube",
num_trees: int = 100,
max_variables: Optional[Union[int, str]] = None,
seed: Optional[int] = None,
) -> "DriverMlModel":
raise NotImplementedError


class DriverMlModel:
"""Base class for driver-side 'ml-model' data structures"""
Expand Down
6 changes: 4 additions & 2 deletions openeo_driver/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ def get_source_constraints(self, merge=True) -> List[SourceConstraint]:
return source_constraints

def get_geometries(
self, operation="aggregate_spatial"
) -> List[Union[shapely.geometry.base.BaseGeometry, DelayedVector]]:
self, operation="aggregate_spatial"
) -> List[
Union[shapely.geometry.base.BaseGeometry, DelayedVector, DriverVectorCube]
]:
"""Get geometries (polygons or DelayedVector), as used by aggregate_spatial"""
geometries_by_id = {}
for leaf in self.get_trace_leaves():
Expand Down
34 changes: 32 additions & 2 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,16 @@ def assert_polygon_sequence(geometries: Union[Sequence, BaseMultipartGeometry])
dims += (self.metadata.band_dimension.name,)
coords[self.metadata.band_dimension.name] = self.metadata.band_names
shape = [len(coords[d]) for d in dims]
data = numpy.arange(numpy.prod(shape)).reshape(shape)
cube = xarray.DataArray(data=data, dims=dims, coords=coords, name="aggregate_spatial")
data = numpy.arange(numpy.prod(shape), dtype="float")
# Start with some more interesting values (e.g. to test NaN/null/None handling)
data[0] = 2.345
data[1] = float("nan")
cube = xarray.DataArray(
data=data.reshape(shape),
dims=dims,
coords=coords,
name="aggregate_spatial",
)
return geometries.with_cube(cube=cube, flatten_prefix="agg")
elif isinstance(geometries, str):
geometries = [geometry for geometry in DelayedVector(geometries).geometries]
Expand Down Expand Up @@ -276,6 +284,25 @@ def fit_class_random_forest(
)


class DummyVectorCube(DriverVectorCube):
def fit_class_random_forest(
self,
target: DriverVectorCube,
num_trees: int = 100,
max_variables: Optional[Union[int, str]] = None,
seed: Optional[int] = None,
) -> "DriverMlModel":
return DummyMlModel(
process_id="fit_class_random_forest",
# TODO: handle `to_geojson` in `DummyMlModel.write_assets` instead of here?
data=self.to_geojson(),
target=target,
num_trees=num_trees,
max_variables=max_variables,
seed=seed,
)


class DummyMlModel(DriverMlModel):

def __init__(self, **kwargs):
Expand Down Expand Up @@ -600,6 +627,9 @@ def delete(self, user_id: str, process_id: str) -> None:


class DummyBackendImplementation(OpenEoBackendImplementation):

vector_cube_cls = DummyVectorCube

def __init__(self, processing: Optional[Processing] = None):
super(DummyBackendImplementation, self).__init__(
secondary_services=DummySecondaryServices(),
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def udp_registry(backend_implementation) -> UserDefinedProcesses:
def flask_app(backend_implementation) -> flask.Flask:
app = build_app(
backend_implementation=backend_implementation,
# error_handling=False
# error_handling=False,
)
app.config.from_mapping(TEST_APP_CONFIG)
return app
Expand Down
2 changes: 1 addition & 1 deletion tests/data/pg/1.0/no_nested_json_result.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"result": true,
"process_id": "save_result",
"arguments": {
"format": "GTIFF",
"format": "GeoJSON",
"data": {
"from_node": "aggregatespatial1"
},
Expand Down
4 changes: 4 additions & 0 deletions tests/data/pg/1.0/run_udf_on_timeseries.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
"temporal_extent": [
"2017-11-21",
"2017-11-21"
],
"bands": [
"B02",
"B03"
]
}
},
Expand Down
Loading

0 comments on commit aee2c00

Please sign in to comment.