Skip to content

Commit

Permalink
EP-3981 vector cube support in aggregate_spatial
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Apr 20, 2022
1 parent e6781fb commit aa5ec2c
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 47 deletions.
7 changes: 5 additions & 2 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def _extract_load_parameters(env: EvalEnv, source_id: tuple) -> LoadParameters:
params.bands = constraints.get("bands", None)
params.properties = constraints.get("properties", {})
params.aggregate_spatial_geometries = constraints.get("aggregate_spatial", {}).get("geometries")
if params.aggregate_spatial_geometries == None:
if params.aggregate_spatial_geometries is None:
params.aggregate_spatial_geometries = constraints.get("filter_spatial", {}).get("geometries")
params.sar_backscatter = constraints.get("sar_backscatter", None)
params.process_types = process_types
Expand Down Expand Up @@ -949,7 +949,10 @@ def aggregate_spatial(args: dict, env: EvalEnv) -> DriverDataCube:
target_dimension = args.get('target_dimension', None)

geoms = extract_arg(args, 'geometries')
if isinstance(geoms, dict):
# TODO: convert all cases to DriverVectorCube first and just work with that
if isinstance(geoms, DriverVectorCube):
geoms = geoms
elif isinstance(geoms, dict):
geoms = geojson_to_geometry(geoms)
elif isinstance(geoms, DelayedVector):
geoms = geoms.path
Expand Down
1 change: 1 addition & 0 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class LoadParameters(dict):
global_extent = dict_item(default={})
bands = dict_item(default=None)
properties = dict_item(default={})
# TODO: rename this to filter_spatial_geometries (because it is used for load_collection-time filtering)?
aggregate_spatial_geometries = dict_item(default=None)
sar_backscatter: Union[SarBackscatterArgs, None] = dict_item(default=None)
process_types = dict_item(default=set())
Expand Down
76 changes: 66 additions & 10 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import inspect
import logging
import zipfile
from pathlib import Path
from typing import List, Union, Optional, Dict, Any
from typing import List, Union, Optional, Dict, Any, Tuple, Sequence

import geopandas as gpd
import pandas as pd
import shapely.geometry
import shapely.geometry.base
import shapely.ops
import xarray

from openeo import ImageCollection
from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.utils import EvalEnv

log = logging.getLogger(__name__)


class DriverDataCube(ImageCollection):
"""Base class for "driver" side raster data cubes."""
Expand Down Expand Up @@ -103,10 +108,10 @@ def aggregate_temporal(self, intervals: list, reducer, labels: list = None,

def aggregate_spatial(
self,
geometries: Union[shapely.geometry.base.BaseGeometry, str],
geometries: Union[shapely.geometry.base.BaseGeometry, str, "DriverVectorCube"],
reducer: dict,
target_dimension: str = "result",
) -> Union["AggregatePolygonResult", "AggregatePolygonSpatialResult"]:
) -> Union["AggregatePolygonResult", "AggregatePolygonSpatialResult", "DriverVectorCube"]:
self._not_implemented()

def zonal_statistics(self, regions, func:str) -> 'DriverDataCube':
Expand All @@ -132,16 +137,40 @@ def resolution_merge(self, args: ResolutionMergeArgs) -> 'DriverDataCube':
self._not_implemented()


class VectorCubeError(InternalException):
code = "VectorCubeError"

def __init__(self, message="Unspecified VectorCube error"):
super(VectorCubeError, self).__init__(message=message)


class DriverVectorCube:
"""
Base class for driver-side 'vector cubes'
Conceptually comparable to GeoJSON FeatureCollections, but possibly more advanced with more dimensions, bands, ...
Internal design has two components:
- a GeoPandas dataframe for holding the GeoJSON-style properties (possibly heterogeneously typed or sparse/free-style)
- optional xarray DataArray for holding the data cube data (homogeneously typed and rigorously indexed/gridded)
These components are "joined" on the GeoPandas dataframe's index and DataArray first dimension
"""
DIM_GEOMETRIES = "geometries"

def __init__(self, data: gpd.GeoDataFrame):
def __init__(self, geometries: gpd.GeoDataFrame, cube: Optional[xarray.DataArray] = None):
# TODO EP-3981: consider other data containers (xarray) and lazy loading?
self.data = data
if cube is not None:
if cube.dims[0] != self.DIM_GEOMETRIES:
log.error(f"First cube dim should be {self.DIM_GEOMETRIES!r} but got dims {cube.dims!r}")
raise VectorCubeError("Cube's first dimension is invalid.")
if not geometries.index.equals(cube.indexes[cube.dims[0]]):
log.error(f"Invalid VectorCube components {geometries.index!r} != {cube.indexes[cube.dims[0]]!r}")
raise VectorCubeError("Incompatible vector cube components")
self._geometries = geometries
self._cube = cube

def with_cube(self, cube: xarray.DataArray) -> "DriverVectorCube":
"""Create new vector cube with same geometries but new cube"""
log.info(f"Creating vector cube with new cube {cube.name!r}")
return DriverVectorCube(geometries=self._geometries, cube=cube)

@classmethod
def from_fiona(cls, paths: List[str], driver: str, options: dict):
Expand All @@ -150,14 +179,32 @@ def from_fiona(cls, paths: List[str], driver: str, options: dict):
# TODO EP-3981: support multiple paths
raise FeatureUnsupportedException(message="Loading a vector cube from multiple files is not supported")
# TODO EP-3981: lazy loading like/with DelayedVector
return cls(data=gpd.read_file(paths[0], driver=driver))
return cls(geometries=gpd.read_file(paths[0], driver=driver))

def _as_geopandas_df(self) -> gpd.GeoDataFrame:
"""Join geometries and cube as a geopandas dataframe"""
# TODO: avoid copy?
df = self._geometries.copy(deep=True)
if self._cube is not None:
# TODO: better way to combine cube with geometries
# Flatten multiple (non-geometry) dimensions from cube to new properties in geopandas dataframe
stacked = self._cube.stack(prop=self._cube.dims[1:])
log.info(f"Flattened cube component of vector cube to {stacked.shape[1]} properties")
for p in stacked.indexes["prop"]:
name = "~".join(str(x) for x in p)
# TODO: avoid column collisions?
df[name] = stacked.sel(prop=p)
return df

def to_geojson(self):
return shapely.geometry.mapping(self._as_geopandas_df())

def write_assets(self, directory: Union[str, Path], format: str, options: Optional[dict] = None) -> Dict[str, StacAsset]:
directory = ensure_dir(directory)
format_info = IOFORMATS.get(format)
# TODO: check if format can be used for vector data?
path = directory / f"vectorcube.{format_info.extension}"
self.data.to_file(path, driver=format_info.fiona_driver)
self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver)

if not format_info.multi_file:
# single file format
Expand Down Expand Up @@ -187,7 +234,16 @@ def write_assets(self, directory: Union[str, Path], format: str, options: Option
return {p.name: {"href": p} for p in components}

def to_multipolygon(self) -> shapely.geometry.MultiPolygon:
return shapely.ops.unary_union(self.data.geometry)
return shapely.ops.unary_union(self._geometries.geometry)

def get_bounding_box(self) -> Tuple[float, float, float, float]:
return self._geometries.total_bounds

def get_geometries(self) -> Sequence[shapely.geometry.base.BaseGeometry]:
return self._geometries.geometry

def get_geometries_index(self) -> pd.Index:
return self._geometries.index


class DriverMlModel:
Expand Down
13 changes: 8 additions & 5 deletions openeo_driver/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from openeo.metadata import CollectionMetadata
from openeo_driver import filter_properties
from openeo_driver.datacube import DriverDataCube
from openeo_driver.datacube import DriverDataCube, DriverVectorCube
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.save_result import AggregatePolygonResult, AggregatePolygonSpatialResult
Expand Down Expand Up @@ -439,7 +439,7 @@ def mask_polygon(self, mask, replacement=None, inside: bool = False) -> 'DriverD

def aggregate_spatial(
self,
geometries: Union[BaseGeometry, str],
geometries: Union[BaseGeometry, str, DriverVectorCube],
reducer: dict,
target_dimension: str = "result",
) -> Union[AggregatePolygonResult, AggregatePolygonSpatialResult]:
Expand All @@ -451,12 +451,15 @@ def aggregate_spatial(
geometries = GeometryCollection([geometries])
return AggregatePolygonResult(timeseries={}, regions=geometries)

def _normalize_geometry(self, geometries):
def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, DelayedVector, BaseGeometry], dict]:
"""
Helper to preprocess geometries (as used in aggregate_spatial and mask_polygon) and apply related filter_bbox
Helper to preprocess geometries (as used in aggregate_spatial and mask_polygon)
and extract bbox (e.g. for filter_bbox)
"""
# TODO EP-3981 normalize to vector cube instead of GeometryCollection
if isinstance(geometries, dict):
if isinstance(geometries, DriverVectorCube):
bbox = geometries.get_bounding_box()
elif isinstance(geometries, dict):
return self._normalize_geometry(geojson_to_geometry(geometries))
elif isinstance(geometries, str):
return self._normalize_geometry(DelayedVector(geometries))
Expand Down
63 changes: 40 additions & 23 deletions openeo_driver/dummy/dummy_backend.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@

import numbers
import json
import numbers
import uuid
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Union, Tuple, Optional, Iterable, Any
from typing import List, Dict, Union, Tuple, Optional, Iterable, Any, Sequence
from unittest.mock import Mock

import flask
import numpy
import xarray
from shapely.geometry import Polygon, MultiPolygon
from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry
from shapely.geometry.collection import GeometryCollection
from shapely.geometry.base import BaseGeometry

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import CollectionMetadata, Band
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing
from openeo_driver.backend import (SecondaryServices, OpenEoBackendImplementation, CollectionCatalog, ServiceMetadata,
BatchJobs, BatchJobMetadata, OidcProvider, UserDefinedProcesses,
UserDefinedProcessMetadata, LoadParameters, Processing)
from openeo_driver.datacube import DriverDataCube, DriverMlModel
from openeo_driver.datacube import DriverDataCube, DriverMlModel, DriverVectorCube
from openeo_driver.datastructs import StacAsset
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.dry_run import SourceConstraint
Expand Down Expand Up @@ -203,38 +204,53 @@ def zonal_statistics(self, regions, func, scale=1000, interval="day")\

def aggregate_spatial(
self,
geometries: Union[BaseGeometry, str],
geometries: Union[BaseGeometry, str, DriverVectorCube],
reducer: dict,
target_dimension: str = "result",
) -> Union[AggregatePolygonResult, AggregatePolygonSpatialResult]:
) -> Union[AggregatePolygonResult, AggregatePolygonSpatialResult, DriverVectorCube]:

# TODO: support more advanced reducers too
assert isinstance(reducer, dict) and len(reducer) == 1
reducer = next(iter(reducer.values()))["process_id"]
assert reducer == 'mean' or reducer == 'avg'

# TODO EP-3981 normalize to vector cube and preserve original properties
def assert_polygon_or_multipolygon(geometry):
assert isinstance(geometry, Polygon) or isinstance(geometry, MultiPolygon)
def assert_polygon_sequence(geometries: Union[Sequence, BaseMultipartGeometry]):
assert len(geometries) > 0
for g in geometries:
assert isinstance(g, Polygon) or isinstance(g, MultiPolygon)

if isinstance(geometries, str):
# TODO EP-3981 normalize to vector cube and preserve original properties
if isinstance(geometries, DriverVectorCube):
# Build dummy aggregation data cube
dims = (DriverVectorCube.DIM_GEOMETRIES,)
# TODO: use something else than the geopandas dataframe's index?
coords = {DriverVectorCube.DIM_GEOMETRIES: geometries.get_geometries_index().to_list()}
if self.metadata.has_temporal_dimension():
dims += (self.metadata.temporal_dimension.name,)
coords[self.metadata.temporal_dimension.name] = ["2015-07-06T00:00:00", "2015-08-22T00:00:00"]
if self.metadata.has_band_dimension():
dims += (self.metadata.band_dimension.name,)
coords[self.metadata.band_dimension.name] = self.metadata.band_names
shape = [len(coords[d]) for d in dims]
data = numpy.arange(numpy.prod(shape)).reshape(shape)
cube = xarray.DataArray(data=data, dims=dims, coords=coords, name="aggregate_spatial")
return geometries.with_cube(cube=cube)
elif isinstance(geometries, str):
geometries = [geometry for geometry in DelayedVector(geometries).geometries]
assert len(geometries) > 0
for geometry in geometries:
assert_polygon_or_multipolygon(geometry)
assert_polygon_sequence(geometries)
elif isinstance(geometries, GeometryCollection):
assert len(geometries) > 0
for geometry in geometries:
assert_polygon_or_multipolygon(geometry)
# TODO EP-3981: GeometryCollection is deprecated
assert_polygon_sequence(geometries)
elif isinstance(geometries, BaseGeometry):
assert_polygon_sequence([geometries])
else:
assert_polygon_or_multipolygon(geometries)
geometries = [geometries]
assert_polygon_sequence(geometries)

if self.metadata.has_temporal_dimension():
return AggregatePolygonResult(timeseries={
"2015-07-06T00:00:00": [2.345],
"2015-08-22T00:00:00": [float('nan')]
}, regions=GeometryCollection())
}, regions=geometries)
else:
return DummyAggregatePolygonSpatialResult(cube=self, geometries=geometries)

Expand Down Expand Up @@ -397,9 +413,10 @@ def load_collection(self, collection_id: str, load_params: LoadParameters, env:
if collection_id in _collections:
return _collections[collection_id]

image_collection = DummyDataCube(
metadata=CollectionMetadata(metadata=self.get_collection_metadata(collection_id))
)
metadata = CollectionMetadata(metadata=self.get_collection_metadata(collection_id))
if load_params.bands:
metadata = metadata.filter_bands(load_params.bands)
image_collection = DummyDataCube(metadata=metadata)

_collections[collection_id] = image_collection
return image_collection
Expand Down
8 changes: 4 additions & 4 deletions tests/data/pg/1.0/mask_with_vector_file.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
"arguments": {
"id": "S2_FOOBAR",
"bands": [
"1",
"2",
"3",
"4"
"B02",
"B03",
"B04",
"B08"
]
},
"result": false
Expand Down
Loading

0 comments on commit aa5ec2c

Please sign in to comment.